Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1/server/handlers/workers/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (t *WorkerService) workerGetV1(ctx echo.Context, tenant *sqlcv1.Tenant, req
return nil, err
}

workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsByWorkerId(
workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsForWorkers(
reqCtx,
worker.Worker.TenantId,
[]uuid.UUID{worker.Worker.ID},
[]sqlcv1.Worker{worker.Worker},
)

if err != nil {
Expand Down
22 changes: 14 additions & 8 deletions api/v1/server/handlers/workers/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,27 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re
telemetry.AttributeKV{Key: "tenant.id", Value: tenant.ID},
)

workers, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts)
workerRows, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts)

if err != nil {
listSpan.RecordError(err)
return nil, err
}

workers := make([]sqlcv1.Worker, len(workerRows))

for i, workerRow := range workerRows {
workers[i] = workerRow.Worker
}

telemetry.WithAttributes(listSpan,
telemetry.AttributeKV{Key: "workers.count", Value: len(workers)},
)

workerIdSet := make(map[uuid.UUID]struct{})

for _, worker := range workers {
workerIdSet[worker.Worker.ID] = struct{}{}
workerIdSet[worker.ID] = struct{}{}
}

workerIds := make([]uuid.UUID, 0, len(workerIdSet))
Expand All @@ -128,10 +134,10 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re
telemetry.AttributeKV{Key: "workers.unique_ids.count", Value: len(workerIds)},
)

workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsByWorkerId(
workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsForWorkers(
listCtx,
tenant.ID,
workerIds,
workers,
)

if err != nil {
Expand Down Expand Up @@ -160,11 +166,11 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re

for i, worker := range workers {
workerCp := worker
actions := workerIdToActionIds[workerCp.Worker.ID.String()]
slotConfig := workerSlotConfig[workerCp.Worker.ID]
labels := workerIdToLabels[workerCp.Worker.ID]
actions := workerIdToActionIds[workerCp.ID.String()]
slotConfig := workerSlotConfig[workerCp.ID]
labels := workerIdToLabels[workerCp.ID]

rows[i] = *transformersv1.ToWorkerSqlc(&workerCp.Worker, slotConfig, actions, nil, labels)
rows[i] = *transformersv1.ToWorkerSqlc(&workerCp, slotConfig, actions, nil, labels)
}

return gen.WorkerList200JSONResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE "Worker" ADD COLUMN "actionHash" BYTEA;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE "Worker" DROP COLUMN "actionHash";
-- +goose StatementEnd
11 changes: 11 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260428213451_v1_0_104.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- +goose no transaction

-- +goose Up
-- +goose StatementBegin
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Worker_tenantId_actionHash_idx" ON "Worker" ("tenantId", "actionHash");
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX CONCURRENTLY IF EXISTS "Worker_tenantId_actionHash_idx";
-- +goose StatementEnd
1 change: 1 addition & 0 deletions pkg/repository/sqlcv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions pkg/repository/sqlcv1/workers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,28 @@ GROUP BY "tenantId"
;

-- name: GetWorkerActionsByWorkerId :many
WITH inputs AS (
SELECT UNNEST(@workerIds::UUID[]) AS "workerId"
)

SELECT
w."id" AS "workerId",
a."actionId" AS actionId
FROM "Worker" w
JOIN inputs i ON w."id" = i."workerId"
LEFT JOIN "_ActionToWorker" aw ON w.id = aw."B"
LEFT JOIN "Action" a ON aw."A" = a.id
JOIN "_ActionToWorker" aw ON w.id = aw."B"
JOIN "Action" a ON aw."A" = a.id
WHERE
a."tenantId" = @tenantId::UUID
AND w.id = ANY(@workerIds::UUID[])
;

-- name: GetWorkerActionsByWorkerActionHash :many
SELECT DISTINCT ON (w."actionHash")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goal here is we'll just move to this completely in a little bit, once all the workers have hashes

Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SELECT DISTINCT ON (w."actionHash") query only returns one actionId per actionHash (the most recent by createdAt), but GetWorkerActionsForWorkers expects to build the full list of actions for each hash. Remove the DISTINCT ON or change it to de-duplicate at the (actionHash, actionId) level so all actions are returned.

Suggested change
SELECT DISTINCT ON (w."actionHash")
SELECT

Copilot uses AI. Check for mistakes.
w."actionHash" AS "actionHash",
a."actionId" AS actionId
FROM "Worker" w
JOIN "_ActionToWorker" aw ON w.id = aw."B"
JOIN "Action" a ON aw."A" = a.id
WHERE
a."tenantId" = @tenantId::UUID
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetWorkerActionsByWorkerActionHash doesn’t constrain w."tenantId", so the new (tenantId, actionHash) index won’t be used and (if any cross-tenant links ever exist) results could bleed across tenants. Add AND w."tenantId" = @tenantId::UUID to the WHERE clause to match the index and keep the join tenant-scoped.

Suggested change
a."tenantId" = @tenantId::UUID
a."tenantId" = @tenantId::UUID
AND w."tenantId" = @tenantId::UUID

Copilot uses AI. Check for mistakes.
AND w."actionHash" = ANY(@actionHashes::BYTEA[])
ORDER BY w."actionHash", a."createdAt" DESC
;

-- name: GetWorkerWorkflowsByWorkerId :many
Expand Down Expand Up @@ -438,7 +447,8 @@ INSERT INTO "Worker" (
"language",
"languageVersion",
"os",
"runtimeExtra"
"runtimeExtra",
"actionHash"
) VALUES (
gen_random_uuid(),
CURRENT_TIMESTAMP,
Expand All @@ -451,7 +461,8 @@ INSERT INTO "Worker" (
sqlc.narg('language')::"WorkerSDKS",
sqlc.narg('languageVersion')::text,
sqlc.narg('os')::text,
sqlc.narg('runtimeExtra')::text
sqlc.narg('runtimeExtra')::text,
@actionHash::bytea
) RETURNING *;

-- name: LinkServicesToWorker :exec
Expand Down
Loading
Loading