From 83444efbe13368b66763cebf54586d99eb3c904f Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 7 May 2026 16:05:02 +0200 Subject: [PATCH 1/3] ref(profiling): Remove base64 encoding from profile task payload Pass raw bytes directly to process_profile_task instead of base64 encoding. The task now accepts both bytes (new) and base64 string (legacy) for backwards compatibility during deployment. STREAM-882 --- src/sentry/profiles/consumers/process/factory.py | 4 +--- src/sentry/profiles/task.py | 7 +++++-- tests/sentry/profiles/consumers/test_process.py | 3 +-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index f8df79d78a5466..48f2c4421cbc60 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -1,4 +1,3 @@ -from base64 import b64encode from collections.abc import Iterable, Mapping from arroyo.backends.kafka.consumer import KafkaPayload @@ -20,8 +19,7 @@ def process_message(message: Message[KafkaPayload]) -> None: sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - b64encoded = b64encode(message.payload.value).decode("utf-8") - process_profile_task.delay(payload=b64encoded, sampled=sampled) + process_profile_task.delay(payload=message.payload.value, sampled=sampled) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index 981584567890b6..cb7c3eef25db54 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -141,7 +141,7 @@ def encode_payload(message: dict[str, Any]) -> str: ) def process_profile_task( profile: Profile | None = None, - payload: str | None = None, + payload: bytes | str | None = None, sampled: bool = True, **kwargs: Any, ) -> None: @@ -149,7 +149,10 @@ def process_profile_task( return if payload: - message_dict = msgpack.unpackb(b64decode(payload.encode("utf-8")), use_list=False) + # Handle both bytes (new) and base64 string (legacy) payloads + if isinstance(payload, str): + payload = b64decode(payload.encode("utf-8")) + message_dict = msgpack.unpackb(payload, use_list=False) profile = json.loads(message_dict["payload"], use_rapid_json=True) diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py index 0250565bb603a6..22bffaf1e0a0a2 100644 --- a/tests/sentry/profiles/consumers/test_process.py +++ b/tests/sentry/profiles/consumers/test_process.py @@ -1,6 +1,5 @@ from __future__ import annotations -from base64 import b64encode from collections.abc import MutableSequence from datetime import datetime from typing import Any @@ -57,7 +56,7 @@ def test_basic_profile_to_task( processing_strategy.terminate() process_profile_task.assert_called_with( - payload=b64encode(payload).decode("utf-8"), + payload=payload, sampled=True, ) From 9465fda02560bc12d2bdc4ddba29962362bf29c2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 8 May 2026 22:04:01 +0200 Subject: [PATCH 2/3] ref(profiling): Add option to gate raw bytes payload Add profiling.process.raw_bytes_payload.enabled option to control whether the profile consumer sends raw bytes or base64-encoded payload to taskbroker. Defaults to off (base64) for safe rollout. Co-Authored-By: Claude Opus 4.5 --- src/sentry/options/defaults.py | 8 ++++++++ src/sentry/profiles/consumers/process/factory.py | 7 ++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 1156ca9162e622..dffaaf67fe33e2 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3108,6 +3108,14 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +# Enable sending raw bytes payload to taskbroker instead of base64 encoded +register( + "profiling.process.raw_bytes_payload.enabled", + default=False, + type=Bool, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # Enable sending a post update signal after we update groups using a queryset update register( "groups.enable-post-update-signal", diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 48f2c4421cbc60..13e9fd521f4582 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -1,3 +1,4 @@ +from base64 import b64encode from collections.abc import Iterable, Mapping from arroyo.backends.kafka.consumer import KafkaPayload @@ -19,7 +20,11 @@ def process_message(message: Message[KafkaPayload]) -> None: sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - process_profile_task.delay(payload=message.payload.value, sampled=sampled) + if options.get("profiling.process.raw_bytes_payload.enabled"): + payload = message.payload.value + else: + payload = b64encode(message.payload.value).decode("utf-8") + process_profile_task.delay(payload=payload, sampled=sampled) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): From f51a58b1424365bebbe2441f7dfff09cf8a6eb4c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 8 May 2026 22:55:42 +0200 Subject: [PATCH 3/3] fix: Update tests to work with new profiling option - Fix typing issue by avoiding reassignment with different types - Update test_basic_profile_to_task to expect base64 (default behavior) - Add profiling.process.raw_bytes_payload.enabled to backpressure test overrides Co-Authored-By: Claude Opus 4.5 --- src/sentry/profiles/consumers/process/factory.py | 6 +++--- tests/sentry/processing/backpressure/test_checking.py | 3 +++ tests/sentry/profiles/consumers/test_process.py | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 13e9fd521f4582..5b2df56e674613 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -21,10 +21,10 @@ def process_message(message: Message[KafkaPayload]) -> None: if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): if options.get("profiling.process.raw_bytes_payload.enabled"): - payload = message.payload.value + process_profile_task.delay(payload=message.payload.value, sampled=sampled) else: - payload = b64encode(message.payload.value).decode("utf-8") - process_profile_task.delay(payload=payload, sampled=sampled) + encoded = b64encode(message.payload.value).decode("utf-8") + process_profile_task.delay(payload=encoded, sampled=sampled) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py index 82400d3356a610..eaaa7a10e62a61 100644 --- a/tests/sentry/processing/backpressure/test_checking.py +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -30,6 +30,7 @@ "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": False, "backpressure.status_ttl": 60, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_bad_config() -> None: @@ -44,6 +45,7 @@ def test_bad_config() -> None: "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_backpressure_healthy_profiles(process_profile_task: MagicMock) -> None: @@ -112,6 +114,7 @@ def test_backpressure_healthy_events(preprocess_event: MagicMock) -> None: { "backpressure.checking.enabled": False, "backpressure.checking.interval": 5, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_backpressure_not_enabled(process_profile_task: MagicMock) -> None: diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py index 22bffaf1e0a0a2..0250565bb603a6 100644 --- a/tests/sentry/profiles/consumers/test_process.py +++ b/tests/sentry/profiles/consumers/test_process.py @@ -1,5 +1,6 @@ from __future__ import annotations +from base64 import b64encode from collections.abc import MutableSequence from datetime import datetime from typing import Any @@ -56,7 +57,7 @@ def test_basic_profile_to_task( processing_strategy.terminate() process_profile_task.assert_called_with( - payload=payload, + payload=b64encode(payload).decode("utf-8"), sampled=True, )