Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,14 @@ def __init__(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
processing_pool_name: str | None = None,
) -> None:
assert len(hosts) > 0, "You must provide at least one RPC host to connect to"
self._application = application
self._hosts = hosts
self._rpc_secret = rpc_secret
self._metrics = metrics
self._processing_pool_name = processing_pool_name or "unknown"

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
Expand Down Expand Up @@ -322,6 +324,7 @@ def _emit_health_check(self) -> None:
self._health_check_settings.file_path.touch()
self._metrics.incr(
"taskworker.client.health_check.touched",
tags={"processing_pool": self._processing_pool_name},
)
self._timestamp_since_touch = cur_time

Expand Down Expand Up @@ -374,15 +377,18 @@ def _get_cur_stub(self) -> tuple[str, ConsumerServiceStub]:
self._num_consecutive_unavailable_errors = 0
self._metrics.incr(
"taskworker.client.loadbalancer.rebalance",
tags={"reason": "unavailable_count_reached"},
tags={
"reason": "unavailable_count_reached",
"processing_pool": self._processing_pool_name,
},
)
elif self._num_tasks_before_rebalance == 0:
self._cur_host = random.choice(available_hosts)
self._num_tasks_before_rebalance = self._max_tasks_before_rebalance
self._num_consecutive_unavailable_errors = 0
self._metrics.incr(
"taskworker.client.loadbalancer.rebalance",
tags={"reason": "max_tasks_reached"},
tags={"reason": "max_tasks_reached", "processing_pool": self._processing_pool_name},
)

