diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 46896763..5e8e1ba9 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -271,12 +271,14 @@ def __init__( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + processing_pool_name: str | None = None, ) -> None: assert len(hosts) > 0, "You must provide at least one RPC host to connect to" self._application = application self._hosts = hosts self._rpc_secret = rpc_secret self._metrics = metrics + self._processing_pool_name = processing_pool_name or "unknown" self._grpc_options: list[tuple[str, Any]] = [ ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) @@ -322,6 +324,7 @@ def _emit_health_check(self) -> None: self._health_check_settings.file_path.touch() self._metrics.incr( "taskworker.client.health_check.touched", + tags={"processing_pool": self._processing_pool_name}, ) self._timestamp_since_touch = cur_time @@ -374,7 +377,10 @@ def _get_cur_stub(self) -> tuple[str, ConsumerServiceStub]: self._num_consecutive_unavailable_errors = 0 self._metrics.incr( "taskworker.client.loadbalancer.rebalance", - tags={"reason": "unavailable_count_reached"}, + tags={ + "reason": "unavailable_count_reached", + "processing_pool": self._processing_pool_name, + }, ) elif self._num_tasks_before_rebalance == 0: self._cur_host = random.choice(available_hosts) @@ -382,7 +388,7 @@ def _get_cur_stub(self) -> tuple[str, ConsumerServiceStub]: self._num_consecutive_unavailable_errors = 0 self._metrics.incr( "taskworker.client.loadbalancer.rebalance", - tags={"reason": "max_tasks_reached"}, + tags={"reason": "max_tasks_reached", "processing_pool": self._processing_pool_name}, ) stub = self._get_stub(self._cur_host) @@ -402,11 +408,19 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non request = GetTaskRequest(application=self._application, namespace=namespace) try: host, stub = self._get_cur_stub() - with self._metrics.timer("taskworker.get_task.rpc", tags={"host": host}): + with self._metrics.timer( + "taskworker.get_task.rpc", + tags={"host": host, "processing_pool": self._processing_pool_name}, + ): response = stub.GetTask(request) except grpc.RpcError as err: self._metrics.incr( - "taskworker.client.rpc_error", tags={"method": "GetTask", "status": err.code().name} + "taskworker.client.rpc_error", + tags={ + "method": "GetTask", + "status": err.code().name, + "processing_pool": self._processing_pool_name, + }, ) if err.code() == grpc.StatusCode.NOT_FOUND: # Because our current broker doesn't have any tasks, try rebalancing. @@ -421,7 +435,10 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non if response.HasField("task"): self._metrics.incr( "taskworker.client.get_task", - tags={"namespace": response.task.namespace}, + tags={ + "namespace": response.task.namespace, + "processing_pool": self._processing_pool_name, + }, ) return InflightTaskActivation( activation=response.task, host=host, receive_timestamp=time.monotonic() @@ -443,7 +460,11 @@ def update_task( fetch_next_task.application = self._application self._metrics.incr( - "taskworker.client.fetch_next", tags={"next": fetch_next_task is not None} + "taskworker.client.fetch_next", + tags={ + "next": fetch_next_task is not None, + "processing_pool": self._processing_pool_name, + }, ) self._clear_temporary_unavailable_hosts() request = SetTaskStatusRequest( @@ -458,7 +479,10 @@ def update_task( if processing_result.host in self._temporary_unavailable_hosts: self._metrics.incr( "taskworker.client.skipping_set_task_due_to_unavailable_host", - tags={"broker_host": processing_result.host}, + tags={ + "broker_host": processing_result.host, + "processing_pool": self._processing_pool_name, + }, ) raise HostTemporarilyUnavailable( f"Host: {processing_result.host} is temporarily unavailable" @@ -466,13 +490,21 @@ def update_task( stub = self._get_stub(processing_result.host) with self._metrics.timer( - "taskworker.update_task.rpc", tags={"host": processing_result.host} + "taskworker.update_task.rpc", + tags={ + "host": processing_result.host, + "processing_pool": self._processing_pool_name, + }, ): response = stub.SetTaskStatus(request) except grpc.RpcError as err: self._metrics.incr( "taskworker.client.rpc_error", - tags={"method": "SetTaskStatus", "status": err.code().name}, + tags={ + "method": "SetTaskStatus", + "status": err.code().name, + "processing_pool": self._processing_pool_name, + }, ) if err.code() == grpc.StatusCode.NOT_FOUND: # The current broker is empty, switch. diff --git a/clients/python/src/taskbroker_client/worker/push_clients.py b/clients/python/src/taskbroker_client/worker/push_clients.py index 5a13b6de..b0f95e0e 100644 --- a/clients/python/src/taskbroker_client/worker/push_clients.py +++ b/clients/python/src/taskbroker_client/worker/push_clients.py @@ -43,11 +43,13 @@ def __init__( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + processing_pool_name: str | None = None, ) -> None: self._application = application self._service = service self._rpc_secret = rpc_secret self._metrics = metrics + self._processing_pool_name = processing_pool_name or "unknown" self._grpc_options: list[tuple[str, Any]] = [ ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) @@ -81,6 +83,7 @@ def _emit_health_check(self) -> None: self._health_check_settings.file_path.touch() self._metrics.incr( "taskworker.client.health_check.touched", + tags={"processing_pool": self._processing_pool_name}, ) self._timestamp_since_touch = cur_time @@ -121,7 +124,11 @@ def _update_task_single( while retries < 3: try: with self._metrics.timer( - "taskworker.update_task.rpc", tags={"service": self._service} + "taskworker.update_task.rpc", + tags={ + "service": self._service, + "processing_pool": self._processing_pool_name, + }, ): self._stub.SetTaskStatus(request) exception = None @@ -130,7 +137,11 @@ def _update_task_single( exception = err self._metrics.incr( "taskworker.client.rpc_error", - tags={"method": "SetTaskStatus", "status": err.code().name}, + tags={ + "method": "SetTaskStatus", + "status": err.code().name, + "processing_pool": self._processing_pool_name, + }, ) finally: retries += 1 @@ -174,7 +185,11 @@ def update_tasks( while retries < 3: try: with self._metrics.timer( - "taskworker.update_task_batch.rpc", tags={"service": self._service} + "taskworker.update_task_batch.rpc", + tags={ + "service": self._service, + "processing_pool": self._processing_pool_name, + }, ): self._stub.SetBatchActivationStatus(request) exception = None @@ -183,7 +198,11 @@ def update_tasks( exception = err self._metrics.incr( "taskworker.client.rpc_error", - tags={"method": "SetBatchActivationStatus", "status": err.code().name}, + tags={ + "method": "SetBatchActivationStatus", + "status": err.code().name, + "processing_pool": self._processing_pool_name, + }, ) finally: retries += 1 diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 9c9b7fb6..f62c371f 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -181,6 +181,7 @@ def __init__( ), rpc_secret=app.config["rpc_secret"], grpc_config=app.config["grpc_config"], + processing_pool_name=processing_pool_name, ) self._metrics = app.metrics self._concurrency = concurrency @@ -207,6 +208,7 @@ def _create_client( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + processing_pool_name: str | None = None, ) -> PushTaskbrokerClient: return PushTaskbrokerClient( service=service, @@ -215,6 +217,7 @@ def _create_client( health_check_settings=health_check_settings, rpc_secret=rpc_secret, grpc_config=grpc_config, + processing_pool_name=processing_pool_name, ) def _send_results( @@ -408,6 +411,7 @@ def _create_client( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + processing_pool_name: str | None = None, ) -> PushTaskbrokerClient: return BatchPushTaskbrokerClient( service=service, @@ -416,6 +420,7 @@ def _create_client( health_check_settings=health_check_settings, rpc_secret=rpc_secret, grpc_config=grpc_config, + processing_pool_name=processing_pool_name, ) def _send_results( @@ -536,6 +541,7 @@ def __init__( ), rpc_secret=app.config["rpc_secret"], grpc_config=app.config["grpc_config"], + processing_pool_name=processing_pool_name, ) self._metrics = app.metrics