Skip to content

Commit 3e1d53a

Browse files
authored
Cleanup old workers via daily gocron (#3663)
* cleanup zombie workers and query use index * cleanup job rewrite * retention period * no advisory lock
1 parent f04fd94 commit 3e1d53a

7 files changed

Lines changed: 117 additions & 84 deletions

File tree

internal/services/controllers/retention/controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,23 @@ func (rc *RetentionControllerImpl) Start() (func() error, error) {
175175
}
176176
}
177177

178+
if rc.workerRetention {
179+
workerInterval := 24 * time.Hour
180+
181+
_, err := rc.s.NewJob(
182+
gocron.DurationJob(workerInterval),
183+
gocron.NewTask(
184+
rc.runCleanupOldWorkers(ctx),
185+
),
186+
gocron.WithSingletonMode(gocron.LimitModeReschedule),
187+
)
188+
189+
if err != nil {
190+
cancel()
191+
return nil, fmt.Errorf("could not set up runCleanupOldWorkers: %w", err)
192+
}
193+
}
194+
178195
rc.s.Start()
179196

180197
cleanup := func() error {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package retention
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
9+
"github.com/hatchet-dev/hatchet/pkg/telemetry"
10+
)
11+
12+
func (rc *RetentionControllerImpl) runCleanupOldWorkers(ctx context.Context) func() {
13+
return func() {
14+
rc.l.Debug().Ctx(ctx).Msg("retention controller: cleaning up old workers")
15+
16+
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
17+
defer cancel()
18+
19+
if err := rc.ForTenants(ctx, rc.cleanupOldWorkersForTenant); err != nil {
20+
rc.l.Err(err).Ctx(ctx).Msg("could not cleanup old workers")
21+
}
22+
}
23+
}
24+
25+
func (rc *RetentionControllerImpl) cleanupOldWorkersForTenant(ctx context.Context, tenant sqlcv1.Tenant) error {
26+
ctx, span := telemetry.NewSpan(ctx, "cleanup-old-workers-tenant")
27+
defer span.End()
28+
29+
cutoff, err := GetDataRetentionExpiredTime(tenant.DataRetentionPeriod)
30+
if err != nil {
31+
return fmt.Errorf("could not get cutoff for tenant %s: %w", tenant.ID.String(), err)
32+
}
33+
34+
shouldContinue := true
35+
36+
for shouldContinue {
37+
if ctx.Err() != nil {
38+
return ctx.Err()
39+
}
40+
41+
shouldContinue, err = rc.repo.Workers().CleanupOldWorkers(ctx, tenant.ID, cutoff)
42+
if err != nil {
43+
return fmt.Errorf("could not cleanup old workers for tenant %s: %w", tenant.ID.String(), err)
44+
}
45+
}
46+
47+
return nil
48+
}

