Skip to content

Commit b39548c

Browse files
authored
Feat: Durable execution frontend work and API improvements (#3639)
* feat: initial work on durable event log ui * feat: use the logging component to show the event log * remove evicted crescent since the hover state was broken and it looked awkward * fix: filter out memo events, change copy * fix: durable event log height * feat: add helper hints for run triggers too * feat: rework wait data * feat: bulk run grouping * fix: or groups, more grouping on fe * feat: simplify api more * fix: get rid of cursor-impl mini map click behavior * feat: start consolidating durable and non-durable event logs * fix: make log level an enum for log component * feat: improve color of different logs, etc * fix: show task display names for dags * refactor: move durable event list to durable events repo, update api to return external ids and display names consistently * fix: swap order of replay / cancel buttons * feat: tabs cleanup for side panel * fix: input, output, additional metadata tab sizes in the side panel * chore: lint * feat: add link to log line * fix: improve copy * chore: lint and gen * feat: label in ts * chore: gen protos * fix: default behavior * chore: versions, changelogs * chore: rm some comments * chore: docs * fix: tests * fix: one more satisfied-now fix * fix: another copilot suggestion * fix: more copilot * fix: pagination * fix: start fixing link clicks * fix: log click overflow logic * Fix: Misc frontend issues (#3633) * fix: event column truncation * feat: add popover for worker labels * fix: memoize table cols * fix: simple table scrolling and padding * fix: lint * fix: dag view * fix: populator type assert * fix: remove duration string helper * fix: durable task ids
1 parent 73de35e commit b39548c

80 files changed

Lines changed: 4098 additions & 980 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api-contracts/openapi/components/schemas/_index.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,12 @@ V1BranchDurableTaskRequest:
384384
$ref: "./v1/workflow_run.yaml#/V1BranchDurableTaskRequest"
385385
V1BranchDurableTaskResponse:
386386
$ref: "./v1/workflow_run.yaml#/V1BranchDurableTaskResponse"
387+
V1DurableEventLogKind:
388+
$ref: "./v1/workflow_run.yaml#/V1DurableEventLogKind"
389+
V1DurableEventLogEntry:
390+
$ref: "./v1/workflow_run.yaml#/V1DurableEventLogEntry"
391+
V1DurableEventLogList:
392+
$ref: "./v1/workflow_run.yaml#/V1DurableEventLogList"
387393
V1LogLine:
388394
$ref: "./v1/logs.yaml#/V1LogLine"
389395
V1LogLineLevel:

api-contracts/openapi/components/schemas/v1/task.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ V1TaskSummary:
3535
duration:
3636
type: integer
3737
description: The duration of the task run, in milliseconds.
38+
isDurable:
39+
type: boolean
40+
description: Whether this task was created as a durable task.
3841
errorMessage:
3942
type: string
4043
description: The error message of the task run (for the latest run)

api-contracts/openapi/components/schemas/v1/workflow_run.yaml

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,110 @@ V1TriggerWorkflowRunRequest:
145145
- workflowName
146146
- input
147147

148+
V1DurableEventLogKind:
149+
type: string
150+
enum:
151+
- RUN
152+
- WAIT_FOR
153+
- MEMO
154+
155+
V1DurableWaitConditionKind:
156+
type: string
157+
enum:
158+
- SLEEP
159+
- USER_EVENT
160+
- CHILD_WORKFLOW
161+
162+
V1DurableWaitCondition:
163+
type: object
164+
properties:
165+
kind:
166+
$ref: "#/V1DurableWaitConditionKind"
167+
sleepDurationMs:
168+
type: integer
169+
format: int64
170+
eventKey:
171+
type: string
172+
workflowName:
173+
type: string
174+
required:
175+
- kind
176+
177+
V1WaitItem:
178+
type: object
179+
properties:
180+
kind:
181+
$ref: "#/V1DurableWaitConditionKind"
182+
sleepDurationMs:
183+
type: integer
184+
format: int64
185+
eventKey:
186+
type: string
187+
workflowName:
188+
type: string
189+
or:
190+
type: array
191+
items:
192+
$ref: "#/V1DurableWaitCondition"
193+
194+
V1WaitData:
195+
type: array
196+
items:
197+
$ref: "#/V1WaitItem"
198+
199+
V1DurableEventLogEntry:
200+
type: object
201+
properties:
202+
nodeId:
203+
type: integer
204+
format: int64
205+
description: The monotonically increasing node id in the event log.
206+
branchId:
207+
type: integer
208+
format: int64
209+
description: The branch id when this entry was first seen.
210+
kind:
211+
$ref: "#/V1DurableEventLogKind"
212+
waitData:
213+
$ref: "#/V1WaitData"
214+
isSatisfied:
215+
type: boolean
216+
description: Whether this entry has been satisfied.
217+
satisfiedAt:
218+
type: string
219+
format: date-time
220+
description: When this entry was satisfied, if it has been satisfied.
221+
insertedAt:
222+
type: string
223+
format: date-time
224+
description: When this entry was inserted.
225+
userMessage:
226+
type: string
227+
description: A user-provided message or label, sent when establishing a durable wait.
228+
taskExternalId:
229+
type: string
230+
format: uuid
231+
minLength: 36
232+
maxLength: 36
233+
description: The external id of the durable task this event log entry is associated with.
234+
taskDisplayName:
235+
type: string
236+
description: The display name of the durable task this event log entry is associated with.
237+
required:
238+
- nodeId
239+
- branchId
240+
- kind
241+
- isSatisfied
242+
- insertedAt
243+
- taskExternalId
244+
- taskDisplayName
245+
246+
V1DurableEventLogList:
247+
type: array
248+
items:
249+
$ref: "#/V1DurableEventLogEntry"
250+
251+
148252
V1BranchDurableTaskRequest:
149253
properties:
150254
taskExternalId:

api-contracts/openapi/openapi.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ paths:
5151
$ref: "./paths/v1/workflow-runs/workflow_run.yaml#/trigger"
5252
/api/v1/stable/tenants/{tenant}/durable-tasks/branch:
5353
$ref: "./paths/v1/workflow-runs/workflow_run.yaml#/branchDurableTask"
54+
/api/v1/stable/durable-tasks/{durable-task}:
55+
$ref: "./paths/v1/workflow-runs/workflow_run.yaml#/listDurableEventLog"
5456
/api/v1/stable/workflow-runs/{v1-workflow-run}:
5557
$ref: "./paths/v1/workflow-runs/workflow_run.yaml#/getWorkflowRunDetails"
5658
/api/v1/stable/workflow-runs/{v1-workflow-run}/status:

api-contracts/openapi/paths/v1/workflow-runs/workflow_run.yaml

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,3 +581,61 @@ getTimings:
581581
summary: List timings for a workflow run
582582
tags:
583583
- Workflow Runs
584+
585+
listDurableEventLog:
586+
get:
587+
x-resources: ["durable-task"]
588+
description: Lists all event log entries for a durable task.
589+
operationId: v1-durable-task:event-log:list
590+
parameters:
591+
- description: The durable task external id
592+
in: path
593+
name: durable-task
594+
required: true
595+
schema:
596+
type: string
597+
format: uuid
598+
minLength: 36
599+
maxLength: 36
600+
- description: The number of event log entries to skip
601+
in: query
602+
name: offset
603+
required: false
604+
schema:
605+
type: integer
606+
format: int64
607+
- description: The number of event log entries to limit by
608+
in: query
609+
name: limit
610+
required: false
611+
schema:
612+
type: integer
613+
format: int64
614+
responses:
615+
"200":
616+
content:
617+
application/json:
618+
schema:
619+
$ref: "../../../components/schemas/_index.yaml#/V1DurableEventLogList"
620+
description: Successfully listed event log entries
621+
"400":
622+
content:
623+
application/json:
624+
schema:
625+
$ref: "../../../components/schemas/_index.yaml#/APIErrors"
626+
description: A malformed or bad request
627+
"403":
628+
content:
629+
application/json:
630+
schema:
631+
$ref: "../../../components/schemas/_index.yaml#/APIErrors"
632+
description: Forbidden
633+
"404":
634+
content:
635+
application/json:
636+
schema:
637+
$ref: "../../../components/schemas/_index.yaml#/APIErrors"
638+
description: Not found
639+
summary: List durable event log
640+
tags:
641+
- Durable Tasks

api-contracts/v1/dispatcher.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ message DurableTaskWaitForRequest {
132132

133133
// Fields for DURABLE_TASK_TRIGGER_KIND_WAIT_FOR
134134
optional DurableEventListenerConditions wait_for_conditions = 3;
135+
136+
// An optional human-readable label for this wait, displayed in the dashboard.
137+
// Example: "Waiting for payment confirmation"
138+
optional string label = 4;
135139
}
136140

137141
message DurableTaskRequest {

api/v1/server/authz/rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,4 @@ roles:
146146
- V1TenantLogLineList
147147
- V1TenantLogLineGetPointMetrics
148148
- TenantFeatureFlagEvaluate
149+
- V1DurableTaskEventLogList
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package durabletasks
2+
3+
import (
4+
"encoding/json"
5+
"time"
6+
7+
"github.com/labstack/echo/v4"
8+
9+
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
10+
"github.com/hatchet-dev/hatchet/pkg/repository"
11+
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
12+
)
13+
14+
func (t *DurableTasksService) V1DurableTaskEventLogList(ctx echo.Context, request gen.V1DurableTaskEventLogListRequestObject) (gen.V1DurableTaskEventLogListResponseObject, error) {
15+
task := ctx.Get("durable-task").(*sqlcv1.V1TasksOlap)
16+
17+
limit := int64(1000)
18+
offset := int64(0)
19+
20+
if request.Params.Limit != nil {
21+
limit = *request.Params.Limit
22+
}
23+
24+
if request.Params.Offset != nil {
25+
offset = *request.Params.Offset
26+
}
27+
28+
entries, err := t.config.V1.DurableEvents().ListDurableEventLog(
29+
ctx.Request().Context(),
30+
task.TenantID,
31+
task.InsertedAt,
32+
task.ID,
33+
limit,
34+
offset,
35+
)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
return gen.V1DurableTaskEventLogList200JSONResponse(toDurableEventLogEntries(entries)), nil
41+
}
42+
43+
func toDurableEventLogEntries(entries []*sqlcv1.ListDurableEventLogForTaskRow) []gen.V1DurableEventLogEntry {
44+
result := make([]gen.V1DurableEventLogEntry, 0, len(entries))
45+
46+
for _, e := range entries {
47+
var insertedAt time.Time
48+
if e.InsertedAt.Valid {
49+
insertedAt = e.InsertedAt.Time
50+
}
51+
52+
entry := gen.V1DurableEventLogEntry{
53+
NodeId: e.NodeID,
54+
BranchId: e.BranchID,
55+
Kind: gen.V1DurableEventLogKind(e.Kind),
56+
IsSatisfied: e.IsSatisfied,
57+
InsertedAt: insertedAt,
58+
TaskExternalId: e.DurableTaskExternalID,
59+
TaskDisplayName: e.DurableTaskDisplayName,
60+
}
61+
62+
if e.UserMessage.Valid {
63+
entry.UserMessage = &e.UserMessage.String
64+
}
65+
66+
if e.SatisfiedAt.Valid {
67+
entry.SatisfiedAt = &e.SatisfiedAt.Time
68+
}
69+
70+
if len(e.WaitData) > 0 {
71+
entry.WaitData = toGenWaitData(e.WaitData)
72+
}
73+
74+
result = append(result, entry)
75+
}
76+
77+
return result
78+
}
79+
80+
func toGenWaitData(raw []byte) *gen.V1WaitData {
81+
var wd repository.WaitData
82+
if err := json.Unmarshal(raw, &wd); err != nil {
83+
return nil
84+
}
85+
86+
var items []gen.V1WaitItem
87+
88+
for _, c := range wd.Conditions {
89+
kind := gen.V1DurableWaitConditionKind(c.Kind)
90+
items = append(items, gen.V1WaitItem{
91+
Kind: &kind,
92+
SleepDurationMs: c.SleepDurationMs,
93+
EventKey: c.EventKey,
94+
WorkflowName: c.WorkflowName,
95+
})
96+
}
97+
98+
for _, g := range wd.OrGroups {
99+
if len(g.Conditions) == 1 {
100+
// normalize legacy single-condition OR groups
101+
kind := gen.V1DurableWaitConditionKind(g.Conditions[0].Kind)
102+
items = append(items, gen.V1WaitItem{
103+
Kind: &kind,
104+
SleepDurationMs: g.Conditions[0].SleepDurationMs,
105+
EventKey: g.Conditions[0].EventKey,
106+
WorkflowName: g.Conditions[0].WorkflowName,
107+
})
108+
continue
109+
}
110+
genConds := make([]gen.V1DurableWaitCondition, 0, len(g.Conditions))
111+
for _, c := range g.Conditions {
112+
kind := gen.V1DurableWaitConditionKind(c.Kind)
113+
genConds = append(genConds, gen.V1DurableWaitCondition{
114+
Kind: kind,
115+
SleepDurationMs: c.SleepDurationMs,
116+
EventKey: c.EventKey,
117+
WorkflowName: c.WorkflowName,
118+
})
119+
}
120+
items = append(items, gen.V1WaitItem{Or: &genConds})
121+
}
122+
123+
if len(items) == 0 {
124+
return nil
125+
}
126+
127+
result := gen.V1WaitData(items)
128+
return &result
129+
}

0 commit comments

Comments
 (0)