@@ -33,6 +33,7 @@ use databend_common_expression::TableDataType;
3333use databend_common_expression:: TableSchema ;
3434use databend_common_expression:: VIRTUAL_COLUMNS_LIMIT ;
3535use databend_common_expression:: VirtualDataSchema ;
36+ use databend_common_meta_api:: GarbageCollectionApi ;
3637use databend_common_meta_app:: schema:: ListHistoryTableBranchesReq ;
3738use databend_common_metrics:: storage:: metrics_inc_block_virtual_column_write_bytes;
3839use databend_common_metrics:: storage:: metrics_inc_block_virtual_column_write_milliseconds;
@@ -60,6 +61,7 @@ use databend_common_storages_fuse::operations::MutationLogEntry;
6061use databend_common_storages_fuse:: operations:: MutationLogs ;
6162use databend_common_storages_fuse:: operations:: TableMutationAggregator ;
6263use databend_common_storages_fuse:: operations:: VirtualSchemaMode ;
64+ use databend_common_users:: UserApiProvider ;
6365use databend_enterprise_virtual_column:: VirtualColumnRefreshResult ;
6466use databend_query:: pipelines:: PipelineBuildResult ;
6567use databend_query:: pipelines:: executor:: ExecutorSettings ;
@@ -473,6 +475,16 @@ async fn vacuum_virtual_column_orphans(
473475 . await ?;
474476 let retention_boundary =
475477 Utc :: now ( ) - Duration :: days ( ctx. get_settings ( ) . get_data_retention_time_in_days ( ) ? as i64 ) ;
478+ let meta_api = UserApiProvider :: instance ( ) . get_meta_store_client ( ) ;
479+ meta_api
480+ . fetch_set_vacuum_timestamp ( & ctx. get_tenant ( ) , retention_boundary)
481+ . await
482+ . map_err ( |e| {
483+ ErrorCode :: MetaStorageError ( format ! (
484+ "Failed to set vacuum watermark before vacuum virtual columns: {}. Vacuum aborted to prevent data inconsistency." ,
485+ e
486+ ) )
487+ } ) ?;
476488 let retain_branches = catalog
477489 . list_history_table_branches ( ListHistoryTableBranchesReq {
478490 table_id : fuse_table. get_id ( ) ,
0 commit comments