diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 48f2c4421cbc..9fd1e5f510a5 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -11,15 +11,32 @@ from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step from sentry.profiles.task import process_profile_task +# Headers from consumer are Iterable[tuple[str, str | bytes]], from taskbroker are dict[str, str] +Headers = Iterable[tuple[str, str | bytes]] | dict[str, str] -def process_message(message: Message[KafkaPayload]) -> None: - if should_drop(message.payload.headers): + +def _process_profile_message( + message_bytes: bytes, + headers: Headers, + inline: bool = False, +) -> None: + """Process a profile message from Kafka. Used by both consumer and taskbroker passthrough.""" + if should_drop(headers): + return + + sampled = is_sampled(headers) + + if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"): return - sampled = is_sampled(message.payload.headers) + if inline: + process_profile_task(payload=message_bytes, sampled=sampled) + else: + process_profile_task.delay(payload=message_bytes, sampled=sampled) - if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - process_profile_task.delay(payload=message.payload.value, sampled=sampled) + +def process_message(message: Message[KafkaPayload]) -> None: + _process_profile_message(message.payload.value, message.payload.headers) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): @@ -42,7 +59,9 @@ def create_with_partitions( ) -def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool: +def is_sampled(headers: Headers) -> bool: + if isinstance(headers, dict): + return headers.get("sampled", "true") == "true" for k, v in headers: if k == "sampled": if isinstance(v, bytes): @@ -50,14 +69,14 @@ def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool: return True -HEADER_KEYS = {"project_id"} - - -def should_drop(headers: Iterable[tuple[str, str | bytes]]) -> bool: - context = {} - for k, v in headers: - if k == "project_id" and isinstance(v, bytes): - context[k] = v.decode("utf-8") +def should_drop(headers: Headers) -> bool: + if isinstance(headers, dict): + context = {"project_id": headers["project_id"]} if "project_id" in headers else {} + else: + context = {} + for k, v in headers: + if k == "project_id" and isinstance(v, bytes): + context[k] = v.decode("utf-8") if "project_id" in context and killswitch_matches_context( "profiling.killswitch.ingest-profiles", context diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index dd25fa2ca2e7..99df17d9dc9e 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -62,7 +62,7 @@ from sentry.signals import first_profile_received from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task -from sentry.taskworker.namespaces import ingest_profiling_tasks +from sentry.taskworker.namespaces import ingest_profiling_passthrough_tasks, ingest_profiling_tasks from sentry.utils import json, metrics from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer from sentry.utils.eap import hex_to_item_id @@ -121,6 +121,25 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer: logger = logging.getLogger(__name__) +@instrumented_task( + name="sentry.profiles.task.process_profile_from_kafka", + namespace=ingest_profiling_passthrough_tasks, + processing_deadline_duration=60, + retry=Retry(times=2, delay=5), + compression_type=CompressionType.ZSTD, + silo_mode=SiloMode.CELL, + pass_headers=True, +) +def process_profile_from_kafka( + message_bytes: bytes, + headers: dict[str, str], +) -> None: + """Process a profile from raw Kafka message bytes (taskbroker passthrough mode).""" + from sentry.profiles.consumers.process.factory import _process_profile_message + + _process_profile_message(message_bytes, headers, inline=True) + + @instrumented_task( name="sentry.profiles.task.process_profile", namespace=ingest_profiling_tasks, diff --git a/src/sentry/tasks/base.py b/src/sentry/tasks/base.py index b696acbfec37..988172abfc7f 100644 --- a/src/sentry/tasks/base.py +++ b/src/sentry/tasks/base.py @@ -45,6 +45,7 @@ def instrumented_task( report_timeout_errors: bool = True, silenced_exceptions: tuple[type[BaseException], ...] | None = None, silo_mode: SiloMode | None = None, + pass_headers: bool = False, **kwargs, ) -> Callable[[Callable[P, R]], Task[P, R]]: """ @@ -127,6 +128,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: compression_type=compression_type, report_timeout_errors=report_timeout_errors, silenced_exceptions=silenced_exceptions, + pass_headers=pass_headers, )(func) if silo_mode: @@ -150,6 +152,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: compression_type=compression_type, report_timeout_errors=report_timeout_errors, silenced_exceptions=silenced_exceptions, + pass_headers=pass_headers, )(func) if silo_mode: diff --git a/src/sentry/taskworker/namespaces.py b/src/sentry/taskworker/namespaces.py index 374dfdf7cdcb..bfadb09ea488 100644 --- a/src/sentry/taskworker/namespaces.py +++ b/src/sentry/taskworker/namespaces.py @@ -87,6 +87,11 @@ app_feature="profiles", ) +ingest_profiling_passthrough_tasks = app.taskregistry.create_namespace( + "ingest.profiling.passthrough", + app_feature="profiles", +) + ingest_transactions_tasks = app.taskregistry.create_namespace( "ingest.transactions", app_feature="transactions",