Skip to content
82 changes: 60 additions & 22 deletions sentry_sdk/integrations/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import sentry_sdk
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.scope import Scope, should_send_default_pii
from sentry_sdk.traces import SegmentSource
from sentry_sdk.tracing import TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
format_timestamp,
parse_version,
Expand Down Expand Up @@ -61,30 +62,59 @@ def setup_once() -> None:

old_perform_job = worker_cls.perform_job

@ensure_integration_enabled(RqIntegration, old_perform_job)
def sentry_patched_perform_job(
self: "Any", job: "Job", *args: "Queue", **kwargs: "Any"
) -> bool:
client = sentry_sdk.get_client()
if client.get_integration(RqIntegration) is None:
return old_perform_job(self, job, *args, **kwargs)

with sentry_sdk.new_scope() as scope:
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(weakref.ref(job)))

transaction = continue_trace(
job.meta.get("_sentry_trace_headers") or {},
op=OP.QUEUE_TASK_RQ,
name="unknown RQ task",
source=TransactionSource.TASK,
origin=RqIntegration.origin,
)

with capture_internal_exceptions():
transaction.name = job.func_name

with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={"rq_job": job},
):
rv = old_perform_job(self, job, *args, **kwargs)
if has_span_streaming_enabled(client.options):
sentry_sdk.traces.continue_trace(
job.meta.get("_sentry_trace_headers") or {}
)
Comment thread
sentry[bot] marked this conversation as resolved.

Scope.set_custom_sampling_context({"rq_job": job})

func_name = None
with capture_internal_exceptions():
func_name = job.func_name

with sentry_sdk.traces.start_span(
name="unknown RQ task" if func_name is None else func_name,
attributes={
"sentry.op": OP.QUEUE_TASK_RQ,
"sentry.origin": RqIntegration.origin,
"sentry.span.source": SegmentSource.TASK,
SPANDATA.MESSAGING_MESSAGE_ID: job.id,
},
parent_span=None,
) as span:
if func_name is not None:
span.set_attribute(SPANDATA.CODE_FUNCTION_NAME, func_name)

rv = old_perform_job(self, job, *args, **kwargs)
Comment thread
alexander-alderman-webb marked this conversation as resolved.
else:
transaction = continue_trace(
job.meta.get("_sentry_trace_headers") or {},
op=OP.QUEUE_TASK_RQ,
name="unknown RQ task",
source=TransactionSource.TASK,
origin=RqIntegration.origin,
)

with capture_internal_exceptions():
transaction.name = job.func_name

with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={"rq_job": job},
):
rv = old_perform_job(self, job, *args, **kwargs)

if self.is_horse:
# We're inside of a forked process and RQ is
Expand Down Expand Up @@ -116,12 +146,20 @@ def sentry_patched_handle_exception(

old_enqueue_job = Queue.enqueue_job

@ensure_integration_enabled(RqIntegration, old_enqueue_job)
def sentry_patched_enqueue_job(
self: "Queue", job: "Any", **kwargs: "Any"
) -> "Any":
client = sentry_sdk.get_client()
Comment thread
alexander-alderman-webb marked this conversation as resolved.
if client.get_integration(RqIntegration) is None:
return old_enqueue_job(self, job, **kwargs)

scope = sentry_sdk.get_current_scope()
if scope.span is not None:
span = (
scope.streamed_span
if has_span_streaming_enabled(client.options)
else scope.span
)
if span is not None:
job.meta["_sentry_trace_headers"] = dict(
scope.iter_trace_propagation_headers()
)
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ def append(envelope):

def flush(timeout=None, callback=None):
real_flush(timeout=timeout, callback=callback)
items_w.write(json.dumps(telemetry).encode("utf-8"))
items_w.write(b"\n")
items_w.write(json.dumps(telemetry).encode("utf-8") + b"\n")
items_w.write(b"flush\n")

monkeypatch.setattr(test_client.transport, "capture_envelope", append)
monkeypatch.setattr(test_client, "flush", flush)
Expand Down
Loading
Loading