diff --git a/pyproject.toml b/pyproject.toml index d3d722d79a36f3..4a8a502dd058de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,7 +107,7 @@ dependencies = [ "statsd>=3.3.0", "structlog>=22.1.0", "symbolic>=13.1.1", - "taskbroker-client>=0.18.6", + "taskbroker-client>=0.19.1", "tiktoken>=0.8.0", "tokenizers>=0.22.0", "tldextract>=5.1.2", diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index ca3140d64ade31..ac043f6906e4d3 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -138,6 +138,9 @@ def taskworker_scheduler(redis_cluster: str, **options: Any) -> None: @click.option( "--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True ) +@click.option( + "--batch-push-mode", help="Whether to run in BATCH PUSH mode.", default=False, is_flag=True +) @click.option( "--rpc-host", help="The hostname and port for the taskbroker gRPC server. When using num-brokers the hostname will be appended with `-{i}` to connect to individual brokers.", @@ -213,6 +216,7 @@ def taskworker(**options: Any) -> None: def run_taskworker( push_mode: bool, + batch_push_mode: bool, worker_rpc_port: int, rpc_host: str, num_brokers: int | None, @@ -232,7 +236,7 @@ def run_taskworker( """ taskworker factory that can be reloaded """ - from taskbroker_client.worker import PushTaskWorker, TaskWorker + from taskbroker_client.worker import BatchPushTaskWorker, PushTaskWorker, TaskWorker from taskbroker_client.worker.client import make_broker_hosts with managed_bgtasks(role="taskworker"): @@ -252,6 +256,23 @@ def run_taskworker( health_check_sec_per_touch=health_check_sec_per_touch, grpc_port=worker_rpc_port, ) + elif batch_push_mode: + worker = BatchPushTaskWorker( + app_module="sentry.taskworker.bootstrap:app", + broker_service=rpc_host, + max_child_task_count=max_child_task_count, + namespace=namespace, + concurrency=concurrency, + child_tasks_queue_maxsize=child_tasks_queue_maxsize, + result_queue_maxsize=result_queue_maxsize, + rebalance_after=rebalance_after, + processing_pool_name=processing_pool_name, + pod_name=pod_name, + health_check_file_path=health_check_file_path, + health_check_sec_per_touch=health_check_sec_per_touch, + grpc_port=worker_rpc_port, + update_in_batches=True, + ) else: worker = TaskWorker( app_module="sentry.taskworker.bootstrap:app", diff --git a/uv.lock b/uv.lock index 82043e6b69c77c..3659da2817304d 100644 --- a/uv.lock +++ b/uv.lock @@ -2427,7 +2427,7 @@ requires-dist = [ { name = "stripe", specifier = ">=6.7.0" }, { name = "structlog", specifier = ">=22.1.0" }, { name = "symbolic", specifier = ">=13.1.1" }, - { name = "taskbroker-client", specifier = ">=0.18.6" }, + { name = "taskbroker-client", specifier = ">=0.19.1" }, { name = "tiktoken", specifier = ">=0.8.0" }, { name = "tldextract", specifier = ">=5.1.2" }, { name = "tokenizers", specifier = ">=0.22.0" }, @@ -2820,7 +2820,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.18.6" +version = "0.19.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2838,7 +2838,7 @@ dependencies = [ { name = "zstandard", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/taskbroker_client-0.18.6-py3-none-any.whl", hash = "sha256:a7e7cb63031e3ef470c1c9ef29f03766fdec86b299c6d386c9fccfd23d181886" }, + { url = "https://pypi.devinfra.sentry.io/wheels/taskbroker_client-0.19.1-py3-none-any.whl", hash = "sha256:fc20b0aa19b0dad7caf8f1182b615b624158194e766f7d84abdaa40633b2d03d" }, ] [[package]]