stub = self._get_stub(self._cur_host)
Expand All @@ -402,11 +408,19 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non
request = GetTaskRequest(application=self._application, namespace=namespace)
try:
host, stub = self._get_cur_stub()
with self._metrics.timer("taskworker.get_task.rpc", tags={"host": host}):
with self._metrics.timer(
"taskworker.get_task.rpc",
tags={"host": host, "processing_pool": self._processing_pool_name},
):
response = stub.GetTask(request)
except grpc.RpcError as err:
self._metrics.incr(
"taskworker.client.rpc_error", tags={"method": "GetTask", "status": err.code().name}
"taskworker.client.rpc_error",
tags={
"method": "GetTask",
"status": err.code().name,
"processing_pool": self._processing_pool_name,
},
)
if err.code() == grpc.StatusCode.NOT_FOUND:
# Because our current broker doesn't have any tasks, try rebalancing.
Expand All @@ -421,7 +435,10 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non
if response.HasField("task"):
self._metrics.incr(
"taskworker.client.get_task",
tags={"namespace": response.task.namespace},
tags={
"namespace": response.task.namespace,
"processing_pool": self._processing_pool_name,
},
)
return InflightTaskActivation(
activation=response.task, host=host, receive_timestamp=time.monotonic()
Expand All @@ -443,7 +460,11 @@ def update_task(
fetch_next_task.application = self._application

self._metrics.incr(
"taskworker.client.fetch_next", tags={"next": fetch_next_task is not None}
"taskworker.client.fetch_next",
tags={
"next": fetch_next_task is not None,
"processing_pool": self._processing_pool_name,
},
)
self._clear_temporary_unavailable_hosts()
request = SetTaskStatusRequest(
Expand All @@ -458,21 +479,32 @@ def update_task(
if processing_result.host in self._temporary_unavailable_hosts:
self._metrics.incr(
"taskworker.client.skipping_set_task_due_to_unavailable_host",
tags={"broker_host": processing_result.host},
tags={
"broker_host": processing_result.host,
"processing_pool": self._processing_pool_name,
},
)
raise HostTemporarilyUnavailable(
f"Host: {processing_result.host} is temporarily unavailable"
)

stub = self._get_stub(processing_result.host)
with self._metrics.timer(
"taskworker.update_task.rpc", tags={"host": processing_result.host}
"taskworker.update_task.rpc",
tags={
"host": processing_result.host,
"processing_pool": self._processing_pool_name,
},
):
response = stub.SetTaskStatus(request)
except grpc.RpcError as err:
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetTaskStatus", "status": err.code().name},
tags={
"method": "SetTaskStatus",
"status": err.code().name,
"processing_pool": self._processing_pool_name,
},
)
if err.code() == grpc.StatusCode.NOT_FOUND:
# The current broker is empty, switch.
Expand Down
27 changes: 23 additions & 4 deletions clients/python/src/taskbroker_client/worker/push_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def __init__(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
processing_pool_name: str | None = None,
) -> None:
self._application = application
self._service = service
self._rpc_secret = rpc_secret
self._metrics = metrics
self._processing_pool_name = processing_pool_name or "unknown"

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
Expand Down Expand Up @@ -81,6 +83,7 @@ def _emit_health_check(self) -> None:
self._health_check_settings.file_path.touch()
self._metrics.incr(
"taskworker.client.health_check.touched",
tags={"processing_pool": self._processing_pool_name},
)
self._timestamp_since_touch = cur_time

Expand Down Expand Up @@ -121,7 +124,11 @@ def _update_task_single(
while retries < 3:
try:
with self._metrics.timer(
"taskworker.update_task.rpc", tags={"service": self._service}
"taskworker.update_task.rpc",
tags={
"service": self._service,
"processing_pool": self._processing_pool_name,
},
):
self._stub.SetTaskStatus(request)
exception = None
Expand All @@ -130,7 +137,11 @@ def _update_task_single(
exception = err
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetTaskStatus", "status": err.code().name},
tags={
"method": "SetTaskStatus",
"status": err.code().name,
"processing_pool": self._processing_pool_name,
},
)
finally:
retries += 1
Expand Down Expand Up @@ -174,7 +185,11 @@ def update_tasks(
while retries < 3:
try:
with self._metrics.timer(
"taskworker.update_task_batch.rpc", tags={"service": self._service}
"taskworker.update_task_batch.rpc",
tags={
"service": self._service,
"processing_pool": self._processing_pool_name,
},
):
self._stub.SetBatchActivationStatus(request)
exception = None
Expand All @@ -183,7 +198,11 @@ def update_tasks(
exception = err
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetBatchActivationStatus", "status": err.code().name},
tags={
"method": "SetBatchActivationStatus",
"status": err.code().name,
"processing_pool": self._processing_pool_name,
},
)
finally:
retries += 1
Expand Down
6 changes: 6 additions & 0 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def __init__(
),
rpc_secret=app.config["rpc_secret"],
grpc_config=app.config["grpc_config"],
processing_pool_name=processing_pool_name,
)
self._metrics = app.metrics
self._concurrency = concurrency
Expand All @@ -207,6 +208,7 @@ def _create_client(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
processing_pool_name: str | None = None,
) -> PushTaskbrokerClient:
return PushTaskbrokerClient(
service=service,
Expand All @@ -215,6 +217,7 @@ def _create_client(
health_check_settings=health_check_settings,
rpc_secret=rpc_secret,
grpc_config=grpc_config,
processing_pool_name=processing_pool_name,
)

def _send_results(
Expand Down Expand Up @@ -408,6 +411,7 @@ def _create_client(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
processing_pool_name: str | None = None,
) -> PushTaskbrokerClient:
return BatchPushTaskbrokerClient(
service=service,
Expand All @@ -416,6 +420,7 @@ def _create_client(
health_check_settings=health_check_settings,
rpc_secret=rpc_secret,
grpc_config=grpc_config,
processing_pool_name=processing_pool_name,
)

def _send_results(
Expand Down Expand Up @@ -536,6 +541,7 @@ def __init__(
),
rpc_secret=app.config["rpc_secret"],
grpc_config=app.config["grpc_config"],
processing_pool_name=processing_pool_name,
)
self._metrics = app.metrics

Expand Down
Loading