pkg/repository/sqlcv1/tasks.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ WITH tasks_on_inactive_workers AS (
425425
v1_task_runtime runtime ON w."id" = runtime.worker_id
426426
WHERE
427427
w."tenantId" = @tenantId::uuid
428+
AND w."tenantId" = runtime.tenant_id
428429
AND w."lastHeartbeatAt" < NOW() - INTERVAL '30 seconds'
429430
-- evicted tasks are not eligible for re-assignment
430431
AND runtime.evicted_at IS NULL

pkg/repository/sqlcv1/tasks.sql.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/sqlcv1/workers.sql

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -379,36 +379,16 @@ SET
379379
"strValue" = sqlc.narg('strValue')::text
380380
RETURNING *;
381381

382-
-- name: DeleteOldWorkers :one
383-
WITH for_delete AS (
384-
SELECT
385-
"id"
386-
FROM "Worker" w
387-
WHERE
388-
w."tenantId" = @tenantId::uuid AND
389-
w."lastHeartbeatAt" < @lastHeartbeatBefore::timestamp
390-
LIMIT sqlc.arg('limit') + 1
391-
), expired_with_limit AS (
392-
SELECT
393-
for_delete."id" as "id"
394-
FROM for_delete
395-
LIMIT sqlc.arg('limit')
396-
), has_more AS (
397-
SELECT
398-
CASE
399-
WHEN COUNT(*) > sqlc.arg('limit') THEN TRUE
400-
ELSE FALSE
401-
END as has_more
402-
FROM for_delete
403-
), delete_events AS (
404-
DELETE FROM "WorkerAssignEvent" wae
405-
WHERE wae."workerId" IN (SELECT "id" FROM expired_with_limit)
406-
RETURNING wae."id"
382+
-- name: CleanupOldWorkers :execresult
383+
WITH old_workers AS (
384+
SELECT "id"
385+
FROM "Worker"
386+
WHERE "tenantId" = @tenantId::uuid
387+
AND "lastHeartbeatAt" < @lastHeartbeatBefore::timestamp
388+
LIMIT @batchSize::int
407389
)
408-
DELETE FROM "Worker" w
409-
WHERE w."id" IN (SELECT "id" FROM expired_with_limit)
410-
RETURNING
411-
(SELECT has_more FROM has_more) as has_more;
390+
DELETE FROM "Worker"
391+
WHERE "id" IN (SELECT "id" FROM old_workers);
412392

413393
-- name: ListDispatcherIdsForWorkers :many
414394
SELECT

pkg/repository/sqlcv1/workers.sql.go

Lines changed: 23 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/worker.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type WorkerRepository interface {
124124

125125
UpsertWorkerLabels(ctx context.Context, workerId uuid.UUID, opts []UpsertWorkerLabelOpts) ([]*sqlcv1.WorkerLabel, error)
126126

127-
DeleteOldWorkers(ctx context.Context, tenantId uuid.UUID, lastHeartbeatBefore time.Time) (bool, error)
127+
CleanupOldWorkers(ctx context.Context, tenantId uuid.UUID, lastHeartbeatBefore time.Time) (bool, error)
128128

129129
GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []uuid.UUID) (map[uuid.UUID]uuid.UUID, map[uuid.UUID]struct{}, error)
130130

@@ -723,22 +723,30 @@ func (w *workerRepository) UpsertWorkerLabels(ctx context.Context, workerId uuid
723723
return affinities, nil
724724
}
725725

726-
func (w *workerRepository) DeleteOldWorkers(ctx context.Context, tenantId uuid.UUID, lastHeartbeatBefore time.Time) (bool, error) {
727-
hasMore, err := w.queries.DeleteOldWorkers(ctx, w.pool, sqlcv1.DeleteOldWorkersParams{
726+
func (w *workerRepository) CleanupOldWorkers(ctx context.Context, tenantId uuid.UUID, lastHeartbeatBefore time.Time) (bool, error) {
727+
const timeout = 1000 * 60 * 3 // 3 minutes
728+
const batchSize int32 = 10000
729+
730+
tx, commit, rollback, err := sqlchelpers.PrepareTxWithStatementTimeout(ctx, w.pool, w.l, timeout)
731+
if err != nil {
732+
return false, fmt.Errorf("error beginning transaction: %w", err)
733+
}
734+
defer rollback()
735+
736+
result, err := w.queries.CleanupOldWorkers(ctx, tx, sqlcv1.CleanupOldWorkersParams{
728737
Tenantid: tenantId,
729738
Lastheartbeatbefore: sqlchelpers.TimestampFromTime(lastHeartbeatBefore),
730-
Limit: 20,
739+
Batchsize: batchSize,
731740
})
732-
733741
if err != nil {
734-
if errors.Is(err, pgx.ErrNoRows) {
735-
return false, nil
736-
}
742+
return false, fmt.Errorf("error cleaning up old workers: %w", err)
743+
}
737744

738-
return false, err
745+
if err := commit(ctx); err != nil {
746+
return false, fmt.Errorf("error committing transaction: %w", err)
739747
}
740748

741-
return hasMore, nil
749+
return result.RowsAffected() == int64(batchSize), nil
742750
}
743751

744752
func (w *workerRepository) GetDispatcherIdsForWorkers(ctx context.Context, tenantId uuid.UUID, workerIds []uuid.UUID) (map[uuid.UUID]uuid.UUID, map[uuid.UUID]struct{}, error) {

0 commit comments

Comments
 (0)