Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions sentry_sdk/integrations/dramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
from sentry_sdk.traces import SegmentSource
from sentry_sdk.tracing import (
BAGGAGE_HEADER_NAME,
SENTRY_TRACE_HEADER_NAME,
TransactionSource,
)
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
AnnotatedValue,
capture_internal_exceptions,
Expand Down Expand Up @@ -114,7 +116,8 @@ def before_enqueue(
}

def before_process_message(self, broker: "Broker", message: "Message[R]") -> None:
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(DramatiqIntegration)
if integration is None:
return

Expand All @@ -129,21 +132,35 @@ def before_process_message(self, broker: "Broker", message: "Message[R]") -> Non
# start new trace in case of retrying
sentry_headers = {}

transaction = continue_trace(
sentry_headers,
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
origin=DramatiqIntegration.origin,
)
transaction.set_status(SPANSTATUS.OK)
sentry_sdk.start_transaction(
transaction,
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
)
transaction.__enter__()
if has_span_streaming_enabled(client.options):
sentry_sdk.traces.continue_trace(sentry_headers)
span = sentry_sdk.traces.start_span(
name=message.actor_name,
attributes={
"sentry.op": OP.QUEUE_TASK_DRAMATIQ,
"sentry.origin": DramatiqIntegration.origin,
"sentry.span.source": SegmentSource.TASK.value,
},
)
message._sentry_span_ctx = span
span.__enter__()
else:
transaction = continue_trace(
sentry_headers,
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
origin=DramatiqIntegration.origin,
)
transaction.set_status(SPANSTATUS.OK)
sentry_sdk.start_transaction(
transaction,
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
)
transaction.__enter__()
message._sentry_span_ctx = transaction

def after_process_message(
self,
Expand All @@ -161,8 +178,8 @@ def after_process_message(
throws = message.options.get("throws") or actor.options.get("throws")

scope_manager = message._scope_manager
transaction = sentry_sdk.get_current_scope().transaction
if not transaction:
span_ctx = getattr(message, "_sentry_span_ctx", None)
if span_ctx is None:
return None

is_event_capture_required = (
Expand All @@ -172,7 +189,7 @@ def after_process_message(
)
if not is_event_capture_required:
# normal transaction finish
transaction.__exit__(None, None, None)
span_ctx.__exit__(None, None, None)
scope_manager.__exit__(None, None, None)
return

Expand All @@ -186,7 +203,7 @@ def after_process_message(
)
sentry_sdk.capture_event(event, hint=hint)
# transaction error
transaction.__exit__(type(exception), exception, None)
span_ctx.__exit__(type(exception), exception, None)
scope_manager.__exit__(type(exception), exception, None)

after_skip_message = after_process_message
Expand Down
226 changes: 184 additions & 42 deletions tests/integrations/dramatiq/test_dramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

@pytest.fixture(scope="function")
def broker(request, sentry_init):
sentry_init(
integrations=[DramatiqIntegration()],
traces_sample_rate=getattr(request, "param", None),
)
param = getattr(request, "param", None)
if isinstance(param, dict):
sentry_init(integrations=[DramatiqIntegration()], **param)
else:
sentry_init(
integrations=[DramatiqIntegration()],
traces_sample_rate=param,
)
broker = StubBroker()
broker.emit_after("process_boot")
dramatiq.set_broker(broker)
Expand Down Expand Up @@ -66,22 +70,77 @@ def dummy_actor(x, y):


@pytest.mark.parametrize(
"broker,expected_span_status,fail_fast",
"broker,expected_span_status,fail_fast,span_streaming",
[
(1.0, SPANSTATUS.INTERNAL_ERROR, False),
(1.0, SPANSTATUS.OK, False),
(1.0, SPANSTATUS.INTERNAL_ERROR, True),
(1.0, SPANSTATUS.OK, True),
({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, False, False),
({"traces_sample_rate": 1.0}, SPANSTATUS.OK, False, False),
({"traces_sample_rate": 1.0}, SPANSTATUS.INTERNAL_ERROR, True, False),
({"traces_sample_rate": 1.0}, SPANSTATUS.OK, True, False),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
SPANSTATUS.INTERNAL_ERROR,
False,
True,
),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
SPANSTATUS.OK,
False,
True,
),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
SPANSTATUS.INTERNAL_ERROR,
True,
True,
),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
SPANSTATUS.OK,
True,
True,
),
],
ids=[
"error",
"success",
"error_fail_fast",
"success_fail_fast",
"error_stream",
"success_stream",
"error_fail_fast_stream",
"success_fail_fast_stream",
],
ids=["error", "success", "error_fail_fast", "success_fail_fast"],
indirect=["broker"],
)
def test_task_transaction(
broker, worker, capture_events, expected_span_status, fail_fast
broker,
worker,
capture_events,
capture_items,
expected_span_status,
fail_fast,
span_streaming,
):
events = capture_events()
task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR

