diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index f62c371f..f15d3755 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -312,6 +312,7 @@ def start(self) -> int: """ This starts the worker gRPC server. """ + self.worker_pool.start_metrics_thread() self.worker_pool.start_result_thread() self.worker_pool.start_spawn_children_thread() @@ -556,6 +557,7 @@ def start(self) -> int: """ This starts a loop that runs until the worker completes its `max_task_count` or it is killed. """ + self.worker_pool.start_metrics_thread() self.worker_pool.start_result_thread() self.worker_pool.start_spawn_children_thread() @@ -732,6 +734,7 @@ def __init__( self._children: list[BaseProcess] = [] self._shutdown_event = self._mp_context.Event() self._result_thread: threading.Thread | None = None + self._metrics_thread: threading.Thread | None = None self._spawn_children_thread: threading.Thread | None = None def send_results(self, results: list[ProcessingResult], is_draining: bool = False) -> None: @@ -754,6 +757,43 @@ def send_results(self, results: list[ProcessingResult], is_draining: bool = Fals for result in results: self.put_result(result) + def start_metrics_thread(self) -> None: + """ + Start a thread that emits metrics on an interval. + """ + + def metrics_thread() -> None: + tags = { + "processing_pool": self._processing_pool_name, + "pod_name": self._pod_name, + } + + while True: + try: + # 'qsize' is not implemented on all platforms, such as macOS + self._metrics.gauge( + "taskworker.child_tasks.size", + float(self._child_tasks.qsize()), + tags=tags, + ) + + self._metrics.gauge( + "taskworker.processed_tasks.size", + float(self._processed_tasks.qsize()), + tags=tags, + ) + except Exception as e: + logger.debug( + "taskworker.worker.queue_gauges.error", + extra={"error": e, "processing_pool": self._processing_pool_name}, + ) + + time.sleep(1) + + self._metrics_thread = threading.Thread(name="metrics", target=metrics_thread, daemon=True) + + self._metrics_thread.start() + def start_result_thread(self) -> None: """ Start a thread that delivers results and fetches new tasks. @@ -770,30 +810,6 @@ def result_thread() -> None: iopool = ThreadPoolExecutor(max_workers=self._concurrency) with iopool as executor: while not self._shutdown_event.is_set(): - tags = { - "processing_pool": self._processing_pool_name, - "pod_name": self._pod_name, - } - - try: - # 'qsize' is not implemented on all platforms, such as macOS - self._metrics.gauge( - "taskworker.child_tasks.size", - float(self._child_tasks.qsize()), - tags=tags, - ) - - self._metrics.gauge( - "taskworker.processed_tasks.size", - float(self._processed_tasks.qsize()), - tags=tags, - ) - except Exception as e: - logger.debug( - "taskworker.worker.queue_gauges.error", - extra={"error": e, "processing_pool": self._processing_pool_name}, - ) - results = [] while True: try: