Skip to content

Commit b362a9f

Browse files
authored
Fix: do nothing on conflict, and diff out existing rows (#3752)
* fix: on conflict, diffing * debug: try unpinning * fix: oss migration
1 parent 0017dd4 commit b362a9f

2 files changed

Lines changed: 66 additions & 76 deletions

File tree

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
2121

2222
- name: Install Protoc
23-
uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0
23+
uses: arduino/setup-protoc
2424
with:
2525
version: "29.3"
2626
repo-token: ${{ secrets.GITHUB_TOKEN }}

cmd/hatchet-migrate/migrate/migrations/20260424190713_v1_0_99.go

Lines changed: 65 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -139,27 +139,22 @@ const v1RunsOlapBackfill = `INSERT INTO v1_runs_olap_new (
139139
parent_task_external_id
140140
)
141141
SELECT
142-
tenant_id,
143-
id,
144-
inserted_at,
145-
external_id,
146-
readable_status,
147-
kind,
148-
workflow_id,
149-
workflow_version_id,
150-
additional_metadata,
151-
parent_task_external_id
152-
FROM v1_runs_olap
153-
ON CONFLICT (inserted_at, id) DO UPDATE
154-
SET
155-
readable_status = CASE
156-
WHEN v1_status_to_priority(v1_runs_olap_new.readable_status) > v1_status_to_priority(EXCLUDED.readable_status) THEN v1_runs_olap_new.readable_status
157-
ELSE EXCLUDED.readable_status
158-
END,
159-
kind = CASE
160-
WHEN v1_status_to_priority(v1_runs_olap_new.readable_status) > v1_status_to_priority(EXCLUDED.readable_status) THEN v1_runs_olap_new.kind
161-
ELSE EXCLUDED.kind
162-
END
142+
src.tenant_id,
143+
src.id,
144+
src.inserted_at,
145+
src.external_id,
146+
src.readable_status,
147+
src.kind,
148+
src.workflow_id,
149+
src.workflow_version_id,
150+
src.additional_metadata,
151+
src.parent_task_external_id
152+
FROM v1_runs_olap src
153+
WHERE NOT EXISTS (
154+
SELECT 1 FROM v1_runs_olap_new n
155+
WHERE n.id = src.id AND n.inserted_at = src.inserted_at
156+
)
157+
ON CONFLICT DO NOTHING
163158
`
164159

165160
const v1TasksOlapNewColDefs = `
@@ -340,41 +335,37 @@ const v1TasksOlapBackfill = `INSERT INTO v1_tasks_olap_new (
340335
is_durable
341336
)
342337
SELECT
343-
tenant_id,
344-
id,
345-
inserted_at,
346-
external_id,
347-
queue,
348-
action_id,
349-
step_id,
350-
workflow_id,
351-
workflow_version_id,
352-
workflow_run_id,
353-
schedule_timeout,
354-
step_timeout,
355-
priority,
356-
sticky,
357-
desired_worker_id,
358-
display_name,
359-
input,
360-
additional_metadata,
361-
readable_status,
362-
latest_retry_count,
363-
latest_worker_id,
364-
dag_id,
365-
dag_inserted_at,
366-
parent_task_external_id,
367-
is_durable
368-
FROM v1_tasks_olap
369-
ON CONFLICT (inserted_at, id) DO UPDATE
370-
SET
371-
readable_status = CASE
372-
WHEN
373-
v1_status_to_priority(v1_tasks_olap_new.readable_status) > v1_status_to_priority(EXCLUDED.readable_status)
374-
OR v1_tasks_olap_new.latest_retry_count > EXCLUDED.latest_retry_count
375-
THEN v1_tasks_olap_new.readable_status
376-
ELSE EXCLUDED.readable_status
377-
END
338+
src.tenant_id,
339+
src.id,
340+
src.inserted_at,
341+
src.external_id,
342+
src.queue,
343+
src.action_id,
344+
src.step_id,
345+
src.workflow_id,
346+
src.workflow_version_id,
347+
src.workflow_run_id,
348+
src.schedule_timeout,
349+
src.step_timeout,
350+
src.priority,
351+
src.sticky,
352+
src.desired_worker_id,
353+
src.display_name,
354+
src.input,
355+
src.additional_metadata,
356+
src.readable_status,
357+
src.latest_retry_count,
358+
src.latest_worker_id,
359+
src.dag_id,
360+
src.dag_inserted_at,
361+
src.parent_task_external_id,
362+
src.is_durable
363+
FROM v1_tasks_olap src
364+
WHERE NOT EXISTS (
365+
SELECT 1 FROM v1_tasks_olap_new n
366+
WHERE n.id = src.id AND n.inserted_at = src.inserted_at
367+
)
368+
ON CONFLICT DO NOTHING
378369
`
379370

380371
const v1DagsOlapNewColDefs = `
@@ -477,25 +468,24 @@ const v1DagsOlapBackfill = `INSERT INTO v1_dags_olap_new (
477468
total_tasks
478469
)
479470
SELECT
480-
id,
481-
inserted_at,
482-
tenant_id,
483-
external_id,
484-
display_name,
485-
workflow_id,
486-
workflow_version_id,
487-
readable_status,
488-
input,
489-
additional_metadata,
490-
parent_task_external_id,
491-
total_tasks
492-
FROM v1_dags_olap
493-
ON CONFLICT (inserted_at, id) DO UPDATE
494-
SET
495-
readable_status = CASE
496-
WHEN v1_status_to_priority(v1_dags_olap_new.readable_status) > v1_status_to_priority(EXCLUDED.readable_status) THEN v1_dags_olap_new.readable_status
497-
ELSE EXCLUDED.readable_status
498-
END
471+
src.id,
472+
src.inserted_at,
473+
src.tenant_id,
474+
src.external_id,
475+
src.display_name,
476+
src.workflow_id,
477+
src.workflow_version_id,
478+
src.readable_status,
479+
src.input,
480+
src.additional_metadata,
481+
src.parent_task_external_id,
482+
src.total_tasks
483+
FROM v1_dags_olap src
484+
WHERE NOT EXISTS (
485+
SELECT 1 FROM v1_dags_olap_new n
486+
WHERE n.id = src.id AND n.inserted_at = src.inserted_at
487+
)
488+
ON CONFLICT DO NOTHING
499489
`
500490

501491
func up20260424190713(ctx context.Context, db *sql.DB) error {

0 commit comments

Comments
 (0)