Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ readme = "README.md"
dependencies = [
"sentry-arroyo>=2.38.7",
"sentry-sdk[http2]>=2.43.0",
"sentry-protos>=0.15.0",
"sentry-protos>=0.26.1",
"confluent_kafka>=2.3.0",
"cronsim>=2.6",
"grpcio>=1.67.1",
Expand Down
27 changes: 23 additions & 4 deletions clients/python/src/examples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,37 @@ def scheduler() -> None:
@click.option(
"--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True
)
@click.option(
"--batch-push-mode", help="Whether to run in BATCH PUSH mode.", default=False, is_flag=True
)
@click.option(
"--grpc-port",
help="Port for the gRPC server to listen on.",
default=50052,
type=int,
)
def worker(rpc_host: str, concurrency: int, push_mode: bool, grpc_port: int) -> None:
from taskbroker_client.worker import PushTaskWorker, TaskWorker
def worker(
rpc_host: str, concurrency: int, push_mode: bool, batch_push_mode: bool, grpc_port: int
) -> None:
from taskbroker_client.worker import BatchPushTaskWorker, PushTaskWorker, TaskWorker

click.echo("Starting worker")
if push_mode:
worker: PushTaskWorker | TaskWorker = PushTaskWorker(
if batch_push_mode:
worker: PushTaskWorker | TaskWorker = BatchPushTaskWorker(
app_module="examples.app:app",
broker_service=rpc_host,
max_child_task_count=100,
concurrency=concurrency,
child_tasks_queue_maxsize=concurrency * 2,
result_queue_maxsize=concurrency * 2,
rebalance_after=32,
processing_pool_name="examples",
process_type="forkserver",
grpc_port=grpc_port,
update_in_batches=True,
)
Comment thread
evanh marked this conversation as resolved.
elif push_mode:
worker = PushTaskWorker(
app_module="examples.app:app",
broker_service=rpc_host,
max_child_task_count=100,
Expand Down
4 changes: 2 additions & 2 deletions clients/python/src/taskbroker_client/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .worker import PushTaskWorker, TaskWorker
from .worker import BatchPushTaskWorker, PushTaskWorker, TaskWorker

__all__ = ("TaskWorker", "PushTaskWorker")
__all__ = ("TaskWorker", "PushTaskWorker", "BatchPushTaskWorker")
108 changes: 0 additions & 108 deletions clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,111 +493,3 @@ def update_task(
receive_timestamp=time.monotonic(),
)
return None


class PushTaskbrokerClient:
"""
Taskworker RPC client wrapper

Push brokers are a deployment so they don't need to be connected to individually. There is one service provided
that works for all the brokers.
"""

def __init__(
self,
service: str,
application: str,
metrics: MetricsBackend,
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
) -> None:
self._application = application
self._service = service
self._rpc_secret = rpc_secret
self._metrics = metrics

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
]
if grpc_config:
self._grpc_options.append(("grpc.service_config", grpc_config))

logger.info(
"taskworker.push_client.start",
extra={"service": service, "options": self._grpc_options},
)

self._stub = self._connect_to_host(service)

self._health_check_settings = health_check_settings
self._timestamp_since_touch_lock = threading.Lock()
self._timestamp_since_touch = 0.0

def _emit_health_check(self) -> None:
if self._health_check_settings is None:
return

with self._timestamp_since_touch_lock:
cur_time = time.time()
if (
cur_time - self._timestamp_since_touch
< self._health_check_settings.touch_interval_sec
):
return

self._health_check_settings.file_path.touch()
self._metrics.incr(
"taskworker.client.health_check.touched",
)
self._timestamp_since_touch = cur_time

def _connect_to_host(self, host: str) -> ConsumerServiceStub:
logger.info("taskworker.push_client.connect", extra={"host": host})
channel = grpc.insecure_channel(host, options=self._grpc_options)
secrets = parse_rpc_secret_list(self._rpc_secret)
if secrets:
channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets))
return ConsumerServiceStub(channel)

def emit_health_check(self) -> None:
self._emit_health_check()

def update_task(
self,
processing_result: ProcessingResult,
) -> None:
"""
Update the status for a given task activation.
"""
self._emit_health_check()

request = SetTaskStatusRequest(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=None,
max_attempts=processing_result.max_attempts,
delay_on_retry=processing_result.delay_on_retry,
)

