Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies = [
"sentry-arroyo>=2.38.7",
"sentry-sdk>=2.36.0",
"sortedcontainers>=2.4.0",
"taskbroker-client>=0.1.9,<1",
"taskbroker-client>=0.19.3,<1",
"typing-extensions>=4.15.0",
"zipfile-zstd==0.0.4",
]
Expand Down
107 changes: 19 additions & 88 deletions src/launchpad/worker/app.py
Original file line number Diff line number Diff line change
@@ -1,96 +1,10 @@
import os
import platform
import resource
import time

from collections.abc import Generator
from contextlib import contextmanager

from arroyo.backends.kafka import KafkaProducer
from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.metrics import MetricsBackend, Tags
from taskbroker_client.metrics import DatadogMetrics, MetricsBackend
from taskbroker_client.router import TaskRouter

from launchpad.utils.statsd import create_dogstatsd_client

_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024


def _convert_tags(tags: Tags | None) -> list[str] | None:
if tags is None:
return None
return [f"{k}:{v}" for k, v in tags.items()]


class TaskworkerMetricsBackend(MetricsBackend):
def __init__(self) -> None:
self._dogstatsd = create_dogstatsd_client("launchpad")

def incr(
self,
name: str,
value: int | float = 1,
tags: Tags | None = None,
sample_rate: float | None = None,
) -> None:
kwargs: dict = {"tags": _convert_tags(tags)}
if sample_rate is not None:
kwargs["sample_rate"] = sample_rate
self._dogstatsd.increment(name, int(value), **kwargs)

def distribution(
self,
name: str,
value: int | float,
tags: Tags | None = None,
unit: str | None = None,
sample_rate: float | None = None,
) -> None:
kwargs: dict = {"tags": _convert_tags(tags)}
if sample_rate is not None:
kwargs["sample_rate"] = sample_rate
self._dogstatsd.distribution(name, value, **kwargs)

def gauge(
self,
name: str,
value: int | float,
tags: Tags | None = None,
sample_rate: float | None = None,
) -> None:
kwargs: dict = {"tags": _convert_tags(tags)}
if sample_rate is not None:
kwargs["sample_rate"] = sample_rate
self._dogstatsd.gauge(name, value, **kwargs)

@contextmanager
def timer(
self,
key: str,
tags: Tags | None = None,
sample_rate: float | None = None,
stacklevel: int = 0,
) -> Generator[None]:
start = time.monotonic()
try:
yield
finally:
duration_ms = (time.monotonic() - start) * 1000
self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate)

@contextmanager
def track_memory_usage(
self,
key: str,
tags: Tags | None = None,
) -> Generator[None]:
before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
try:
yield
finally:
after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte")


class CustomRouter(TaskRouter):
def route_namespace(self, name: str) -> str:
Expand All @@ -107,11 +21,28 @@ def producer_factory(topic: str) -> KafkaProducer:
return KafkaProducer(config)


def create_metrics() -> MetricsBackend:
host = os.getenv("STATSD_HOST", "127.0.0.1")
port_str = os.getenv("STATSD_PORT", "8125")
try:
port = int(port_str)
except ValueError:
raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}")

return DatadogMetrics(
application="launchpad",
processing_pool="launchpad",
statsd_host=host,
statsd_port=port,
enable_prefixed_metrics=True,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this has rolled out and dashboards have been updated, I'll do another change to remove this.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded processing pool tag

Medium Severity

create_metrics always sets processing_pool to launchpad, but the worker can be started with a different --processing-pool-name that is only passed to TaskWorker. Unprefixed taskbroker metrics will then carry the wrong processing_pool tag relative to the pool the worker actually joins.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c9b8fbc. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unprefixed taskbroker metrics will then carry the wrong processing_pool tag relative to the pool the worker actually joins.

Not true, when a worker has a processing pool name provided, that name will be used instead. This is meant as a fallback.

)


app = TaskbrokerApp(
name="launchpad",
producer_factory=producer_factory,
router_class=CustomRouter(),
metrics_class=TaskworkerMetricsBackend(),
metrics_class=create_metrics(),
)

app.set_config(
Expand Down
14 changes: 7 additions & 7 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading