Skip to content

Commit 1c63bed

Browse files
BloggerBustwsehl
authored andcommitted
feat(docs): python support agent cookbook example (#3623)
* feat(python): add support_agent example with durable reply handling - add support_agent durable workflow example - add triage, reply generation, and escalation tasks - use consider_events_since for early scoped reply events - add trigger script for the example - add E2E tests for resolved and timeout paths - register support_agent workflows in the central example worker * docs(cookbooks): add support agent workflow cookbook and example snippets * docs(cookbooks): add support agent sidebar entry * docs(cookbooks): minor improvement to support agent intro and fallback wording * docs(cookbooks): document support agent worker registration * docs(cookbooks): document support agent worker registration * docs(lint): Fixed black errors * docs(cookbooks): Improve phrasing and explanations Also include how to run the tests themselves. * docs(cookbooks): replace ASCII diagram with Mermaid * docs(cookbooks): Generalize intro beyond support workflows * docs(cookbooks): add links to support agent cookbook * docs(cookbooks): clarify durable workflow rationale
1 parent 8f3a0cc commit 1c63bed

10 files changed

Lines changed: 663 additions & 0 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# > Trigger the workflow
2+
from examples.support_agent.worker import (
3+
REPLY_EVENT_KEY,
4+
SupportTicketInput,
5+
hatchet,
6+
support_agent,
7+
)
8+
9+
ticket = SupportTicketInput(
10+
ticket_id="ticket-42",
11+
customer_email="alice@example.com",
12+
subject="Login broken",
13+
body="I can't log in since this morning.",
14+
)
15+
16+
# Start the support agent workflow
17+
ref = support_agent.run(ticket, wait_for_result=False)
18+
print(f"Started workflow run: {ref.workflow_run_id}")
19+
20+
# Push a customer reply event (scoped to this ticket)
21+
print("Pushing customer reply event...")
22+
hatchet.event.push(
23+
REPLY_EVENT_KEY,
24+
{"message": "I cleared my cookies and it works now. Thanks!"},
25+
scope=ticket.ticket_id,
26+
)
27+
28+
# Wait for the workflow to complete
29+
result = ref.result()
30+
print(f"Workflow completed: {result}")
31+
32+
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import os
2+
from datetime import timedelta
3+
from typing import Any
4+
5+
from pydantic import BaseModel
6+
7+
from hatchet_sdk import (
8+
Context,
9+
DurableContext,
10+
Hatchet,
11+
SleepCondition,
12+
UserEventCondition,
13+
or_,
14+
)
15+
16+
hatchet = Hatchet()
17+
18+
REPLY_EVENT_KEY = "support:customer-reply"
19+
TIMEOUT_SECONDS = 5
20+
21+
22+
# > Models
23+
class SupportTicketInput(BaseModel):
24+
ticket_id: str
25+
customer_email: str
26+
subject: str
27+
body: str
28+
29+
30+
class TriageOutput(BaseModel):
31+
category: str
32+
priority: str
33+
34+
35+
class ReplyOutput(BaseModel):
36+
message: str
37+
38+
39+
class EscalationOutput(BaseModel):
40+
reason: str
41+
assigned_to: str
42+
43+
44+
45+
46+
# > Triage task
47+
@hatchet.task(input_validator=SupportTicketInput)
48+
async def triage_ticket(input: SupportTicketInput, ctx: Context) -> TriageOutput:
49+
"""Classify the ticket into a category and priority."""
50+
subject = input.subject.lower()
51+
body = input.body.lower()
52+
text = subject + " " + body
53+
54+
if any(word in text for word in ["bill", "charge", "payment", "invoice"]):
55+
category = "billing"
56+
elif any(word in text for word in ["login", "password", "auth", "access"]):
57+
category = "account"
58+
else:
59+
category = "technical"
60+
61+
if any(word in text for word in ["urgent", "critical", "down", "outage"]):
62+
priority = "high"
63+
elif any(word in text for word in ["twice", "broken", "error"]):
64+
priority = "medium"
65+
else:
66+
priority = "low"
67+
68+
return TriageOutput(category=category, priority=priority)
69+
70+
71+
72+
73+
# > Generate reply task
74+
@hatchet.task(input_validator=SupportTicketInput)
75+
async def generate_reply(input: SupportTicketInput, ctx: Context) -> ReplyOutput:
76+
"""Generate an initial support reply using Claude."""
77+
api_key = os.environ.get("ANTHROPIC_API_KEY")
78+
79+
if not api_key:
80+
return ReplyOutput(
81+
message=f"Thank you for contacting support about: {input.subject}. "
82+
"We are looking into this and will get back to you shortly."
83+
)
84+
85+
import anthropic
86+
87+
client = anthropic.AsyncAnthropic(api_key=api_key)
88+
89+
response = await client.messages.create(
90+
model="claude-sonnet-4-20250514",
91+
max_tokens=300,
92+
messages=[
93+
{
94+
"role": "user",
95+
"content": (
96+
f"You are a friendly support agent. Write a brief, helpful initial "
97+
f"reply to this support ticket.\n\n"
98+
f"Subject: {input.subject}\n"
99+
f"Message: {input.body}\n\n"
100+
f"Keep the reply under 3 sentences."
101+
),
102+
}
103+
],
104+
)
105+
106+
text = response.content[0].text
107+
return ReplyOutput(message=text)
108+
109+
110+
111+
112+
# > Escalate task
113+
@hatchet.task(input_validator=SupportTicketInput)
114+
async def escalate_ticket(input: SupportTicketInput, ctx: Context) -> EscalationOutput:
115+
"""Escalate an unresolved ticket to the human support team."""
116+
return EscalationOutput(
117+
reason=f"No customer reply within {TIMEOUT_SECONDS}s timeout",
118+
assigned_to="support-team@example.com",
119+
)
120+
121+
122+
123+
124+
# > Support agent workflow
125+
@hatchet.durable_task(input_validator=SupportTicketInput)
126+
async def support_agent(
127+
input: SupportTicketInput, ctx: DurableContext
128+
) -> dict[str, Any]:
129+
# Step 1: Triage the ticket
130+
triage = await triage_ticket.aio_run(input)
131+
132+
# Step 2: Generate an initial reply
133+
reply = await generate_reply.aio_run(input)
134+
135+
# Step 3: Wait for a customer reply or timeout
136+
now = await ctx.aio_now()
137+
consider_events_since = now - timedelta(minutes=5)
138+
139+
wait_result = await ctx.aio_wait_for(
140+
"await-customer-reply",
141+
or_(
142+
SleepCondition(timedelta(seconds=TIMEOUT_SECONDS)),
143+
UserEventCondition(
144+
event_key=REPLY_EVENT_KEY,
145+
scope=input.ticket_id,
146+
consider_events_since=consider_events_since,
147+
),
148+
),
149+
)
150+
151+
# The or-group result is {"CREATE": {"<condition_key>": ...}}.
152+
# Check whether the reply event condition was the one that resolved.
153+
resolved_key = list(wait_result["CREATE"].keys())[0]
154+
customer_replied = resolved_key == REPLY_EVENT_KEY
155+
156+
if not customer_replied:
157+
# Step 4a: Timeout -> escalate
158+
await escalate_ticket.aio_run(input)
159+
return {
160+
"ticket_id": input.ticket_id,
161+
"status": "escalated",
162+
"triage_category": triage.category,
163+
"triage_priority": triage.priority,
164+
"initial_reply": reply.message,
165+
}
166+
167+
# Step 4b: Customer replied -> resolve
168+
return {
169+
"ticket_id": input.ticket_id,
170+
"status": "resolved",
171+
"triage_category": triage.category,
172+
"triage_priority": triage.priority,
173+
"initial_reply": reply.message,
174+
}
175+
176+
177+
178+
179+
# > Worker registration
180+
def main() -> None:
181+
worker = hatchet.worker(
182+
"support-agent-worker",
183+
workflows=[support_agent, triage_ticket, generate_reply, escalate_ticket],
184+
)
185+
worker.start()
186+
187+
188+
if __name__ == "__main__":
189+
main()
190+
191+

examples/python/worker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@
7676
from examples.run_details.worker import run_detail_test_workflow
7777
from examples.serde.worker import serde_workflow
7878
from examples.simple.worker import simple, simple_durable
79+
from examples.support_agent.worker import (
80+
escalate_ticket,
81+
generate_reply,
82+
support_agent,
83+
triage_ticket,
84+
)
7985
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
8086
from examples.webhook_with_scope.worker import (
8187
webhook_with_scope,
@@ -171,6 +177,10 @@ def main() -> None:
171177
otel_simple_task,
172178
otel_spawn_parent,
173179
otel_workflow,
180+
support_agent,
181+
triage_ticket,
182+
generate_reply,
183+
escalate_ticket,
174184
],
175185
lifespan=lifespan,
176186
)

