diff --git a/api/v1/server/handlers/workers/get.go b/api/v1/server/handlers/workers/get.go index cd55e20b15..0d34973ba0 100644 --- a/api/v1/server/handlers/workers/get.go +++ b/api/v1/server/handlers/workers/get.go @@ -27,10 +27,10 @@ func (t *WorkerService) workerGetV1(ctx echo.Context, tenant *sqlcv1.Tenant, req return nil, err } - workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsByWorkerId( + workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsForWorkers( reqCtx, worker.Worker.TenantId, - []uuid.UUID{worker.Worker.ID}, + []sqlcv1.Worker{worker.Worker}, ) if err != nil { diff --git a/api/v1/server/handlers/workers/list.go b/api/v1/server/handlers/workers/list.go index ba7157d3f6..39920f79c3 100644 --- a/api/v1/server/handlers/workers/list.go +++ b/api/v1/server/handlers/workers/list.go @@ -99,13 +99,19 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re telemetry.AttributeKV{Key: "tenant.id", Value: tenant.ID}, ) - workers, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts) + workerRows, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts) if err != nil { listSpan.RecordError(err) return nil, err } + workers := make([]sqlcv1.Worker, len(workerRows)) + + for i, workerRow := range workerRows { + workers[i] = workerRow.Worker + } + telemetry.WithAttributes(listSpan, telemetry.AttributeKV{Key: "workers.count", Value: len(workers)}, ) @@ -113,7 +119,7 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re workerIdSet := make(map[uuid.UUID]struct{}) for _, worker := range workers { - workerIdSet[worker.Worker.ID] = struct{}{} + workerIdSet[worker.ID] = struct{}{} } workerIds := make([]uuid.UUID, 0, len(workerIdSet)) @@ -128,10 +134,10 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re telemetry.AttributeKV{Key: "workers.unique_ids.count", Value: len(workerIds)}, ) - workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsByWorkerId( + workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsForWorkers( listCtx, tenant.ID, - workerIds, + workers, ) if err != nil { @@ -160,11 +166,11 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re for i, worker := range workers { workerCp := worker - actions := workerIdToActionIds[workerCp.Worker.ID.String()] - slotConfig := workerSlotConfig[workerCp.Worker.ID] - labels := workerIdToLabels[workerCp.Worker.ID] + actions := workerIdToActionIds[workerCp.ID.String()] + slotConfig := workerSlotConfig[workerCp.ID] + labels := workerIdToLabels[workerCp.ID] - rows[i] = *transformersv1.ToWorkerSqlc(&workerCp.Worker, slotConfig, actions, nil, labels) + rows[i] = *transformersv1.ToWorkerSqlc(&workerCp, slotConfig, actions, nil, labels) } return gen.WorkerList200JSONResponse( diff --git a/cmd/hatchet-migrate/migrate/migrations/20260428204451_v1_0_103.sql b/cmd/hatchet-migrate/migrate/migrations/20260428204451_v1_0_103.sql new file mode 100644 index 0000000000..b730446dba --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20260428204451_v1_0_103.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE "Worker" ADD COLUMN "actionHash" BYTEA; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE "Worker" DROP COLUMN "actionHash"; +-- +goose StatementEnd diff --git a/cmd/hatchet-migrate/migrate/migrations/20260428213451_v1_0_104.sql b/cmd/hatchet-migrate/migrate/migrations/20260428213451_v1_0_104.sql new file mode 100644 index 0000000000..b7e2f179a4 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20260428213451_v1_0_104.sql @@ -0,0 +1,11 @@ +-- +goose no transaction + +-- +goose Up +-- +goose StatementBegin +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Worker_tenantId_actionHash_idx" ON "Worker" ("tenantId", "actionHash"); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX CONCURRENTLY IF EXISTS "Worker_tenantId_actionHash_idx"; +-- +goose StatementEnd diff --git a/pkg/repository/sqlcv1/models.go b/pkg/repository/sqlcv1/models.go index 3e881b85d6..e905057ae7 100644 --- a/pkg/repository/sqlcv1/models.go +++ b/pkg/repository/sqlcv1/models.go @@ -3814,6 +3814,7 @@ type Worker struct { RuntimeExtra pgtype.Text `json:"runtimeExtra"` SdkVersion pgtype.Text `json:"sdkVersion"` DurableTaskDispatcherId *uuid.UUID `json:"durableTaskDispatcherId"` + ActionHash []byte `json:"actionHash"` } type WorkerAssignEvent struct { diff --git a/pkg/repository/sqlcv1/workers.sql b/pkg/repository/sqlcv1/workers.sql index 4ce08abbc2..6804ec1f8a 100644 --- a/pkg/repository/sqlcv1/workers.sql +++ b/pkg/repository/sqlcv1/workers.sql @@ -247,19 +247,28 @@ GROUP BY "tenantId" ; -- name: GetWorkerActionsByWorkerId :many -WITH inputs AS ( - SELECT UNNEST(@workerIds::UUID[]) AS "workerId" -) - SELECT w."id" AS "workerId", a."actionId" AS actionId FROM "Worker" w -JOIN inputs i ON w."id" = i."workerId" -LEFT JOIN "_ActionToWorker" aw ON w.id = aw."B" -LEFT JOIN "Action" a ON aw."A" = a.id +JOIN "_ActionToWorker" aw ON w.id = aw."B" +JOIN "Action" a ON aw."A" = a.id +WHERE + a."tenantId" = @tenantId::UUID + AND w.id = ANY(@workerIds::UUID[]) +; + +-- name: GetWorkerActionsByWorkerActionHash :many +SELECT DISTINCT ON (w."actionHash") + w."actionHash" AS "actionHash", + a."actionId" AS actionId +FROM "Worker" w +JOIN "_ActionToWorker" aw ON w.id = aw."B" +JOIN "Action" a ON aw."A" = a.id WHERE a."tenantId" = @tenantId::UUID + AND w."actionHash" = ANY(@actionHashes::BYTEA[]) +ORDER BY w."actionHash", a."createdAt" DESC ; -- name: GetWorkerWorkflowsByWorkerId :many @@ -438,7 +447,8 @@ INSERT INTO "Worker" ( "language", "languageVersion", "os", - "runtimeExtra" + "runtimeExtra", + "actionHash" ) VALUES ( gen_random_uuid(), CURRENT_TIMESTAMP, @@ -451,7 +461,8 @@ INSERT INTO "Worker" ( sqlc.narg('language')::"WorkerSDKS", sqlc.narg('languageVersion')::text, sqlc.narg('os')::text, - sqlc.narg('runtimeExtra')::text + sqlc.narg('runtimeExtra')::text, + @actionHash::bytea ) RETURNING *; -- name: LinkServicesToWorker :exec diff --git a/pkg/repository/sqlcv1/workers.sql.go b/pkg/repository/sqlcv1/workers.sql.go index 288fe1309f..326e7f0e1d 100644 --- a/pkg/repository/sqlcv1/workers.sql.go +++ b/pkg/repository/sqlcv1/workers.sql.go @@ -51,7 +51,8 @@ INSERT INTO "Worker" ( "language", "languageVersion", "os", - "runtimeExtra" + "runtimeExtra", + "actionHash" ) VALUES ( gen_random_uuid(), CURRENT_TIMESTAMP, @@ -64,8 +65,9 @@ INSERT INTO "Worker" ( $6::"WorkerSDKS", $7::text, $8::text, - $9::text -) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" + $9::text, + $10::bytea +) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` type CreateWorkerParams struct { @@ -78,6 +80,7 @@ type CreateWorkerParams struct { LanguageVersion pgtype.Text `json:"languageVersion"` Os pgtype.Text `json:"os"` RuntimeExtra pgtype.Text `json:"runtimeExtra"` + Actionhash []byte `json:"actionhash"` } func (q *Queries) CreateWorker(ctx context.Context, db DBTX, arg CreateWorkerParams) (*Worker, error) { @@ -91,6 +94,7 @@ func (q *Queries) CreateWorker(ctx context.Context, db DBTX, arg CreateWorkerPar arg.LanguageVersion, arg.Os, arg.RuntimeExtra, + arg.Actionhash, ) var i Worker err := row.Scan( @@ -114,6 +118,7 @@ func (q *Queries) CreateWorker(ctx context.Context, db DBTX, arg CreateWorkerPar &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } @@ -162,7 +167,7 @@ DELETE FROM "Worker" WHERE "id" = $1::uuid -RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` func (q *Queries) DeleteWorker(ctx context.Context, db DBTX, id uuid.UUID) (*Worker, error) { @@ -189,13 +194,14 @@ func (q *Queries) DeleteWorker(ctx context.Context, db DBTX, id uuid.UUID) (*Wor &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } const getActiveWorkerById = `-- name: GetActiveWorkerById :one SELECT - w.id, w."createdAt", w."updatedAt", w."deletedAt", w."tenantId", w."lastHeartbeatAt", w.name, w."dispatcherId", w."maxRuns", w."isActive", w."lastListenerEstablished", w."isPaused", w.type, w."webhookId", w.language, w."languageVersion", w.os, w."runtimeExtra", w."sdkVersion", w."durableTaskDispatcherId", + w.id, w."createdAt", w."updatedAt", w."deletedAt", w."tenantId", w."lastHeartbeatAt", w.name, w."dispatcherId", w."maxRuns", w."isActive", w."lastListenerEstablished", w."isPaused", w.type, w."webhookId", w.language, w."languageVersion", w.os, w."runtimeExtra", w."sdkVersion", w."durableTaskDispatcherId", w."actionHash", ww."url" AS "webhookUrl", w."maxRuns" - ( SELECT COUNT(*) @@ -252,26 +258,66 @@ func (q *Queries) GetActiveWorkerById(ctx context.Context, db DBTX, arg GetActiv &i.Worker.RuntimeExtra, &i.Worker.SdkVersion, &i.Worker.DurableTaskDispatcherId, + &i.Worker.ActionHash, &i.WebhookUrl, &i.RemainingSlots, ) return &i, err } -const getWorkerActionsByWorkerId = `-- name: GetWorkerActionsByWorkerId :many -WITH inputs AS ( - SELECT UNNEST($2::UUID[]) AS "workerId" -) +const getWorkerActionsByWorkerActionHash = `-- name: GetWorkerActionsByWorkerActionHash :many +SELECT DISTINCT ON (w."actionHash") + w."actionHash" AS "actionHash", + a."actionId" AS actionId +FROM "Worker" w +JOIN "_ActionToWorker" aw ON w.id = aw."B" +JOIN "Action" a ON aw."A" = a.id +WHERE + a."tenantId" = $1::UUID + AND w."actionHash" = ANY($2::BYTEA[]) +ORDER BY w."actionHash", a."createdAt" DESC +` + +type GetWorkerActionsByWorkerActionHashParams struct { + Tenantid uuid.UUID `json:"tenantid"` + Actionhashes [][]byte `json:"actionhashes"` +} + +type GetWorkerActionsByWorkerActionHashRow struct { + ActionHash []byte `json:"actionHash"` + Actionid string `json:"actionid"` +} + +func (q *Queries) GetWorkerActionsByWorkerActionHash(ctx context.Context, db DBTX, arg GetWorkerActionsByWorkerActionHashParams) ([]*GetWorkerActionsByWorkerActionHashRow, error) { + rows, err := db.Query(ctx, getWorkerActionsByWorkerActionHash, arg.Tenantid, arg.Actionhashes) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*GetWorkerActionsByWorkerActionHashRow + for rows.Next() { + var i GetWorkerActionsByWorkerActionHashRow + if err := rows.Scan(&i.ActionHash, &i.Actionid); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} +const getWorkerActionsByWorkerId = `-- name: GetWorkerActionsByWorkerId :many SELECT w."id" AS "workerId", a."actionId" AS actionId FROM "Worker" w -JOIN inputs i ON w."id" = i."workerId" -LEFT JOIN "_ActionToWorker" aw ON w.id = aw."B" -LEFT JOIN "Action" a ON aw."A" = a.id +JOIN "_ActionToWorker" aw ON w.id = aw."B" +JOIN "Action" a ON aw."A" = a.id WHERE a."tenantId" = $1::UUID + AND w.id = ANY($2::UUID[]) ` type GetWorkerActionsByWorkerIdParams struct { @@ -280,8 +326,8 @@ type GetWorkerActionsByWorkerIdParams struct { } type GetWorkerActionsByWorkerIdRow struct { - WorkerId uuid.UUID `json:"workerId"` - Actionid pgtype.Text `json:"actionid"` + WorkerId uuid.UUID `json:"workerId"` + Actionid string `json:"actionid"` } func (q *Queries) GetWorkerActionsByWorkerId(ctx context.Context, db DBTX, arg GetWorkerActionsByWorkerIdParams) ([]*GetWorkerActionsByWorkerIdRow, error) { @@ -306,7 +352,7 @@ func (q *Queries) GetWorkerActionsByWorkerId(ctx context.Context, db DBTX, arg G const getWorkerById = `-- name: GetWorkerById :one SELECT - w.id, w."createdAt", w."updatedAt", w."deletedAt", w."tenantId", w."lastHeartbeatAt", w.name, w."dispatcherId", w."maxRuns", w."isActive", w."lastListenerEstablished", w."isPaused", w.type, w."webhookId", w.language, w."languageVersion", w.os, w."runtimeExtra", w."sdkVersion", w."durableTaskDispatcherId" + w.id, w."createdAt", w."updatedAt", w."deletedAt", w."tenantId", w."lastHeartbeatAt", w.name, w."dispatcherId", w."maxRuns", w."isActive", w."lastListenerEstablished", w."isPaused", w.type, w."webhookId", w.language, w."languageVersion", w.os, w."runtimeExtra", w."sdkVersion", w."durableTaskDispatcherId", w."actionHash" FROM "Worker" w WHERE @@ -341,6 +387,7 @@ func (q *Queries) GetWorkerById(ctx context.Context, db DBTX, id uuid.UUID) (*Ge &i.Worker.RuntimeExtra, &i.Worker.SdkVersion, &i.Worker.DurableTaskDispatcherId, + &i.Worker.ActionHash, ) return &i, err } @@ -1179,7 +1226,7 @@ func (q *Queries) ListWorkerSlotConfigs(ctx context.Context, db DBTX, arg ListWo const listWorkers = `-- name: ListWorkers :many SELECT - workers.id, workers."createdAt", workers."updatedAt", workers."deletedAt", workers."tenantId", workers."lastHeartbeatAt", workers.name, workers."dispatcherId", workers."maxRuns", workers."isActive", workers."lastListenerEstablished", workers."isPaused", workers.type, workers."webhookId", workers.language, workers."languageVersion", workers.os, workers."runtimeExtra", workers."sdkVersion", workers."durableTaskDispatcherId" + workers.id, workers."createdAt", workers."updatedAt", workers."deletedAt", workers."tenantId", workers."lastHeartbeatAt", workers.name, workers."dispatcherId", workers."maxRuns", workers."isActive", workers."lastListenerEstablished", workers."isPaused", workers.type, workers."webhookId", workers.language, workers."languageVersion", workers.os, workers."runtimeExtra", workers."sdkVersion", workers."durableTaskDispatcherId", workers."actionHash" FROM "Worker" workers WHERE @@ -1259,6 +1306,7 @@ func (q *Queries) ListWorkers(ctx context.Context, db DBTX, arg ListWorkersParam &i.Worker.RuntimeExtra, &i.Worker.SdkVersion, &i.Worker.DurableTaskDispatcherId, + &i.Worker.ActionHash, ); err != nil { return nil, err } @@ -1281,7 +1329,7 @@ SET "isPaused" = coalesce($4::boolean, "isPaused") WHERE "id" = $5::uuid -RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` type UpdateWorkerParams struct { @@ -1322,6 +1370,7 @@ func (q *Queries) UpdateWorker(ctx context.Context, db DBTX, arg UpdateWorkerPar &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } @@ -1337,7 +1386,7 @@ WHERE "lastListenerEstablished" IS NULL OR "lastListenerEstablished" <= $2::timestamp ) -RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` type UpdateWorkerActiveStatusParams struct { @@ -1370,6 +1419,7 @@ func (q *Queries) UpdateWorkerActiveStatus(ctx context.Context, db DBTX, arg Upd &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } @@ -1382,7 +1432,7 @@ SET WHERE "id" = $2::uuid AND "tenantId" = $3::uuid -RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` type UpdateWorkerDurableTaskDispatcherIdParams struct { @@ -1415,6 +1465,7 @@ func (q *Queries) UpdateWorkerDurableTaskDispatcherId(ctx context.Context, db DB &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } @@ -1427,7 +1478,7 @@ SET "lastHeartbeatAt" = $1::timestamp WHERE "id" = $2::uuid -RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "lastHeartbeatAt", name, "dispatcherId", "maxRuns", "isActive", "lastListenerEstablished", "isPaused", type, "webhookId", language, "languageVersion", os, "runtimeExtra", "sdkVersion", "durableTaskDispatcherId", "actionHash" ` type UpdateWorkerHeartbeatParams struct { @@ -1459,6 +1510,7 @@ func (q *Queries) UpdateWorkerHeartbeat(ctx context.Context, db DBTX, arg Update &i.RuntimeExtra, &i.SdkVersion, &i.DurableTaskDispatcherId, + &i.ActionHash, ) return &i, err } diff --git a/pkg/repository/worker.go b/pkg/repository/worker.go index 6b50a37165..c52c45035c 100644 --- a/pkg/repository/worker.go +++ b/pkg/repository/worker.go @@ -2,6 +2,7 @@ package repository import ( "context" + "crypto/sha256" "errors" "fmt" "time" @@ -88,7 +89,7 @@ type WorkerRepository interface { ListActiveSDKsPerTenant(ctx context.Context) (map[TenantIdSDKTuple]int64, error) // GetWorkerActionsByWorkerId returns a list of actions for a worker - GetWorkerActionsByWorkerId(ctx context.Context, tenantId uuid.UUID, workerId []uuid.UUID) (map[string][]string, error) + GetWorkerActionsForWorkers(ctx context.Context, tenantId uuid.UUID, workers []sqlcv1.Worker) (map[string][]string, error) // GetWorkerWorkflowsByWorkerId returns a list of workflows for a worker GetWorkerWorkflowsByWorkerId(ctx context.Context, tenantId uuid.UUID, workerId uuid.UUID) ([]*sqlcv1.Workflow, error) @@ -275,8 +276,42 @@ func (w *workerRepository) CountActiveWorkersPerTenant(ctx context.Context) (map return tenantToWorkers, nil } -func (w *workerRepository) GetWorkerActionsByWorkerId(ctx context.Context, tenantId uuid.UUID, workerIds []uuid.UUID) (map[string][]string, error) { - records, err := w.queries.GetWorkerActionsByWorkerId(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerIdParams{ +func (w *workerRepository) GetWorkerActionsForWorkers(ctx context.Context, tenantId uuid.UUID, workers []sqlcv1.Worker) (map[string][]string, error) { + actionHashSet := make(map[string]struct{}) + workerIds := make([]uuid.UUID, len(workers)) + actionHashToWorkerIds := make(map[string][]uuid.UUID) + + for _, worker := range workers { + if len(worker.ActionHash) == 0 { + // if the worker has no action hash, we have no choice but to look + // it up by its id + workerIds = append(workerIds, worker.ID) + continue + } + + actionHashToWorkerIds[string(worker.ActionHash)] = append(actionHashToWorkerIds[string(worker.ActionHash)], worker.ID) + + if _, ok := actionHashSet[string(worker.ActionHash)]; !ok { + actionHashSet[string(worker.ActionHash)] = struct{}{} + } + } + + actionHashes := make([][]byte, 0, len(actionHashSet)) + + for actionHash := range actionHashSet { + actionHashes = append(actionHashes, []byte(actionHash)) + } + + recordsFromActionHashes, err := w.queries.GetWorkerActionsByWorkerActionHash(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerActionHashParams{ + Actionhashes: actionHashes, + Tenantid: tenantId, + }) + + if err != nil { + return nil, err + } + + recordsFromWorkerIds, err := w.queries.GetWorkerActionsByWorkerId(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerIdParams{ Workerids: workerIds, Tenantid: tenantId, }) @@ -287,15 +322,32 @@ func (w *workerRepository) GetWorkerActionsByWorkerId(ctx context.Context, tenan workerIdToActionIds := make(map[string][]string) - for _, record := range records { + for _, record := range recordsFromWorkerIds { workerId := record.WorkerId.String() - actionId := record.Actionid.String if _, ok := workerIdToActionIds[workerId]; !ok { workerIdToActionIds[workerId] = make([]string, 0) } - workerIdToActionIds[workerId] = append(workerIdToActionIds[workerId], actionId) + workerIdToActionIds[workerId] = append(workerIdToActionIds[workerId], record.Actionid) + } + + for _, record := range recordsFromActionHashes { + workerIds, ok := actionHashToWorkerIds[string(record.ActionHash)] + + if !ok { + continue + } + + for _, workerIdUuid := range workerIds { + workerId := workerIdUuid.String() + if _, ok := workerIdToActionIds[workerId]; !ok { + workerIdToActionIds[workerId] = make([]string, 0) + } + + workerIdToActionIds[workerId] = append(workerIdToActionIds[workerId], record.Actionid) + } + } return workerIdToActionIds, nil @@ -393,6 +445,16 @@ func (w *workerRepository) GetWorkerForEngine(ctx context.Context, tenantId uuid }) } +func hashActions(actions []string) []byte { + h := sha256.New() + + for _, action := range actions { + h.Write([]byte(action)) + } + + return h.Sum(nil) +} + func (w *workerRepository) CreateNewWorker(ctx context.Context, tenantId uuid.UUID, opts *CreateWorkerOpts) (*sqlcv1.Worker, error) { preWorker, postWorker := w.m.Meter(ctx, sqlcv1.LimitResourceWORKER, tenantId, 1) @@ -429,6 +491,7 @@ func (w *workerRepository) CreateNewWorker(ctx context.Context, tenantId uuid.UU Tenantid: tenantId, Dispatcherid: opts.DispatcherId, Name: opts.Name, + Actionhash: hashActions(opts.Actions), } // Default to self hosted diff --git a/sql/schema/v0.sql b/sql/schema/v0.sql index 4b2d657fb4..789b66e1f2 100644 --- a/sql/schema/v0.sql +++ b/sql/schema/v0.sql @@ -864,6 +864,7 @@ CREATE TABLE "Worker" ( "runtimeExtra" TEXT, "sdkVersion" TEXT, "durableTaskDispatcherId" UUID, + "actionHash" BYTEA, CONSTRAINT "Worker_pkey" PRIMARY KEY ("id") ); @@ -1441,6 +1442,9 @@ CREATE UNIQUE INDEX "Worker_webhookId_key" ON "Worker" ("webhookId" ASC); CREATE INDEX "Worker_tenantId_lastHeartbeatAt_idx" ON "Worker" ("tenantId", "lastHeartbeatAt"); +-- CreateIndex +CREATE INDEX "Worker_tenantId_actionHash_idx" ON "Worker" ("tenantId", "actionHash"); + -- CreateIndex CREATE INDEX "WorkerAssignEvent_workerId_id_idx" ON "WorkerAssignEvent" ("workerId" ASC, "id" ASC);