Skip to content

Commit 6e9016a

Browse files
committed
feat: wiring, dual read using the hash sometimes
1 parent e7e543c commit 6e9016a

3 files changed

Lines changed: 108 additions & 5 deletions

File tree

pkg/repository/sqlcv1/workers.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,19 @@ WHERE
258258
AND w.id = ANY(@workerIds::UUID[])
259259
;
260260

261+
-- name: GetWorkerActionsByWorkerActionHash :many
262+
SELECT
263+
w."id" AS "workerId",
264+
a."actionId" AS actionId,
265+
w."actionHash" AS actionHash
266+
FROM "Worker" w
267+
JOIN "_ActionToWorker" aw ON w.id = aw."B"
268+
JOIN "Action" a ON aw."A" = a.id
269+
WHERE
270+
a."tenantId" = @tenantId::UUID
271+
AND w.actionHash = ANY(@actionHashes::BYTEA[])
272+
;
273+
261274
-- name: GetWorkerWorkflowsByWorkerId :many
262275
SELECT wf.*
263276
FROM "Worker" w

pkg/repository/sqlcv1/workers.sql.go

Lines changed: 44 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/worker.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,41 @@ func (w *workerRepository) CountActiveWorkersPerTenant(ctx context.Context) (map
277277
}
278278

279279
func (w *workerRepository) GetWorkerActionsForWorkers(ctx context.Context, tenantId uuid.UUID, workers []sqlcv1.Worker) (map[string][]string, error) {
280-
actionHashToWorkerId := make(map[uuid.UUID][]byte)
280+
actionHashSet := make(map[string]struct{})
281+
workerIds := make([]uuid.UUID, len(workers))
282+
actionHashToWorkerIds := make(map[string][]uuid.UUID)
281283

282284
for _, worker := range workers {
283-
actionHashToWorkerId[worker.ID] = worker.ActionsHash
285+
if len(worker.ActionHash) == 0 {
286+
// if the worker has no action has, we have no choice but to look
287+
// it up by its id
288+
workerIds = append(workerIds, worker.ID)
289+
continue
290+
}
291+
292+
actionHashToWorkerIds[string(worker.ActionHash)] = append(actionHashToWorkerIds[string(worker.ActionHash)], worker.ID)
293+
294+
if _, ok := actionHashSet[string(worker.ActionHash)]; !ok {
295+
actionHashSet[string(worker.ActionHash)] = struct{}{}
296+
}
284297
}
285298

286-
records, err := w.queries.GetWorkerActionsByWorkerId(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerIdParams{
299+
actionHashes := make([][]byte, 0, len(actionHashSet))
300+
301+
for actionHash := range actionHashSet {
302+
actionHashes = append(actionHashes, []byte(actionHash))
303+
}
304+
305+
recordsFromActionHashes, err := w.queries.GetWorkerActionsByWorkerActionHash(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerActionHashParams{
306+
Actionhashes: actionHashes,
307+
Tenantid: tenantId,
308+
})
309+
310+
if err != nil {
311+
return nil, err
312+
}
313+
314+
recordsFromWorkerIds, err := w.queries.GetWorkerActionsByWorkerId(ctx, w.pool, sqlcv1.GetWorkerActionsByWorkerIdParams{
287315
Workerids: workerIds,
288316
Tenantid: tenantId,
289317
})
@@ -294,7 +322,7 @@ func (w *workerRepository) GetWorkerActionsForWorkers(ctx context.Context, tenan
294322

295323
workerIdToActionIds := make(map[string][]string)
296324

297-
for _, record := range records {
325+
for _, record := range recordsFromWorkerIds {
298326
workerId := record.WorkerId.String()
299327

300328
if _, ok := workerIdToActionIds[workerId]; !ok {
@@ -304,6 +332,24 @@ func (w *workerRepository) GetWorkerActionsForWorkers(ctx context.Context, tenan
304332
workerIdToActionIds[workerId] = append(workerIdToActionIds[workerId], record.Actionid)
305333
}
306334

335+
for _, record := range recordsFromActionHashes {
336+
workerIds, ok := actionHashToWorkerIds[string(record.Actionhash)]
337+
338+
if !ok {
339+
continue
340+
}
341+
342+
for _, workerIdUuid := range workerIds {
343+
workerId := workerIdUuid.String()
344+
if _, ok := workerIdToActionIds[workerId]; !ok {
345+
workerIdToActionIds[workerId] = make([]string, 0)
346+
}
347+
348+
workerIdToActionIds[workerId] = append(workerIdToActionIds[workerId], record.Actionid)
349+
}
350+
351+
}
352+
307353
return workerIdToActionIds, nil
308354
}
309355

@@ -445,7 +491,7 @@ func (w *workerRepository) CreateNewWorker(ctx context.Context, tenantId uuid.UU
445491
Tenantid: tenantId,
446492
Dispatcherid: opts.DispatcherId,
447493
Name: opts.Name,
448-
Actionshash: hashActions(opts.Actions),
494+
Actionhash: hashActions(opts.Actions),
449495
}
450496

451497
// Default to self hosted

0 commit comments

Comments
 (0)