if span_streaming:
items = capture_items("event", "span")
else:
events = capture_events()

@dramatiq.actor(max_retries=0)
def dummy_actor(x, y):
return x / y
Expand All @@ -95,37 +154,93 @@ def dummy_actor(x, y):
broker.join(dummy_actor.queue_name, fail_fast=fail_fast)

worker.join()
sentry_sdk.flush()

if span_streaming:
if task_fails:
error_item, segment_item = items
error_event = error_item.payload
exception = error_event["exception"]["values"][0]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
else:
(segment_item,) = items

segment = segment_item.payload
assert segment_item.type == "span"
assert segment["name"] == "dummy_actor"
assert segment["is_segment"] is True
assert segment["attributes"]["sentry.op"] == "queue.task.dramatiq"
assert segment["attributes"]["sentry.span.source"] == "task"
assert segment["status"] == ("error" if task_fails else "ok")
else:
if task_fails:
error_event = events.pop(0)
exception = error_event["exception"]["values"][0]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier

if task_fails:
error_event = events.pop(0)
exception = error_event["exception"]["values"][0]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "dummy_actor"
assert event["transaction_info"] == {"source": TransactionSource.TASK}
assert event["contexts"]["trace"]["status"] == expected_span_status

(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "dummy_actor"
assert event["transaction_info"] == {"source": TransactionSource.TASK}
assert event["contexts"]["trace"]["status"] == expected_span_status

@pytest.mark.parametrize(
"broker,span_streaming",
[
({"traces_sample_rate": 1.0}, False),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
True,
),
],
ids=["static", "stream"],
indirect=["broker"],
)
def test_dramatiq_propagate_trace(
broker, worker, capture_events, capture_items, span_streaming
):
if span_streaming:
items = capture_items("span")

@pytest.mark.parametrize("broker", [1.0], indirect=True)
def test_dramatiq_propagate_trace(broker, worker, capture_events):
events = capture_events()
with sentry_sdk.traces.start_span(name="outer") as outer_span:

@dramatiq.actor(max_retries=0)
def propagated_trace_task():
pass
@dramatiq.actor(max_retries=0)
def propagated_trace_task():
pass

with start_transaction() as outer_transaction:
propagated_trace_task.send()
broker.join(propagated_trace_task.queue_name)
worker.join()
propagated_trace_task.send()
broker.join(propagated_trace_task.queue_name)
worker.join()

assert (
events[0]["transaction"] == "propagated_trace_task"
) # the "inner" transaction
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id
sentry_sdk.flush()

inner_segment, outer_segment = [i.payload for i in items]
assert inner_segment["name"] == "propagated_trace_task"
assert inner_segment["attributes"]["sentry.op"] == "queue.task.dramatiq"
assert inner_segment["trace_id"] == outer_span.trace_id
assert outer_segment["name"] == "outer"
else:
events = capture_events()

@dramatiq.actor(max_retries=0)
def propagated_trace_task():
pass

with start_transaction() as outer_transaction:
propagated_trace_task.send()
broker.join(propagated_trace_task.queue_name)
worker.join()

assert (
events[0]["transaction"] == "propagated_trace_task"
) # the "inner" transaction
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id


@pytest.mark.parametrize(
Expand Down Expand Up @@ -389,19 +504,39 @@ def dummy_actor():
assert events == []


@pytest.mark.parametrize("broker", [1.0], indirect=True)
@pytest.mark.parametrize(
"broker,span_streaming",
[
({"traces_sample_rate": 1.0}, False),
(
{
"traces_sample_rate": 1.0,
"_experiments": {"trace_lifecycle": "stream"},
},
True,
),
],
ids=["static", "stream"],
indirect=["broker"],
)
def test_that_skip_message_cleans_up_scope_and_transaction(
broker, worker, capture_events
broker, worker, capture_events, capture_items, span_streaming
):
transactions: list[Transaction] = []
captured_spans: list = []

class SkipMessageMiddleware(Middleware):
def before_process_message(self, broker, message):
transactions.append(sentry_sdk.get_current_scope().transaction)
if span_streaming:
captured_spans.append(sentry_sdk.get_current_span())
else:
captured_spans.append(sentry_sdk.get_current_scope().transaction)
raise SkipMessage()

broker.add_middleware(SkipMessageMiddleware())

if span_streaming:
items = capture_items("span")

@dramatiq.actor(max_retries=0)
def skipped_actor(): ...

Expand All @@ -410,5 +545,12 @@ def skipped_actor(): ...
broker.join(skipped_actor.queue_name)
worker.join()

(transaction,) = transactions
assert transaction.timestamp is not None
if span_streaming:
sentry_sdk.flush()
(segment_payload,) = [i.payload for i in items]
assert segment_payload["name"] == "skipped_actor"
assert segment_payload["end_timestamp"] is not None
else:
(transaction,) = captured_spans
assert isinstance(transaction, Transaction)
assert transaction.timestamp is not None
Loading