Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions src/sentry/profiles/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,32 @@
from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step
from sentry.profiles.task import process_profile_task

# Headers from consumer are Iterable[tuple[str, str | bytes]], from taskbroker are dict[str, str]
Headers = Iterable[tuple[str, str | bytes]] | dict[str, str]

def process_message(message: Message[KafkaPayload]) -> None:
if should_drop(message.payload.headers):

def _process_profile_message(
message_bytes: bytes,
headers: Headers,
inline: bool = False,
) -> None:
"""Process a profile message from Kafka. Used by both consumer and taskbroker passthrough."""
if should_drop(headers):
return

sampled = is_sampled(headers)

if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
return

sampled = is_sampled(message.payload.headers)
if inline:
process_profile_task(payload=message_bytes, sampled=sampled)
else:
process_profile_task.delay(payload=message_bytes, sampled=sampled)

if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
process_profile_task.delay(payload=message.payload.value, sampled=sampled)

def process_message(message: Message[KafkaPayload]) -> None:
_process_profile_message(message.payload.value, message.payload.headers)


class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand All @@ -42,22 +59,24 @@ def create_with_partitions(
)


def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool:
def is_sampled(headers: Headers) -> bool:
if isinstance(headers, dict):
return headers.get("sampled", "true") == "true"
for k, v in headers:
if k == "sampled":
if isinstance(v, bytes):
return v.decode("utf-8") == "true"
return True


HEADER_KEYS = {"project_id"}


def should_drop(headers: Iterable[tuple[str, str | bytes]]) -> bool:
context = {}
for k, v in headers:
if k == "project_id" and isinstance(v, bytes):
context[k] = v.decode("utf-8")
def should_drop(headers: Headers) -> bool:
if isinstance(headers, dict):
context = {"project_id": headers["project_id"]} if "project_id" in headers else {}
else:
context = {}
for k, v in headers:
if k == "project_id" and isinstance(v, bytes):
context[k] = v.decode("utf-8")

if "project_id" in context and killswitch_matches_context(
"profiling.killswitch.ingest-profiles", context
Expand Down
21 changes: 20 additions & 1 deletion src/sentry/profiles/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from sentry.signals import first_profile_received
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import ingest_profiling_tasks
from sentry.taskworker.namespaces import ingest_profiling_passthrough_tasks, ingest_profiling_tasks
from sentry.utils import json, metrics
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
from sentry.utils.eap import hex_to_item_id
Expand Down Expand Up @@ -121,6 +121,25 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
logger = logging.getLogger(__name__)


@instrumented_task(
name="sentry.profiles.task.process_profile_from_kafka",
namespace=ingest_profiling_passthrough_tasks,
processing_deadline_duration=60,
retry=Retry(times=2, delay=5),
compression_type=CompressionType.ZSTD,
silo_mode=SiloMode.CELL,
pass_headers=True,
)
def process_profile_from_kafka(
message_bytes: bytes,
headers: dict[str, str],
) -> None:
"""Process a profile from raw Kafka message bytes (taskbroker passthrough mode)."""
from sentry.profiles.consumers.process.factory import _process_profile_message

_process_profile_message(message_bytes, headers, inline=True)


@instrumented_task(
name="sentry.profiles.task.process_profile",
namespace=ingest_profiling_tasks,
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def instrumented_task(
report_timeout_errors: bool = True,
silenced_exceptions: tuple[type[BaseException], ...] | None = None,
silo_mode: SiloMode | None = None,
pass_headers: bool = False,
**kwargs,
) -> Callable[[Callable[P, R]], Task[P, R]]:
"""
Expand Down Expand Up @@ -127,6 +128,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
compression_type=compression_type,
report_timeout_errors=report_timeout_errors,
silenced_exceptions=silenced_exceptions,
pass_headers=pass_headers,
)(func)

if silo_mode:
Expand All @@ -150,6 +152,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
compression_type=compression_type,
report_timeout_errors=report_timeout_errors,
silenced_exceptions=silenced_exceptions,
pass_headers=pass_headers,
)(func)

if silo_mode:
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/taskworker/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
app_feature="profiles",
)

ingest_profiling_passthrough_tasks = app.taskregistry.create_namespace(
"ingest.profiling.passthrough",
app_feature="profiles",
)

ingest_transactions_tasks = app.taskregistry.create_namespace(
"ingest.transactions",
app_feature="transactions",
Expand Down
Loading