From 5e0fcacd3814a2951b395857acae738cd14bae8e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 7 May 2026 15:55:47 +0200 Subject: [PATCH 1/2] feat(profiling): Add task for taskbroker passthrough mode Add `process_profile_from_kafka` task that accepts raw Kafka message bytes for use with taskbroker's passthrough mode. This allows taskbroker to read directly from the ingest-profiles Kafka topic. Changes: - New task `process_profile_from_kafka` in `ingest.profiling.passthrough` namespace - Refactored `_process_profile_message` to share logic between consumer and task - Removed base64 encoding - tasks now accept bytes directly - Use Kafka headers for killswitch/sampled when available, fall back to message body Depends on getsentry/taskbroker#623 for exposing Kafka headers to tasks. STREAM-882 --- .../profiles/consumers/process/factory.py | 49 +++++++++++++------ src/sentry/profiles/task.py | 28 +++++++++-- src/sentry/tasks/base.py | 3 ++ src/sentry/taskworker/namespaces.py | 5 ++ .../sentry/profiles/consumers/test_process.py | 3 +- 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index f8df79d78a54..9fd1e5f510a5 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -1,4 +1,3 @@ -from base64 import b64encode from collections.abc import Iterable, Mapping from arroyo.backends.kafka.consumer import KafkaPayload @@ -12,16 +11,32 @@ from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step from sentry.profiles.task import process_profile_task +# Headers from consumer are Iterable[tuple[str, str | bytes]], from taskbroker are dict[str, str] +Headers = Iterable[tuple[str, str | bytes]] | dict[str, str] -def process_message(message: Message[KafkaPayload]) -> None: - if should_drop(message.payload.headers): + +def _process_profile_message( + message_bytes: bytes, + headers: Headers, + inline: bool = False, +) -> None: + """Process a profile message from Kafka. Used by both consumer and taskbroker passthrough.""" + if should_drop(headers): + return + + sampled = is_sampled(headers) + + if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"): return - sampled = is_sampled(message.payload.headers) + if inline: + process_profile_task(payload=message_bytes, sampled=sampled) + else: + process_profile_task.delay(payload=message_bytes, sampled=sampled) - if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - b64encoded = b64encode(message.payload.value).decode("utf-8") - process_profile_task.delay(payload=b64encoded, sampled=sampled) + +def process_message(message: Message[KafkaPayload]) -> None: + _process_profile_message(message.payload.value, message.payload.headers) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): @@ -44,7 +59,9 @@ def create_with_partitions( ) -def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool: +def is_sampled(headers: Headers) -> bool: + if isinstance(headers, dict): + return headers.get("sampled", "true") == "true" for k, v in headers: if k == "sampled": if isinstance(v, bytes): @@ -52,14 +69,14 @@ def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool: return True -HEADER_KEYS = {"project_id"} - - -def should_drop(headers: Iterable[tuple[str, str | bytes]]) -> bool: - context = {} - for k, v in headers: - if k == "project_id" and isinstance(v, bytes): - context[k] = v.decode("utf-8") +def should_drop(headers: Headers) -> bool: + if isinstance(headers, dict): + context = {"project_id": headers["project_id"]} if "project_id" in headers else {} + else: + context = {} + for k, v in headers: + if k == "project_id" and isinstance(v, bytes): + context[k] = v.decode("utf-8") if "project_id" in context and killswitch_matches_context( "profiling.killswitch.ingest-profiles", context diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index 981584567890..ee8f9a7936cb 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -63,7 +63,7 @@ from sentry.signals import first_profile_received from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task -from sentry.taskworker.namespaces import ingest_profiling_tasks +from sentry.taskworker.namespaces import ingest_profiling_passthrough_tasks, ingest_profiling_tasks from sentry.utils import json, metrics from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer from sentry.utils.eap import hex_to_item_id @@ -131,6 +131,25 @@ def encode_payload(message: dict[str, Any]) -> str: ).decode("utf-8") +@instrumented_task( + name="sentry.profiles.task.process_profile_from_kafka", + namespace=ingest_profiling_passthrough_tasks, + processing_deadline_duration=60, + retry=Retry(times=2, delay=5), + compression_type=CompressionType.ZSTD, + silo_mode=SiloMode.CELL, + pass_headers=True, +) +def process_profile_from_kafka( + message_bytes: bytes, + headers: dict[str, str], +) -> None: + """Process a profile from raw Kafka message bytes (taskbroker passthrough mode).""" + from sentry.profiles.consumers.process.factory import _process_profile_message + + _process_profile_message(message_bytes, headers, inline=True) + + @instrumented_task( name="sentry.profiles.task.process_profile", namespace=ingest_profiling_tasks, @@ -141,7 +160,7 @@ def encode_payload(message: dict[str, Any]) -> str: ) def process_profile_task( profile: Profile | None = None, - payload: str | None = None, + payload: bytes | str | None = None, sampled: bool = True, **kwargs: Any, ) -> None: @@ -149,7 +168,10 @@ def process_profile_task( return if payload: - message_dict = msgpack.unpackb(b64decode(payload.encode("utf-8")), use_list=False) + # Handle both bytes (new) and base64 string (legacy) payloads + if isinstance(payload, str): + payload = b64decode(payload.encode("utf-8")) + message_dict = msgpack.unpackb(payload, use_list=False) profile = json.loads(message_dict["payload"], use_rapid_json=True) diff --git a/src/sentry/tasks/base.py b/src/sentry/tasks/base.py index 438d3cabc534..3b5bc4c86adb 100644 --- a/src/sentry/tasks/base.py +++ b/src/sentry/tasks/base.py @@ -49,6 +49,7 @@ def instrumented_task( report_timeout_errors: bool = True, silenced_exceptions: tuple[type[BaseException], ...] | None = None, silo_mode: SiloMode | None = None, + pass_headers: bool = False, **kwargs, ) -> Callable[[Callable[P, R]], Task[P, R]]: """ @@ -131,6 +132,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: compression_type=compression_type, report_timeout_errors=report_timeout_errors, silenced_exceptions=silenced_exceptions, + pass_headers=pass_headers, )(func) if silo_mode: @@ -154,6 +156,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: compression_type=compression_type, report_timeout_errors=report_timeout_errors, silenced_exceptions=silenced_exceptions, + pass_headers=pass_headers, )(func) if silo_mode: diff --git a/src/sentry/taskworker/namespaces.py b/src/sentry/taskworker/namespaces.py index 31502074bcae..fbd36618dda9 100644 --- a/src/sentry/taskworker/namespaces.py +++ b/src/sentry/taskworker/namespaces.py @@ -92,6 +92,11 @@ app_feature="profiles", ) +ingest_profiling_passthrough_tasks = app.taskregistry.create_namespace( + "ingest.profiling.passthrough", + app_feature="profiles", +) + ingest_transactions_tasks = app.taskregistry.create_namespace( "ingest.transactions", app_feature="transactions", diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py index 0250565bb603..22bffaf1e0a0 100644 --- a/tests/sentry/profiles/consumers/test_process.py +++ b/tests/sentry/profiles/consumers/test_process.py @@ -1,6 +1,5 @@ from __future__ import annotations -from base64 import b64encode from collections.abc import MutableSequence from datetime import datetime from typing import Any @@ -57,7 +56,7 @@ def test_basic_profile_to_task( processing_strategy.terminate() process_profile_task.assert_called_with( - payload=b64encode(payload).decode("utf-8"), + payload=payload, sampled=True, ) From 58e7a753250539fa1e6d7e60efcb429d240533ff Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 11 May 2026 15:33:25 +0200 Subject: [PATCH 2/2] chore: Bump taskbroker-client to 0.1.14 for pass_headers support --- pyproject.toml | 2 +- uv.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index abca24de70f9..bb88d9722726 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,7 +102,7 @@ dependencies = [ "statsd>=3.3.0", "structlog>=22.1.0", "symbolic>=12.14.1", - "taskbroker-client>=0.1.13,<1", + "taskbroker-client>=0.1.14,<1", "tiktoken>=0.8.0", "tokenizers>=0.22.0", "tldextract>=5.1.2", diff --git a/uv.lock b/uv.lock index 4e7b247329fe..4cddd6ce1977 100644 --- a/uv.lock +++ b/uv.lock @@ -2344,7 +2344,7 @@ requires-dist = [ { name = "stripe", specifier = ">=6.7.0" }, { name = "structlog", specifier = ">=22.1.0" }, { name = "symbolic", specifier = ">=12.14.1" }, - { name = "taskbroker-client", specifier = ">=0.1.13,<1" }, + { name = "taskbroker-client", specifier = ">=0.1.14,<1" }, { name = "tiktoken", specifier = ">=0.8.0" }, { name = "tldextract", specifier = ">=5.1.2" }, { name = "tokenizers", specifier = ">=0.22.0" }, @@ -2729,7 +2729,7 @@ wheels = [ [[package]] name = "taskbroker-client" -version = "0.1.13" +version = "0.1.14" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2747,7 +2747,7 @@ dependencies = [ { name = "zstandard", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/taskbroker_client-0.1.13-py3-none-any.whl", hash = "sha256:1983895279909c74cce2c15316cd03c0e671a89ded8d3e65584b08ebc1282845" }, + { url = "https://pypi.devinfra.sentry.io/wheels/taskbroker_client-0.1.14-py3-none-any.whl", hash = "sha256:668395bccb219fafc55d17cc483babd1c888174d951b7fccf2a2bce3d9b87373" }, ] [[package]]