syncer(dm): prevent checkpoint flush after BeginTx error#12644
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request centralizes downstream execution error handling by introducing the isDownstreamExecutionError helper function, which now includes terror.ErrDBExecuteFailedBegin in its checks. This ensures that checkpoint flushes are correctly skipped when a downstream transaction fails to begin, preventing potential data inconsistency. The changes also include a new unit test to verify this behavior and a safety check in dml_worker.go to ensure key-not-found logic only executes when no other errors are present. I have no feedback to provide as there were no review comments to assess.
|
@GMHDBJD: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
🔍 Starting code review for this PR... |
ingress-bot
left a comment
There was a problem hiding this comment.
This review was generated by AI and should be verified by a human reviewer.
Manual follow-up is recommended before merge.
Summary
- Total findings: 4
- Inline comments: 2
- Summary-only findings (no inline anchor): 1
Findings (highest risk first)
🟡 [Minor] (3)
- New test hand-rolls full CheckPoint mock instead of reusing the package's interface-embedding pattern (dm/syncer/checkpoint_flush_worker_test.go:70, dm/syncer/status_test.go:72, dm/syncer/safe_mode_test.go:30)
- Resume-safety predicate is only partially covered: no negative case and three of four call sites untested (dm/syncer/execution_error.go:19, dm/syncer/checkpoint_flush_worker_test.go:60, dm/syncer/syncer.go:1977)
- isDownstreamExecutionError name over-promises generality and lacks intent comment for its safety-critical allowlist (dm/syncer/execution_error.go:19, dm/syncer/syncer.go:1294, dm/syncer/checkpoint_flush_worker.go:79)
ℹ️ [Info] (1)
- Stale TODO comment claims any error blocks checkpoint flush, contradicting the selective gate (dm/syncer/syncer.go:1290, dm/syncer/syncer.go:1326, dm/syncer/checkpoint_flush_worker.go:75)
Unanchored findings
ℹ️ [Info] (1)
- Stale TODO comment claims any error blocks checkpoint flush, contradicting the selective gate
- Request: Reword the comment to say checkpoint flush is skipped only for specific downstream execution failures (the
isDownstreamExecutionErrorset), not for every error, so the stated limitation matches actual behavior.
- Request: Reword the comment to say checkpoint flush is skipped only for specific downstream execution failures (the
| require.Zero(t, cp.flushCount, "checkpoint flush must be skipped after downstream BeginTx failure; flushing here can persist a checkpoint past non-durable DML") | ||
| } | ||
|
|
||
| type checkpointFlushWorkerTestCheckpoint struct { |
There was a problem hiding this comment.
🟡 [Minor] New test hand-rolls full CheckPoint mock instead of reusing the package's interface-embedding pattern
Impact
Adds about 100 lines of boilerplate and a third divergent CheckPoint mock in the same package.
This one must be manually edited every time the CheckPoint interface grows a method, while the embedded-interface mocks do not, creating ongoing maintenance drift and compile-break risk for unrelated changes.
Scope
dm/syncer/checkpoint_flush_worker_test.go:70—checkpointFlushWorkerTestCheckpointdm/syncer/status_test.go:72—mockCheckpointdm/syncer/safe_mode_test.go:30—mockCheckpointForSafeMode
Evidence
mockCheckpoint and mockCheckpointForSafeMode embed the CheckPoint interface and override only the methods they exercise. checkpointFlushWorkerTestCheckpoint instead implements every CheckPoint method explicitly, yet the test's worker returns early via isDownstreamExecutionError so FlushPointsExcept (and every other method) is never actually called.
Change request
Follow the existing pattern: declare type checkpointFlushWorkerTestCheckpoint struct { CheckPoint; flushCount int } and keep only the FlushPointsExcept override that increments flushCount, dropping the ~25 unused method stubs.
|
|
||
| import "github.com/pingcap/tiflow/dm/pkg/terror" | ||
|
|
||
| func isDownstreamExecutionError(err error) bool { |
There was a problem hiding this comment.
2 findings on this line:
🟡 [Minor] Resume-safety predicate is only partially covered: no negative case and three of four call sites untested
Impact
The new test asserts checkpoint flush is skipped for one error class (ErrDBExecuteFailedBegin) at one of four call sites, and never asserts that a non-downstream error still flushes.
A degenerate predicate that returned true for every non-nil error (e.g. wrongly matching context.Canceled) would still pass this test, yet would silently stall checkpoint progress; conversely, dropping ErrDBExecuteFailed/ErrDBUnExpect or breaking the inverted safe-mode-exit branch at syncer.go:1977 would reintroduce the checkpoint-advances-past-uncommitted-DML data loss on resume without failing CI.
Scope
dm/syncer/execution_error.go:19—isDownstreamExecutionErrordm/syncer/checkpoint_flush_worker_test.go:60—TestCheckpointFlushWorkerSkipsCheckpointOnBeginErrordm/syncer/syncer.go:1977—Syncer.Run
Evidence
isDownstreamExecutionError guards four resume-critical branches with inverted meaning: it skips checkpoint flush in checkpointFlushWorker.Run, flushCheckPoints, and flushCheckPointsAsync, but triggers FlushSafeModeExitPoint at syncer.go:1977. The single test only exercises the worker skip path with ErrDBExecuteFailedBegin; ErrDBExecuteFailed, ErrDBUnExpect, the negative (non-matching) case, and the safe-mode-exit branch are unverified.
Change request
Add a focused unit test for isDownstreamExecutionError covering all three positive error classes plus at least one negative (e.g. a generic/context.Canceled error returning false), so the predicate contract that all four sites depend on is locked down independent of any one call site.
🟡 [Minor] isDownstreamExecutionError name over-promises generality and lacks intent comment for its safety-critical allowlist
Impact
The name reads as "any error from executing against downstream", but the function is a curated allowlist of exactly three terror codes that gate whether a checkpoint may advance past possibly non-durable DML.
A maintainer trusting the name could add or drop an error type, or assume other downstream DB failures are covered, and silently change checkpoint-durability behavior across all four call sites.
Scope
dm/syncer/execution_error.go:19—isDownstreamExecutionErrordm/syncer/syncer.go:1294—flushCheckPointsdm/syncer/checkpoint_flush_worker.go:79—checkpointFlushWorker.Run
Evidence
isDownstreamExecutionError matches only ErrDBExecuteFailed, ErrDBExecuteFailedBegin, and ErrDBUnExpect; any other downstream DB error returns false. The helper is now the single source of truth for the "skip checkpoint flush" decision but carries no doc comment explaining why these specific codes (and notably the newly added ErrDBExecuteFailedBegin) are the durability-relevant set.
Change request
Add a doc comment on isDownstreamExecutionError stating the invariant it encodes (these errors mean downstream DML may not be durable, so the checkpoint must not advance) and why ErrDBExecuteFailedBegin belongs here. Consider a more intent-revealing name (for example one that signals it is a checkpoint-skip predicate rather than a generic "any downstream execution error" check).
What problem does this PR solve?
Issue Number: close #12626
What is changed and how it works?
This PR fixes a DM correctness issue where checkpoint flush could still advance after a downstream transaction
BeginTxfailure.Changes:
ErrDBExecuteFailedBegintogether with existingErrDBExecuteFailedandErrDBUnExpect.ExecuteSQLalready returned an execution error.ErrDBExecuteFailedBegin(sql.ErrConnDone)to ensure checkpoint flush is skipped.Check List
Tests
Also ran:
Questions
Will it cause performance regression or break compatibility?
No. The change only broadens existing checkpoint-blocking error classification to include downstream transaction begin failures and suppresses a misleading diagnostic after failed execution.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note