diff --git a/pyproject.toml b/pyproject.toml index bea0db43..edd66eea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk>=2.36.0", "sortedcontainers>=2.4.0", - "taskbroker-client>=0.1.9,<1", + "taskbroker-client>=0.19.3,<1", "typing-extensions>=4.15.0", "zipfile-zstd==0.0.4", ] diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index c46c07ea..e34aca90 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -1,96 +1,10 @@ import os -import platform -import resource -import time - -from collections.abc import Generator -from contextlib import contextmanager from arroyo.backends.kafka import KafkaProducer from taskbroker_client.app import TaskbrokerApp -from taskbroker_client.metrics import MetricsBackend, Tags +from taskbroker_client.metrics import DatadogMetrics, MetricsBackend from taskbroker_client.router import TaskRouter -from launchpad.utils.statsd import create_dogstatsd_client - -_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 - - -def _convert_tags(tags: Tags | None) -> list[str] | None: - if tags is None: - return None - return [f"{k}:{v}" for k, v in tags.items()] - - -class TaskworkerMetricsBackend(MetricsBackend): - def __init__(self) -> None: - self._dogstatsd = create_dogstatsd_client("launchpad") - - def incr( - self, - name: str, - value: int | float = 1, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.increment(name, int(value), **kwargs) - - def distribution( - self, - name: str, - value: int | float, - tags: Tags | None = None, - unit: str | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.distribution(name, value, **kwargs) - - def gauge( - self, - name: str, - value: int | float, - tags: Tags | None = None, - sample_rate: float | None = None, - ) -> None: - kwargs: dict = {"tags": _convert_tags(tags)} - if sample_rate is not None: - kwargs["sample_rate"] = sample_rate - self._dogstatsd.gauge(name, value, **kwargs) - - @contextmanager - def timer( - self, - key: str, - tags: Tags | None = None, - sample_rate: float | None = None, - stacklevel: int = 0, - ) -> Generator[None]: - start = time.monotonic() - try: - yield - finally: - duration_ms = (time.monotonic() - start) * 1000 - self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate) - - @contextmanager - def track_memory_usage( - self, - key: str, - tags: Tags | None = None, - ) -> Generator[None]: - before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - try: - yield - finally: - after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte") - class CustomRouter(TaskRouter): def route_namespace(self, name: str) -> str: @@ -107,11 +21,28 @@ def producer_factory(topic: str) -> KafkaProducer: return KafkaProducer(config) +def create_metrics() -> MetricsBackend: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + + return DatadogMetrics( + application="launchpad", + processing_pool="launchpad", + statsd_host=host, + statsd_port=port, + enable_prefixed_metrics=True, + ) + + app = TaskbrokerApp( name="launchpad", producer_factory=producer_factory, router_class=CustomRouter(), - metrics_class=TaskworkerMetricsBackend(), + metrics_class=create_metrics(), ) app.set_config( diff --git a/uv.lock b/uv.lock index 55e481b8..b63a3462 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ requires-dist = [ { name = "sentry-arroyo", specifier = ">=2.38.7" }, { name = "sentry-sdk", specifier = ">=2.36.0" }, { name = "sortedcontainers", specifier = ">=2.4.0" }, - { name = "taskbroker-client", specifier = ">=0.1.9,<1" }, + { name = "taskbroker-client", specifier = ">=0.19.3,<1" }, { name = "typing-extensions", specifier = ">=4.15.0" }, { name = "zipfile-zstd", specifier = "==0.0.4" }, ] @@ -1877,16 +1877,16 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.22.1" +version = "0.32.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "grpc-stubs" }, { name = "grpcio" }, { name = "protobuf" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/10/6c/9345de057b9c2b7eaae2ebc28f3234df9560f0151c6e7204508d1d12566b/sentry_protos-0.22.1.tar.gz", hash = "sha256:17821439e47d22df493d6350084d0c9fc6966aaa7f9d419261597f4fe66ebf4c", size = 135668, upload-time = "2026-06-03T23:29:05.479Z" } +sdist = { url = "https://files.pythonhosted.org/packages/54/69/d0c32256fe22470eb2e09d1136b97f452e1c9d50c790680807e881e72e94/sentry_protos-0.32.2.tar.gz", hash = "sha256:e76f89de38485b81052f1cb5049c428cb300faf11f56670c3fbf6e0301804c65", size = 151052, upload-time = "2026-06-18T20:44:10.344Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/99/edf6f7e43a73e65ca5144c957f4790a346ef6f798c53463436d3e20a5c65/sentry_protos-0.22.1-py3-none-any.whl", hash = "sha256:c6bb445ab288d9309b1e5e57c7a43b545eb5aa0c805dff83b8d8d9a18c001531", size = 352722, upload-time = "2026-06-03T23:29:04.204Z" }, + { url = "https://files.pythonhosted.org/packages/bf/89/7f2d40aa4ef6300d03f5cf550635364500f2bf2ea3cc86400deefd1dc1f0/sentry_protos-0.32.2-py3-none-any.whl", hash = "sha256:4ae5023f27ea05b21b6a8fb5c7ef724921fd209f81c5d64a11ef346077274b66", size = 402246, upload-time = "2026-06-18T20:44:08.987Z" }, ] [[package]] @@ -1967,7 +1967,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.18.2" +version = "0.19.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "confluent-kafka" }, @@ -1984,9 +1984,9 @@ dependencies = [ { name = "sentry-sdk", extra = ["http2"] }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0e/4d/8feea0a9ee72e492f63aaaee480ab2fb6cf1ce1afcabe41c86fa0dc7e956/taskbroker_client-0.18.2.tar.gz", hash = "sha256:8679fc0832d2a6ae085086f247249266bbea1a255a9fc5783c1d979d5be98cef", size = 35153, upload-time = "2026-06-04T20:34:55.437Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f2/04/6cb05381207feb41f5257c3964feb939b23308e9d08133fe15c39ccc9b5d/taskbroker_client-0.19.3.tar.gz", hash = "sha256:8e3c8f159a832d919c6fc06c1363d37019a37fbf329f2414f120b246d8089ab5", size = 39250, upload-time = "2026-06-18T19:51:05.012Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/35/b7/6b38da76f535731d30dbe1b2dc48cbb379dc5ea7231c4a6084e91d30ba20/taskbroker_client-0.18.2-py3-none-any.whl", hash = "sha256:a968e1da5f778af5136894f3d67c1bc42e349b9197852d50d611df19739675c0", size = 44913, upload-time = "2026-06-04T20:34:54.544Z" }, + { url = "https://files.pythonhosted.org/packages/35/d0/2ca1e4d268f9876514e06979dffc92f61a9c8d93f6bf3c8c1ebb5877e2d3/taskbroker_client-0.19.3-py3-none-any.whl", hash = "sha256:4359b5906902e254d8c3efa1492dc249d4ed0678146e87b14c82769742da433f", size = 50155, upload-time = "2026-06-18T19:51:03.943Z" }, ] [[package]]