Skip to content

Commit a928781

Browse files
authored
Ignore invalid step IDs when inserting or replaying tasks (#3735)
* ignore with warning invalid step IDs * early return when no tasks remain after filtration
1 parent 80b2174 commit a928781

1 file changed

Lines changed: 34 additions & 4 deletions

File tree

pkg/repository/task.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1786,7 +1786,21 @@ func (r *sharedRepository) createTasks(
17861786
stepIdsToConfig[step.ID] = step
17871787
}
17881788

1789-
return r.insertTasks(ctx, tx, tenantId, tasks, stepIdsToConfig)
1789+
filteredTasks := make([]CreateTaskOpts, 0, len(tasks))
1790+
1791+
for _, task := range tasks {
1792+
if _, ok := stepIdsToConfig[task.StepId]; !ok {
1793+
r.l.Warn().Ctx(ctx).Str("step_id", task.StepId.String()).Str("external_id", task.ExternalId.String()).Msg("skipping task: step not found (may have been deleted)")
1794+
continue
1795+
}
1796+
filteredTasks = append(filteredTasks, task)
1797+
}
1798+
1799+
if len(filteredTasks) == 0 {
1800+
return []*V1TaskWithPayload{}, nil
1801+
}
1802+
1803+
return r.insertTasks(ctx, tx, tenantId, filteredTasks, stepIdsToConfig)
17901804
}
17911805

17921806
// insertTasks inserts new tasks into the database. note that we're using Postgres rules to automatically insert the created
@@ -2369,10 +2383,28 @@ func (r *sharedRepository) replayTasks(
23692383
stepIdsToConfig[step.ID] = step
23702384
}
23712385

2386+
filteredTasks := make([]ReplayTaskOpts, 0, len(tasks))
2387+
2388+
for _, task := range tasks {
2389+
if _, ok := stepIdsToConfig[task.StepId]; !ok {
2390+
r.l.Warn().Ctx(ctx).Str("step_id", task.StepId.String()).Str("external_id", task.ExternalId.String()).Msg("skipping replay task: step not found (may have been deleted)")
2391+
continue
2392+
}
2393+
filteredTasks = append(filteredTasks, task)
2394+
}
2395+
2396+
res := make([]*V1TaskWithPayload, 0)
2397+
2398+
if len(filteredTasks) == 0 {
2399+
return res, nil
2400+
}
2401+
2402+
tasks = filteredTasks
2403+
23722404
concurrencyStrats, err := r.getConcurrencyExpressions(ctx, tx, tenantId, stepIdsToConfig)
23732405

23742406
if err != nil {
2375-
return nil, fmt.Errorf("failed to get step expressions: %w", err)
2407+
return nil, fmt.Errorf("failed to get concurrency expressions: %w", err)
23762408
}
23772409

23782410
taskIds := make([]int64, len(tasks))
@@ -2549,8 +2581,6 @@ func (r *sharedRepository) replayTasks(
25492581
stepIdsToStorePayloadOpts[task.StepId] = append(stepIdsToStorePayloadOpts[task.StepId], storePayloadOpts)
25502582
}
25512583

2552-
res := make([]*V1TaskWithPayload, 0)
2553-
25542584
// for any initial states which are not queued, create a finalizing task event
25552585
eventTaskIdRetryCounts := make([]TaskIdInsertedAtRetryCount, 0)
25562586
eventTaskExternalIds := make([]uuid.UUID, 0)

0 commit comments

Comments
 (0)