Skip to content

Commit 83e0bd0

Browse files
SkyFan2002smallfish
authored andcommitted
fix(temp-table): update multi-insert fuse meta (#19778)
* fix(temp-table): update multi-insert fuse meta Generate and write a new FUSE snapshot for temporary table targets in multi-table insert instead of reusing stale table metadata. This keeps reads and system.temporary_tables statistics in sync after INSERT ALL. Fixes #19777 * fix(temp-table): defer multi-insert temp meta
1 parent 469bf8e commit 83e0bd0

2 files changed

Lines changed: 204 additions & 41 deletions

File tree

src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 163 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use databend_common_exception::ErrorCode;
2626
use databend_common_exception::Result;
2727
use databend_common_expression::BlockMetaInfoDowncast;
2828
use databend_common_expression::DataBlock;
29+
use databend_common_meta_app::schema::TableInfo;
30+
use databend_common_meta_app::schema::TableMeta;
2931
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
3032
use databend_common_meta_app::schema::UpdateStreamMetaReq;
3133
use databend_common_meta_app::schema::UpdateTableMetaReq;
@@ -106,12 +108,17 @@ impl AsyncSink for CommitMultiTableInsert {
106108
snapshot_generator.set_conflict_resolve_context(commit_meta.conflict_resolve_context);
107109
let table = self.tables.get(&table_id).unwrap();
108110
if table.is_temp() {
109-
update_temp_tables.push(UpdateTempTableReq {
110-
table_id,
111-
new_table_meta: table.get_table_info().meta.clone(),
112-
copied_files: Default::default(),
113-
desc: table.get_table_info().desc.clone(),
114-
});
111+
update_temp_tables.push(
112+
build_update_temp_table_req(
113+
table.as_ref(),
114+
&snapshot_generator,
115+
self.ctx.txn_mgr(),
116+
*self.table_meta_timestampss.get(&table_id).unwrap(),
117+
&commit_meta.hll,
118+
insert_rows.get(&table_id).cloned().unwrap_or_default(),
119+
)
120+
.await?,
121+
);
115122
} else {
116123
update_table_metas.push((
117124
build_update_table_meta_req(
@@ -134,32 +141,42 @@ impl AsyncSink for CommitMultiTableInsert {
134141
let mut retries = 0;
135142

136143
loop {
137-
let update_multi_table_meta_req = UpdateMultiTableMetaReq {
138-
update_table_metas: update_table_metas.clone(),
139-
copied_files: vec![],
140-
update_stream_metas: self.update_stream_meta.clone(),
141-
deduplicated_labels: self.deduplicated_label.clone().into_iter().collect(),
142-
update_temp_tables: std::mem::take(&mut update_temp_tables),
143-
};
144+
let update_multi_table_meta_req = build_non_temp_update_multi_table_meta_req(
145+
update_table_metas.clone(),
146+
self.update_stream_meta.clone(),
147+
self.deduplicated_label.clone(),
148+
);
144149

145-
let update_meta_result = match self
146-
.catalog
147-
.retryable_update_multi_table_meta(update_multi_table_meta_req)
148-
.await
149-
{
150-
Ok(ret) => ret,
151-
Err(e) => {
152-
// other errors may occur, especially the version mismatch of streams,
153-
// let's log it here for the convenience of diagnostics
154-
error!(
155-
"Non-recoverable fault occurred during updating tables. {}",
156-
e
157-
);
158-
return Err(e);
150+
let update_meta_result = if update_multi_table_meta_req.is_empty() {
151+
Ok(Default::default())
152+
} else {
153+
match self
154+
.catalog
155+
.retryable_update_multi_table_meta(update_multi_table_meta_req)
156+
.await
157+
{
158+
Ok(ret) => ret,
159+
Err(e) => {
160+
// other errors may occur, especially the version mismatch of streams,
161+
// let's log it here for the convenience of diagnostics
162+
error!(
163+
"Non-recoverable fault occurred during updating tables. {}",
164+
e
165+
);
166+
return Err(e);
167+
}
159168
}
160169
};
161170

162171
let Err(update_failed_tbls) = update_meta_result else {
172+
if !update_temp_tables.is_empty() {
173+
self.catalog
174+
.update_multi_table_meta(build_temp_update_multi_table_meta_req(
175+
std::mem::take(&mut update_temp_tables),
176+
))
177+
.await?;
178+
}
179+
163180
let table_descriptions = self
164181
.tables
165182
.values()
@@ -262,6 +279,56 @@ impl AsyncSink for CommitMultiTableInsert {
262279
}
263280
}
264281

282+
fn build_non_temp_update_multi_table_meta_req(
283+
update_table_metas: Vec<(UpdateTableMetaReq, TableInfo)>,
284+
update_stream_metas: Vec<UpdateStreamMetaReq>,
285+
deduplicated_label: Option<String>,
286+
) -> UpdateMultiTableMetaReq {
287+
UpdateMultiTableMetaReq {
288+
update_table_metas,
289+
copied_files: vec![],
290+
update_stream_metas,
291+
deduplicated_labels: deduplicated_label.into_iter().collect(),
292+
update_temp_tables: vec![],
293+
}
294+
}
295+
296+
fn build_temp_update_multi_table_meta_req(
297+
update_temp_tables: Vec<UpdateTempTableReq>,
298+
) -> UpdateMultiTableMetaReq {
299+
UpdateMultiTableMetaReq {
300+
update_temp_tables,
301+
..Default::default()
302+
}
303+
}
304+
305+
async fn build_update_temp_table_req(
306+
table: &dyn Table,
307+
snapshot_generator: &AppendGenerator,
308+
txn_mgr: TxnManagerRef,
309+
table_meta_timestamps: TableMetaTimestamps,
310+
insert_hll: &BlockHLL,
311+
insert_rows: u64,
312+
) -> Result<UpdateTempTableReq> {
313+
let table_info = table.get_table_info();
314+
let new_table_meta = write_new_snapshot_and_build_table_meta(
315+
table,
316+
snapshot_generator,
317+
txn_mgr,
318+
table_meta_timestamps,
319+
insert_hll,
320+
insert_rows,
321+
)
322+
.await?;
323+
324+
Ok(UpdateTempTableReq {
325+
table_id: table_info.ident.table_id,
326+
new_table_meta,
327+
copied_files: Default::default(),
328+
desc: table_info.desc.clone(),
329+
})
330+
}
331+
265332
async fn build_update_table_meta_req(
266333
table: &dyn Table,
267334
snapshot_generator: &AppendGenerator,
@@ -270,6 +337,37 @@ async fn build_update_table_meta_req(
270337
insert_hll: &BlockHLL,
271338
insert_rows: u64,
272339
) -> Result<UpdateTableMetaReq> {
340+
let fuse_table = FuseTable::try_from_table(table)?;
341+
let new_table_meta = write_new_snapshot_and_build_table_meta(
342+
table,
343+
snapshot_generator,
344+
txn_mgr,
345+
table_meta_timestamps,
346+
insert_hll,
347+
insert_rows,
348+
)
349+
.await?;
350+
let table_id = fuse_table.table_info.ident.table_id;
351+
let table_version = fuse_table.table_info.ident.seq;
352+
353+
let req = UpdateTableMetaReq {
354+
table_id,
355+
seq: MatchSeq::Exact(table_version),
356+
new_table_meta,
357+
base_snapshot_location: fuse_table.snapshot_loc(),
358+
lvt_check: None,
359+
};
360+
Ok(req)
361+
}
362+
363+
async fn write_new_snapshot_and_build_table_meta(
364+
table: &dyn Table,
365+
snapshot_generator: &AppendGenerator,
366+
txn_mgr: TxnManagerRef,
367+
table_meta_timestamps: TableMetaTimestamps,
368+
insert_hll: &BlockHLL,
369+
insert_rows: u64,
370+
) -> Result<TableMeta> {
273371
let fuse_table = FuseTable::try_from_table(table)?;
274372
let previous = fuse_table.read_table_snapshot().await?;
275373
let table_stats_gen = fuse_table
@@ -292,25 +390,49 @@ async fn build_update_table_meta_req(
292390
&snapshot.summary,
293391
);
294392

295-
// write snapshot
296393
let dal = fuse_table.get_operator();
297394
let location_generator = &fuse_table.meta_location_generator;
298395
let location =
299396
location_generator.gen_snapshot_location(&snapshot.snapshot_id, TableSnapshot::VERSION)?;
300397
dal.write(&location, snapshot.to_bytes()?).await?;
301398

302-
// build new table meta
303-
let new_table_meta =
304-
FuseTable::build_new_table_meta(&fuse_table.table_info.meta, &location, &snapshot);
305-
let table_id = fuse_table.table_info.ident.table_id;
306-
let table_version = fuse_table.table_info.ident.seq;
399+
Ok(FuseTable::build_new_table_meta(
400+
&fuse_table.table_info.meta,
401+
&location,
402+
&snapshot,
403+
))
404+
}
307405

308-
let req = UpdateTableMetaReq {
309-
table_id,
310-
seq: MatchSeq::Exact(table_version),
311-
new_table_meta,
312-
base_snapshot_location: fuse_table.snapshot_loc(),
313-
lvt_check: None,
314-
};
315-
Ok(req)
406+
#[cfg(test)]
407+
mod tests {
408+
use super::*;
409+
410+
#[test]
411+
fn non_temp_update_req_does_not_carry_temp_table_updates() {
412+
let req =
413+
build_non_temp_update_multi_table_meta_req(vec![], vec![], Some("label".to_string()));
414+
415+
assert!(req.update_table_metas.is_empty());
416+
assert!(req.copied_files.is_empty());
417+
assert!(req.update_stream_metas.is_empty());
418+
assert_eq!(req.deduplicated_labels, vec!["label".to_string()]);
419+
assert!(req.update_temp_tables.is_empty());
420+
}
421+
422+
#[test]
423+
fn temp_update_req_only_carries_temp_table_updates() {
424+
let temp_req = UpdateTempTableReq {
425+
table_id: 1,
426+
desc: "default.tmp".to_string(),
427+
new_table_meta: TableMeta::default(),
428+
copied_files: Default::default(),
429+
};
430+
let req = build_temp_update_multi_table_meta_req(vec![temp_req]);
431+
432+
assert!(req.update_table_metas.is_empty());
433+
assert!(req.copied_files.is_empty());
434+
assert!(req.update_stream_metas.is_empty());
435+
assert!(req.deduplicated_labels.is_empty());
436+
assert_eq!(req.update_temp_tables.len(), 1);
437+
}
316438
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
statement ok
2+
create temp table multi_insert_temp_fuse(a UInt64) engine=fuse;
3+
4+
statement ok
5+
drop table if exists multi_insert_perm_fuse;
6+
7+
statement ok
8+
create table multi_insert_perm_fuse(a UInt64) engine=fuse;
9+
10+
statement ok
11+
insert all into multi_insert_temp_fuse into multi_insert_perm_fuse select * from numbers(3);
12+
13+
query I
14+
select count(*) from multi_insert_temp_fuse;
15+
----
16+
3
17+
18+
query I
19+
select a from multi_insert_temp_fuse order by a;
20+
----
21+
0
22+
1
23+
2
24+
25+
query I
26+
select num_rows from system.temporary_tables where is_current_session = true and name = 'multi_insert_temp_fuse';
27+
----
28+
3
29+
30+
query I
31+
select a from multi_insert_perm_fuse order by a;
32+
----
33+
0
34+
1
35+
2
36+
37+
statement ok
38+
drop table multi_insert_temp_fuse;
39+
40+
statement ok
41+
drop table multi_insert_perm_fuse;

0 commit comments

Comments
 (0)