Skip to content

Commit 0b54ad8

Browse files
gruttmnafees
andauthored
feat: durable go (#3696)
* feat: go * chore: gen * add token.... * PR comments * fix dependabot yaml * fix e2e tests waiting * set RMQ vars * tls ci e2e * TLS off * fix test * some more fixes * count low * fix count --------- Co-authored-by: Mohammed Nafees <hello@mnafees.me>
1 parent 8c8d2c9 commit 0b54ad8

40 files changed

Lines changed: 5028 additions & 93 deletions

File tree

.github/dependabot.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ updates:
66
interval: "weekly"
77
day: "monday"
88
cooldown:
9-
semver-major-days: 30
9+
default-days: 30
1010
open-pull-requests-limit: 1
1111
groups:
1212
github-actions:

.github/workflows/test.yml

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ jobs:
149149
curl -sSf https://atlasgo.sh | sh
150150
151151
- name: Compose
152-
run: docker compose up -d
152+
run: docker compose up -d --wait
153153

154154
- name: Go deps
155155
run: go mod download
@@ -159,8 +159,6 @@ jobs:
159159
cat > .env <<EOF
160160
HATCHET_CLIENT_TENANT_ID=707d0855-80ab-4e1f-a156-f1c4546cbf52
161161
DATABASE_URL="postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet"
162-
HATCHET_CLIENT_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
163-
HATCHET_CLIENT_TLS_SERVER_NAME="cluster"
164162
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
165163
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
166164
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
@@ -181,6 +179,12 @@ jobs:
181179
SERVER_AUTH_COOKIE_DOMAIN=app.dev.hatchet-tools.com
182180
SERVER_AUTH_COOKIE_INSECURE=false
183181
SERVER_AUTH_SET_EMAIL_VERIFIED=true
182+
SERVER_MSGQUEUE_KIND=rabbitmq
183+
SERVER_MSGQUEUE_RABBITMQ_URL=amqp://user:password@127.0.0.1:5672/
184+
SERVER_GRPC_INSECURE=true
185+
SERVER_GRPC_BROADCAST_ADDRESS=127.0.0.1:7070
186+
SERVER_INTERNAL_CLIENT_BASE_STRATEGY=none
187+
SERVER_INTERNAL_CLIENT_BASE_INHERIT_BASE=false
184188
EOF
185189
186190
- name: Generate
@@ -195,11 +199,50 @@ jobs:
195199
. .env
196200
set +a
197201
202+
export SERVER_MSGQUEUE_KIND=rabbitmq
203+
export SERVER_MSGQUEUE_RABBITMQ_URL=amqp://user:password@127.0.0.1:5672/
204+
198205
go run ./cmd/hatchet-admin quickstart
199206
200-
go run ./cmd/hatchet-engine --config ./generated/ &
201-
go run ./cmd/hatchet-api --config ./generated/ &
202-
sleep 30
207+
go run ./cmd/hatchet-engine --config ./generated/ > /tmp/hatchet-engine.log 2>&1 &
208+
ENGINE_PID=$!
209+
go run ./cmd/hatchet-api --config ./generated/ > /tmp/hatchet-api.log 2>&1 &
210+
API_PID=$!
211+
212+
for i in {1..60}; do
213+
if (echo > /dev/tcp/127.0.0.1/7070) >/dev/null 2>&1 && (echo > /dev/tcp/127.0.0.1/8080) >/dev/null 2>&1; then
214+
echo "Hatchet engine and API are ready"
215+
break
216+
fi
217+
218+
if ! kill -0 "$ENGINE_PID" 2>/dev/null; then
219+
echo "Hatchet engine exited before becoming ready"
220+
cat /tmp/hatchet-engine.log
221+
exit 1
222+
fi
223+
224+
if ! kill -0 "$API_PID" 2>/dev/null; then
225+
echo "Hatchet API exited before becoming ready"
226+
cat /tmp/hatchet-api.log
227+
exit 1
228+
fi
229+
230+
sleep 2
231+
done
232+
233+
if ! (echo > /dev/tcp/127.0.0.1/7070) >/dev/null 2>&1 || ! (echo > /dev/tcp/127.0.0.1/8080) >/dev/null 2>&1; then
234+
echo "Timed out waiting for Hatchet engine/API to become ready"
235+
echo "=== Engine logs ==="
236+
cat /tmp/hatchet-engine.log
237+
echo "=== API logs ==="
238+
cat /tmp/hatchet-api.log
239+
exit 1
240+
fi
241+
242+
- name: Generate API token
243+
run: |
244+
echo "HATCHET_CLIENT_TOKEN=$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)" >> $GITHUB_ENV
245+
echo "HATCHET_CLIENT_TLS_STRATEGY=none" >> $GITHUB_ENV
203246
204247
- name: Test
205248
run: |
@@ -238,7 +281,7 @@ jobs:
238281
curl -sSf https://atlasgo.sh | sh
239282
240283
- name: Compose
241-
run: docker compose up -d
284+
run: docker compose up -d --wait
242285

243286
- name: Go deps
244287
run: go mod download
@@ -248,8 +291,6 @@ jobs:
248291
cat > .env <<EOF
249292
HATCHET_CLIENT_TENANT_ID=707d0855-80ab-4e1f-a156-f1c4546cbf52
250293
DATABASE_URL="postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet"
251-
HATCHET_CLIENT_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
252-
HATCHET_CLIENT_TLS_SERVER_NAME="cluster"
253294
SERVER_TLS_CERT_FILE=./hack/dev/certs/cluster.pem
254295
SERVER_TLS_KEY_FILE=./hack/dev/certs/cluster.key
255296
SERVER_TLS_ROOT_CA_FILE=./hack/dev/certs/ca.cert
@@ -272,6 +313,12 @@ jobs:
272313
SERVER_AUTH_SET_EMAIL_VERIFIED=true
273314
274315
SERVER_TASKQUEUE_KIND=postgres
316+
SERVER_MSGQUEUE_KIND=rabbitmq
317+
SERVER_MSGQUEUE_RABBITMQ_URL=amqp://user:password@127.0.0.1:5672/
318+
SERVER_GRPC_INSECURE=true
319+
SERVER_GRPC_BROADCAST_ADDRESS=127.0.0.1:7070
320+
SERVER_INTERNAL_CLIENT_BASE_STRATEGY=none
321+
SERVER_INTERNAL_CLIENT_BASE_INHERIT_BASE=false
275322
EOF
276323
277324
- name: Generate
@@ -286,11 +333,50 @@ jobs:
286333
. .env
287334
set +a
288335
336+
export SERVER_MSGQUEUE_KIND=rabbitmq
337+
export SERVER_MSGQUEUE_RABBITMQ_URL=amqp://user:password@127.0.0.1:5672/
338+
289339
go run ./cmd/hatchet-admin quickstart
290340
291-
go run ./cmd/hatchet-engine --config ./generated/ &
292-
go run ./cmd/hatchet-api --config ./generated/ &
293-
sleep 30
341+
go run ./cmd/hatchet-engine --config ./generated/ > /tmp/hatchet-engine.log 2>&1 &
342+
ENGINE_PID=$!
343+
go run ./cmd/hatchet-api --config ./generated/ > /tmp/hatchet-api.log 2>&1 &
344+
API_PID=$!
345+
346+
for i in {1..60}; do
347+
if (echo > /dev/tcp/127.0.0.1/7070) >/dev/null 2>&1 && (echo > /dev/tcp/127.0.0.1/8080) >/dev/null 2>&1; then
348+
echo "Hatchet engine and API are ready"
349+
break
350+
fi
351+
352+
if ! kill -0 "$ENGINE_PID" 2>/dev/null; then
353+
echo "Hatchet engine exited before becoming ready"
354+
cat /tmp/hatchet-engine.log
355+
exit 1
356+
fi
357+
358+
if ! kill -0 "$API_PID" 2>/dev/null; then
359+
echo "Hatchet API exited before becoming ready"
360+
cat /tmp/hatchet-api.log
361+
exit 1
362+
fi
363+
364+
sleep 2
365+
done
366+
367+
if ! (echo > /dev/tcp/127.0.0.1/7070) >/dev/null 2>&1 || ! (echo > /dev/tcp/127.0.0.1/8080) >/dev/null 2>&1; then
368+
echo "Timed out waiting for Hatchet engine/API to become ready"
369+
echo "=== Engine logs ==="
370+
cat /tmp/hatchet-engine.log
371+
echo "=== API logs ==="
372+
cat /tmp/hatchet-api.log
373+
exit 1
374+
fi
375+
376+
- name: Generate API token
377+
run: |
378+
echo "HATCHET_CLIENT_TOKEN=$(go run ./cmd/hatchet-admin token create --config ./generated/ --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52)" >> $GITHUB_ENV
379+
echo "HATCHET_CLIENT_TLS_STRATEGY=none" >> $GITHUB_ENV
294380
295381
- name: Test
296382
run: |

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,5 @@ frontend/docs/lib/generated/
103103
# Scripts
104104
hack/dev/psql-connect.sh
105105
CLAUDE.md
106+
107+
.cache/

examples/go/durable/event/main.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
type DurableInput struct {
1313
Message string `json:"message"`
14-
Delay int `json:"delay"` // seconds
1514
}
1615

1716
type DurableOutput struct {
@@ -26,14 +25,14 @@ func main() {
2625
}
2726

2827
// > Durable Event
29-
task := client.NewStandaloneDurableTask("long-running-task", func(ctx hatchet.DurableContext, input DurableInput) (DurableOutput, error) {
30-
log.Printf("Starting task, will sleep for %d seconds", input.Delay)
28+
task := client.NewStandaloneDurableTask("durable-event-task", func(ctx hatchet.DurableContext, input DurableInput) (DurableOutput, error) {
29+
log.Printf("Waiting for user:update event, message: %s", input.Message)
3130

32-
if _, err := ctx.WaitForEvent("user:updated", ""); err != nil {
31+
if _, err := ctx.WaitForEvent("user:update", ""); err != nil {
3332
return DurableOutput{}, err
3433
}
3534

36-
log.Printf("Finished waiting for event, processing message: %s", input.Message)
35+
log.Printf("Got event, processing message: %s", input.Message)
3736

3837
return DurableOutput{
3938
ProcessedAt: time.Now().Format(time.RFC3339),
@@ -43,14 +42,14 @@ func main() {
4342

4443
_ = func(ctx hatchet.DurableContext) (DurableOutput, error) {
4544
// > Durable Event With Filter
46-
if _, err := ctx.WaitForEvent("user:updated", "input.status_code == 200"); err != nil {
45+
if _, err := ctx.WaitForEvent("user:update", "input.user_id == '1234'"); err != nil {
4746
return DurableOutput{}, err
4847
}
4948

5049
return DurableOutput{}, nil
5150
}
5251

53-
worker, err := client.NewWorker("durable-worker",
52+
worker, err := client.NewWorker("durable-event-worker",
5453
hatchet.WithWorkflows(task),
5554
hatchet.WithDurableSlots(10),
5655
)
@@ -67,10 +66,8 @@ func main() {
6766
}
6867
}()
6968

70-
// Run the workflow with a 30-second delay
71-
_, err = client.Run(context.Background(), "durable-workflow", DurableInput{
69+
_, err = client.Run(context.Background(), "durable-event-task", DurableInput{
7270
Message: "Hello from durable task!",
73-
Delay: 30,
7471
})
7572
if err != nil {
7673
log.Fatalf("failed to run workflow: %v", err)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
9+
)
10+
11+
const eventKey = "durable-eviction:event"
12+
13+
func main() {
14+
client, err := hatchet.NewClient()
15+
if err != nil {
16+
log.Fatalf("failed to create hatchet client: %v", err)
17+
}
18+
19+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
20+
defer cancel()
21+
22+
if err := client.Events().Push(ctx, eventKey, map[string]any{}); err != nil {
23+
log.Fatalf("failed to push %s: %v", eventKey, err)
24+
}
25+
26+
log.Printf("pushed event %s", eventKey)
27+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
8+
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
9+
)
10+
11+
const (
12+
evictionTTLSeconds = 5
13+
longSleepSeconds = 15
14+
eventKey = "durable-eviction:event"
15+
)
16+
17+
type EmptyInput struct{}
18+
19+
type EvictionOutput struct {
20+
Status string `json:"status"`
21+
}
22+
23+
func main() {
24+
client, err := hatchet.NewClient()
25+
if err != nil {
26+
log.Fatalf("failed to create hatchet client: %v", err)
27+
}
28+
29+
// > Eviction Policy
30+
evictionPolicy := &hatchet.EvictionPolicy{
31+
TTL: evictionTTLSeconds * time.Second,
32+
AllowCapacityEviction: true,
33+
Priority: 0,
34+
}
35+
36+
// > Evictable Sleep
37+
evictableSleep := client.NewStandaloneDurableTask("evictable-sleep",
38+
func(ctx hatchet.DurableContext, input EmptyInput) (EvictionOutput, error) {
39+
if _, err := ctx.SleepFor(longSleepSeconds * time.Second); err != nil {
40+
return EvictionOutput{}, err
41+
}
42+
return EvictionOutput{Status: "completed"}, nil
43+
},
44+
hatchet.WithExecutionTimeout(5*time.Minute),
45+
hatchet.WithEvictionPolicy(evictionPolicy),
46+
)
47+
48+
// > Evictable Wait For Event
49+
evictableWaitForEvent := client.NewStandaloneDurableTask("evictable-wait-for-event",
50+
func(ctx hatchet.DurableContext, input EmptyInput) (EvictionOutput, error) {
51+
if _, err := ctx.WaitForEvent(eventKey, "true"); err != nil {
52+
return EvictionOutput{}, err
53+
}
54+
return EvictionOutput{Status: "completed"}, nil
55+
},
56+
hatchet.WithExecutionTimeout(5*time.Minute),
57+
hatchet.WithEvictionPolicy(evictionPolicy),
58+
)
59+
60+
// > Non Evictable Sleep
61+
nonEvictablePolicy := &hatchet.EvictionPolicy{
62+
AllowCapacityEviction: false,
63+
Priority: 0,
64+
}
65+
66+
nonEvictableSleep := client.NewStandaloneDurableTask("non-evictable-sleep",
67+
func(ctx hatchet.DurableContext, input EmptyInput) (EvictionOutput, error) {
68+
if _, err := ctx.SleepFor(10 * time.Second); err != nil {
69+
return EvictionOutput{}, err
70+
}
71+
return EvictionOutput{Status: "completed"}, nil
72+
},
73+
hatchet.WithExecutionTimeout(5*time.Minute),
74+
hatchet.WithEvictionPolicy(nonEvictablePolicy),
75+
)
76+
77+
// > Capacity Evictable Sleep
78+
capacityEvictionPolicy := &hatchet.EvictionPolicy{
79+
AllowCapacityEviction: true,
80+
Priority: 0,
81+
}
82+
83+
capacityEvictableSleep := client.NewStandaloneDurableTask("capacity-evictable-sleep",
84+
func(ctx hatchet.DurableContext, input EmptyInput) (EvictionOutput, error) {
85+
if _, err := ctx.SleepFor(20 * time.Second); err != nil {
86+
return EvictionOutput{}, err
87+
}
88+
return EvictionOutput{Status: "completed"}, nil
89+
},
90+
hatchet.WithExecutionTimeout(5*time.Minute),
91+
hatchet.WithEvictionPolicy(capacityEvictionPolicy),
92+
)
93+
94+
worker, err := client.NewWorker("eviction-worker",
95+
hatchet.WithWorkflows(
96+
evictableSleep,
97+
evictableWaitForEvent,
98+
nonEvictableSleep,
99+
capacityEvictableSleep,
100+
),
101+
hatchet.WithDurableSlots(10),
102+
)
103+
if err != nil {
104+
log.Fatalf("failed to create worker: %v", err)
105+
}
106+
107+
interruptCtx, cancel := cmdutils.NewInterruptContext()
108+
defer cancel()
109+
110+
if err := worker.StartBlocking(interruptCtx); err != nil {
111+
log.Fatalf("failed to start worker: %v", err)
112+
}
113+
}

0 commit comments

Comments
 (0)