Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def start(self) -> int:
"""
This starts the worker gRPC server.
"""
self.worker_pool.start_metrics_thread()
self.worker_pool.start_result_thread()
self.worker_pool.start_spawn_children_thread()

Expand Down Expand Up @@ -556,6 +557,7 @@ def start(self) -> int:
"""
This starts a loop that runs until the worker completes its `max_task_count` or it is killed.
"""
self.worker_pool.start_metrics_thread()
self.worker_pool.start_result_thread()
self.worker_pool.start_spawn_children_thread()

Expand Down Expand Up @@ -732,6 +734,7 @@ def __init__(
self._children: list[BaseProcess] = []
self._shutdown_event = self._mp_context.Event()
self._result_thread: threading.Thread | None = None
self._metrics_thread: threading.Thread | None = None
self._spawn_children_thread: threading.Thread | None = None

def send_results(self, results: list[ProcessingResult], is_draining: bool = False) -> None:
Expand All @@ -754,6 +757,43 @@ def send_results(self, results: list[ProcessingResult], is_draining: bool = Fals
for result in results:
self.put_result(result)

def start_metrics_thread(self) -> None:
"""
Start a thread that emits metrics on an interval.
"""
Comment thread
sentry[bot] marked this conversation as resolved.

def metrics_thread() -> None:
tags = {
"processing_pool": self._processing_pool_name,
"pod_name": self._pod_name,
}

while True:
try:
# 'qsize' is not implemented on all platforms, such as macOS
self._metrics.gauge(
"taskworker.child_tasks.size",
float(self._child_tasks.qsize()),
tags=tags,
)

self._metrics.gauge(
"taskworker.processed_tasks.size",
float(self._processed_tasks.qsize()),
tags=tags,
)
except Exception as e:
logger.debug(
"taskworker.worker.queue_gauges.error",
extra={"error": e, "processing_pool": self._processing_pool_name},
)

time.sleep(1)

self._metrics_thread = threading.Thread(name="metrics", target=metrics_thread, daemon=True)

self._metrics_thread.start()
Comment thread
cursor[bot] marked this conversation as resolved.

def start_result_thread(self) -> None:
"""
Start a thread that delivers results and fetches new tasks.
Expand All @@ -770,30 +810,6 @@ def result_thread() -> None:
iopool = ThreadPoolExecutor(max_workers=self._concurrency)
with iopool as executor:
while not self._shutdown_event.is_set():
tags = {
"processing_pool": self._processing_pool_name,
"pod_name": self._pod_name,
}

try:
# 'qsize' is not implemented on all platforms, such as macOS
self._metrics.gauge(
"taskworker.child_tasks.size",
float(self._child_tasks.qsize()),
tags=tags,
)

self._metrics.gauge(
"taskworker.processed_tasks.size",
float(self._processed_tasks.qsize()),
tags=tags,
)
except Exception as e:
logger.debug(
"taskworker.worker.queue_gauges.error",
extra={"error": e, "processing_pool": self._processing_pool_name},
)

results = []
while True:
try:
Expand Down
Loading