retries = 0
exception = None
while retries < 3:
try:
with self._metrics.timer(
"taskworker.update_task.rpc", tags={"service": self._service}
):
self._stub.SetTaskStatus(request)
exception = None
break
except grpc.RpcError as err:
exception = err
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetTaskStatus", "status": err.code().name},
)
finally:
retries += 1

if exception:
raise exception
192 changes: 192 additions & 0 deletions clients/python/src/taskbroker_client/worker/push_clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import logging
import threading
import time
from typing import TYPE_CHECKING, Any

import grpc
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
SetBatchActivationStatusRequest,
SetTaskStatusRequest,
)
from sentry_protos.taskbroker.v1.taskbroker_pb2_grpc import ConsumerServiceStub

from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.types import ProcessingResult
from taskbroker_client.worker.client import (
MAX_ACTIVATION_SIZE,
HealthCheckSettings,
RequestSignatureInterceptor,
parse_rpc_secret_list,
)

if TYPE_CHECKING:
ServerInterceptor = grpc.ServerInterceptor[Any, Any]
else:
ServerInterceptor = grpc.ServerInterceptor

logger = logging.getLogger(__name__)


class PushTaskbrokerClient:
"""
Taskworker RPC client wrapper

Push brokers are a deployment so they don't need to be connected to individually. There is one service provided
that works for all the brokers.
"""

def __init__(
self,
service: str,
application: str,
metrics: MetricsBackend,
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
) -> None:
self._application = application
self._service = service
self._rpc_secret = rpc_secret
self._metrics = metrics

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
]
if grpc_config:
self._grpc_options.append(("grpc.service_config", grpc_config))

logger.info(
"taskworker.push_client.start",
extra={"service": service, "options": self._grpc_options},
)

self._stub = self._connect_to_host(service)

self._health_check_settings = health_check_settings
self._timestamp_since_touch_lock = threading.Lock()
self._timestamp_since_touch = 0.0

def _emit_health_check(self) -> None:
if self._health_check_settings is None:
return

with self._timestamp_since_touch_lock:
cur_time = time.time()
if (
cur_time - self._timestamp_since_touch
< self._health_check_settings.touch_interval_sec
):
return

self._health_check_settings.file_path.touch()
self._metrics.incr(
"taskworker.client.health_check.touched",
)
self._timestamp_since_touch = cur_time

def _connect_to_host(self, host: str) -> ConsumerServiceStub:
logger.info("taskworker.push_client.connect", extra={"host": host})
channel = grpc.insecure_channel(host, options=self._grpc_options)
secrets = parse_rpc_secret_list(self._rpc_secret)
if secrets:
channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets))
return ConsumerServiceStub(channel)

def emit_health_check(self) -> None:
self._emit_health_check()

def update_tasks(self, processing_results: list[ProcessingResult]) -> None:
for processing_result in processing_results:
self._update_task_single(processing_result)

def _update_task_single(
self,
processing_result: ProcessingResult,
) -> None:
"""
Update the status for a given task activation.
"""
self._emit_health_check()

request = SetTaskStatusRequest(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=None,
max_attempts=processing_result.max_attempts,
delay_on_retry=processing_result.delay_on_retry,
)

retries = 0
exception = None
while retries < 3:
try:
with self._metrics.timer(
"taskworker.update_task.rpc", tags={"service": self._service}
):
self._stub.SetTaskStatus(request)
exception = None
break
except grpc.RpcError as err:
exception = err
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetTaskStatus", "status": err.code().name},
)
finally:
retries += 1

if exception:
raise exception


class BatchPushTaskbrokerClient(PushTaskbrokerClient):
"""
Taskworker RPC client wrapper

Push brokers are a deployment so they don't need to be connected to individually. There is one service provided
that works for all the brokers. This client pushes batches of activation updates.
"""

def update_tasks(
self,
processing_results: list[ProcessingResult],
) -> None:
"""
Update the status for a given task activation.
"""
self._emit_health_check()

request = SetBatchActivationStatusRequest(
updates=[
SetTaskStatusRequest(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=None,
max_attempts=processing_result.max_attempts,
delay_on_retry=processing_result.delay_on_retry,
)
for processing_result in processing_results
]
)

retries = 0
exception = None
while retries < 3:
try:
with self._metrics.timer(
"taskworker.update_task_batch.rpc", tags={"service": self._service}
):
self._stub.SetBatchActivationStatus(request)
exception = None
break
except grpc.RpcError as err:
exception = err
self._metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetBatchActivationStatus", "status": err.code().name},
)
finally:
retries += 1

if exception:
raise exception
Loading
Loading