Skip to content

Commit a6decda

Browse files
authored
feat: wire TracingInterceptor, sandbox passthrough, replay-safe time, plugin name (#1)
1 parent 2d98c0b commit a6decda

9 files changed

Lines changed: 163 additions & 109 deletions

File tree

README.md

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ pip install temporal-parseable
1717
```python
1818
from temporalio.client import Client
1919
from temporalio.worker import Worker
20-
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions
2120
from temporal_parseable import ParseablePlugin, ParseableConfig
2221

2322
config = ParseableConfig(
@@ -30,16 +29,12 @@ plugin = ParseablePlugin(config)
3029

3130
client = await Client.connect("localhost:7233", plugins=[plugin])
3231

33-
sandbox = SandboxedWorkflowRunner(
34-
restrictions=SandboxRestrictions.default.with_passthrough_modules("temporal_parseable")
35-
)
36-
3732
async with Worker(
3833
client,
3934
task_queue="my-queue",
4035
workflows=[MyWorkflow],
4136
activities=[my_activity],
42-
workflow_runner=sandbox,
37+
plugins=[plugin],
4338
):
4439
await asyncio.Event().wait()
4540
```

pyproject.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ classifiers = [
2424
"Topic :: System :: Monitoring",
2525
]
2626
dependencies = [
27-
"temporalio>=1.7",
27+
"temporalio>=1.23.0",
2828
# OTel pinned to 1.x — Temporal's contrib.opentelemetry rides 1.x.
2929
# Bump the ceiling once temporalio.contrib.opentelemetry moves to 2.x.
3030
"opentelemetry-sdk>=1.25,<2",
@@ -51,8 +51,6 @@ packages = ["src/temporal_parseable"]
5151
[tool.pytest.ini_options]
5252
asyncio_mode = "auto"
5353
testpaths = ["tests"]
54-
markers = ["integration: requires live Temporal server (skipped in CI)"]
55-
addopts = "-m 'not integration'"
5654

5755
[tool.ruff]
5856
line-length = 100

src/temporal_parseable/__init__.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@
3434

3535
from typing import Optional, Type
3636

37+
from opentelemetry import trace as _otel_trace
38+
from temporalio.contrib.opentelemetry import TracingInterceptor
3739
from temporalio.plugin import SimplePlugin
3840
from temporalio.worker import ActivityInboundInterceptor, Interceptor, WorkflowInterceptorClassInput, WorkflowOutboundInterceptor
41+
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
3942

4043
from .config import ParseableConfig, LogsConfig, TracesConfig
4144
from .exporters import build_tracer_provider, build_logger_provider
@@ -47,6 +50,12 @@
4750
from . import workflow as _workflow_module
4851
from ._version import PLUGIN_VERSION
4952

53+
_PASSTHROUGH_MODULES = (
54+
"temporal_parseable",
55+
"opentelemetry",
56+
"google.protobuf",
57+
)
58+
5059
__version__ = PLUGIN_VERSION
5160
__all__ = [
5261
"ParseablePlugin",
@@ -69,10 +78,17 @@ def __init__(self, config: Optional[ParseableConfig] = None) -> None:
6978
_workflow_module._set_emitter(self._emitter)
7079

7180
worker_interceptor = _ParseableWorkerInterceptor(self._emitter)
81+
interceptors: list[Interceptor] = [worker_interceptor]
82+
83+
if self._tracer_provider is not None:
84+
_otel_trace.set_tracer_provider(self._tracer_provider)
85+
tracer = self._tracer_provider.get_tracer(__name__)
86+
interceptors.append(TracingInterceptor(tracer=tracer))
7287

7388
super().__init__(
74-
name="parseable.temporal",
75-
interceptors=[worker_interceptor],
89+
name="parseable.ParseablePlugin",
90+
interceptors=interceptors,
91+
workflow_runner=_apply_passthrough,
7692
)
7793

7894
@property
@@ -86,6 +102,15 @@ def shutdown(self) -> None:
86102
self._logger_provider.shutdown()
87103

88104

105+
def _apply_passthrough(existing: object) -> SandboxedWorkflowRunner:
106+
base = existing if isinstance(existing, SandboxedWorkflowRunner) else SandboxedWorkflowRunner()
107+
restrictions = base.restrictions.with_passthrough_modules(*_PASSTHROUGH_MODULES)
108+
return SandboxedWorkflowRunner(
109+
restrictions=restrictions,
110+
runner_class=base.runner_class,
111+
)
112+
113+
89114
class _ParseableWorkerInterceptor(Interceptor):
90115
def __init__(self, emitter: ParseableEmitter) -> None:
91116
self._emitter = emitter

src/temporal_parseable/activity_interceptor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import Any, Dict, cast
1717

1818
from temporalio import activity
19+
from temporalio.exceptions import ApplicationError
1920
from temporalio.worker import ActivityInboundInterceptor, ExecuteActivityInput
2021

2122
from ._emitter import ParseableEmitter, _now_iso
@@ -71,7 +72,8 @@ async def execute_activity(self, input: ExecuteActivityInput) -> Any:
7172
status="failed",
7273
timestamp=_now_iso(),
7374
duration_ms=round(duration_ms, 3),
74-
error=str(exc),
75+
error=exc.message if isinstance(exc, ApplicationError) else str(exc),
76+
error_type=exc.type if isinstance(exc, ApplicationError) else type(exc).__name__,
7577
))
7678
raise
7779

src/temporal_parseable/exporters.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def _sanitize_span(span: ReadableSpan) -> ReadableSpan:
5151
clean[key] = sanitised
5252
try:
5353
object.__setattr__(span, "_attributes", clean)
54-
except (AttributeError, TypeError):
55-
pass
54+
except (AttributeError, TypeError) as exc:
55+
logger.warning("Could not sanitize span attributes: %s", exc)
5656
return span
5757

5858

src/temporal_parseable/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ParseableEventRecord(TypedDict, total=False):
5656
# ── Completion / failure fields ─────────────────────────────────────────
5757
duration_ms: float
5858
error: str # failure message
59+
error_type: str # ApplicationError.type or exception class name
5960

6061
# ── Message records (signal/query/update/child_workflow/continue_as_new) ─
6162
direction: EventDirection

src/temporal_parseable/workflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,5 @@ def workflow_event(name: str, data: Optional[Dict[str, Any]] = None) -> None:
7979
"workflow_id": info.workflow_id,
8080
"run_id": info.run_id,
8181
"workflow_name": info.workflow_type,
82+
"timestamp": workflow.now().isoformat(),
8283
})

0 commit comments

Comments
 (0)