frontend/docs/pages/cookbooks/_meta.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,9 @@ export default {
77
"webhooks-stripe": "Stripe",
88
"webhooks-github": "GitHub",
99
"webhooks-slack": "Slack",
10+
"--workflow-patterns": {
11+
title: "Workflow Patterns",
12+
type: "separator",
13+
},
14+
"workflow-support-agent": "Support Agent",
1015
};

frontend/docs/pages/cookbooks/index.mdx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,14 @@ Receive webhooks from external services and use them to trigger tasks in Hatchet
2323
Respond to Slack events like messages, reactions, and slash commands.
2424
</Cards.Card>
2525
</Cards>
26+
27+
## Workflow Patterns
28+
29+
Build practical end-to-end workflows with Hatchet’s durable execution, event handling, and task orchestration primitives.
30+
31+
<Cards>
32+
<Cards.Card title="Support Agent" href="/cookbooks/workflow-support-agent">
33+
Build a durable support workflow that triages tickets, waits for customer
34+
replies, and escalates on timeout.
35+
</Cards.Card>
36+
</Cards>
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { Steps } from "nextra/components";
2+
import { snippets } from "@/lib/generated/snippets";
3+
import { Snippet } from "@/components/code";
4+
5+
# How to Create a Support Agent Using Hatchet
6+
7+
Many real-world workflows become difficult to manage once they involve multiple steps, long waits, human replies, and escalation rules. Support is one example, but the same pattern also shows up in onboarding, approvals, incident response, and other operational flows. In this cookbook, we will build a simple support agent that triages a ticket, generates an initial reply, and then waits for either a customer response or a timeout. If the customer replies, the workflow resolves. If no reply arrives in time, the workflow escalates the ticket to a human support agent.
8+
9+
## What this example builds
10+
11+
This example implements the following durable support workflow:
12+
13+
```mermaid
14+
flowchart TD
15+
A[Support ticket received] --> B[Triage the ticket]
16+
B --> C[Generate initial reply]
17+
C --> D[Wait for reply or timeout]
18+
D --> E[Customer reply]
19+
D --> F[Timeout fires]
20+
E --> G[Resolve ticket]
21+
F --> H[Escalate to human support]
22+
```
23+
24+
Hatchet’s durable execution model helps keep the whole interaction in one workflow rather than scattering it across separate queue jobs and ad hoc timers.
25+
26+
## Setup
27+
28+
<Steps>
29+
30+
### Prepare your environment
31+
32+
To run this example, you will need:
33+
34+
- a working local Hatchet environment or access to [Hatchet Cloud](https://cloud.onhatchet.run)
35+
- the Python SDK example environment (see the [Quickstart](/v1/quickstart))
36+
- optionally, an `ANTHROPIC_API_KEY`
37+
38+
Without `ANTHROPIC_API_KEY`, the example still runs using a fixed fallback reply.
39+
40+
### Define the models
41+
42+
Start with a few small Pydantic models for the workflow input and outputs.
43+
44+
<Snippet src={snippets.python.support_agent.worker.models} />
45+
46+
The models keep the inputs and outputs for each task explicit, which makes the workflow easier to inspect and test.
47+
48+
### Add the workflow tasks
49+
50+
The durable workflow delegates its work to a few small [tasks](/v1/tasks).
51+
52+
First, add a task to classify the incoming ticket:
53+
54+
<Snippet src={snippets.python.support_agent.worker.triage_task} />
55+
56+
Next, add a task to generate the initial support reply. This task calls Claude when `ANTHROPIC_API_KEY` is present and falls back to a fixed response when it is not.
57+
58+
<Snippet src={snippets.python.support_agent.worker.generate_reply_task} />
59+
60+
Finally, add a task to represent escalation to the support team:
61+
62+
<Snippet src={snippets.python.support_agent.worker.escalate_task} />
63+
64+
Keeping triage, reply generation, and escalation as separate tasks keeps the workflow itself small and makes each piece easier to reason about.
65+
66+
### Build the durable workflow
67+
68+
Now tie everything together in a [durable Hatchet workflow](/v1/durable-execution). A durable workflow is a good fit here because this interaction may stay open for some time while waiting for a customer reply. Hatchet persists the workflow state and its wait conditions, so the workflow can survive long delays, worker restarts, or even a worker crash, then continue later on another worker. That gives you a straightforward way to model the whole interaction without adding custom recovery logic.
69+
70+
<Snippet src={snippets.python.support_agent.worker.support_agent_workflow} />
71+
72+
As you can see, the workflow runs triage first, generates an initial reply, and then waits for one of two things to happen: either a customer reply event arrives for that ticket, or the timeout fires. From there, the workflow either resolves the ticket or escalates it.
73+
74+
The detail that matters most here is [`consider_events_since`](/v1/durable-event-waits#lookback-windows) on the reply event condition. A customer reply could arrive while the workflow is still finishing triage or generating the first response. By using `consider_events_since`, the workflow can still pick up that reply once the wait becomes active instead of missing it because the event arrived slightly early.
75+
76+
### Register and start the worker
77+
78+
To run this workflow, register the workflow and its tasks on a Hatchet worker, then start it.
79+
80+
<Snippet src={snippets.python.support_agent.worker.worker_registration} />
81+
82+
With the worker running, you can trigger the workflow and observe either the resolved or escalated outcome.
83+
84+
### Trigger the workflow
85+
86+
The example also includes a small trigger script that starts the workflow, pushes a scoped reply event, and waits for the result.
87+
88+
<Snippet src={snippets.python.support_agent.trigger.trigger_the_workflow} />
89+
90+
Because the workflow uses `consider_events_since`, the trigger can push the reply event immediately after starting the support agent.
91+
92+
### Test it
93+
94+
This example includes two end-to-end tests against a live Hatchet instance:
95+
96+
- a resolved path, where the customer reply event arrives before the timeout
97+
- a timeout path, where no reply arrives and the workflow escalates
98+
99+
If you are running the Python SDK examples locally, you can run the support agent tests with:
100+
101+
```bash
102+
pytest examples/support_agent/test_support_agent.py
103+
```
104+
105+
Together, these tests validate both branches of the workflow and confirm that early reply events are handled safely without coordination sleeps.
106+
107+
</Steps>
108+
109+
## Why Hatchet fits this workflow
110+
111+
The interesting part of this example is not the LLM call. It is the combination of waiting, branching, and keeping the full interaction in one place. A support flow like this usually needs to preserve state across several steps, wait for human input, and react differently depending on whether a reply arrives before a deadline. Hatchet fits that pattern well because you can express the event wait and timeout branch directly in the workflow. That makes the control flow easier to inspect, easier to test, and easier to extend as the interaction becomes more complex.
112+
113+
## Next steps
114+
115+
A natural next step would be to connect this workflow to a real ticketing system and carry the conversation beyond a single reply. You could also make escalation depend on the content of the customer response instead of only on timeout. For this cookbook, though, the smaller version is enough to show the core pattern: start work immediately, wait safely for a reply, and escalate when the deadline passes.

0 commit comments

Comments
 (0)