@@ -26,6 +26,8 @@ use databend_common_exception::ErrorCode;
2626use databend_common_exception:: Result ;
2727use databend_common_expression:: BlockMetaInfoDowncast ;
2828use databend_common_expression:: DataBlock ;
29+ use databend_common_meta_app:: schema:: TableInfo ;
30+ use databend_common_meta_app:: schema:: TableMeta ;
2931use databend_common_meta_app:: schema:: UpdateMultiTableMetaReq ;
3032use databend_common_meta_app:: schema:: UpdateStreamMetaReq ;
3133use databend_common_meta_app:: schema:: UpdateTableMetaReq ;
@@ -106,12 +108,17 @@ impl AsyncSink for CommitMultiTableInsert {
106108 snapshot_generator. set_conflict_resolve_context ( commit_meta. conflict_resolve_context ) ;
107109 let table = self . tables . get ( & table_id) . unwrap ( ) ;
108110 if table. is_temp ( ) {
109- update_temp_tables. push ( UpdateTempTableReq {
110- table_id,
111- new_table_meta : table. get_table_info ( ) . meta . clone ( ) ,
112- copied_files : Default :: default ( ) ,
113- desc : table. get_table_info ( ) . desc . clone ( ) ,
114- } ) ;
111+ update_temp_tables. push (
112+ build_update_temp_table_req (
113+ table. as_ref ( ) ,
114+ & snapshot_generator,
115+ self . ctx . txn_mgr ( ) ,
116+ * self . table_meta_timestampss . get ( & table_id) . unwrap ( ) ,
117+ & commit_meta. hll ,
118+ insert_rows. get ( & table_id) . cloned ( ) . unwrap_or_default ( ) ,
119+ )
120+ . await ?,
121+ ) ;
115122 } else {
116123 update_table_metas. push ( (
117124 build_update_table_meta_req (
@@ -134,32 +141,42 @@ impl AsyncSink for CommitMultiTableInsert {
134141 let mut retries = 0 ;
135142
136143 loop {
137- let update_multi_table_meta_req = UpdateMultiTableMetaReq {
138- update_table_metas : update_table_metas. clone ( ) ,
139- copied_files : vec ! [ ] ,
140- update_stream_metas : self . update_stream_meta . clone ( ) ,
141- deduplicated_labels : self . deduplicated_label . clone ( ) . into_iter ( ) . collect ( ) ,
142- update_temp_tables : std:: mem:: take ( & mut update_temp_tables) ,
143- } ;
144+ let update_multi_table_meta_req = build_non_temp_update_multi_table_meta_req (
145+ update_table_metas. clone ( ) ,
146+ self . update_stream_meta . clone ( ) ,
147+ self . deduplicated_label . clone ( ) ,
148+ ) ;
144149
145- let update_meta_result = match self
146- . catalog
147- . retryable_update_multi_table_meta ( update_multi_table_meta_req)
148- . await
149- {
150- Ok ( ret) => ret,
151- Err ( e) => {
152- // other errors may occur, especially the version mismatch of streams,
153- // let's log it here for the convenience of diagnostics
154- error ! (
155- "Non-recoverable fault occurred during updating tables. {}" ,
156- e
157- ) ;
158- return Err ( e) ;
150+ let update_meta_result = if update_multi_table_meta_req. is_empty ( ) {
151+ Ok ( Default :: default ( ) )
152+ } else {
153+ match self
154+ . catalog
155+ . retryable_update_multi_table_meta ( update_multi_table_meta_req)
156+ . await
157+ {
158+ Ok ( ret) => ret,
159+ Err ( e) => {
160+ // other errors may occur, especially the version mismatch of streams,
161+ // let's log it here for the convenience of diagnostics
162+ error ! (
163+ "Non-recoverable fault occurred during updating tables. {}" ,
164+ e
165+ ) ;
166+ return Err ( e) ;
167+ }
159168 }
160169 } ;
161170
162171 let Err ( update_failed_tbls) = update_meta_result else {
172+ if !update_temp_tables. is_empty ( ) {
173+ self . catalog
174+ . update_multi_table_meta ( build_temp_update_multi_table_meta_req (
175+ std:: mem:: take ( & mut update_temp_tables) ,
176+ ) )
177+ . await ?;
178+ }
179+
163180 let table_descriptions = self
164181 . tables
165182 . values ( )
@@ -262,6 +279,56 @@ impl AsyncSink for CommitMultiTableInsert {
262279 }
263280}
264281
282+ fn build_non_temp_update_multi_table_meta_req (
283+ update_table_metas : Vec < ( UpdateTableMetaReq , TableInfo ) > ,
284+ update_stream_metas : Vec < UpdateStreamMetaReq > ,
285+ deduplicated_label : Option < String > ,
286+ ) -> UpdateMultiTableMetaReq {
287+ UpdateMultiTableMetaReq {
288+ update_table_metas,
289+ copied_files : vec ! [ ] ,
290+ update_stream_metas,
291+ deduplicated_labels : deduplicated_label. into_iter ( ) . collect ( ) ,
292+ update_temp_tables : vec ! [ ] ,
293+ }
294+ }
295+
296+ fn build_temp_update_multi_table_meta_req (
297+ update_temp_tables : Vec < UpdateTempTableReq > ,
298+ ) -> UpdateMultiTableMetaReq {
299+ UpdateMultiTableMetaReq {
300+ update_temp_tables,
301+ ..Default :: default ( )
302+ }
303+ }
304+
305+ async fn build_update_temp_table_req (
306+ table : & dyn Table ,
307+ snapshot_generator : & AppendGenerator ,
308+ txn_mgr : TxnManagerRef ,
309+ table_meta_timestamps : TableMetaTimestamps ,
310+ insert_hll : & BlockHLL ,
311+ insert_rows : u64 ,
312+ ) -> Result < UpdateTempTableReq > {
313+ let table_info = table. get_table_info ( ) ;
314+ let new_table_meta = write_new_snapshot_and_build_table_meta (
315+ table,
316+ snapshot_generator,
317+ txn_mgr,
318+ table_meta_timestamps,
319+ insert_hll,
320+ insert_rows,
321+ )
322+ . await ?;
323+
324+ Ok ( UpdateTempTableReq {
325+ table_id : table_info. ident . table_id ,
326+ new_table_meta,
327+ copied_files : Default :: default ( ) ,
328+ desc : table_info. desc . clone ( ) ,
329+ } )
330+ }
331+
265332async fn build_update_table_meta_req (
266333 table : & dyn Table ,
267334 snapshot_generator : & AppendGenerator ,
@@ -270,6 +337,37 @@ async fn build_update_table_meta_req(
270337 insert_hll : & BlockHLL ,
271338 insert_rows : u64 ,
272339) -> Result < UpdateTableMetaReq > {
340+ let fuse_table = FuseTable :: try_from_table ( table) ?;
341+ let new_table_meta = write_new_snapshot_and_build_table_meta (
342+ table,
343+ snapshot_generator,
344+ txn_mgr,
345+ table_meta_timestamps,
346+ insert_hll,
347+ insert_rows,
348+ )
349+ . await ?;
350+ let table_id = fuse_table. table_info . ident . table_id ;
351+ let table_version = fuse_table. table_info . ident . seq ;
352+
353+ let req = UpdateTableMetaReq {
354+ table_id,
355+ seq : MatchSeq :: Exact ( table_version) ,
356+ new_table_meta,
357+ base_snapshot_location : fuse_table. snapshot_loc ( ) ,
358+ lvt_check : None ,
359+ } ;
360+ Ok ( req)
361+ }
362+
363+ async fn write_new_snapshot_and_build_table_meta (
364+ table : & dyn Table ,
365+ snapshot_generator : & AppendGenerator ,
366+ txn_mgr : TxnManagerRef ,
367+ table_meta_timestamps : TableMetaTimestamps ,
368+ insert_hll : & BlockHLL ,
369+ insert_rows : u64 ,
370+ ) -> Result < TableMeta > {
273371 let fuse_table = FuseTable :: try_from_table ( table) ?;
274372 let previous = fuse_table. read_table_snapshot ( ) . await ?;
275373 let table_stats_gen = fuse_table
@@ -292,25 +390,49 @@ async fn build_update_table_meta_req(
292390 & snapshot. summary ,
293391 ) ;
294392
295- // write snapshot
296393 let dal = fuse_table. get_operator ( ) ;
297394 let location_generator = & fuse_table. meta_location_generator ;
298395 let location =
299396 location_generator. gen_snapshot_location ( & snapshot. snapshot_id , TableSnapshot :: VERSION ) ?;
300397 dal. write ( & location, snapshot. to_bytes ( ) ?) . await ?;
301398
302- // build new table meta
303- let new_table_meta =
304- FuseTable :: build_new_table_meta ( & fuse_table. table_info . meta , & location, & snapshot) ;
305- let table_id = fuse_table. table_info . ident . table_id ;
306- let table_version = fuse_table. table_info . ident . seq ;
399+ Ok ( FuseTable :: build_new_table_meta (
400+ & fuse_table. table_info . meta ,
401+ & location,
402+ & snapshot,
403+ ) )
404+ }
307405
308- let req = UpdateTableMetaReq {
309- table_id,
310- seq : MatchSeq :: Exact ( table_version) ,
311- new_table_meta,
312- base_snapshot_location : fuse_table. snapshot_loc ( ) ,
313- lvt_check : None ,
314- } ;
315- Ok ( req)
406+ #[ cfg( test) ]
407+ mod tests {
408+ use super :: * ;
409+
410+ #[ test]
411+ fn non_temp_update_req_does_not_carry_temp_table_updates ( ) {
412+ let req =
413+ build_non_temp_update_multi_table_meta_req ( vec ! [ ] , vec ! [ ] , Some ( "label" . to_string ( ) ) ) ;
414+
415+ assert ! ( req. update_table_metas. is_empty( ) ) ;
416+ assert ! ( req. copied_files. is_empty( ) ) ;
417+ assert ! ( req. update_stream_metas. is_empty( ) ) ;
418+ assert_eq ! ( req. deduplicated_labels, vec![ "label" . to_string( ) ] ) ;
419+ assert ! ( req. update_temp_tables. is_empty( ) ) ;
420+ }
421+
422+ #[ test]
423+ fn temp_update_req_only_carries_temp_table_updates ( ) {
424+ let temp_req = UpdateTempTableReq {
425+ table_id : 1 ,
426+ desc : "default.tmp" . to_string ( ) ,
427+ new_table_meta : TableMeta :: default ( ) ,
428+ copied_files : Default :: default ( ) ,
429+ } ;
430+ let req = build_temp_update_multi_table_meta_req ( vec ! [ temp_req] ) ;
431+
432+ assert ! ( req. update_table_metas. is_empty( ) ) ;
433+ assert ! ( req. copied_files. is_empty( ) ) ;
434+ assert ! ( req. update_stream_metas. is_empty( ) ) ;
435+ assert ! ( req. deduplicated_labels. is_empty( ) ) ;
436+ assert_eq ! ( req. update_temp_tables. len( ) , 1 ) ;
437+ }
316438}
0 commit comments