loader(dm): update import-into external ID handling#12716
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the DM loader to run the import-into backend directly, allowing compatibility options to be passed while keeping the source directory intact. It also introduces progress tracking for import-into using atomic byte counters and removes the now-redundant StripS3ExternalID utility. Feedback on the changes highlights a potential database connection leak in runImportInto where the downstream database connection pool is not closed on successful execution, which should be resolved using a deferred close.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| importDB, err := conn.GetDownstreamDB(&l.cfg.To) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| l.importIntoTotalBytes.Store(0) | ||
| l.importIntoFinishedBytes.Store(0) | ||
| importer, err := importinto.NewImporter(ctx, cfg, importDB.DB, | ||
| importinto.WithProgressUpdater(&importIntoProgressUpdater{loader: l}), | ||
| importinto.WithStripS3ExternalIDForImportSQL(), | ||
| ) | ||
| if err != nil { | ||
| if closeErr := importDB.Close(); closeErr != nil { | ||
| l.logger.Warn("failed to close import-into DB connection", zap.Error(closeErr)) | ||
| } | ||
| return err | ||
| } | ||
| defer importer.Close() |
There was a problem hiding this comment.
The database connection pool importDB created by conn.GetDownstreamDB is only closed when importinto.NewImporter returns an error. If NewImporter succeeds, importDB is never closed, leading to a connection leak every time runImportInto is executed.
Using a deferred function right after successfully obtaining importDB ensures that the connection pool is always closed when runImportInto exits.
| importDB, err := conn.GetDownstreamDB(&l.cfg.To) | |
| if err != nil { | |
| return err | |
| } | |
| l.importIntoTotalBytes.Store(0) | |
| l.importIntoFinishedBytes.Store(0) | |
| importer, err := importinto.NewImporter(ctx, cfg, importDB.DB, | |
| importinto.WithProgressUpdater(&importIntoProgressUpdater{loader: l}), | |
| importinto.WithStripS3ExternalIDForImportSQL(), | |
| ) | |
| if err != nil { | |
| if closeErr := importDB.Close(); closeErr != nil { | |
| l.logger.Warn("failed to close import-into DB connection", zap.Error(closeErr)) | |
| } | |
| return err | |
| } | |
| defer importer.Close() | |
| importDB, err := conn.GetDownstreamDB(&l.cfg.To) | |
| if err != nil { | |
| return err | |
| } | |
| defer func() { | |
| if closeErr := importDB.Close(); closeErr != nil { | |
| l.logger.Warn("failed to close import-into DB connection", zap.Error(closeErr)) | |
| } | |
| }() | |
| l.importIntoTotalBytes.Store(0) | |
| l.importIntoFinishedBytes.Store(0) | |
| importer, err := importinto.NewImporter(ctx, cfg, importDB.DB, | |
| importinto.WithProgressUpdater(&importIntoProgressUpdater{loader: l}), | |
| importinto.WithStripS3ExternalIDForImportSQL(), | |
| ) | |
| if err != nil { | |
| return err | |
| } | |
| defer importer.Close() |
e87f862 to
3211cdf
Compare
|
/test pull-check |
3211cdf to
c31bac8
Compare
|
/retest |
Resolve conflict in dm/pkg/storage/utils_test.go: upstream PR pingcap#12716 (loader(dm): update import-into external ID handling) removed the StripS3ExternalID helper from dm/pkg/storage/utils.go in favor of the lightning TikvImporter.StripS3ExternalIDForImportSQL config flag. Drop the now-orphaned TestStripS3ExternalID test accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What problem does this PR solve?
Issue Number: close #12699
What is changed and how it works?
external-idfrom the LightningMydumper.SourceDir, so Dumpling and Lightning storage access still use the configured S3 external ID.TikvImporter.StripS3ExternalIDForImportSQLon the per-task Lightning config. TiDB then strips explicit S3 external IDs only from generated IMPORT INTO SQL resource parameters.RunOnceWithOptionspath instead of running the import-into importer directly.storage.StripS3ExternalIDhelper and its tests.Check List
Tests
go test ./dm/loader ./dm/pkg/storagemake dm_unit_test_pkg PKG=github.com/pingcap/tiflow/dm/loadermake fmtgit diff --checkQuestions
Will it cause performance regression or break compatibility?
No performance regression is expected. This keeps the original S3 URI for Dumpling and Lightning storage access, and only enables the TiDB Lightning import-into SQL compatibility path for DM import-into mode.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note