Skip to content

Commit 6612ad0

Browse files
authored
feat(query): Support fuse_block_statistics table function (#19737)
* feat(query): Support `fuse_block_statistics` table function * fix * fix * use variant type
1 parent 4e2558b commit 6612ad0

6 files changed

Lines changed: 273 additions & 1 deletion

File tree

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_exception::Result;
2222
use databend_common_storages_fuse::table_functions::ClusteringStatisticsFunc;
2323
use databend_common_storages_fuse::table_functions::FuseAmendTable;
2424
use databend_common_storages_fuse::table_functions::FuseBlockFunc;
25+
use databend_common_storages_fuse::table_functions::FuseBlockStatisticsFunc;
2526
use databend_common_storages_fuse::table_functions::FuseColumnFunc;
2627
use databend_common_storages_fuse::table_functions::FuseDumpSnapshotsFunc;
2728
use databend_common_storages_fuse::table_functions::FuseEncodingFunc;
@@ -205,6 +206,14 @@ impl TableFunctionFactory {
205206
),
206207
);
207208

209+
creators.insert(
210+
"fuse_block_statistics".to_string(),
211+
(
212+
next_id(),
213+
Arc::new(TableFunctionTemplate::<FuseBlockStatisticsFunc>::create),
214+
),
215+
);
216+
208217
creators.insert(
209218
"fuse_page".to_string(),
210219
(
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
use std::sync::Arc;
17+
18+
use databend_common_catalog::table::Table;
19+
use databend_common_exception::Result;
20+
use databend_common_expression::DataBlock;
21+
use databend_common_expression::FromData;
22+
use databend_common_expression::FunctionContext;
23+
use databend_common_expression::Scalar;
24+
use databend_common_expression::TableDataType;
25+
use databend_common_expression::TableField;
26+
use databend_common_expression::TableSchema;
27+
use databend_common_expression::TableSchemaRefExt;
28+
use databend_common_expression::types::NumberDataType;
29+
use databend_common_expression::types::NumberScalar;
30+
use databend_common_expression::types::StringType;
31+
use databend_common_expression::types::UInt64Type;
32+
use databend_common_expression::types::VariantType;
33+
use databend_common_expression::types::number::F64;
34+
use databend_common_expression::types::variant::cast_scalar_to_variant;
35+
use databend_storages_common_table_meta::meta::SegmentInfo;
36+
use databend_storages_common_table_meta::meta::SpatialStatistics;
37+
use databend_storages_common_table_meta::meta::TableSnapshot;
38+
39+
use crate::FuseTable;
40+
use crate::io::SegmentsIO;
41+
use crate::sessions::TableContext;
42+
use crate::table_functions::TableMetaFuncTemplate;
43+
use crate::table_functions::function_template::TableMetaFunc;
44+
45+
pub struct FuseBlockStatistics;
46+
pub type FuseBlockStatisticsFunc = TableMetaFuncTemplate<FuseBlockStatistics>;
47+
48+
#[async_trait::async_trait]
49+
impl TableMetaFunc for FuseBlockStatistics {
50+
fn schema() -> Arc<TableSchema> {
51+
TableSchemaRefExt::create(vec![
52+
TableField::new("block_location", TableDataType::String),
53+
TableField::new("column_id", TableDataType::Number(NumberDataType::UInt64)),
54+
TableField::new("column_name", TableDataType::String),
55+
TableField::new(
56+
"statistics",
57+
TableDataType::Nullable(Box::new(TableDataType::Variant)),
58+
),
59+
TableField::new(
60+
"spatial_statistics",
61+
TableDataType::Nullable(Box::new(TableDataType::Variant)),
62+
),
63+
])
64+
}
65+
66+
async fn apply(
67+
ctx: &Arc<dyn TableContext>,
68+
tbl: &FuseTable,
69+
snapshot: Arc<TableSnapshot>,
70+
limit: Option<usize>,
71+
) -> Result<DataBlock> {
72+
let limit = limit.unwrap_or(usize::MAX);
73+
let schema = tbl.schema();
74+
let func_ctx = ctx.get_function_context()?;
75+
let estimated_rows = std::cmp::min(
76+
snapshot.summary.block_count as usize * schema.num_fields().max(1),
77+
limit,
78+
);
79+
80+
let mut block_locations = Vec::with_capacity(estimated_rows);
81+
let mut column_ids = Vec::with_capacity(estimated_rows);
82+
let mut column_names = Vec::with_capacity(estimated_rows);
83+
let mut statistics = Vec::with_capacity(estimated_rows);
84+
let mut spatial_statistics = Vec::with_capacity(estimated_rows);
85+
86+
let segments_io = SegmentsIO::create(ctx.clone(), tbl.operator.clone(), schema.clone());
87+
88+
let mut num_rows = 0;
89+
let chunk_size = std::cmp::min(
90+
ctx.get_settings().get_max_threads()? as usize * 4,
91+
snapshot.summary.block_count as usize,
92+
)
93+
.max(1);
94+
'outer: for chunk in snapshot.segments.chunks(chunk_size) {
95+
let segments = segments_io
96+
.read_segments::<SegmentInfo>(chunk, true)
97+
.await?;
98+
for segment in segments {
99+
let segment = segment?;
100+
101+
for block in segment.blocks.iter() {
102+
let block = block.as_ref();
103+
let col_stats = block.col_stats.iter().collect::<BTreeMap<_, _>>();
104+
let spatial_stats = block
105+
.spatial_stats
106+
.as_ref()
107+
.map(|stats| stats.iter().collect::<BTreeMap<_, _>>());
108+
109+
for (column_id, column_stat) in col_stats {
110+
let Ok(field) = schema.field_of_column_id(*column_id) else {
111+
continue;
112+
};
113+
block_locations.push(block.location.0.clone());
114+
column_ids.push(*column_id as u64);
115+
column_names.push(field.name().to_string());
116+
let stat = build_column_statistics_variant(
117+
column_stat,
118+
field.data_type().remove_nullable(),
119+
&func_ctx,
120+
);
121+
statistics.push(Some(stat));
122+
spatial_statistics.push(None);
123+
124+
num_rows += 1;
125+
if num_rows >= limit {
126+
break 'outer;
127+
}
128+
}
129+
130+
if let Some(spatial_stats) = &spatial_stats {
131+
for (column_id, spatial_stat) in spatial_stats {
132+
let Ok(field) = schema.field_of_column_id(**column_id) else {
133+
continue;
134+
};
135+
block_locations.push(block.location.0.clone());
136+
column_ids.push(**column_id as u64);
137+
column_names.push(field.name().to_string());
138+
statistics.push(None);
139+
let stat = build_spatial_statistics_variant(spatial_stat, &func_ctx);
140+
spatial_statistics.push(Some(stat));
141+
142+
num_rows += 1;
143+
if num_rows >= limit {
144+
break 'outer;
145+
}
146+
}
147+
}
148+
}
149+
}
150+
}
151+
152+
Ok(DataBlock::new_from_columns(vec![
153+
StringType::from_data(block_locations),
154+
UInt64Type::from_data(column_ids),
155+
StringType::from_data(column_names),
156+
VariantType::from_opt_data(statistics),
157+
VariantType::from_opt_data(spatial_statistics),
158+
]))
159+
}
160+
}
161+
162+
fn build_column_statistics_variant(
163+
column_stat: &databend_storages_common_table_meta::meta::ColumnStatistics,
164+
field_type: TableDataType,
165+
func_ctx: &FunctionContext,
166+
) -> Vec<u8> {
167+
let scalar = Scalar::Tuple(vec![
168+
column_stat.min.clone(),
169+
column_stat.max.clone(),
170+
Scalar::Number(NumberScalar::UInt64(column_stat.null_count)),
171+
Scalar::Number(NumberScalar::UInt64(column_stat.in_memory_size)),
172+
column_stat
173+
.distinct_of_values
174+
.map(|value| Scalar::Number(NumberScalar::UInt64(value)))
175+
.unwrap_or(Scalar::Null),
176+
]);
177+
let data_type = TableDataType::Tuple {
178+
fields_name: vec![
179+
"min".to_string(),
180+
"max".to_string(),
181+
"null_count".to_string(),
182+
"in_memory_size".to_string(),
183+
"distinct_count".to_string(),
184+
],
185+
fields_type: vec![
186+
field_type.clone(),
187+
field_type,
188+
TableDataType::Number(NumberDataType::UInt64),
189+
TableDataType::Number(NumberDataType::UInt64),
190+
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
191+
],
192+
};
193+
194+
build_variant(scalar, &data_type, func_ctx)
195+
}
196+
197+
fn build_spatial_statistics_variant(
198+
spatial_stat: &SpatialStatistics,
199+
func_ctx: &FunctionContext,
200+
) -> Vec<u8> {
201+
let scalar = Scalar::Tuple(vec![
202+
Scalar::Number(NumberScalar::Float64(F64::from(spatial_stat.min_x.0))),
203+
Scalar::Number(NumberScalar::Float64(F64::from(spatial_stat.min_y.0))),
204+
Scalar::Number(NumberScalar::Float64(F64::from(spatial_stat.max_x.0))),
205+
Scalar::Number(NumberScalar::Float64(F64::from(spatial_stat.max_y.0))),
206+
Scalar::Number(NumberScalar::Int32(spatial_stat.srid)),
207+
Scalar::Boolean(spatial_stat.has_null),
208+
Scalar::Boolean(spatial_stat.has_empty_rect),
209+
Scalar::Boolean(spatial_stat.is_valid),
210+
]);
211+
let data_type = TableDataType::Tuple {
212+
fields_name: vec![
213+
"min_x".to_string(),
214+
"min_y".to_string(),
215+
"max_x".to_string(),
216+
"max_y".to_string(),
217+
"srid".to_string(),
218+
"has_null".to_string(),
219+
"has_empty_rect".to_string(),
220+
"is_valid".to_string(),
221+
],
222+
fields_type: vec![
223+
TableDataType::Number(NumberDataType::Float64),
224+
TableDataType::Number(NumberDataType::Float64),
225+
TableDataType::Number(NumberDataType::Float64),
226+
TableDataType::Number(NumberDataType::Float64),
227+
TableDataType::Number(NumberDataType::Int32),
228+
TableDataType::Boolean,
229+
TableDataType::Boolean,
230+
TableDataType::Boolean,
231+
],
232+
};
233+
234+
build_variant(scalar, &data_type, func_ctx)
235+
}
236+
237+
fn build_variant(scalar: Scalar, data_type: &TableDataType, func_ctx: &FunctionContext) -> Vec<u8> {
238+
let mut buf = Vec::new();
239+
cast_scalar_to_variant(scalar.as_ref(), &func_ctx.tz, &mut buf, Some(data_type));
240+
buf
241+
}

src/query/storages/fuse/src/table_functions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod clustering_statistics;
1717
mod function_template;
1818
mod fuse_amend;
1919
mod fuse_block;
20+
mod fuse_block_statistics;
2021
mod fuse_column;
2122
mod fuse_dump_snapshot;
2223
mod fuse_encoding;
@@ -41,6 +42,7 @@ pub use function_template::TableFunctionTemplate;
4142
pub use function_template::*;
4243
pub use fuse_amend::FuseAmendTable;
4344
pub use fuse_block::FuseBlockFunc;
45+
pub use fuse_block_statistics::FuseBlockStatisticsFunc;
4446
pub use fuse_column::FuseColumnFunc;
4547
pub use fuse_dump_snapshot::FuseDumpSnapshotsFunc;
4648
pub use fuse_encoding::FuseEncodingFunc;

tests/sqllogictests/suites/base/06_show/06_0014_show_table_functions.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SHOW TABLE_FUNCTIONS LIKE 'fuse%'
1010
----
1111
fuse_amend
1212
fuse_block
13+
fuse_block_statistics
1314
fuse_column
1415
fuse_dump_snapshots
1516
fuse_encoding

tests/sqllogictests/suites/query/index/10_spatial_index/10_0000_spatial_index_base.test

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,25 @@ REFRESH SPATIAL INDEX idx_refresh ON t
7070
----
7171
0
7272

73+
query ITTT
74+
SELECT column_id, column_name, statistics, spatial_statistics FROM fuse_block_statistics('test_spatial_index', 't');
75+
----
76+
0 id {"distinct_count":2,"in_memory_size":9,"max":2,"min":1,"null_count":0} NULL
77+
1 geom NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":11.0,"max_y":11.0,"min_x":10.0,"min_y":10.0,"srid":0}
78+
2 geom2 NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":101.0,"max_y":101.0,"min_x":100.0,"min_y":100.0,"srid":0}
79+
0 id {"distinct_count":2,"in_memory_size":9,"max":4,"min":3,"null_count":0} NULL
80+
1 geom NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":21.0,"max_y":21.0,"min_x":20.0,"min_y":20.0,"srid":0}
81+
2 geom2 NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":201.0,"max_y":201.0,"min_x":200.0,"min_y":200.0,"srid":0}
82+
0 id {"distinct_count":2,"in_memory_size":9,"max":6,"min":5,"null_count":0} NULL
83+
1 geom NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":40.0,"max_y":40.0,"min_x":30.0,"min_y":30.0,"srid":0}
84+
2 geom2 NULL {"has_empty_rect":false,"has_null":false,"is_valid":true,"max_x":400.0,"max_y":400.0,"min_x":300.0,"min_y":300.0,"srid":0}
85+
86+
query ITTT
87+
SELECT column_id, column_name, statistics, spatial_statistics FROM fuse_block_statistics('test_spatial_index', 't') where statistics:max > 2;
88+
----
89+
0 id {"distinct_count":2,"in_memory_size":9,"max":4,"min":3,"null_count":0} NULL
90+
0 id {"distinct_count":2,"in_memory_size":9,"max":6,"min":5,"null_count":0} NULL
91+
7392
statement ok
7493
CREATE TABLE spatial_stores (store_id INT, category String, store_name String, status String, location Geometry, SPATIAL INDEX idx (location)) row_per_block=2
7594

tests/sqllogictests/suites/stage/formats/parquet/parquet_metadata.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ parquet/ii/f2.parquet 3 0 3
77

88

99
query TIII
10-
select metadata$filename, c1, metadata$file_row_number, id from @data_s3/parquet/ii/ where metadata$file_row_number = 0 and id > 1;
10+
select metadata$filename, c1, metadata$file_row_number, id from @data_s3/parquet/ii/ where metadata$file_row_number = 0 and id > 1 order by metadata$filename;
1111
----
1212
parquet/ii/f2.parquet 3 0 3
1313
parquet/ii/f3.parquet 5 0 5

0 commit comments

Comments
 (0)