diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index d38af44dd4b0..65bbc0c321a9 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3108,6 +3108,14 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +# Enable sending raw bytes payload to taskbroker instead of base64 encoded +register( + "profiling.process.raw_bytes_payload.enabled", + default=False, + type=Bool, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # Enable sending a post update signal after we update groups using a queryset update register( "groups.enable-post-update-signal", diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index f8df79d78a54..5b2df56e6746 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -20,8 +20,11 @@ def process_message(message: Message[KafkaPayload]) -> None: sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): - b64encoded = b64encode(message.payload.value).decode("utf-8") - process_profile_task.delay(payload=b64encoded, sampled=sampled) + if options.get("profiling.process.raw_bytes_payload.enabled"): + process_profile_task.delay(payload=message.payload.value, sampled=sampled) + else: + encoded = b64encode(message.payload.value).decode("utf-8") + process_profile_task.delay(payload=encoded, sampled=sampled) class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index 981584567890..cb7c3eef25db 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -141,7 +141,7 @@ def encode_payload(message: dict[str, Any]) -> str: ) def process_profile_task( profile: Profile | None = None, - payload: str | None = None, + payload: bytes | str | None = None, sampled: bool = True, **kwargs: Any, ) -> None: @@ -149,7 +149,10 @@ def process_profile_task( return if payload: - message_dict = msgpack.unpackb(b64decode(payload.encode("utf-8")), use_list=False) + # Handle both bytes (new) and base64 string (legacy) payloads + if isinstance(payload, str): + payload = b64decode(payload.encode("utf-8")) + message_dict = msgpack.unpackb(payload, use_list=False) profile = json.loads(message_dict["payload"], use_rapid_json=True) diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py index 82400d3356a6..eaaa7a10e62a 100644 --- a/tests/sentry/processing/backpressure/test_checking.py +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -30,6 +30,7 @@ "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": False, "backpressure.status_ttl": 60, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_bad_config() -> None: @@ -44,6 +45,7 @@ def test_bad_config() -> None: "backpressure.checking.interval": 5, "backpressure.monitoring.enabled": True, "backpressure.status_ttl": 60, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_backpressure_healthy_profiles(process_profile_task: MagicMock) -> None: @@ -112,6 +114,7 @@ def test_backpressure_healthy_events(preprocess_event: MagicMock) -> None: { "backpressure.checking.enabled": False, "backpressure.checking.interval": 5, + "profiling.process.raw_bytes_payload.enabled": False, } ) def test_backpressure_not_enabled(process_profile_task: MagicMock) -> None: