dxf, domain: acquire cross-keyspace runtimes at manager boundaries#69023
dxf, domain: acquire cross-keyspace runtimes at manager boundaries#69023D3Hunter wants to merge 2 commits into
Conversation
dxf: route crossks runtime through task params tests: cover crossks runtime reclamation docs/design: record crossks runtime validation bazel change change change change test change test using mock
|
@D3Hunter I've received your pull request and will start the review. I'll conduct a thorough review covering code quality, potential issues, and implementation details. ⏳ This process typically takes 10-30 minutes depending on the complexity of the changes. ℹ️ Learn more details on Pantheon AI. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @D3Hunter. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
📝 WalkthroughWalkthroughThis PR refactors the SQL server runtime abstraction by introducing a unified sqlsvrapi.Runtime (Store(), SysSessionPool()), renaming session-pool accessors from SessPool() to SysSessionPool(), replacing TaskStore kv.Storage usage with TaskRuntime sqlsvrapi.Runtime across scheduler/executor/import/ddl flows, and adding runtime acquisition/release lifecycle and mocks/tests. ChangesCore runtime abstraction and domain updates
Mock infrastructure generation and build
DDL backfilling scheduler and executor
DXF scheduler framework refactor
DXF taskexecutor framework refactor
Import INTO scheduler and executors
dxfutil helper and tests
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
pkg/dxf/framework/taskexecutor/task_executor.go (1)
259-265: ⚡ Quick winConsider adding explicit nil checks for better error messages.
The keyspace validation at line 259 will panic if
e.TaskRuntimeis nil or ifStore()returns nil, rather than producing a clear error message. While the manager is responsible for settingTaskRuntimebefore callingInit(), an explicit check would provide better diagnostics if that contract is violated.🛡️ Suggested defensive check
func (e *BaseTaskExecutor) Init(_ context.Context) error { + if e.TaskRuntime == nil { + return errors.New("TaskRuntime not initialized by manager") + } storeKS := e.TaskRuntime.Store().GetKeyspace() if storeKS != e.GetTaskBase().Keyspace {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/dxf/framework/taskexecutor/task_executor.go` around lines 259 - 265, The current keyspace validation can panic if e.TaskRuntime or e.TaskRuntime.Store() is nil; add explicit nil checks before calling Store() and before comparing keyspaces to produce clear errors. In the Init()/keyspace validation block, check if e.TaskRuntime == nil and return a traced fmt.Errorf describing "nil TaskRuntime", then check if sr := e.TaskRuntime.Store(); sr == nil and return a traced fmt.Errorf "nil Store from TaskRuntime"; only after those checks read sr.GetKeyspace() and compare with e.GetTaskBase().Keyspace to return the existing "store keyspace mismatch" error. Ensure you reference e.TaskRuntime, Store(), GetKeyspace(), and GetTaskBase().Keyspace in the updated checks.pkg/dxf/importinto/task_executor_testkit_test.go (1)
162-162: Clarify the two “import test runtime” helpers to avoid mix-ups
newImportTestRuntimeis defined inpkg/dxf/importinto/scheduler_testkit_test.go(samepackage importinto_test), sotask_executor_testkit_test.gocan call it; passingnilintentionally results inSysSessionPool()returning a nilDestroyableSessionPool.newImportSchedulerTestRuntimeis a separate helper inpkg/dxf/importinto/scheduler_test.gofor scheduler tests usingutil.DestroyableSessionPool.- Add a short comment (or rename/consolidate) to make the distinction between these helpers explicit.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/dxf/importinto/task_executor_testkit_test.go` at line 162, The test is confusing two similarly named helpers—newImportTestRuntime (used in task_executor_testkit_test.go and defined in scheduler_testkit_test.go) and newImportSchedulerTestRuntime (used in scheduler tests and returns a util.DestroyableSessionPool) —so add a short clarifying comment next to the param.TaskRuntime = newImportTestRuntime(...) call (or rename one helper) stating that newImportTestRuntime intentionally returns a runtime whose SysSessionPool() yields a nil DestroyableSessionPool, whereas newImportSchedulerTestRuntime returns a runtime with util.DestroyableSessionPool for scheduler tests; reference the helper names newImportTestRuntime and newImportSchedulerTestRuntime in the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/ddl/backfilling_dist_scheduler.go`:
- Line 179: Replace the unsafe type assertion of store to kv.StorageWithPD with
a checked assertion and handle the failure path: in the function that calls
store.(kv.StorageWithPD) (the backfilling distribution scheduler code path used
for global sort), perform v, ok := store.(kv.StorageWithPD); if !ok return an
error (or propagate an existing error) explaining that PD-backed storage is
required for global sort, otherwise use v; ensure the error case is used
wherever this code runs so it cannot panic at runtime.
In `@pkg/dxf/framework/taskexecutor/manager_test.go`:
- Line 223: Define and use a shared, documented constant for the DXF bookkeeper
ID formats instead of hardcoded fmt.Sprintf strings: add constants like
executorBookkeeperIDFormat = "DXF/executor/%d" and schedulerBookkeeperIDFormat =
"DXF/scheduler/%d" in a package-visible location, then replace
fmt.Sprintf("DXF/executor/%d", task.ID) in
pkg/dxf/framework/taskexecutor/manager.go, fmt.Sprintf("DXF/scheduler/%d",
task.ID) in pkg/dxf/framework/scheduler/scheduler_manager.go, and the hardcoded
literal in pkg/dxf/framework/taskexecutor/manager_test.go (the
server.EXPECT().AcquireKSRuntime call) to use fmt.Sprintf with the new constants
so all code and tests reference the same documented formats.
---
Nitpick comments:
In `@pkg/dxf/framework/taskexecutor/task_executor.go`:
- Around line 259-265: The current keyspace validation can panic if
e.TaskRuntime or e.TaskRuntime.Store() is nil; add explicit nil checks before
calling Store() and before comparing keyspaces to produce clear errors. In the
Init()/keyspace validation block, check if e.TaskRuntime == nil and return a
traced fmt.Errorf describing "nil TaskRuntime", then check if sr :=
e.TaskRuntime.Store(); sr == nil and return a traced fmt.Errorf "nil Store from
TaskRuntime"; only after those checks read sr.GetKeyspace() and compare with
e.GetTaskBase().Keyspace to return the existing "store keyspace mismatch" error.
Ensure you reference e.TaskRuntime, Store(), GetKeyspace(), and
GetTaskBase().Keyspace in the updated checks.
In `@pkg/dxf/importinto/task_executor_testkit_test.go`:
- Line 162: The test is confusing two similarly named
helpers—newImportTestRuntime (used in task_executor_testkit_test.go and defined
in scheduler_testkit_test.go) and newImportSchedulerTestRuntime (used in
scheduler tests and returns a util.DestroyableSessionPool) —so add a short
clarifying comment next to the param.TaskRuntime = newImportTestRuntime(...)
call (or rename one helper) stating that newImportTestRuntime intentionally
returns a runtime whose SysSessionPool() yields a nil DestroyableSessionPool,
whereas newImportSchedulerTestRuntime returns a runtime with
util.DestroyableSessionPool for scheduler tests; reference the helper names
newImportTestRuntime and newImportSchedulerTestRuntime in the comment.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8e26b2e5-cf02-4565-b04b-907f967ffc4e
📒 Files selected for processing (32)
Makefilepkg/ddl/BUILD.bazelpkg/ddl/backfilling_dist_executor.gopkg/ddl/backfilling_dist_scheduler.gopkg/ddl/backfilling_dist_scheduler_test.gopkg/domain/crossks/cross_ks.gopkg/domain/crossks/cross_ks_internal_test.gopkg/domain/crossks/cross_ks_test.gopkg/domain/domain.gopkg/domain/sqlsvrapi/mock/BUILD.bazelpkg/domain/sqlsvrapi/mock/ksruntime_mock.gopkg/domain/sqlsvrapi/mock/runtime_mock.gopkg/domain/sqlsvrapi/mock/server_mock.gopkg/domain/sqlsvrapi/server.gopkg/dxf/framework/scheduler/BUILD.bazelpkg/dxf/framework/scheduler/interface.gopkg/dxf/framework/scheduler/scheduler.gopkg/dxf/framework/scheduler/scheduler_manager.gopkg/dxf/framework/scheduler/scheduler_manager_nokit_test.gopkg/dxf/framework/taskexecutor/BUILD.bazelpkg/dxf/framework/taskexecutor/manager.gopkg/dxf/framework/taskexecutor/manager_test.gopkg/dxf/framework/taskexecutor/task_executor.gopkg/dxf/framework/taskexecutor/task_executor_test.gopkg/dxf/framework/taskexecutor/task_executor_testkit_test.gopkg/dxf/importinto/BUILD.bazelpkg/dxf/importinto/scheduler.gopkg/dxf/importinto/scheduler_test.gopkg/dxf/importinto/scheduler_testkit_test.gopkg/dxf/importinto/task_executor.gopkg/dxf/importinto/task_executor_test.gopkg/dxf/importinto/task_executor_testkit_test.go
| return generateGlobalSortIngestPlan( | ||
| ctx, | ||
| sch.TaskStore.(kv.StorageWithPD), | ||
| store.(kv.StorageWithPD), |
There was a problem hiding this comment.
Verify store type before casting to kv.StorageWithPD.
The unsafe type assertion store.(kv.StorageWithPD) will panic if the store does not implement the StorageWithPD interface. While this is only reached for global sort (which requires PD), consider a checked type assertion with an error return for safer failure handling.
🛡️ Proposed fix to add checked assertion
- return generateGlobalSortIngestPlan(
- ctx,
- store.(kv.StorageWithPD),
- taskHandle,
- task,
- backfillMeta.CloudStorageURI,
- logger)
+ storePD, ok := store.(kv.StorageWithPD)
+ if !ok {
+ return nil, errors.Errorf("global sort requires StorageWithPD but got %T", store)
+ }
+ return generateGlobalSortIngestPlan(
+ ctx,
+ storePD,
+ taskHandle,
+ task,
+ backfillMeta.CloudStorageURI,
+ logger)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| store.(kv.StorageWithPD), | |
| storePD, ok := store.(kv.StorageWithPD) | |
| if !ok { | |
| return nil, errors.Errorf("global sort requires StorageWithPD but got %T", store) | |
| } | |
| return generateGlobalSortIngestPlan( | |
| ctx, | |
| storePD, | |
| taskHandle, | |
| task, | |
| backfillMeta.CloudStorageURI, | |
| logger) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/ddl/backfilling_dist_scheduler.go` at line 179, Replace the unsafe type
assertion of store to kv.StorageWithPD with a checked assertion and handle the
failure path: in the function that calls store.(kv.StorageWithPD) (the
backfilling distribution scheduler code path used for global sort), perform v,
ok := store.(kv.StorageWithPD); if !ok return an error (or propagate an existing
error) explaining that PD-backed storage is required for global sort, otherwise
use v; ensure the error case is used wherever this code runs so it cannot panic
at runtime.
| runtimeHandle := newTaskExecutorTestRuntimeHandle(ctrl, taskStore) | ||
| runtimeHandle.EXPECT().Release() | ||
| server := sqlsvrapimock.NewMockServer(ctrl) | ||
| server.EXPECT().AcquireKSRuntime(task.Keyspace, "DXF/executor/201").Return(runtimeHandle, nil) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for bookkeeper ID patterns in manager implementations
rg -n 'DXF/(executor|scheduler)' --type=go --glob='**/scheduler_manager.go' --glob='**/taskexecutor/manager.go' -C2Repository: pingcap/tidb
Length of output: 982
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect the exact test line context
echo "=== manager_test.go around line 223 ==="
nl -ba pkg/dxf/framework/taskexecutor/manager_test.go | sed -n '200,260p'
# 2) Check if there are existing constants/docs for the bookkeeper naming
echo "=== Search for DXF/executor format constant or comment ==="
rg -n 'DXF/executor/' --type=go --glob='**/*.go'
echo "=== Search for DXF/scheduler format constant or comment ==="
rg -n 'DXF/scheduler/' --type=go --glob='**/*.go'
echo "=== Search for potential existing format const ==="
rg -n 'bookkeeperIDFormat|bookkeeper.*format|Bookkeeper.*ID|DXF/(executor|scheduler).*%d' --type=go --glob='**/*.go'Repository: pingcap/tidb
Length of output: 157
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== manager_test.go around line 223 ==="
sed -n '210,240p' pkg/dxf/framework/taskexecutor/manager_test.go
echo
echo "=== manager_test.go (with line numbers) around line 223 ==="
cat -n pkg/dxf/framework/taskexecutor/manager_test.go | sed -n '210,240p'
echo
echo "=== Search for DXF/executor/%d occurrences ==="
rg -n 'DXF/executor/%d' --type=go --glob='**/*.go'
echo
echo "=== Search for DXF/scheduler/%d occurrences ==="
rg -n 'DXF/scheduler/%d' --type=go --glob='**/*.go'
echo
echo "=== Search for existing bookkeeper ID format constants/docs ==="
rg -n 'bookkeeper.*(id|ID)|Bookkeeper.*(id|ID)|ID.*format|DXF/(executor|scheduler).*%d' --type=go --glob='**/*.go'Repository: pingcap/tidb
Length of output: 16861
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== All occurrences of DXF executor/scheduler bookkeeper IDs (string literals) in Go ==="
rg -n 'DXF/(executor|scheduler)/' --type=go --glob='**/*.go'Repository: pingcap/tidb
Length of output: 1697
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== pkg/dxf/framework/taskexecutor/manager.go around bookkeeper fmt.Sprintf ==="
sed -n '300,380p' pkg/dxf/framework/taskexecutor/manager.go
echo
echo "=== pkg/dxf/framework/scheduler/scheduler_manager.go around bookkeeper fmt.Sprintf ==="
sed -n '330,400p' pkg/dxf/framework/scheduler/scheduler_manager.goRepository: pingcap/tidb
Length of output: 5088
Document and centralize DXF bookkeeper ID naming formats used for KS runtime acquisition
pkg/dxf/framework/taskexecutor/manager.goacquires KS runtime usingfmt.Sprintf("DXF/executor/%d", task.ID), andpkg/dxf/framework/taskexecutor/manager_test.gohardcodes"DXF/executor/<taskID>"(e.g.,"DXF/executor/201").pkg/dxf/framework/scheduler/scheduler_manager.goacquires KS runtime usingfmt.Sprintf("DXF/scheduler/%d", task.ID).
Add a documented constant (or shared helper) and reuse it in both managers and tests to avoid drifting string literals, e.g.:
// Bookkeeper ID format for KS runtime handles.
// Executor-owned: "DXF/executor/<taskID>"
// Scheduler-owned: "DXF/scheduler/<taskID>"
const (
executorBookkeeperIDFormat = "DXF/executor/%d"
schedulerBookkeeperIDFormat = "DXF/scheduler/%d"
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/dxf/framework/taskexecutor/manager_test.go` at line 223, Define and use a
shared, documented constant for the DXF bookkeeper ID formats instead of
hardcoded fmt.Sprintf strings: add constants like executorBookkeeperIDFormat =
"DXF/executor/%d" and schedulerBookkeeperIDFormat = "DXF/scheduler/%d" in a
package-visible location, then replace fmt.Sprintf("DXF/executor/%d", task.ID)
in pkg/dxf/framework/taskexecutor/manager.go, fmt.Sprintf("DXF/scheduler/%d",
task.ID) in pkg/dxf/framework/scheduler/scheduler_manager.go, and the hardcoded
literal in pkg/dxf/framework/taskexecutor/manager_test.go (the
server.EXPECT().AcquireKSRuntime call) to use fmt.Sprintf with the new constants
so all code and tests reference the same documented formats.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #69023 +/- ##
================================================
+ Coverage 76.3214% 76.8118% +0.4903%
================================================
Files 2041 2051 +10
Lines 562686 567082 +4396
================================================
+ Hits 429450 435586 +6136
+ Misses 132323 129789 -2534
- Partials 913 1707 +794
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
🔍 Starting code review for this PR... |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/dxf/framework/scheduler/scheduler_nokit_test.go (1)
42-65: ⚖️ Poor tradeoffConsider extracting shared test utilities to reduce duplication.
The helpers
newSchedulerRuntimeSessionPoolandnewSchedulerMockRuntimeare nearly identical tonewCheckRuntimeSessionPoolandnewCheckRuntimeMockRuntimeinpkg/dxf/framework/dxfutil/util_test.go. While package boundaries prevent direct sharing of unexported test helpers, this duplication could make maintenance harder if the mock setup needs to evolve.♻️ Potential approach
If these helpers are expected to be reused across multiple scheduler or executor test packages, consider creating a shared test utility package (e.g.,
pkg/dxf/framework/testutil) with exported helper functions. However, given that these are simple wrappers and the duplication is isolated to two test files, keeping them separate is also reasonable.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/dxf/framework/scheduler/scheduler_nokit_test.go` around lines 42 - 65, The two helpers newSchedulerRuntimeSessionPool and newSchedulerMockRuntime duplicate newCheckRuntimeSessionPool and newCheckRuntimeMockRuntime — extract the common logic into a shared test helper package with exported functions (e.g., NewRuntimeSessionPool, NewMockRuntime) and update the tests to call those exported helpers; ensure the new helpers preserve the same signatures (accepting *testing.T, kv.Storage, *gomock.Controller, tidbutil.DestroyableSessionPool as needed) and keep the same behavior (creating utilmock.NewContext, setting Store, registering t.Cleanup for Close, and setting up mock expectations) so both scheduler and check tests can reuse them.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/dxf/framework/scheduler/scheduler_nokit_test.go`:
- Around line 42-65: The two helpers newSchedulerRuntimeSessionPool and
newSchedulerMockRuntime duplicate newCheckRuntimeSessionPool and
newCheckRuntimeMockRuntime — extract the common logic into a shared test helper
package with exported functions (e.g., NewRuntimeSessionPool, NewMockRuntime)
and update the tests to call those exported helpers; ensure the new helpers
preserve the same signatures (accepting *testing.T, kv.Storage,
*gomock.Controller, tidbutil.DestroyableSessionPool as needed) and keep the same
behavior (creating utilmock.NewContext, setting Store, registering t.Cleanup for
Close, and setting up mock expectations) so both scheduler and check tests can
reuse them.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5dfb9546-da72-4d7d-b653-3d1ec9833b4d
📒 Files selected for processing (11)
pkg/domain/domain.gopkg/domain/sqlsvrapi/server.gopkg/dxf/framework/dxfutil/BUILD.bazelpkg/dxf/framework/dxfutil/util.gopkg/dxf/framework/dxfutil/util_test.gopkg/dxf/framework/scheduler/BUILD.bazelpkg/dxf/framework/scheduler/scheduler.gopkg/dxf/framework/scheduler/scheduler_nokit_test.gopkg/dxf/framework/taskexecutor/BUILD.bazelpkg/dxf/framework/taskexecutor/task_executor.gopkg/dxf/framework/taskexecutor/task_executor_test.go
✅ Files skipped from review due to trivial changes (2)
- pkg/dxf/framework/dxfutil/BUILD.bazel
- pkg/dxf/framework/taskexecutor/BUILD.bazel
🚧 Files skipped from review as they are similar to previous changes (6)
- pkg/dxf/framework/scheduler/BUILD.bazel
- pkg/dxf/framework/scheduler/scheduler.go
- pkg/dxf/framework/taskexecutor/task_executor_test.go
- pkg/domain/sqlsvrapi/server.go
- pkg/dxf/framework/taskexecutor/task_executor.go
- pkg/domain/domain.go
|
@D3Hunter: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #68883
Problem Summary:
NextGen DXF work can run IMPORT and distributed DDL backfill tasks against user keyspaces from the SYSTEM keyspace. Those task paths need explicit ownership of cross-keyspace runtimes so stores, session pools, schema syncers, etcd clients, and background loops are retained only while the DXF scheduler or executor is actively using them.
What changed and how does it work?
This PR routes task-keyspace runtime access through the DXF manager boundaries:
sqlsvrapi.ServerwithGetRuntime()and make acquired keyspace handles expose the sharedRuntimeview.DXF/scheduler/<taskID>DXF/executor/<taskID>TaskRuntimeinto schedulers and task executors, and release acquired handles on normal exit, init failure, missing executor factory, and acquisition failure paths.sqlsvrapigomock targets and focused tests for runtime acquisition/release boundaries, failure cleanup, runtime keyspace validation, and IMPORT/DDL consumers.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit