Skip to content

Commit 0d93b15

Browse files
committed
fix
1 parent 546d06b commit 0d93b15

3 files changed

Lines changed: 49 additions & 46 deletions

File tree

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub async fn do_vacuum2(
170170
// Step 2: classify branch history and select gc roots.
171171
// Phase A (serial): construct branch tables and classify beyond-retention.
172172
// Phase B (parallel): select gc roots for retainable candidates.
173-
let mut beyond_retention_branches: Vec<Box<FuseTable>> = Vec::new();
173+
let mut beyond_retention_branches: Vec<(Box<FuseTable>, String)> = Vec::new();
174174
let mut gc_root_candidates: Vec<Box<FuseTable>> = Vec::new();
175175
for branch in history_branches {
176176
if ctx.check_aborting().is_err() {
@@ -180,6 +180,7 @@ pub async fn do_vacuum2(
180180
let branch_id = branch.branch_id.table_id;
181181
let expire_at = branch.expire_at;
182182
let drop_on = branch.branch_meta.data.drop_on;
183+
let branch_name = branch.branch_name.clone();
183184
let branch_table = fuse_table.branch_table_from_meta(branch, &s3_storage_class)?;
184185

185186
let storage_prefix = format!("{}/", branch_table.meta_location_generator().prefix());
@@ -189,7 +190,7 @@ pub async fn do_vacuum2(
189190
let effective_drop_time =
190191
drop_on.or_else(|| expire_at.filter(|expire_at| *expire_at <= now));
191192
if effective_drop_time.is_some_and(|drop_time| drop_time < retention_time) {
192-
beyond_retention_branches.push(branch_table);
193+
beyond_retention_branches.push((branch_table, branch_name));
193194
continue;
194195
}
195196

@@ -320,7 +321,7 @@ pub async fn do_vacuum2(
320321
.map(|s| s.table_id)
321322
.into_iter()
322323
.chain(cleanup_branches.iter().map(|branch| branch.state.table_id))
323-
.chain(beyond_retention_branches.iter().map(|b| b.get_id()))
324+
.chain(beyond_retention_branches.iter().map(|(b, _)| b.get_id()))
324325
.collect();
325326
if tables_at_risk.is_empty() {
326327
info!(
@@ -421,61 +422,62 @@ pub async fn do_vacuum2(
421422
// whether the branch can be final-GC'd or must stay gc-pending because it still owns
422423
// protected snapshot/segment/block data.
423424
let beyond_retention_results =
424-
futures::stream::iter(beyond_retention_branches.into_iter().map(|branch_table| {
425-
let ctx = ctx.clone();
426-
let external_head_snapshots = &external_head_snapshots;
427-
let protected_segments_by_table = &protected_segments_by_table;
428-
let protected_blocks_by_table = &protected_blocks_by_table;
429-
async move {
430-
let bid = branch_table.get_id();
431-
let mut snapshots_to_gc = branch_table
432-
.list_files_for_gc(
433-
branch_table
434-
.meta_location_generator()
435-
.snapshot_location_prefix(),
436-
None,
437-
)
438-
.await?;
439-
let snapshot_count = snapshots_to_gc.len();
440-
snapshots_to_gc.retain(|path| !external_head_snapshots.contains(path));
441-
let has_protected_snapshot = snapshots_to_gc.len() != snapshot_count;
442-
if !snapshots_to_gc.is_empty() {
443-
branch_table
444-
.cleanup_snapshot_files(&ctx, &snapshots_to_gc, false)
425+
futures::stream::iter(beyond_retention_branches.into_iter().map(
426+
|(branch_table, branch_name)| {
427+
let ctx = ctx.clone();
428+
let external_head_snapshots = &external_head_snapshots;
429+
let protected_segments_by_table = &protected_segments_by_table;
430+
let protected_blocks_by_table = &protected_blocks_by_table;
431+
async move {
432+
let bid = branch_table.get_id();
433+
let mut snapshots_to_gc = branch_table
434+
.list_files_for_gc(
435+
branch_table
436+
.meta_location_generator()
437+
.snapshot_location_prefix(),
438+
None,
439+
)
445440
.await?;
446-
}
441+
let snapshot_count = snapshots_to_gc.len();
442+
snapshots_to_gc.retain(|path| !external_head_snapshots.contains(path));
443+
let has_protected_snapshot = snapshots_to_gc.len() != snapshot_count;
444+
if !snapshots_to_gc.is_empty() {
445+
branch_table
446+
.cleanup_snapshot_files(&ctx, &snapshots_to_gc, false)
447+
.await?;
448+
}
447449

448-
let has_protected = has_protected_snapshot
449-
|| protected_segments_by_table
450-
.get(&bid)
451-
.is_some_and(|s| !s.is_empty())
452-
|| protected_blocks_by_table
453-
.get(&bid)
454-
.is_some_and(|b| !b.is_empty());
455-
Ok::<_, ErrorCode>((branch_table, has_protected, snapshots_to_gc))
456-
}
457-
}))
450+
let has_protected = has_protected_snapshot
451+
|| protected_segments_by_table
452+
.get(&bid)
453+
.is_some_and(|s| !s.is_empty())
454+
|| protected_blocks_by_table
455+
.get(&bid)
456+
.is_some_and(|b| !b.is_empty());
457+
Ok::<_, ErrorCode>((branch_table, branch_name, has_protected, snapshots_to_gc))
458+
}
459+
},
460+
))
458461
.buffer_unordered(concurrency)
459462
.collect::<Vec<_>>()
460463
.await;
461464

462465
let mut gc_pending_branches = Vec::new();
463466
let mut final_gc_branches = Vec::new();
464467
for result in beyond_retention_results {
465-
let (branch_table, has_protected, snapshots_to_gc) = result?;
468+
let (branch_table, branch_name, has_protected, snapshots_to_gc) = result?;
466469
files_to_gc.extend(snapshots_to_gc);
467470
if has_protected {
468471
gc_pending_branches.push(branch_table);
469472
} else {
470-
final_gc_branches.push(branch_table);
473+
final_gc_branches.push((branch_table, branch_name));
471474
}
472475
}
473-
let final_gc_results =
474-
futures::stream::iter(final_gc_branches.into_iter().map(|branch_table| {
476+
let final_gc_results = futures::stream::iter(final_gc_branches.into_iter().map(
477+
|(branch_table, branch_name)| {
475478
let ctx = ctx.clone();
476479
let meta_api = meta_api.clone();
477480
async move {
478-
let branch_name = branch_table.get_table_info().name.clone();
479481
final_gc_branch(
480482
&ctx,
481483
&meta_api,
@@ -485,10 +487,11 @@ pub async fn do_vacuum2(
485487
)
486488
.await
487489
}
488-
}))
489-
.buffer_unordered(concurrency)
490-
.collect::<Vec<_>>()
491-
.await;
490+
},
491+
))
492+
.buffer_unordered(concurrency)
493+
.collect::<Vec<_>>()
494+
.await;
492495
for result in final_gc_results {
493496
files_to_gc.extend(result?);
494497
}

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ impl Catalog for MutableCatalog {
680680
};
681681
let info = self.ctx.meta.get_table_branch(req).await?;
682682
let mut info = info;
683-
info.name = table_name.to_string();
683+
info.name = format!("{table_name}/{branch_name}");
684684
info.desc = format!("'{}'.'{}'/'{}'", db_name, table_name, branch_name);
685685
self.get_table_by_info(&info)
686686
}

src/query/storages/fuse/src/operations/vacuum.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl FuseTable {
136136
seq: branch.branch_meta.seq,
137137
},
138138
desc: format!("{}/'{}'", self.table_info.desc, branch.branch_name),
139-
name: branch.branch_name,
139+
name: format!("{}/{}", self.table_info.name, branch.branch_name),
140140
meta: branch.branch_meta.data,
141141
db_type: DatabaseType::NormalDB,
142142
// Internal synthetic branch tables must inherit the source table's catalog identity,

0 commit comments

Comments
 (0)