From 5f2b12a479568df5be9a0c638206056fb7f03236 Mon Sep 17 00:00:00 2001 From: kould Date: Sat, 6 Jun 2026 01:17:22 +0800 Subject: [PATCH 1/3] Reduce runtime metadata overhead --- src/binder/alter_table.rs | 4 +-- src/binder/analyze.rs | 4 +-- src/binder/copy.rs | 4 +-- src/binder/create_index.rs | 4 +-- src/binder/create_table.rs | 6 ++--- src/binder/create_view.rs | 7 +++-- src/binder/delete.rs | 4 +-- src/binder/describe.rs | 4 +-- src/binder/drop_table.rs | 4 +-- src/binder/insert.rs | 5 ++-- src/binder/select.rs | 13 +++------ src/binder/truncate.rs | 4 +-- src/binder/update.rs | 4 +-- src/db.rs | 10 +++++++ src/errors.rs | 4 +-- src/execution/dml/analyze.rs | 3 +-- src/execution/dql/aggregate/mod.rs | 2 +- src/execution/dql/join/nested_loop_join.rs | 12 ++++----- src/execution/dql/sort.rs | 12 ++++----- src/execution/dql/top_k.rs | 13 +++++---- src/expression/evaluator.rs | 5 +--- src/expression/function/table.rs | 4 +-- src/expression/mod.rs | 19 +++++-------- src/expression/range_detacher.rs | 16 +++++++---- src/function/numbers.rs | 6 ++--- src/macros/mod.rs | 23 +++++++++------- .../rule/normalization/agg_elimination.rs | 8 +++--- .../rule/normalization/pushdown_predicates.rs | 2 +- src/serdes/column.rs | 13 +++++---- src/storage/table_codec.rs | 5 ++-- src/types/evaluator/binary.rs | 22 +++++++-------- src/types/evaluator/boolean.rs | 11 ++++---- src/types/evaluator/cast.rs | 13 +++++---- src/types/evaluator/decimal.rs | 23 ++++++++-------- src/types/evaluator/float32.rs | 27 +++++++++---------- src/types/evaluator/float64.rs | 27 +++++++++---------- src/types/evaluator/null.rs | 7 +++-- src/types/evaluator/time32.rs | 17 ++++++------ src/types/evaluator/time64.rs | 13 +++++---- src/types/evaluator/tuple.rs | 15 +++++------ src/types/evaluator/unary.rs | 4 +-- src/types/evaluator/utf8.rs | 19 +++++++------ tests/macros-test/src/main.rs | 3 ++- 43 files changed, 206 insertions(+), 219 deletions(-) diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index 93b9bc2e..301b3a14 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -15,10 +15,10 @@ use sqlparser::ast::{AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName}; use std::borrow::Cow; -use std::sync::Arc; use super::{attach_span_if_absent, is_valid_identifier, Binder}; use crate::binder::lower_case_name; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::alter_table::add_column::AddColumnOperator; @@ -83,7 +83,7 @@ impl> Binder<'_, '_, T, A> name: &ObjectName, operation: &AlterTableOperation, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); let table = self .context .table(table_name.clone())? diff --git a/src/binder/analyze.rs b/src/binder/analyze.rs index 4f355fce..adf4d0da 100644 --- a/src/binder/analyze.rs +++ b/src/binder/analyze.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder, Source}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::analyze::AnalyzeOperator; use crate::planner::operator::table_scan::TableScanOperator; @@ -21,11 +22,10 @@ use crate::planner::{Childrens, LogicalPlan}; use crate::storage::Transaction; use crate::types::value::DataValue; use sqlparser::ast::ObjectName; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); let table = self .context diff --git a/src/binder/copy.rs b/src/binder/copy.rs index c690c64d..667242c8 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -14,9 +14,9 @@ use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; use super::*; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::copy_from_file::CopyFromFileOperator; use crate::planner::operator::copy_to_file::CopyToFileOperator; @@ -110,7 +110,7 @@ impl> Binder<'_, '_, T, A> )); } }; - let table_name: Arc = lower_case_name(&table_name)?.into(); + let table_name: TableName = lower_case_name(&table_name)?.into(); if let Some(table) = self.context.table(table_name.clone())? { let schema_ref = table.schema_ref().clone(); diff --git a/src/binder/create_index.rs b/src/binder/create_index.rs index be6594ef..bc8d30af 100644 --- a/src/binder/create_index.rs +++ b/src/binder/create_index.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder, Source}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::create_index::CreateIndexOperator; @@ -23,7 +24,6 @@ use crate::storage::Transaction; use crate::types::index::IndexType; use crate::types::value::DataValue; use sqlparser::ast::{IndexColumn, ObjectName}; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_create_index( @@ -34,7 +34,7 @@ impl> Binder<'_, '_, T, A> if_not_exists: bool, is_unique: bool, ) -> Result { - let table_name: Arc = lower_case_name(table_name)?.into(); + let table_name: TableName = lower_case_name(table_name)?.into(); let index_name = name .ok_or(DatabaseError::InvalidIndex) .and_then(lower_case_name)?; diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 6ff89a0c..8a4a58ce 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -14,7 +14,7 @@ use super::{attach_span_if_absent, is_valid_identifier, Binder}; use crate::binder::lower_case_name; -use crate::catalog::{ColumnCatalog, ColumnDesc}; +use crate::catalog::{ColumnCatalog, ColumnDesc, TableName}; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; use crate::planner::operator::create_table::CreateTableOperator; @@ -27,7 +27,6 @@ use itertools::Itertools; use sqlparser::ast::{ColumnDef, ColumnOption, Expr, IndexColumn, ObjectName, TableConstraint}; use std::borrow::Cow; use std::collections::HashSet; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { // TODO: TableConstraint @@ -38,7 +37,7 @@ impl> Binder<'_, '_, T, A> constraints: &[TableConstraint], if_not_exists: bool, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); if !is_valid_identifier(&table_name) { return Err(attach_span_if_absent( @@ -192,6 +191,7 @@ mod tests { use crate::utils::lru::SharedLruCache; use std::hash::RandomState; use std::sync::atomic::AtomicUsize; + use std::sync::Arc; use tempfile::TempDir; #[test] diff --git a/src/binder/create_view.rs b/src/binder/create_view.rs index 9dada889..82b35ae9 100644 --- a/src/binder/create_view.rs +++ b/src/binder/create_view.rs @@ -14,7 +14,7 @@ use crate::binder::{lower_case_name, lower_ident, Binder}; use crate::catalog::view::View; -use crate::catalog::{ColumnCatalog, ColumnRef}; +use crate::catalog::{ColumnCatalog, ColumnRef, TableName}; use crate::errors::DatabaseError; use crate::expression::{AliasType, ScalarExpression}; use crate::planner::operator::create_view::CreateViewOperator; @@ -24,7 +24,6 @@ use crate::storage::Transaction; use crate::types::value::DataValue; use itertools::Itertools; use sqlparser::ast::{ObjectName, Query, ViewColumnDef}; -use std::sync::Arc; use ulid::Ulid; impl> Binder<'_, '_, T, A> { @@ -36,7 +35,7 @@ impl> Binder<'_, '_, T, A> query: &Query, ) -> Result { fn projection_exprs( - view_name: &Arc, + view_name: &TableName, mapping_schema: &[ColumnRef], column_names: impl Iterator, ) -> Vec { @@ -62,7 +61,7 @@ impl> Binder<'_, '_, T, A> .collect_vec() } - let view_name: Arc = lower_case_name(name)?.into(); + let view_name: TableName = lower_case_name(name)?.into(); let mut plan = self.bind_query(query)?; let mapping_schema = plan.output_schema(); diff --git a/src/binder/delete.rs b/src/binder/delete.rs index 49026f89..48993a90 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::Operator; @@ -21,7 +22,6 @@ use crate::storage::Transaction; use crate::types::value::DataValue; use itertools::Itertools; use sqlparser::ast::{Expr, TableFactor, TableWithJoins}; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_delete( @@ -30,7 +30,7 @@ impl> Binder<'_, '_, T, A> selection: &Option, ) -> Result { if let TableFactor::Table { name, .. } = &from.relation { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); let table = self .context .table(table_name.clone())? diff --git a/src/binder/describe.rs b/src/binder/describe.rs index efc80594..aee151ff 100644 --- a/src/binder/describe.rs +++ b/src/binder/describe.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::describe::DescribeOperator; use crate::planner::operator::Operator; @@ -20,14 +21,13 @@ use crate::planner::{Childrens, LogicalPlan}; use crate::storage::Transaction; use crate::types::value::DataValue; use sqlparser::ast::ObjectName; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_describe( &mut self, name: &ObjectName, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); Ok(LogicalPlan::new( Operator::Describe(DescribeOperator { table_name }), diff --git a/src/binder/drop_table.rs b/src/binder/drop_table.rs index ff0cbfa6..24b58f62 100644 --- a/src/binder/drop_table.rs +++ b/src/binder/drop_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::drop_table::DropTableOperator; use crate::planner::operator::Operator; @@ -20,7 +21,6 @@ use crate::planner::{Childrens, LogicalPlan}; use crate::storage::Transaction; use crate::types::value::DataValue; use sqlparser::ast::ObjectName; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_drop_table( @@ -28,7 +28,7 @@ impl> Binder<'_, '_, T, A> name: &ObjectName, if_exists: &bool, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); Ok(LogicalPlan::new( Operator::DropTable(DropTableOperator { diff --git a/src/binder/insert.rs b/src/binder/insert.rs index fe23a716..6e258335 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -16,6 +16,7 @@ use crate::binder::{ attach_span_from_sqlparser_span_if_absent, attach_span_if_absent, lower_case_name, lower_ident, Binder, }; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::simplify::ConstantCalculator; use crate::expression::visitor_mut::VisitorMut; @@ -44,7 +45,7 @@ impl> Binder<'_, '_, T, A> ) -> Result { // FIXME: Make it better to detect the current BindStep self.context.allow_default = true; - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); let source = self .context @@ -145,7 +146,7 @@ impl> Binder<'_, '_, T, A> query: &Query, is_overwrite: bool, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); let table_schema = { let source = self .context diff --git a/src/binder/select.rs b/src/binder/select.rs index 22a1a2d0..a6618dc1 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -586,7 +586,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' ColumnDesc::new(typ, None, false, None)?, ); column_ref.set_ref_table(value_name.clone(), ColumnId::default(), true); - Ok(ColumnRef(Arc::new(column_ref))) + Ok(ColumnRef::from(column_ref)) }) .collect::>()?; @@ -856,7 +856,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' }) = alias { let source_name = self.context.temp_table(); - let table_alias: Arc = name.value.to_lowercase().into(); + let table_alias: TableName = name.value.to_lowercase().into(); plan = self.bind_alias( plan, @@ -903,7 +903,6 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' if let ScalarExpression::TableFunction(function) = self.bind_expr(expr)? { let mut table_alias = None; let table_name: TableName = function.summary().name.clone(); - let table = function.table(); let mut plan = FunctionScanOperator::build(function); if let Some(TableAlias { @@ -922,11 +921,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' )?; } - let source = if table_alias.is_some() { - Source::Schema(plan.output_schema().clone()) - } else { - Source::Table(table) - }; + let source = Source::Schema(plan.output_schema().clone()); self.context .add_bound_source(table_name, table_alias, joint_type, source); plan @@ -1115,7 +1110,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' } } SelectItem::QualifiedWildcard(table_name, _) => { - let table_name: Arc = match table_name { + let table_name: TableName = match table_name { SelectItemQualifiedWildcardKind::ObjectName(name) => { lower_case_name(name)?.into() } diff --git a/src/binder/truncate.rs b/src/binder/truncate.rs index 222f8305..fb0ef677 100644 --- a/src/binder/truncate.rs +++ b/src/binder/truncate.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::binder::{lower_case_name, Binder}; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::planner::operator::truncate::TruncateOperator; use crate::planner::operator::Operator; @@ -20,14 +21,13 @@ use crate::planner::{Childrens, LogicalPlan}; use crate::storage::Transaction; use crate::types::value::DataValue; use sqlparser::ast::ObjectName; -use std::sync::Arc; impl> Binder<'_, '_, T, A> { pub(crate) fn bind_truncate( &mut self, name: &ObjectName, ) -> Result { - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); Ok(LogicalPlan::new( Operator::Truncate(TruncateOperator { table_name }), diff --git a/src/binder/update.rs b/src/binder/update.rs index 3b99fb5e..94991cb1 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -15,6 +15,7 @@ use crate::binder::{ attach_span_from_sqlparser_span_if_absent, attach_span_if_absent, lower_case_name, Binder, }; +use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::visitor_mut::VisitorMut; use crate::expression::ScalarExpression; @@ -29,7 +30,6 @@ use sqlparser::ast::{ }; use std::borrow::Cow; use std::slice; -use std::sync::Arc; struct UpdateExprTargetRemapper<'a> { target_schema: &'a [crate::catalog::ColumnRef], @@ -78,7 +78,7 @@ impl> Binder<'_, '_, T, A> self.context.allow_default = true; if let TableFactor::Table { name, .. } = &to.relation { let is_joined_update = !to.joins.is_empty(); - let table_name: Arc = lower_case_name(name)?.into(); + let table_name: TableName = lower_case_name(name)?.into(); self.with_pk(table_name.clone()); let mut plan = self.bind_table_ref(to)?; diff --git a/src/db.rs b/src/db.rs index 2fd8138e..2f024c09 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1128,6 +1128,16 @@ pub(crate) mod test { use std::time::Duration; use tempfile::TempDir; + fn assert_send_sync() {} + + #[test] + fn database_handles_are_send_sync() { + #[cfg(feature = "rocksdb")] + assert_send_sync::>(); + #[cfg(feature = "lmdb")] + assert_send_sync::>(); + } + pub(crate) fn build_table( table_cache: &TableCache, transaction: &mut T, diff --git a/src/errors.rs b/src/errors.rs index a1dadcaa..837d7f8c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::expression::{BinaryOperator, ScalarExpression, UnaryOperator}; +use crate::expression::{BinaryOperator, UnaryOperator}; use crate::types::tuple::TupleId; use crate::types::LogicalType; use chrono::ParseError; @@ -247,8 +247,6 @@ pub enum DatabaseError { TupleIdNotFound(TupleId), #[error("there are more buckets: {0} than elements: {1}")] TooManyBuckets(usize, usize), - #[error("this scalar expression: '{0}' unbind position")] - UnbindExpressionPosition(ScalarExpression), #[error("unsupported unary operator: {0} cannot support {1} for calculations")] UnsupportedUnaryOperator(LogicalType, UnaryOperator), #[error("unsupported binary operator: {0} cannot support {1} for calculations")] diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 62370858..b512f746 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -27,7 +27,6 @@ use crate::types::value::{DataValue, Utf8Type}; use crate::types::CharLengthUnits; use itertools::Itertools; use std::fmt::{self, Formatter}; -use std::sync::Arc; const DEFAULT_NUM_OF_BUCKETS: usize = 100; @@ -110,7 +109,7 @@ impl Analyze { if values.len() == 1 { builder.append(&values[0])?; } else { - builder.append(&Arc::new(DataValue::Tuple(values, false)))?; + builder.append(&DataValue::Tuple(values, false))?; } } } diff --git a/src/execution/dql/aggregate/mod.rs b/src/execution/dql/aggregate/mod.rs index 78d6a409..44a9bcb1 100644 --- a/src/execution/dql/aggregate/mod.rs +++ b/src/execution/dql/aggregate/mod.rs @@ -34,7 +34,7 @@ use std::borrow::Cow; /// Tips: Idea for sqlrs /// An accumulator represents a stateful object that lives throughout the evaluation of multiple /// rows and generically accumulates values. -pub trait Accumulator: Send + Sync { +pub trait Accumulator { /// updates the accumulator's state from a vector of arrays. fn update_value(&mut self, value: &DataValue) -> Result<(), DatabaseError>; diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index 50eaa5be..788ba9b3 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -462,10 +462,10 @@ mod test { use crate::planner::Childrens; use crate::storage::rocksdb::{RocksStorage, RocksTransaction}; use crate::storage::Storage; - use crate::types::evaluator::int32::Int32GtBinaryEvaluator; - use crate::types::evaluator::BinaryEvaluatorBox; + use crate::types::evaluator::binary_create; use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; + use std::borrow::Cow; use std::collections::HashSet; use std::hash::RandomState; use std::sync::Arc; @@ -598,11 +598,9 @@ mod test { ColumnRef::from(ColumnCatalog::new("c4".to_owned(), true, desc.clone())), 3, )), - evaluator: Some(BinaryEvaluatorBox::new( - Arc::new(Int32GtBinaryEvaluator), - LogicalType::Integer, - BinaryOperator::Gt, - )), + evaluator: Some( + binary_create(Cow::Owned(LogicalType::Integer), BinaryOperator::Gt).unwrap(), + ), ty: LogicalType::Boolean, }; diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 0c80a6e9..94983d53 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -374,11 +374,11 @@ mod test { let fn_sort_fields = |asc: bool, nulls_first: bool| { vec![SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 0, }, asc, @@ -525,11 +525,11 @@ mod test { vec![ SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 0, }, asc: asc_1, @@ -537,11 +537,11 @@ mod test { }, SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 1, }, asc: asc_2, diff --git a/src/execution/dql/top_k.rs b/src/execution/dql/top_k.rs index d6d8e47e..93b55f8c 100644 --- a/src/execution/dql/top_k.rs +++ b/src/execution/dql/top_k.rs @@ -173,18 +173,17 @@ mod test { use crate::types::LogicalType; use bumpalo::Bump; use std::collections::BTreeSet; - use std::sync::Arc; #[test] fn test_top_k_sort() -> Result<(), DatabaseError> { let fn_sort_fields = |asc: bool, nulls_first: bool| { vec![SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 0, }, asc, @@ -354,11 +353,11 @@ mod test { vec![ SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 0, }, asc: asc_1, @@ -366,11 +365,11 @@ mod test { }, SortField { expr: ScalarExpression::ColumnRef { - column: ColumnRef(Arc::new(ColumnCatalog::new( + column: ColumnRef::from(ColumnCatalog::new( String::new(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - ))), + )), position: 1, }, asc: asc_2, diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index e05e855c..045dfa96 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -48,10 +48,7 @@ impl ScalarExpression { return Ok(DataValue::Null); }; if let AliasType::Expr(inner_expr) = alias { - match inner_expr.eval(Some(tuple)) { - Err(DatabaseError::UnbindExpressionPosition(_)) => expr.eval(Some(tuple)), - res => res, - } + inner_expr.eval(Some(tuple)) } else { expr.eval(Some(tuple)) } diff --git a/src/expression/function/table.rs b/src/expression/function/table.rs index 3c991a68..308ee308 100644 --- a/src/expression/function/table.rs +++ b/src/expression/function/table.rs @@ -63,7 +63,7 @@ pub trait TableFunctionImpl: Debug + Send + Sync { fn output_schema(&self) -> &SchemaRef; - fn table(&self) -> &'static TableCatalog; + fn table(&self) -> &TableCatalog; } impl TableFunction { @@ -75,7 +75,7 @@ impl TableFunction { self.inner.output_schema() } - pub fn table(&self) -> &'static TableCatalog { + pub fn table(&self) -> &TableCatalog { self.inner.table() } } diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 984e33f2..67ecf183 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -875,9 +875,7 @@ mod test { use crate::serdes::{ReferenceDecodeContext, ReferenceSerialization, ReferenceTables}; use crate::storage::rocksdb::{RocksStorage, RocksTransaction}; use crate::storage::{Storage, Transaction}; - use crate::types::evaluator::boolean::BooleanNotUnaryEvaluator; - use crate::types::evaluator::int32::Int32PlusBinaryEvaluator; - use crate::types::evaluator::{cast_create, BinaryEvaluatorBox, UnaryEvaluatorBox}; + use crate::types::evaluator::{binary_create, cast_create, unary_create}; use crate::types::value::{DataValue, Utf8Type}; use crate::types::CharLengthUnits; use crate::types::LogicalType; @@ -1071,11 +1069,10 @@ mod test { ScalarExpression::Unary { op: UnaryOperator::Plus, expr: Box::new(ScalarExpression::Empty), - evaluator: Some(UnaryEvaluatorBox::new( - Arc::new(BooleanNotUnaryEvaluator), - LogicalType::Boolean, + evaluator: Some(unary_create( + Cow::Owned(LogicalType::Boolean), UnaryOperator::Not, - )), + )?), ty: LogicalType::Integer, }, Some(&context), @@ -1098,11 +1095,9 @@ mod test { op: BinaryOperator::Plus, left_expr: Box::new(ScalarExpression::Empty), right_expr: Box::new(ScalarExpression::Empty), - evaluator: Some(BinaryEvaluatorBox::new( - Arc::new(Int32PlusBinaryEvaluator), - LogicalType::Integer, - BinaryOperator::Plus, - )), + evaluator: Some( + binary_create(Cow::Owned(LogicalType::Integer), BinaryOperator::Plus).unwrap(), + ), ty: LogicalType::Integer, }, Some(&context), diff --git a/src/expression/range_detacher.rs b/src/expression/range_detacher.rs index 3265e50b..c43f5774 100644 --- a/src/expression/range_detacher.rs +++ b/src/expression/range_detacher.rs @@ -806,6 +806,7 @@ mod test { use crate::binder::test::build_t1_table; use crate::errors::DatabaseError; use crate::expression::range_detacher::{Range, RangeDetacher}; + use crate::expression::BinaryOperator; use crate::optimizer::heuristic::batch::HepBatchStrategy; use crate::optimizer::heuristic::optimizer::HepOptimizerPipeline; use crate::optimizer::rule::normalization::NormalizationRuleImpl; @@ -813,9 +814,9 @@ mod test { use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use crate::storage::rocksdb::RocksTransaction; - use crate::types::evaluator::tuple::TupleLtBinaryEvaluator; - use crate::types::evaluator::BinaryEvaluator; + use crate::types::evaluator::binary_create; use crate::types::value::DataValue; + use crate::types::LogicalType; use std::ops::Bound; fn plan_filter(plan: LogicalPlan) -> Result, DatabaseError> { @@ -1975,7 +1976,7 @@ mod test { } #[test] - fn test_to_tuple_range_none() { + fn test_to_tuple_range_none() -> Result<(), DatabaseError> { let range = Range::Scope { min: Bound::Included(DataValue::Int32(2)), max: Bound::Unbounded, @@ -2009,8 +2010,13 @@ mod test { unreachable!() }; assert_eq!( - TupleLtBinaryEvaluator.binary_eval(&min, &max).unwrap(), + binary_create( + std::borrow::Cow::Owned(LogicalType::Tuple(vec![])), + BinaryOperator::Lt + )? + .binary_eval(&min, &max)?, DataValue::Boolean(true) - ) + ); + Ok(()) } } diff --git a/src/function/numbers.rs b/src/function/numbers.rs index b5ad1cdf..4445d109 100644 --- a/src/function/numbers.rs +++ b/src/function/numbers.rs @@ -23,8 +23,6 @@ use crate::types::tuple::SchemaRef; use crate::types::tuple::Tuple; use crate::types::value::DataValue; use crate::types::LogicalType; -use serde::Deserialize; -use serde::Serialize; use std::sync::Arc; use std::sync::LazyLock; @@ -40,7 +38,7 @@ static NUMBERS: LazyLock = LazyLock::new(|| { .unwrap() }); -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub(crate) struct Numbers { summary: FunctionSummary, } @@ -85,7 +83,7 @@ impl TableFunctionImpl for Numbers { &self.summary } - fn table(&self) -> &'static TableCatalog { + fn table(&self) -> &TableCatalog { &NUMBERS } } diff --git a/src/macros/mod.rs b/src/macros/mod.rs index bf373a27..aee17e8a 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -80,14 +80,14 @@ macro_rules! from_tuple { #[macro_export] macro_rules! scala_function { ($struct_name:ident::$function_name:ident($($arg_ty:expr),*) -> $return_ty:expr => $closure:expr) => { - #[derive(Debug, ::serde::Serialize, ::serde::Deserialize)] + #[derive(Debug)] pub(crate) struct $struct_name { summary: ::kite_sql::expression::function::FunctionSummary } impl $struct_name { #[allow(unused_mut)] - pub(crate) fn new() -> Arc { + pub(crate) fn new() -> ::std::sync::Arc { let function_name = stringify!($function_name).to_lowercase(); let mut arg_types = Vec::new(); @@ -95,7 +95,7 @@ macro_rules! scala_function { arg_types.push($arg_ty); })* - Arc::new(Self { + ::std::sync::Arc::new(Self { summary: ::kite_sql::expression::function::FunctionSummary { name: function_name.into(), arg_types @@ -159,17 +159,18 @@ macro_rules! table_function { $({ columns.push(::kite_sql::catalog::column::ColumnCatalog::new(stringify!($output_name).to_lowercase(), true, ::kite_sql::catalog::column::ColumnDesc::new($output_ty, None, false, None).unwrap())); })* + ::kite_sql::catalog::table::TableCatalog::new(stringify!($function_name).to_lowercase().into(), columns).unwrap() }); - #[derive(Debug, ::serde::Serialize, ::serde::Deserialize)] + #[derive(Debug)] pub(crate) struct $struct_name { - summary: ::kite_sql::expression::function::FunctionSummary + summary: ::kite_sql::expression::function::FunctionSummary, } impl $struct_name { #[allow(unused_mut)] - pub(crate) fn new() -> Arc { + pub(crate) fn new() -> ::std::sync::Arc { let function_name = stringify!($function_name).to_lowercase(); let mut arg_types = Vec::new(); @@ -177,14 +178,16 @@ macro_rules! table_function { arg_types.push($arg_ty); })* - Arc::new(Self { + ::std::sync::Arc::new(Self { summary: ::kite_sql::expression::function::FunctionSummary { name: function_name.into(), arg_types - } + }, }) } - } impl ::kite_sql::expression::function::table::TableFunctionImpl for $struct_name { + } + + impl ::kite_sql::expression::function::table::TableFunctionImpl for $struct_name { #[allow(unused_variables, clippy::redundant_closure_call)] fn eval(&self, args: &[::kite_sql::expression::ScalarExpression]) -> Result>>, ::kite_sql::errors::DatabaseError> { let mut _index = 0; @@ -206,7 +209,7 @@ macro_rules! table_function { &self.summary } - fn table(&self) -> &'static ::kite_sql::catalog::table::TableCatalog { + fn table(&self) -> &::kite_sql::catalog::table::TableCatalog { &$function_name } } diff --git a/src/optimizer/rule/normalization/agg_elimination.rs b/src/optimizer/rule/normalization/agg_elimination.rs index 6c08ac05..52e7719f 100644 --- a/src/optimizer/rule/normalization/agg_elimination.rs +++ b/src/optimizer/rule/normalization/agg_elimination.rs @@ -406,7 +406,7 @@ mod tests { fields: index_fields, ignore_prefix_len, }; - let table_name: TableName = Arc::from("t1"); + let table_name: TableName = ::std::sync::Arc::from("t1"); let meta = Arc::new(IndexMeta { id: 1, column_ids: (0..len).map(|_| Ulid::new()).collect(), @@ -431,7 +431,7 @@ mod tests { } fn build_distinct_scan_plan() -> (LogicalPlan, SortOption) { - let table_name: TableName = Arc::from("t1"); + let table_name: TableName = ::std::sync::Arc::from("t1"); let c1 = ColumnRef::from(ColumnCatalog::new_dummy("c1".to_string())); let c1_id = Ulid::new(); let mut columns = BTreeMap::new(); @@ -570,7 +570,7 @@ mod tests { let mut columns = BTreeMap::new(); columns.insert(0, column); - let table_name: TableName = Arc::from("t"); + let table_name: TableName = ::std::sync::Arc::from("t"); let table_scan = LogicalPlan::new( Operator::TableScan(TableScanOperator { table_name: table_name.clone(), @@ -693,7 +693,7 @@ mod tests { let mut scan_plan = LogicalPlan::new( Operator::TableScan(TableScanOperator { - table_name: Arc::from("t"), + table_name: ::std::sync::Arc::from("t"), primary_keys: vec![], columns, limit: (None, None), diff --git a/src/optimizer/rule/normalization/pushdown_predicates.rs b/src/optimizer/rule/normalization/pushdown_predicates.rs index 5dd477ab..e20e39c9 100644 --- a/src/optimizer/rule/normalization/pushdown_predicates.rs +++ b/src/optimizer/rule/normalization/pushdown_predicates.rs @@ -550,7 +550,7 @@ mod tests { #[test] fn test_cover_mapping_matches_scan_order() -> Result<(), DatabaseError> { - let table_name: TableName = Arc::from("mock_table"); + let table_name: TableName = ::std::sync::Arc::from("mock_table"); let c1_id = Ulid::new(); let c2_id = Ulid::new(); let c3_id = Ulid::new(); diff --git a/src/serdes/column.rs b/src/serdes/column.rs index cd214cc5..d2d66685 100644 --- a/src/serdes/column.rs +++ b/src/serdes/column.rs @@ -18,7 +18,6 @@ use crate::serdes::{ReferenceDecodeContext, ReferenceSerialization, ReferenceTab use crate::storage::Transaction; use crate::types::ColumnId; use std::io::{Read, Write}; -use std::sync::Arc; impl ReferenceSerialization for ColumnRef { fn encode( @@ -85,9 +84,9 @@ impl ReferenceSerialization for ColumnRef { nullable = nullable_for_join; } - Ok(Self(Arc::new(ColumnCatalog::direct_new( + Ok(Self::from(ColumnCatalog::direct_new( summary, nullable, desc, in_join, - )))) + ))) } } } @@ -196,7 +195,7 @@ pub(crate) mod test { }; { - let ref_column = ColumnRef(Arc::new(ColumnCatalog::direct_new( + let ref_column = ColumnRef::from(ColumnCatalog::direct_new( ColumnSummary { name: "c3".to_string(), relation: ColumnRelation::Table { @@ -208,7 +207,7 @@ pub(crate) mod test { false, ColumnDesc::new(LogicalType::Integer, None, false, None)?, false, - ))); + )); ref_column.encode(&mut cursor, false, &mut reference_tables)?; cursor.seek(SeekFrom::Start(0))?; @@ -237,7 +236,7 @@ pub(crate) mod test { cursor.seek(SeekFrom::Start(0))?; } { - let not_ref_column = ColumnRef(Arc::new(ColumnCatalog::direct_new( + let not_ref_column = ColumnRef::from(ColumnCatalog::direct_new( ColumnSummary { name: "c3".to_string(), relation: ColumnRelation::None, @@ -250,7 +249,7 @@ pub(crate) mod test { Some(ScalarExpression::Constant(DataValue::UInt64(42))), )?, false, - ))); + )); not_ref_column.encode(&mut cursor, false, &mut reference_tables)?; cursor.seek(SeekFrom::Start(0))?; diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index fd948373..1a78fa4a 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -47,7 +47,6 @@ const STATISTICS_BUCKET_ORD_LEN: usize = 4; static ROOT_BYTES: LazyLock> = LazyLock::new(|| b"Root".to_vec()); static VIEW_BYTES: LazyLock> = LazyLock::new(|| b"View".to_vec()); static HASH_BYTES: LazyLock> = LazyLock::new(|| b"Hash".to_vec()); -static EMPTY_REFERENCE_TABLES: LazyLock = LazyLock::new(ReferenceTables::new); pub type Bytes = Vec; pub type BumpBytes<'bump> = bumpalo::collections::Vec<'bump, u8>; @@ -681,7 +680,7 @@ impl TableCodec { } pub fn decode_index_meta(bytes: &[u8]) -> Result { - IndexMeta::decode::(&mut Cursor::new(bytes), None, &EMPTY_REFERENCE_TABLES) + IndexMeta::decode::(&mut Cursor::new(bytes), None, &ReferenceTables::new()) } pub fn decode_index_key( @@ -848,7 +847,7 @@ impl TableCodec { pub fn decode_root_table(bytes: &[u8]) -> Result { let mut bytes = Cursor::new(bytes); - TableMeta::decode::(&mut bytes, None, &EMPTY_REFERENCE_TABLES) + TableMeta::decode::(&mut bytes, None, &ReferenceTables::new()) } } diff --git a/src/types/evaluator/binary.rs b/src/types/evaluator/binary.rs index f3fc416e..e16eec9d 100644 --- a/src/types/evaluator/binary.rs +++ b/src/types/evaluator/binary.rs @@ -155,27 +155,27 @@ pub fn binary_create( macro_rules! numeric_binary_evaluator_definition { ($value_type:ident, $compute_type:path) => { paste::paste! { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type PlusBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type MinusBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type MultiplyBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type DivideBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type GtBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type GtEqBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type LtBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type LtEqBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type EqBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type NotEqBinaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type ModBinaryEvaluator>]; impl $crate::types::evaluator::BinaryEvaluator for [<$value_type PlusBinaryEvaluator>] { fn binary_eval( &self, diff --git a/src/types/evaluator/boolean.rs b/src/types/evaluator/boolean.rs index 31a530e9..9f54dc6f 100644 --- a/src/types/evaluator/boolean.rs +++ b/src/types/evaluator/boolean.rs @@ -18,18 +18,17 @@ use crate::types::evaluator::DataValue; use crate::types::evaluator::{BinaryEvaluator, UnaryEvaluator}; use crate::types::CharLengthUnits; use ordered_float::OrderedFloat; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct BooleanNotUnaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct BooleanAndBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct BooleanOrBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct BooleanEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct BooleanNotEqBinaryEvaluator; impl UnaryEvaluator for BooleanNotUnaryEvaluator { fn unary_eval(&self, value: &DataValue) -> DataValue { diff --git a/src/types/evaluator/cast.rs b/src/types/evaluator/cast.rs index 0f337cc7..1b8a8485 100644 --- a/src/types/evaluator/cast.rs +++ b/src/types/evaluator/cast.rs @@ -37,7 +37,6 @@ use crate::types::value::{DataValue, Utf8Type}; use crate::types::CharLengthUnits; use crate::types::LogicalType; use paste::paste; -use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::sync::Arc; @@ -140,7 +139,7 @@ macro_rules! decimal_to_int_cast { #[macro_export] macro_rules! define_cast_evaluator { ($name:ident, $pattern:pat => $body:block) => { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct $name; impl $crate::types::evaluator::CastEvaluator for $name { fn eval_cast( @@ -156,7 +155,7 @@ macro_rules! define_cast_evaluator { } }; ($name:ident, $pattern:pat => |$this:ident| $body:expr) => { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct $name; impl $crate::types::evaluator::CastEvaluator for $name { fn eval_cast( @@ -175,7 +174,7 @@ macro_rules! define_cast_evaluator { } }; ($name:ident, $pattern:pat => |$this:ident| $body:block) => { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct $name; impl $crate::types::evaluator::CastEvaluator for $name { fn eval_cast( @@ -194,7 +193,7 @@ macro_rules! define_cast_evaluator { } }; ($name:ident { $($field:ident : $field_ty:ty),+ $(,)? }, $pattern:pat => |$this:ident| $body:expr) => { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct $name { $(pub $field: $field_ty),+ } @@ -215,7 +214,7 @@ macro_rules! define_cast_evaluator { } }; ($name:ident { $($field:ident : $field_ty:ty),+ $(,)? }, $pattern:pat => $body:block) => { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct $name { $(pub $field: $field_ty),+ } @@ -372,7 +371,7 @@ macro_rules! define_float_cast_evaluators { }; } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct IdentityCastEvaluator; impl CastEvaluator for IdentityCastEvaluator { fn eval_cast(&self, value: &DataValue) -> Result { diff --git a/src/types/evaluator/decimal.rs b/src/types/evaluator/decimal.rs index 0ba2f514..ab2d5ec9 100644 --- a/src/types/evaluator/decimal.rs +++ b/src/types/evaluator/decimal.rs @@ -19,30 +19,29 @@ use crate::types::evaluator::DataValue; use crate::types::CharLengthUnits; use ordered_float::OrderedFloat; use rust_decimal::prelude::ToPrimitive; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalPlusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalMinusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalMultiplyBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalDivideBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalGtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalGtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalLtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalLtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalNotEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct DecimalModBinaryEvaluator; impl BinaryEvaluator for DecimalPlusBinaryEvaluator { fn binary_eval(&self, left: &DataValue, right: &DataValue) -> Result { diff --git a/src/types/evaluator/float32.rs b/src/types/evaluator/float32.rs index d8d9be17..3133e724 100644 --- a/src/types/evaluator/float32.rs +++ b/src/types/evaluator/float32.rs @@ -17,12 +17,11 @@ use crate::types::evaluator::DataValue; use crate::types::evaluator::{BinaryEvaluator, UnaryEvaluator}; use crate::types::LogicalType; use rust_decimal::prelude::FromPrimitive; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32PlusUnaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32MinusUnaryEvaluator; impl UnaryEvaluator for Float32PlusUnaryEvaluator { fn unary_eval(&self, value: &DataValue) -> DataValue { @@ -39,27 +38,27 @@ impl UnaryEvaluator for Float32MinusUnaryEvaluator { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32PlusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32MinusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32MultiplyBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32DivideBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32GtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32GtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32LtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32LtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32EqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32NotEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float32ModBinaryEvaluator; impl BinaryEvaluator for Float32PlusBinaryEvaluator { fn binary_eval(&self, left: &DataValue, right: &DataValue) -> Result { diff --git a/src/types/evaluator/float64.rs b/src/types/evaluator/float64.rs index 54eb8d06..c8358c74 100644 --- a/src/types/evaluator/float64.rs +++ b/src/types/evaluator/float64.rs @@ -20,12 +20,11 @@ use crate::types::CharLengthUnits; use crate::types::LogicalType; use rust_decimal::prelude::FromPrimitive; use rust_decimal::Decimal; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64PlusUnaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64MinusUnaryEvaluator; impl UnaryEvaluator for Float64PlusUnaryEvaluator { fn unary_eval(&self, value: &DataValue) -> DataValue { @@ -42,27 +41,27 @@ impl UnaryEvaluator for Float64MinusUnaryEvaluator { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64PlusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64MinusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64MultiplyBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64DivideBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64GtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64GtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64LtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64LtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64EqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64NotEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Float64ModBinaryEvaluator; impl BinaryEvaluator for Float64PlusBinaryEvaluator { fn binary_eval(&self, left: &DataValue, right: &DataValue) -> Result { diff --git a/src/types/evaluator/null.rs b/src/types/evaluator/null.rs index 563e74cf..e53918c9 100644 --- a/src/types/evaluator/null.rs +++ b/src/types/evaluator/null.rs @@ -15,11 +15,10 @@ use crate::errors::DatabaseError; use crate::types::evaluator::DataValue; use crate::types::evaluator::{BinaryEvaluator, CastEvaluator}; -use serde::{Deserialize, Serialize}; /// Tips: /// - Null values operate as null values -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct NullBinaryEvaluator; impl BinaryEvaluator for NullBinaryEvaluator { fn binary_eval(&self, _: &DataValue, _: &DataValue) -> Result { @@ -27,7 +26,7 @@ impl BinaryEvaluator for NullBinaryEvaluator { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct ToSqlNullCastEvaluator; impl CastEvaluator for ToSqlNullCastEvaluator { fn eval_cast(&self, _value: &DataValue) -> Result { @@ -35,7 +34,7 @@ impl CastEvaluator for ToSqlNullCastEvaluator { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct NullCastEvaluator; impl CastEvaluator for NullCastEvaluator { fn eval_cast(&self, _value: &DataValue) -> Result { diff --git a/src/types/evaluator/time32.rs b/src/types/evaluator/time32.rs index 574f694b..18866ffc 100644 --- a/src/types/evaluator/time32.rs +++ b/src/types/evaluator/time32.rs @@ -19,24 +19,23 @@ use crate::types::evaluator::DataValue; use crate::types::value::{ONE_DAY_TO_SEC, ONE_SEC_TO_NANO}; use crate::types::CharLengthUnits; use crate::types::LogicalType; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimePlusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeMinusBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeGtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeGtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeLtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeLtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TimeNotEqBinaryEvaluator; impl BinaryEvaluator for TimePlusBinaryEvaluator { fn binary_eval(&self, left: &DataValue, right: &DataValue) -> Result { diff --git a/src/types/evaluator/time64.rs b/src/types/evaluator/time64.rs index cafdfa0a..3468f40c 100644 --- a/src/types/evaluator/time64.rs +++ b/src/types/evaluator/time64.rs @@ -19,20 +19,19 @@ use crate::types::evaluator::DataValue; use crate::types::CharLengthUnits; use crate::types::LogicalType; use chrono::{Datelike, Timelike}; -use serde::{Deserialize, Serialize}; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64GtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64GtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64LtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64LtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64EqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Time64NotEqBinaryEvaluator; impl BinaryEvaluator for Time64GtBinaryEvaluator { fn binary_eval(&self, left: &DataValue, right: &DataValue) -> Result { diff --git a/src/types/evaluator/tuple.rs b/src/types/evaluator/tuple.rs index bcbcdfe1..4dd843be 100644 --- a/src/types/evaluator/tuple.rs +++ b/src/types/evaluator/tuple.rs @@ -15,21 +15,20 @@ use crate::errors::DatabaseError; use crate::types::evaluator::DataValue; use crate::types::evaluator::{BinaryEvaluator, CastEvaluator, CastEvaluatorBox}; -use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::hint; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleNotEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleGtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleGtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleLtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct TupleLtEqBinaryEvaluator; fn tuple_cmp( @@ -85,7 +84,7 @@ impl BinaryEvaluator for TupleNotEqBinaryEvaluator { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash)] +#[derive(Debug)] pub struct TupleCastEvaluator { pub element_evaluators: Vec, } diff --git a/src/types/evaluator/unary.rs b/src/types/evaluator/unary.rs index b4645558..1bea7e42 100644 --- a/src/types/evaluator/unary.rs +++ b/src/types/evaluator/unary.rs @@ -73,9 +73,9 @@ pub fn unary_create( macro_rules! numeric_unary_evaluator_definition { ($value_type:ident, $compute_type:path) => { paste::paste! { - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type PlusUnaryEvaluator>]; - #[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)] + #[derive(Debug)] pub struct [<$value_type MinusUnaryEvaluator>]; impl $crate::types::evaluator::UnaryEvaluator for [<$value_type PlusUnaryEvaluator>] { fn unary_eval(&self, value: &$crate::types::value::DataValue) -> $crate::types::value::DataValue { value.clone() diff --git a/src/types/evaluator/utf8.rs b/src/types/evaluator/utf8.rs index 7eb4ac85..09450917 100644 --- a/src/types/evaluator/utf8.rs +++ b/src/types/evaluator/utf8.rs @@ -22,29 +22,28 @@ use crate::types::LogicalType; use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike}; use ordered_float::OrderedFloat; use rust_decimal::Decimal; -use serde::{Deserialize, Serialize}; use std::hint; use std::str::FromStr; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8GtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8GtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8LtBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8LtEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8EqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8NotEqBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8StringConcatBinaryEvaluator; -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8LikeBinaryEvaluator { pub(crate) escape_char: Option, } -#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +#[derive(Debug)] pub struct Utf8NotLikeBinaryEvaluator { pub(crate) escape_char: Option, } diff --git a/tests/macros-test/src/main.rs b/tests/macros-test/src/main.rs index 141eb921..5e24997a 100644 --- a/tests/macros-test/src/main.rs +++ b/tests/macros-test/src/main.rs @@ -17,6 +17,7 @@ fn main() {} #[cfg(test)] mod test { use kite_sql::catalog::column::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation}; + use kite_sql::catalog::table::TableName; use kite_sql::db::{DataBaseBuilder, Database, ResultIter}; use kite_sql::errors::DatabaseError; use kite_sql::expression::function::scala::ScalarFunctionImpl; @@ -2069,7 +2070,7 @@ mod test { assert!(numbers.next().is_none()); let function_schema = function.output_schema(); - let table_name: Arc = "test_numbers".to_string().into(); + let table_name: TableName = "test_numbers".to_string().into(); let mut c1 = ColumnCatalog::new( "c1".to_string(), true, From d7a9f9031fdf31bcf0c5d9d520c6786b40845e54 Mon Sep 17 00:00:00 2001 From: kould Date: Sat, 6 Jun 2026 02:50:57 +0800 Subject: [PATCH 2/3] Optimize column pruning outcome reuse --- .../rule/normalization/column_pruning.rs | 385 ++++++++++-------- 1 file changed, 208 insertions(+), 177 deletions(-) diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index e68a85a2..0e65dbc0 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -25,54 +25,26 @@ use crate::planner::{Childrens, LogicalPlan}; use crate::types::value::{DataValue, Utf8Type}; use crate::types::CharLengthUnits; use crate::types::LogicalType; -use bumpalo::Bump; use std::collections::HashSet; #[derive(Clone)] pub struct ColumnPruning; -type BumpUsizeVec<'bump> = bumpalo::collections::Vec<'bump, usize>; - -struct ApplyOutcome<'bump> { - changed: bool, - removed_positions: BumpUsizeVec<'bump>, -} - -impl<'bump> ApplyOutcome<'bump> { - fn new(arena: &'bump Bump) -> Self { - Self { - changed: false, - removed_positions: BumpUsizeVec::new_in(arena), - } - } -} - -struct JoinChildrenOutcome<'bump> { +struct ApplyOutcome { changed: bool, - left_removed_positions: BumpUsizeVec<'bump>, - right_removed_positions: BumpUsizeVec<'bump>, + removed_positions: Vec, } -impl<'bump> JoinChildrenOutcome<'bump> { - fn new(arena: &'bump Bump) -> Self { +impl ApplyOutcome { + fn new() -> Self { Self { changed: false, - left_removed_positions: BumpUsizeVec::new_in(arena), - right_removed_positions: BumpUsizeVec::new_in(arena), + removed_positions: Vec::new(), } } } impl ColumnPruning { - fn copy_removed_positions<'bump>( - removed_positions: &[usize], - arena: &'bump Bump, - ) -> BumpUsizeVec<'bump> { - let mut copied = BumpUsizeVec::with_capacity_in(removed_positions.len(), arena); - copied.extend_from_slice(removed_positions); - copied - } - fn extend_operator_referenced_columns<'a>( operator: &'a Operator, referenced_columns: &mut HashSet<&'a ColumnSummary>, @@ -210,9 +182,10 @@ impl ColumnPruning { fn clear_exprs( column_references: &HashSet<&ColumnSummary>, exprs: &mut Vec, - removed_positions: &mut BumpUsizeVec<'_>, + removed_positions: &mut Vec, + output_start: usize, ) { - removed_positions.clear(); + removed_positions.truncate(output_start); let mut position = 0; exprs.retain(|expr| { let keep = Self::output_column_is_required(expr, column_references); @@ -309,94 +282,77 @@ impl ColumnPruning { remap_exprs_positions(exprs, removed_positions) } - fn apply_only_child<'bump>( + fn apply_only_child( referenced_columns: HashSet<&ColumnSummary>, all_referenced: bool, childrens: &mut Childrens, - arena: &'bump Bump, - ) -> Result, DatabaseError> { + outcome: &mut ApplyOutcome, + output_start: usize, + ) -> Result { let Childrens::Only(child) = childrens else { - return Ok(ApplyOutcome::new(arena)); - }; - Self::_apply(referenced_columns, all_referenced, child.as_mut(), arena) - } - - fn apply_join_children<'bump>( - referenced_columns: HashSet<&ColumnSummary>, - all_referenced: bool, - childrens: &mut Childrens, - arena: &'bump Bump, - ) -> Result, DatabaseError> { - let Childrens::Twins { left, right } = childrens else { - return Ok(JoinChildrenOutcome::new(arena)); + outcome.changed = false; + outcome.removed_positions.truncate(output_start); + return Ok(false); }; - let left_outcome = Self::_apply( - referenced_columns.clone(), - all_referenced, - left.as_mut(), - arena, - )?; - let right_outcome = - Self::_apply(referenced_columns, all_referenced, right.as_mut(), arena)?; - - Ok(JoinChildrenOutcome { - changed: left_outcome.changed || right_outcome.changed, - left_removed_positions: left_outcome.removed_positions, - right_removed_positions: right_outcome.removed_positions, - }) + Self::_apply_appending(referenced_columns, all_referenced, child.as_mut(), outcome)?; + Ok(outcome.changed) } #[allow(clippy::needless_lifetimes)] - fn apply_twins<'bump>( + fn apply_twins( referenced_columns: HashSet<&ColumnSummary>, all_referenced: bool, childrens: &mut Childrens, - arena: &'bump Bump, + outcome: &mut ApplyOutcome, + output_start: usize, ) -> Result { let Childrens::Twins { left, right } = childrens else { + outcome.changed = false; + outcome.removed_positions.truncate(output_start); return Ok(false); }; - let left_changed = Self::_apply( + Self::_apply_appending( referenced_columns.clone(), all_referenced, left.as_mut(), - arena, - )? - .changed; - let right_changed = - Self::_apply(referenced_columns, all_referenced, right.as_mut(), arena)?.changed; + outcome, + )?; + let left_changed = outcome.changed; + outcome.removed_positions.truncate(output_start); + + Self::_apply_appending(referenced_columns, all_referenced, right.as_mut(), outcome)?; + let right_changed = outcome.changed; + outcome.removed_positions.truncate(output_start); - Ok(left_changed || right_changed) + outcome.changed = left_changed || right_changed; + Ok(outcome.changed) } - fn merge_removed_positions<'bump>( - left_removed_positions: &[usize], - right_removed_positions: &[usize], - right_offset: usize, - arena: &'bump Bump, - ) -> BumpUsizeVec<'bump> { - let mut removed_positions = BumpUsizeVec::with_capacity_in( - left_removed_positions.len() + right_removed_positions.len(), - arena, - ); - removed_positions.extend_from_slice(left_removed_positions); - removed_positions.extend( - right_removed_positions - .iter() - .map(|position| position + right_offset), - ); - removed_positions + fn offset_removed_positions(removed_positions: &mut [usize], offset: usize) { + for position in removed_positions { + *position += offset; + } } - fn _apply<'bump>( + fn _apply( required_columns: HashSet<&ColumnSummary>, all_referenced: bool, plan: &mut LogicalPlan, - arena: &'bump Bump, - ) -> Result, DatabaseError> { + ) -> Result { + let mut outcome = ApplyOutcome::new(); + Self::_apply_appending(required_columns, all_referenced, plan, &mut outcome)?; + Ok(outcome) + } + + fn _apply_appending( + required_columns: HashSet<&ColumnSummary>, + all_referenced: bool, + plan: &mut LogicalPlan, + outcome: &mut ApplyOutcome, + ) -> Result<(), DatabaseError> { let mut changed = false; - let mut output_removed_positions = BumpUsizeVec::new_in(arena); + let output_start = outcome.removed_positions.len(); let (operator, childrens) = (&mut plan.operator, plan.childrens.as_mut()); match operator { @@ -405,9 +361,10 @@ impl ColumnPruning { Self::clear_exprs( &required_columns, &mut op.agg_calls, - &mut output_removed_positions, + &mut outcome.removed_positions, + output_start, ); - if !output_removed_positions.is_empty() { + if outcome.removed_positions.len() > output_start { changed = true; } @@ -427,8 +384,12 @@ impl ColumnPruning { }); changed = true; } + } else { + outcome.removed_positions.truncate(output_start); } - let child_outcome = { + + let child_start = outcome.removed_positions.len(); + let child_changed = { let mut child_required = if op.is_distinct { required_columns } else { @@ -439,15 +400,16 @@ impl ColumnPruning { &mut child_required, ); - Self::apply_only_child(child_required, false, childrens, arena)? + Self::apply_only_child(child_required, false, childrens, outcome, child_start)? }; - if child_outcome.changed { + if child_changed { Self::remap_operator_after_child_change( operator, - &child_outcome.removed_positions, + &outcome.removed_positions[child_start..], )?; changed = true; } + outcome.removed_positions.truncate(child_start); } Operator::Project(op) => { let mut has_count_star = HasCountStar::default(); @@ -459,40 +421,56 @@ impl ColumnPruning { Self::clear_exprs( &required_columns, &mut op.exprs, - &mut output_removed_positions, + &mut outcome.removed_positions, + output_start, ); - if !output_removed_positions.is_empty() { + if outcome.removed_positions.len() > output_start { changed = true; } + } else { + outcome.removed_positions.truncate(output_start); } - let child_outcome = { + + let child_start = outcome.removed_positions.len(); + let child_changed = { let mut child_required = HashSet::new(); Self::extend_expr_referenced_columns(op.exprs.iter(), &mut child_required); - Self::apply_only_child(child_required, false, childrens, arena)? + Self::apply_only_child( + child_required, + false, + childrens, + outcome, + child_start, + )? }; - if child_outcome.changed { + if child_changed { Self::remap_operator_after_child_change( operator, - &child_outcome.removed_positions, + &outcome.removed_positions[child_start..], )?; changed = true; } + outcome.removed_positions.truncate(child_start); + } else { + outcome.removed_positions.truncate(output_start); } } Operator::TableScan(op) => { if !all_referenced { - output_removed_positions.clear(); + outcome.removed_positions.truncate(output_start); op.columns.retain(|position, column| { let keep = required_columns.contains(column.summary()); if !keep { - output_removed_positions.push(*position); + outcome.removed_positions.push(*position); } keep }); - if !output_removed_positions.is_empty() { + if outcome.removed_positions.len() > output_start { changed = true; } + } else { + outcome.removed_positions.truncate(output_start); } } Operator::Sort(_) @@ -509,128 +487,182 @@ impl ColumnPruning { operator, Operator::ScalarApply(_) | Operator::MarkApply(_) | Operator::Join(_) ) { - let (child_outcome, old_left_outputs_len) = { - let mut child_required = required_columns.clone(); + let (old_left_outputs_len, left_removed_start, right_removed_start) = { + let mut child_required = required_columns; Self::extend_operator_referenced_columns(operator, &mut child_required); let old_left_outputs_len = match childrens { Childrens::Twins { left, .. } => left.output_schema().len(), _ => 0, }; - let child_outcome = Self::apply_join_children( + let Childrens::Twins { left, right } = childrens else { + outcome.changed = false; + outcome.removed_positions.truncate(output_start); + return Ok(()); + }; + + let left_removed_start = outcome.removed_positions.len(); + Self::_apply_appending( + child_required.clone(), + all_referenced, + left.as_mut(), + outcome, + )?; + let left_changed = outcome.changed; + let right_removed_start = outcome.removed_positions.len(); + Self::_apply_appending( child_required, all_referenced, - childrens, - arena, + right.as_mut(), + outcome, )?; - (child_outcome, old_left_outputs_len) + changed = left_changed || outcome.changed; + ( + old_left_outputs_len, + left_removed_start, + right_removed_start, + ) }; - if child_outcome.changed { - let JoinChildrenOutcome { - changed: _, - left_removed_positions, - right_removed_positions, - } = child_outcome; + if changed { + let right_removed_end = outcome.removed_positions.len(); + let left_removed_len = right_removed_start - left_removed_start; if let Operator::Join(op) = operator { match &mut op.on { JoinCondition::On { on, filter } => { - for (left_expr, right_expr) in on { - remap_expr_positions(left_expr, &left_removed_positions)?; - remap_expr_positions(right_expr, &right_removed_positions)?; + { + let (left_removed_positions, right_removed_positions) = + outcome.removed_positions + [left_removed_start..right_removed_end] + .split_at(left_removed_len); + for (left_expr, right_expr) in on { + remap_expr_positions( + left_expr, + left_removed_positions, + )?; + remap_expr_positions( + right_expr, + right_removed_positions, + )?; + } } + Self::offset_removed_positions( + &mut outcome.removed_positions + [right_removed_start..right_removed_end], + old_left_outputs_len, + ); if let Some(filter) = filter { - if !left_removed_positions.is_empty() - || !right_removed_positions.is_empty() - { - let removed_positions = Self::merge_removed_positions( - &left_removed_positions, - &right_removed_positions, - old_left_outputs_len, - arena, - ); - remap_expr_positions(filter, &removed_positions)?; + let removed_positions = &outcome.removed_positions + [left_removed_start..right_removed_end]; + if !removed_positions.is_empty() { + remap_expr_positions(filter, removed_positions)?; } } } - JoinCondition::None => {} + JoinCondition::None => { + Self::offset_removed_positions( + &mut outcome.removed_positions + [right_removed_start..right_removed_end], + old_left_outputs_len, + ); + } } - output_removed_positions = Self::merge_removed_positions( - &left_removed_positions, - &right_removed_positions, - old_left_outputs_len, - arena, - ); } else if let Operator::MarkApply(op) = operator { - let removed_positions = Self::merge_removed_positions( - &left_removed_positions, - &right_removed_positions, + Self::offset_removed_positions( + &mut outcome.removed_positions + [right_removed_start..right_removed_end], old_left_outputs_len, - arena, ); + let removed_positions = + &outcome.removed_positions[left_removed_start..right_removed_end]; Self::remap_exprs_after_child_change( op.predicates_mut().iter_mut(), - &removed_positions, + removed_positions, )?; - output_removed_positions = - Self::copy_removed_positions(&left_removed_positions, arena); + outcome.removed_positions.truncate(right_removed_start); } else { - output_removed_positions = Self::merge_removed_positions( - &left_removed_positions, - &right_removed_positions, + Self::offset_removed_positions( + &mut outcome.removed_positions + [right_removed_start..right_removed_end], old_left_outputs_len, - arena, ); } - changed = true; + } else { + outcome.removed_positions.truncate(output_start); } } else if matches!(operator, Operator::Union(_) | Operator::SetMembership(_)) { let mut child_required = required_columns; Self::extend_operator_referenced_columns(operator, &mut child_required); - changed |= Self::apply_twins(child_required, all_referenced, childrens, arena)?; + changed |= Self::apply_twins( + child_required, + all_referenced, + childrens, + outcome, + output_start, + )?; + outcome.removed_positions.truncate(output_start); } else { - let child_outcome = { + let child_start = outcome.removed_positions.len(); + let child_changed = { let mut child_required = required_columns; Self::extend_operator_referenced_columns(operator, &mut child_required); - Self::apply_only_child(child_required, all_referenced, childrens, arena)? + Self::apply_only_child( + child_required, + all_referenced, + childrens, + outcome, + child_start, + )? }; - if child_outcome.changed { - let removed_positions = child_outcome.removed_positions; - Self::remap_operator_after_child_change(operator, &removed_positions)?; - output_removed_positions = removed_positions; + if child_changed { + Self::remap_operator_after_child_change( + operator, + &outcome.removed_positions[child_start..], + )?; changed = true; } } } // Last Operator - Operator::Dummy | Operator::Values(_) | Operator::FunctionScan(_) => (), + Operator::Dummy | Operator::Values(_) | Operator::FunctionScan(_) => { + outcome.removed_positions.truncate(output_start); + } Operator::Explain => { - let child_outcome = - Self::apply_only_child(required_columns, true, childrens, arena)?; - if child_outcome.changed { + let child_start = outcome.removed_positions.len(); + let child_changed = Self::apply_only_child( + required_columns, + true, + childrens, + outcome, + child_start, + )?; + if child_changed { Self::remap_operator_after_child_change( operator, - &child_outcome.removed_positions, + &outcome.removed_positions[child_start..], )?; changed = true; } + outcome.removed_positions.truncate(output_start); } // DDL Based on Other Plan Operator::Insert(_) | Operator::Update(_) | Operator::Delete(_) | Operator::Analyze(_) => { - let child_outcome = { + let child_start = outcome.removed_positions.len(); + let child_changed = { let mut child_required = HashSet::new(); Self::extend_operator_referenced_columns(operator, &mut child_required); - Self::apply_only_child(child_required, true, childrens, arena)? + Self::apply_only_child(child_required, true, childrens, outcome, child_start)? }; - if child_outcome.changed { + if child_changed { Self::remap_operator_after_child_change( operator, - &child_outcome.removed_positions, + &outcome.removed_positions[child_start..], )?; changed = true; } + outcome.removed_positions.truncate(output_start); } // DDL Single Plan Operator::CreateTable(_) @@ -647,20 +679,19 @@ impl ColumnPruning { | Operator::AddColumn(_) | Operator::ChangeColumn(_) | Operator::DropColumn(_) - | Operator::Describe(_) => (), + | Operator::Describe(_) => { + outcome.removed_positions.truncate(output_start); + } } - Ok(ApplyOutcome { - changed, - removed_positions: output_removed_positions, - }) + outcome.changed = changed; + Ok(()) } } impl NormalizationRule for ColumnPruning { fn apply(&self, plan: &mut LogicalPlan) -> Result { - let arena = Bump::new(); - let outcome = Self::_apply(HashSet::<&ColumnSummary>::new(), true, plan, &arena)?; + let outcome = Self::_apply(HashSet::<&ColumnSummary>::new(), true, plan)?; Ok(outcome.changed) } } From 71e5d63e8285520df9ed0bc8fb880af297901b9f Mon Sep 17 00:00:00 2001 From: kould Date: Sat, 6 Jun 2026 05:42:40 +0800 Subject: [PATCH 3/3] Reduce runtime allocation churn --- Makefile | 14 ++- src/binder/alter_table.rs | 39 ++++---- src/binder/create_index.rs | 2 +- src/binder/create_table.rs | 8 +- src/binder/create_view.rs | 4 +- src/binder/drop_index.rs | 2 +- src/binder/expr.rs | 39 ++++---- src/binder/insert.rs | 2 +- src/binder/mod.rs | 15 ++- src/binder/select.rs | 12 +-- src/binder/update.rs | 2 +- src/catalog/table.rs | 22 +++++ src/execution/dml/insert.rs | 29 +++--- src/execution/dml/update.rs | 18 ++-- src/optimizer/core/histogram.rs | 42 +++++++- src/optimizer/heuristic/optimizer.rs | 48 ++++++++-- .../rule/implementation/dql/table_scan.rs | 2 +- .../rule/normalization/agg_elimination.rs | 17 +--- .../rule/normalization/column_pruning.rs | 11 ++- .../rule/normalization/pushdown_predicates.rs | 9 +- src/planner/mod.rs | 2 +- src/planner/operator/mod.rs | 4 +- src/planner/operator/table_scan.rs | 20 +--- src/storage/memory.rs | 14 ++- src/storage/mod.rs | 95 ++++++++++--------- src/storage/rocksdb.rs | 26 ++--- 26 files changed, 285 insertions(+), 213 deletions(-) diff --git a/Makefile b/Makefile index a2e5e467..9616bfb2 100644 --- a/Makefile +++ b/Makefile @@ -6,9 +6,11 @@ PYO3_PYTHON ?= /usr/bin/python3.12 TPCC_MEASURE_TIME ?= 15 TPCC_NUM_WARE ?= 1 TPCC_PPROF_OUTPUT ?= /tmp/tpcc_lmdb.svg +TPCC_HEAPTRACK_MEASURE_TIME ?= 300 +TPCC_HEAPTRACK_OUTPUT ?= /tmp/tpcc_lmdb_heaptrack TPCC_SQLITE_PROFILE ?= balanced -.PHONY: test test-python test-wasm test-slt test-all wasm-build check tpcc tpcc-kitesql-rocksdb tpcc-kitesql-lmdb tpcc-lmdb-flamegraph tpcc-sqlite tpcc-sqlite-practical tpcc-sqlite-balanced tpcc-dual cargo-check build wasm-examples native-examples fmt clippy +.PHONY: test test-python test-wasm test-slt test-all wasm-build check tpcc tpcc-kitesql-rocksdb tpcc-kitesql-lmdb tpcc-lmdb-flamegraph tpcc-lmdb-heaptrack tpcc-sqlite tpcc-sqlite-practical tpcc-sqlite-balanced tpcc-dual cargo-check build wasm-examples native-examples fmt clippy ## Run default Rust tests in the current environment (non-WASM). test: @@ -66,6 +68,16 @@ tpcc-kitesql-lmdb: tpcc-lmdb-flamegraph: CARGO_PROFILE_RELEASE_DEBUG=true $(CARGO) run -p tpcc --release --features pprof -- --backend kitesql-lmdb --measure-time $(TPCC_MEASURE_TIME) --num-ware $(TPCC_NUM_WARE) --pprof-output $(TPCC_PPROF_OUTPUT) +## Execute TPCC on LMDB under heaptrack and emit a heap profile. +tpcc-lmdb-heaptrack: + @command -v heaptrack >/dev/null || { echo "heaptrack is not installed"; exit 1; } + $(CARGO) build -p tpcc --release + @mkdir -p $(dir $(TPCC_HEAPTRACK_OUTPUT)) + heaptrack -o $(TPCC_HEAPTRACK_OUTPUT) ./target/release/tpcc --backend kitesql-lmdb --measure-time $(TPCC_HEAPTRACK_MEASURE_TIME) --num-ware $(TPCC_NUM_WARE) + @echo "heaptrack output:" + @ls -1 $(TPCC_HEAPTRACK_OUTPUT)* + @echo "open gui: heaptrack_gui $$(ls -1 $(TPCC_HEAPTRACK_OUTPUT)* | tail -n 1)" + ## Execute the TPCC workload on SQLite with the practical profile. tpcc-sqlite: $(CARGO) run -p tpcc --release -- --backend sqlite --sqlite-profile $(TPCC_SQLITE_PROFILE) --path kite_sql_tpcc.sqlite diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index 301b3a14..03e3deef 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -17,7 +17,7 @@ use sqlparser::ast::{AlterColumnOperation, AlterTableOperation, ColumnOption, Ob use std::borrow::Cow; use super::{attach_span_if_absent, is_valid_identifier, Binder}; -use crate::binder::lower_case_name; +use crate::binder::{lower_case_name, lower_ident}; use crate::catalog::TableName; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; @@ -137,11 +137,11 @@ impl> Binder<'_, '_, T, A> old_column_name, new_column_name, } => { - let old_column_name = old_column_name.value.to_lowercase(); - let new_column_name = new_column_name.value.to_lowercase(); + let old_column_name = lower_ident(old_column_name); + let new_column_name = lower_ident(new_column_name).into_owned(); let old_column = table - .get_column_by_name(&old_column_name) - .ok_or_else(|| DatabaseError::column_not_found(old_column_name.clone()))?; + .get_column_by_name(old_column_name.as_ref()) + .ok_or_else(|| DatabaseError::column_not_found(old_column_name.to_string()))?; if !is_valid_identifier(&new_column_name) { return Err(DatabaseError::invalid_column( @@ -152,7 +152,7 @@ impl> Binder<'_, '_, T, A> LogicalPlan::new( Operator::ChangeColumn(ChangeColumnOperator { table_name, - old_column_name, + old_column_name: old_column_name.into_owned(), new_column_name, data_type: old_column.datatype().clone(), default_change: DefaultChange::NoChange, @@ -162,10 +162,10 @@ impl> Binder<'_, '_, T, A> ) } AlterTableOperation::AlterColumn { column_name, op } => { - let old_column_name = column_name.value.to_lowercase(); + let old_column_name = lower_ident(column_name); let old_column = table - .get_column_by_name(&old_column_name) - .ok_or_else(|| DatabaseError::column_not_found(old_column_name.clone()))?; + .get_column_by_name(old_column_name.as_ref()) + .ok_or_else(|| DatabaseError::column_not_found(old_column_name.to_string()))?; let old_data_type = old_column.datatype().clone(); let (data_type, default_change, not_null_change) = match op { @@ -213,8 +213,8 @@ impl> Binder<'_, '_, T, A> LogicalPlan::new( Operator::ChangeColumn(ChangeColumnOperator { table_name, - new_column_name: old_column_name.clone(), - old_column_name, + new_column_name: old_column_name.to_string(), + old_column_name: old_column_name.into_owned(), data_type, default_change, not_null_change, @@ -233,10 +233,11 @@ impl> Binder<'_, '_, T, A> "MODIFY COLUMN does not currently support column positions".to_string(), )); } - let old_column_name = col_name.value.to_lowercase(); + let old_column_name = lower_ident(col_name); let _ = table - .get_column_by_name(&old_column_name) - .ok_or_else(|| DatabaseError::column_not_found(old_column_name.clone()))?; + .get_column_by_name(old_column_name.as_ref()) + .ok_or_else(|| DatabaseError::column_not_found(old_column_name.to_string()))?; + let old_column_name = old_column_name.into_owned(); let data_type = LogicalType::try_from(data_type.clone())?; let (default_change, not_null_change) = self.bind_change_column_options(options, &data_type)?; @@ -265,11 +266,11 @@ impl> Binder<'_, '_, T, A> "CHANGE COLUMN does not currently support column positions".to_string(), )); } - let old_column_name = old_name.value.to_lowercase(); - let new_column_name = new_name.value.to_lowercase(); + let old_column_name = lower_ident(old_name); + let new_column_name = lower_ident(new_name).into_owned(); let _ = table - .get_column_by_name(&old_column_name) - .ok_or_else(|| DatabaseError::column_not_found(old_column_name.clone()))?; + .get_column_by_name(old_column_name.as_ref()) + .ok_or_else(|| DatabaseError::column_not_found(old_column_name.to_string()))?; if !is_valid_identifier(&new_column_name) { return Err(DatabaseError::invalid_column( @@ -283,7 +284,7 @@ impl> Binder<'_, '_, T, A> LogicalPlan::new( Operator::ChangeColumn(ChangeColumnOperator { table_name, - old_column_name, + old_column_name: old_column_name.into_owned(), new_column_name, data_type, default_change, diff --git a/src/binder/create_index.rs b/src/binder/create_index.rs index bc8d30af..7aa3f259 100644 --- a/src/binder/create_index.rs +++ b/src/binder/create_index.rs @@ -77,7 +77,7 @@ impl> Binder<'_, '_, T, A> Operator::CreateIndex(CreateIndexOperator { table_name, columns, - index_name, + index_name: index_name.into_owned(), if_not_exists, ty, }), diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 8a4a58ce..9a043955 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::{attach_span_if_absent, is_valid_identifier, Binder}; -use crate::binder::lower_case_name; +use crate::binder::{lower_case_name, lower_ident}; use crate::catalog::{ColumnCatalog, ColumnDesc, TableName}; use crate::errors::DatabaseError; use crate::expression::ScalarExpression; @@ -116,11 +116,11 @@ impl> Binder<'_, '_, T, A> "only identifier columns are supported in `PRIMARY KEY/UNIQUE`".to_string(), )); }; - let column_name = ident.value.to_lowercase(); + let column_name = lower_ident(ident); if let Some(column) = table_columns .iter_mut() - .find(|column| column.name() == column_name) + .find(|column| column.name() == column_name.as_ref()) { fn_constraint(i, column.desc_mut()) } @@ -133,7 +133,7 @@ impl> Binder<'_, '_, T, A> column_def: &ColumnDef, column_index: Option, ) -> Result { - let column_name = column_def.name.value.to_lowercase(); + let column_name = lower_ident(&column_def.name).into_owned(); let mut column_desc = ColumnDesc::new( LogicalType::try_from(column_def.data_type.clone())?, None, diff --git a/src/binder/create_view.rs b/src/binder/create_view.rs index 82b35ae9..09c2bbe0 100644 --- a/src/binder/create_view.rs +++ b/src/binder/create_view.rs @@ -78,7 +78,9 @@ impl> Binder<'_, '_, T, A> projection_exprs( &view_name, mapping_schema, - columns.iter().map(|column| lower_ident(&column.name)), + columns + .iter() + .map(|column| lower_ident(&column.name).into_owned()), ) }; plan = self.bind_project(plan, exprs)?; diff --git a/src/binder/drop_index.rs b/src/binder/drop_index.rs index 5c6bd1e9..51e2c219 100644 --- a/src/binder/drop_index.rs +++ b/src/binder/drop_index.rs @@ -38,7 +38,7 @@ impl> Binder<'_, '_, T, A> Ok(LogicalPlan::new( Operator::DropIndex(DropIndexOperator { table_name, - index_name, + index_name: index_name.into_owned(), if_exists: *if_exists, }), Childrens::None, diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 85ef96d1..56bc48df 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -41,20 +41,9 @@ use crate::types::tuple::SchemaRef; use crate::types::value::{DataValue, Utf8Type}; use crate::types::{CharLengthUnits, ColumnId, LogicalType}; -macro_rules! try_alias { - ($context:expr, $full_name:expr) => { - if let Some(expr) = $context.expr_aliases.get(&$full_name) { - return Ok(ScalarExpression::Alias { - expr: Box::new(expr.clone()), - alias: AliasType::Name($full_name.1), - }); - } - }; -} - macro_rules! try_default { ($table_name:expr, $column_name:expr) => { - if let (None, "default") = ($table_name, $column_name.as_str()) { + if let (None, "default") = ($table_name, $column_name.as_ref()) { return Ok(ScalarExpression::Empty); } }; @@ -559,7 +548,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T pub fn bind_column_ref_from_identifiers( &mut self, idents: &[Ident], - bind_table_name: Option, + bind_table_name: Option<&str>, ) -> Result { let full_name = match idents { [column] => (None, lower_ident(column)), @@ -578,12 +567,22 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T } }; if full_name.0.is_none() { - try_alias!(self.context, full_name); + if let Some((_, expr)) = self + .context + .expr_aliases + .iter() + .find(|((table, column), _)| table.is_none() && column == full_name.1.as_ref()) + { + return Ok(ScalarExpression::Alias { + expr: Box::new(expr.clone()), + alias: AliasType::Name(full_name.1.into_owned()), + }); + } } if self.context.allow_default { try_default!(&full_name.0, full_name.1); } - if let Some(table) = full_name.0.or(bind_table_name) { + if let Some(table) = full_name.0.as_deref().or(bind_table_name) { let (schema_ref, position_offset) = match Self::resolve_source_columns_in_scope( &self.context, &mut self.table_schema_buf, @@ -614,8 +613,8 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T } } }; - let (position, column) = Self::find_column_in_schema(&schema_ref, full_name.1.as_str()) - .ok_or_else(|| Self::column_not_found_with_span(idents, full_name.1.as_str()))?; + let (position, column) = Self::find_column_in_schema(&schema_ref, full_name.1.as_ref()) + .ok_or_else(|| Self::column_not_found_with_span(idents, full_name.1.as_ref()))?; Ok(ScalarExpression::column_expr( column.clone(), @@ -627,14 +626,14 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T |context: &BinderContext<'a, T>| -> Result, DatabaseError> { Ok(context .using - .get(full_name.1.as_str()) + .get(full_name.1.as_ref()) .map(|using_column| using_column.visible_expr()) .transpose()? .or_else(|| { Self::find_column_in_scope( context, &mut self.table_schema_buf, - full_name.1.as_str(), + full_name.1.as_ref(), ) })) }; @@ -649,7 +648,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T Some(column) => Ok(column), None => Err(Self::column_not_found_with_span( idents, - full_name.1.as_str(), + full_name.1.as_ref(), )), } } diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 6e258335..bfc507e7 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -69,7 +69,7 @@ impl> Binder<'_, '_, T, A> for ident in idents { match self.bind_column_ref_from_identifiers( slice::from_ref(ident), - Some(table_name.to_string()), + Some(table_name.as_ref()), )? { ScalarExpression::ColumnRef { column, .. } => columns.push(column), _ => return Err(DatabaseError::UnsupportedStmt(ident.to_string())), diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 78ffdd09..292f9a08 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -39,6 +39,7 @@ use sqlparser::ast::{ Statement, TableObject, }; use sqlparser::tokenizer::Span; +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -783,18 +784,24 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, ' } } -fn lower_ident(ident: &Ident) -> String { - ident.value.to_lowercase() +fn lower_ident(ident: &Ident) -> Cow<'_, str> { + let value = &ident.value; + + if value.chars().any(char::is_uppercase) { + Cow::Owned(value.to_lowercase()) + } else { + Cow::Borrowed(value) + } } -fn lower_name_part(part: &ObjectNamePart) -> Result { +fn lower_name_part(part: &ObjectNamePart) -> Result, DatabaseError> { part.as_ident() .map(lower_ident) .ok_or_else(|| attach_span_if_absent(DatabaseError::invalid_table(part.to_string()), part)) } /// Convert an object name into lower case -fn lower_case_name(name: &ObjectName) -> Result { +fn lower_case_name(name: &ObjectName) -> Result, DatabaseError> { if name.0.len() == 1 { return lower_name_part(&name.0[0]); } diff --git a/src/binder/select.rs b/src/binder/select.rs index a6618dc1..40d7b21b 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -856,7 +856,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' }) = alias { let source_name = self.context.temp_table(); - let table_alias: TableName = name.value.to_lowercase().into(); + let table_alias: TableName = lower_ident(name).into(); plan = self.bind_alias( plan, @@ -911,7 +911,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' .. }) = alias { - table_alias = Some(name.value.to_lowercase().into()); + table_alias = Some(lower_ident(name).into()); plan = self.bind_alias( plan, @@ -955,7 +955,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' } else { alias_column .iter() - .map(|column| lower_ident(&column.name)) + .map(|column| lower_ident(&column.name).into_owned()) .zip(input_schema.iter().cloned()) .collect_vec() }; @@ -1027,7 +1027,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' let mut alias_idents = None; if let Some(TableAlias { name, columns, .. }) = alias { - table_alias = Some(name.value.to_lowercase().into()); + table_alias = Some(lower_ident(name).into()); alias_idents = Some(columns); } @@ -1075,7 +1075,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' SelectItem::UnnamedExpr(expr) => select_items.push(self.bind_expr(expr)?), SelectItem::ExprWithAlias { expr, alias } => { let expr = self.bind_expr(expr)?; - let alias_name = alias.value.to_lowercase(); + let alias_name = lower_ident(alias).into_owned(); self.context .add_alias(None, alias_name.clone(), expr.clone()); @@ -1909,7 +1909,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' )); }; self.context.add_using( - name.clone(), + name.clone().into_owned(), join_type, left_column, left_position, diff --git a/src/binder/update.rs b/src/binder/update.rs index 94991cb1..83d62f53 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -113,7 +113,7 @@ impl> Binder<'_, '_, T, A> for ident in idents { match self.bind_column_ref_from_identifiers( slice::from_ref(ident), - Some(table_name.to_string()), + Some(table_name.as_ref()), )? { ScalarExpression::ColumnRef { column, .. } => { let mut expr = if matches!(expression, ScalarExpression::Empty) { diff --git a/src/catalog/table.rs b/src/catalog/table.rs index d1045e7b..7210907b 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -14,6 +14,7 @@ use crate::catalog::{ColumnCatalog, ColumnRef, ColumnRelation}; use crate::errors::DatabaseError; +use crate::expression::ScalarExpression; use crate::types::index::{IndexMeta, IndexMetaRef, IndexType}; use crate::types::tuple::SchemaRef; use crate::types::{ColumnId, LogicalType}; @@ -41,6 +42,13 @@ pub struct TableCatalog { primary_key_type: LogicalType, } +pub(crate) struct DmlTableSnapshot { + pub(crate) schema_ref: SchemaRef, + pub(crate) primary_key_indices: PrimaryKeyIndices, + pub(crate) columns_len: usize, + pub(crate) index_metas: Vec<(IndexMetaRef, Vec)>, +} + //TODO: can add some like Table description and other information as attributes #[derive(Debug, Clone, PartialEq, ReferenceSerialization)] pub struct TableMeta { @@ -107,6 +115,20 @@ impl TableCatalog { &self.primary_key_indices } + pub(crate) fn dml_snapshot(&self) -> Result { + let index_metas = self + .indexes() + .map(|index_meta| Ok((index_meta.clone(), index_meta.column_exprs(self)?))) + .collect::, DatabaseError>>()?; + + Ok(DmlTableSnapshot { + schema_ref: self.schema_ref.clone(), + primary_key_indices: self.primary_key_indices.clone(), + columns_len: self.columns_len(), + index_metas, + }) + } + /// Add a column to the table catalog. pub(crate) fn add_column( &mut self, diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 20a4d23c..9b89f6b7 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -102,26 +102,21 @@ impl Insert { return Ok(()); }; - if let Some(table_catalog) = arena - .transaction_mut() + if let Some(table_snapshot) = arena + .transaction() .table(arena.table_cache(), self.table_name.clone())? - .cloned() + .map(|table| table.dml_snapshot()) + .transpose()? { - if table_catalog.primary_keys().is_empty() { + if table_snapshot.primary_key_indices.is_empty() { return Err(DatabaseError::not_null()); } - let mut index_metas = Vec::new(); - for index_meta in table_catalog.indexes() { - let exprs = index_meta.column_exprs(&table_catalog)?; - index_metas.push((index_meta, exprs)); - } - - let serializers = table_catalog - .columns() + let serializers = table_snapshot + .schema_ref + .iter() .map(|column| column.datatype().serializable()) .collect_vec(); - let pk_indices = table_catalog.primary_keys_indices(); let mut inserted_count = 0; while arena.next_tuple(input)? { @@ -131,9 +126,9 @@ impl Insert { for (i, value) in values.into_iter().enumerate() { tuple_map.insert(self.input_schema[i].key(self.is_mapping_by_name), value); } - let mut values = Vec::with_capacity(table_catalog.columns_len()); + let mut values = Vec::with_capacity(table_snapshot.columns_len); - for col in table_catalog.columns() { + for col in table_snapshot.schema_ref.iter() { let mut value = { let mut value = tuple_map.remove(&col.key(self.is_mapping_by_name)); @@ -149,10 +144,10 @@ impl Insert { } values.push(value) } - let pk = Tuple::primary_projection(pk_indices, &values); + let pk = Tuple::primary_projection(&table_snapshot.primary_key_indices, &values); let tuple = Tuple::new(Some(pk), values); - for (index_meta, exprs) in index_metas.iter() { + for (index_meta, exprs) in table_snapshot.index_metas.iter() { let values = Projection::projection(&tuple, exprs)?; let Some(value) = DataValue::values_to_tuple(values) else { continue; diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index 45824c0d..519b91a9 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -88,21 +88,17 @@ impl Update { exprs_map.insert(column.id(), expr); } - if let Some(table_catalog) = arena - .transaction_mut() + if let Some(table_snapshot) = arena + .transaction() .table(arena.table_cache(), self.table_name.clone())? - .cloned() + .map(|table| table.dml_snapshot()) + .transpose()? { let serializers = self .input_schema .iter() .map(|column| column.datatype().serializable()) .collect_vec(); - let mut index_metas = Vec::new(); - for index_meta in table_catalog.indexes() { - let exprs = index_meta.column_exprs(&table_catalog)?; - index_metas.push((index_meta, exprs)); - } let mut updated_count = 0; @@ -113,7 +109,7 @@ impl Update { let Some(old_pk) = tuple.pk.clone() else { continue; }; - for (index_meta, exprs) in index_metas.iter() { + for (index_meta, exprs) in table_snapshot.index_metas.iter() { let values = Projection::projection(&tuple, exprs)?; let Some(value) = DataValue::values_to_tuple(values) else { continue; @@ -131,7 +127,7 @@ impl Update { } tuple.pk = Some(Tuple::primary_projection( - table_catalog.primary_keys_indices(), + &table_snapshot.primary_key_indices, &tuple.values, )); let new_pk = tuple.pk.as_ref().ok_or(DatabaseError::PrimaryKeyNotFound)?; @@ -142,7 +138,7 @@ impl Update { .remove_tuple(&self.table_name, &old_pk)?; is_overwrite = false; } - for (index_meta, exprs) in index_metas.iter() { + for (index_meta, exprs) in table_snapshot.index_metas.iter() { let values = Projection::projection(&tuple, exprs)?; let Some(value) = DataValue::values_to_tuple(values) else { continue; diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index f2a737db..cef0ac4a 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -27,6 +27,7 @@ use kite_sql_serde_macros::ReferenceSerialization; use ordered_float::OrderedFloat; use std::borrow::Cow; use std::collections::Bound; +use std::sync::OnceLock; use std::{cmp, mem}; pub struct HistogramBuilder { @@ -42,6 +43,7 @@ pub struct HistogramBuilder { value_index: usize, } +#[derive(Debug)] struct BoundComparator { lt: BinaryEvaluatorBox, lte: BinaryEvaluatorBox, @@ -63,10 +65,27 @@ pub struct HistogramMeta { } // Equal depth histogram -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug)] pub struct Histogram { meta: HistogramMeta, buckets: Vec, + comparator: OnceLock, +} + +impl Clone for Histogram { + fn clone(&self) -> Self { + Self { + meta: self.meta.clone(), + buckets: self.buckets.clone(), + comparator: OnceLock::new(), + } + } +} + +impl PartialEq for Histogram { + fn eq(&self, other: &Self) -> bool { + self.meta == other.meta && self.buckets == other.buckets + } } #[derive(Debug, Clone, PartialEq, ReferenceSerialization)] @@ -212,6 +231,7 @@ impl HistogramBuilder { correlation: Self::calc_correlation(corr_xy_sum, values_len), }, buckets, + comparator: OnceLock::new(), }, sketch, )) @@ -320,13 +340,29 @@ impl Histogram { ))); } - Ok(Self { meta, buckets }) + Ok(Self { + meta, + buckets, + comparator: OnceLock::new(), + }) } pub fn into_parts(self) -> (HistogramMeta, Vec) { (self.meta, self.buckets) } + fn comparator(&self) -> Result<&BoundComparator, DatabaseError> { + if let Some(comparator) = self.comparator.get() { + return Ok(comparator); + } + + let comparator = BoundComparator::new(self.buckets[0].upper.logical_type())?; + let _ = self.comparator.set(comparator); + self.comparator + .get() + .ok_or(DatabaseError::EvaluatorNotFound) + } + pub fn meta(&self) -> &HistogramMeta { &self.meta } @@ -363,7 +399,7 @@ impl Histogram { if self.buckets.is_empty() || ranges.is_empty() { return Ok(0); } - let comparator = BoundComparator::new(self.buckets[0].upper.logical_type())?; + let comparator = self.comparator()?; let mut count = 0; let mut binary_i = 0; diff --git a/src/optimizer/heuristic/optimizer.rs b/src/optimizer/heuristic/optimizer.rs index d29c445a..9af51686 100644 --- a/src/optimizer/heuristic/optimizer.rs +++ b/src/optimizer/heuristic/optimizer.rs @@ -39,6 +39,7 @@ pub struct HepOptimizer<'a> { before_batches: &'a [HepBatch], after_batches: &'a [HepBatch], implementation_index: &'a ImplementationRuleIndex, + max_local_rules_len: usize, plan: LogicalPlan, } @@ -48,11 +49,13 @@ impl<'a> HepOptimizer<'a> { before_batches: &'a [HepBatch], after_batches: &'a [HepBatch], implementation_index: &'a ImplementationRuleIndex, + max_local_rules_len: usize, ) -> Self { Self { before_batches, after_batches, implementation_index, + max_local_rules_len, plan, } } @@ -61,7 +64,8 @@ impl<'a> HepOptimizer<'a> { mut self, loader: Option<&StatisticMetaLoader<'_, T>>, ) -> Result { - Self::apply_batches(&mut self.plan, self.before_batches)?; + let mut applied_rules = Vec::with_capacity(self.max_local_rules_len); + Self::apply_batches(&mut self.plan, self.before_batches, &mut applied_rules)?; if let Some(loader) = loader { if self.implementation_index.is_empty().not() { @@ -76,30 +80,40 @@ impl<'a> HepOptimizer<'a> { )?; } } - Self::apply_batches(&mut self.plan, self.after_batches)?; + Self::apply_batches(&mut self.plan, self.after_batches, &mut applied_rules)?; Ok(self.plan) } #[inline] - fn apply_batches(plan: &mut LogicalPlan, batches: &[HepBatch]) -> Result<(), DatabaseError> { + fn apply_batches( + plan: &mut LogicalPlan, + batches: &[HepBatch], + applied_rules: &mut Vec, + ) -> Result<(), DatabaseError> { for batch in batches { match batch.strategy { HepBatchStrategy::MaxTimes(max_iteration) => { for _ in 0..max_iteration { - if !Self::apply_batch(plan, batch)? { + if !Self::apply_batch(plan, batch, applied_rules)? { break; } } } - HepBatchStrategy::LoopIfApplied => while Self::apply_batch(plan, batch)? {}, + HepBatchStrategy::LoopIfApplied => { + while Self::apply_batch(plan, batch, applied_rules)? {} + } } } Ok(()) } #[inline] - fn apply_batch(plan: &mut LogicalPlan, batch: &HepBatch) -> Result { + fn apply_batch( + plan: &mut LogicalPlan, + batch: &HepBatch, + applied_rules: &mut Vec, + ) -> Result { let mut applied = false; for step in &batch.steps { match step { @@ -110,7 +124,7 @@ impl<'a> HepOptimizer<'a> { } } HepBatchStep::LocalRewrite(rules) => { - if Self::apply_local_rules(plan, rules)? { + if Self::apply_local_rules(plan, rules, applied_rules)? { applied = true; } } @@ -349,9 +363,11 @@ impl<'a> HepOptimizer<'a> { fn apply_local_rules( plan: &mut LogicalPlan, rules: &HepLocalRewriteBatch, + applied_rules: &mut Vec, ) -> Result { - let mut applied_rules = vec![false; rules.len()]; - Self::apply_local_rules_inner(plan, rules, &mut applied_rules) + applied_rules.clear(); + applied_rules.resize(rules.len(), false); + Self::apply_local_rules_inner(plan, rules, applied_rules) } fn apply_local_rules_inner( @@ -706,6 +722,7 @@ pub struct HepOptimizerPipeline { before_batches: Vec, after_batches: Vec, implementation_index: ImplementationRuleIndex, + max_local_rules_len: usize, } impl HepOptimizerPipeline { @@ -722,10 +739,22 @@ impl HepOptimizerPipeline { after_batches: Vec, implementations: Vec, ) -> Self { + let max_local_rules_len = before_batches + .iter() + .chain(after_batches.iter()) + .flat_map(|batch| batch.steps.iter()) + .filter_map(|step| match step { + HepBatchStep::LocalRewrite(rules) => Some(rules.len()), + HepBatchStep::WholeTree(_) => None, + }) + .max() + .unwrap_or(0); + Self { before_batches, after_batches, implementation_index: ImplementationRuleIndex::new(implementations), + max_local_rules_len, } } @@ -735,6 +764,7 @@ impl HepOptimizerPipeline { &self.before_batches, &self.after_batches, &self.implementation_index, + self.max_local_rules_len, ) } } diff --git a/src/optimizer/rule/implementation/dql/table_scan.rs b/src/optimizer/rule/implementation/dql/table_scan.rs index bfb0f75c..cdaf4baf 100644 --- a/src/optimizer/rule/implementation/dql/table_scan.rs +++ b/src/optimizer/rule/implementation/dql/table_scan.rs @@ -46,7 +46,7 @@ impl ImplementationRule for SeqScanImplementation { let cost = scan_op .index_infos .iter() - .find(|index_info| index_info.meta.column_ids == scan_op.primary_keys) + .find(|index_info| matches!(index_info.meta.ty, IndexType::PrimaryKey { .. })) .map(|index_info| loader.load(&scan_op.table_name, index_info.meta.id)) .transpose()? .flatten() diff --git a/src/optimizer/rule/normalization/agg_elimination.rs b/src/optimizer/rule/normalization/agg_elimination.rs index 52e7719f..a4a93435 100644 --- a/src/optimizer/rule/normalization/agg_elimination.rs +++ b/src/optimizer/rule/normalization/agg_elimination.rs @@ -117,7 +117,7 @@ pub(crate) fn apply_scan_order_hint( field.expr.all_referenced_columns(true, |column| { scan_op .columns - .values() + .iter() .any(|table_column| table_column == column) }) }), @@ -125,7 +125,7 @@ pub(crate) fn apply_scan_order_hint( expr.all_referenced_columns(true, |column| { scan_op .columns - .values() + .iter() .any(|table_column| table_column == column) }) }), @@ -351,7 +351,6 @@ mod tests { use crate::types::index::{IndexInfo, IndexLookup, IndexMeta, IndexType}; use crate::types::value::DataValue; use crate::types::LogicalType; - use std::collections::BTreeMap; use std::ops::Bound; use std::sync::Arc; use ulid::Ulid; @@ -434,8 +433,7 @@ mod tests { let table_name: TableName = ::std::sync::Arc::from("t1"); let c1 = ColumnRef::from(ColumnCatalog::new_dummy("c1".to_string())); let c1_id = Ulid::new(); - let mut columns = BTreeMap::new(); - columns.insert(0, c1.clone()); + let columns = vec![c1.clone()]; let sort_fields = vec![SortField::new( ScalarExpression::column_expr(c1.clone(), 0), @@ -467,7 +465,6 @@ mod tests { let scan = LogicalPlan::new( Operator::TableScan(TableScanOperator { table_name, - primary_keys: vec![c1_id], columns, limit: (None, None), index_infos: vec![index_info], @@ -568,13 +565,11 @@ mod tests { ); let (index_info, _) = build_index_info(vec![sort_field.clone()], 0); - let mut columns = BTreeMap::new(); - columns.insert(0, column); + let columns = vec![column]; let table_name: TableName = ::std::sync::Arc::from("t"); let table_scan = LogicalPlan::new( Operator::TableScan(TableScanOperator { table_name: table_name.clone(), - primary_keys: vec![], columns, limit: (None, None), index_infos: vec![index_info], @@ -688,13 +683,11 @@ mod tests { max: Bound::Unbounded, })); - let mut columns = BTreeMap::new(); - columns.insert(0, column); + let columns = vec![column]; let mut scan_plan = LogicalPlan::new( Operator::TableScan(TableScanOperator { table_name: ::std::sync::Arc::from("t"), - primary_keys: vec![], columns, limit: (None, None), index_infos: vec![index_info], diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index 0e65dbc0..e778197a 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -80,7 +80,7 @@ impl ColumnPruning { referenced_columns.insert(op.output_column().summary()); } Operator::TableScan(op) => { - referenced_columns.extend(op.columns.values().map(|column| column.summary())); + referenced_columns.extend(op.columns.iter().map(|column| column.summary())); } Operator::FunctionScan(op) => { Self::extend_expr_referenced_columns( @@ -459,10 +459,13 @@ impl ColumnPruning { Operator::TableScan(op) => { if !all_referenced { outcome.removed_positions.truncate(output_start); - op.columns.retain(|position, column| { + let mut position = 0; + op.columns.retain(|column| { + let current_position = position; + position += 1; let keep = required_columns.contains(column.summary()); if !keep { - outcome.removed_positions.push(*position); + outcome.removed_positions.push(current_position); } keep }); @@ -736,7 +739,7 @@ mod tests { if op.table_name.to_string() == table_name { scans.push( op.columns - .values() + .iter() .map(|column| column.name().to_string()) .collect(), ); diff --git a/src/optimizer/rule/normalization/pushdown_predicates.rs b/src/optimizer/rule/normalization/pushdown_predicates.rs index e20e39c9..b851c273 100644 --- a/src/optimizer/rule/normalization/pushdown_predicates.rs +++ b/src/optimizer/rule/normalization/pushdown_predicates.rs @@ -289,7 +289,7 @@ impl NormalizationRule for PushPredicateIntoScan { for (idx, column_id) in meta.column_ids.iter().enumerate() { if let Some((scan_idx, column)) = scan_op .columns - .values() + .iter() .enumerate() .find(|(_, column)| column.id().map(|id| id == *column_id).unwrap_or(false)) { @@ -495,7 +495,7 @@ mod tests { use crate::types::index::{IndexInfo, IndexLookup, IndexMeta, IndexType}; use crate::types::value::DataValue; use crate::types::LogicalType; - use std::collections::{BTreeMap, Bound}; + use std::collections::Bound; use std::sync::Arc; use ulid::Ulid; @@ -578,9 +578,7 @@ mod tests { ); c3.set_ref_table(table_name.clone(), c3_id, false); - let mut columns = BTreeMap::new(); - columns.insert(0, c1_ref.clone()); - columns.insert(1, c2_ref.clone()); + let columns = vec![c1_ref.clone(), c2_ref.clone()]; let index_meta_reordered = Arc::new(IndexMeta { id: 0, @@ -608,7 +606,6 @@ mod tests { let scan_plan = LogicalPlan::new( Operator::TableScan(TableScanOperator { table_name: table_name.clone(), - primary_keys: vec![c1_id], columns, limit: (None, None), index_infos: vec![ diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 3a7d5310..1d3660dd 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -205,7 +205,7 @@ impl LogicalPlan { .collect_vec(), ), Operator::TableScan(op) => { - SchemaOutput::Schema(op.columns.values().cloned().collect_vec()) + SchemaOutput::Schema(op.columns.iter().cloned().collect_vec()) } Operator::FunctionScan(op) => { SchemaOutput::SchemaRef(op.table_function.output_schema().clone()) diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 5ea733d3..b16e0f24 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -208,7 +208,7 @@ impl Operator { } Operator::TableScan(op) => { output_exprs.clear(); - output_exprs.extend(op.columns.values().enumerate().map(|(position, column)| { + output_exprs.extend(op.columns.iter().enumerate().map(|(position, column)| { ScalarExpression::column_expr(column.clone(), position) })); true @@ -308,7 +308,7 @@ impl Operator { .iter() .all(|expr| expr.visit_referenced_columns(only_column_ref, f)), Operator::ScalarSubquery(_) => true, - Operator::TableScan(op) => op.columns.values().all(f), + Operator::TableScan(op) => op.columns.iter().all(f), Operator::FunctionScan(op) => op .table_function .args diff --git a/src/planner/operator/table_scan.rs b/src/planner/operator/table_scan.rs index c31709a4..637ca8a3 100644 --- a/src/planner/operator/table_scan.rs +++ b/src/planner/operator/table_scan.rs @@ -20,19 +20,15 @@ use crate::planner::operator::sort::SortField; use crate::planner::{Childrens, LogicalPlan}; use crate::storage::Bounds; use crate::types::index::IndexInfo; -use crate::types::ColumnId; use itertools::Itertools; use kite_sql_serde_macros::ReferenceSerialization; -use std::collections::BTreeMap; use std::fmt; use std::fmt::Formatter; #[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] pub struct TableScanOperator { pub(crate) table_name: TableName, - pub(crate) primary_keys: Vec, - #[rustfmt::skip] - pub(crate) columns: BTreeMap::, + pub(crate) columns: Vec, // Support push down limit. pub(crate) limit: Bounds, @@ -48,17 +44,8 @@ impl TableScanOperator { table_catalog: &TableCatalog, with_pk: bool, ) -> Result { - let primary_keys = table_catalog - .primary_keys() - .iter() - .filter_map(|(_, column)| column.id()) - .collect_vec(); // Fill all Columns in TableCatalog by default - let columns = table_catalog - .columns() - .enumerate() - .map(|(i, column)| (i, column.clone())) - .collect(); + let columns = table_catalog.columns().cloned().collect(); let mut index_infos = Vec::with_capacity(table_catalog.indexes.len()); for index_meta in table_catalog.indexes.iter() { @@ -92,7 +79,6 @@ impl TableScanOperator { Operator::TableScan(TableScanOperator { index_infos, table_name, - primary_keys, columns, limit: (None, None), with_pk, @@ -106,7 +92,7 @@ impl fmt::Display for TableScanOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let projection_columns = self .columns - .values() + .iter() .map(|column| column.name().to_string()) .join(", "); let (offset, limit) = self.limit; diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 5fe9d583..6e37a02c 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -168,7 +168,7 @@ mod wasm_tests { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use itertools::Itertools; - use std::collections::{BTreeMap, Bound}; + use std::collections::Bound; use std::hash::RandomState; use std::sync::Arc; use wasm_bindgen_test::*; @@ -227,8 +227,7 @@ mod wasm_tests { false, )?; - let mut read_columns = BTreeMap::new(); - read_columns.insert(0, columns[0].clone()); + let read_columns = vec![columns[0].clone()]; let mut iter = transaction.read( &table_cache, @@ -268,7 +267,7 @@ mod wasm_tests { kite_sql.state.table_cache(), table_name, (Some(0), None), - table.columns().cloned().enumerate().collect(), + table.columns().cloned().collect(), pk_index, vec![Range::Scope { min: Bound::Excluded(DataValue::Int32(0)), @@ -301,7 +300,7 @@ mod native_tests { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use itertools::Itertools; - use std::collections::{BTreeMap, Bound}; + use std::collections::Bound; use std::hash::RandomState; use std::sync::Arc; @@ -359,8 +358,7 @@ mod native_tests { false, )?; - let mut read_columns = BTreeMap::new(); - read_columns.insert(0, columns[0].clone()); + let read_columns = vec![columns[0].clone()]; let mut iter = transaction.read( &table_cache, @@ -400,7 +398,7 @@ mod native_tests { kite_sql.state.table_cache(), table_name, (Some(0), None), - table.columns().cloned().enumerate().collect(), + table.columns().cloned().collect(), pk_index, vec![Range::Scope { min: Bound::Excluded(DataValue::Int32(0)), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 43ae10f0..1bb70d1b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -42,7 +42,7 @@ use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; use itertools::Itertools; use std::borrow::Cow; -use std::collections::{BTreeMap, Bound}; +use std::collections::Bound; use std::fmt::{self, Display, Formatter}; use std::io::Cursor; use std::mem; @@ -182,20 +182,13 @@ pub trait Transaction: Sized { table_cache: &'a TableCache, table_name: TableName, bounds: Bounds, - mut columns: BTreeMap, + columns: Vec, with_pk: bool, ) -> Result, DatabaseError> { - debug_assert!(columns.keys().all_unique()); - let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; - if with_pk { - for (i, column) in table.primary_keys() { - columns.insert(*i, column.clone()); - } - } - let deserializers = Self::create_deserializers(&columns, table); + let deserializers = Self::create_deserializers(&columns, table, with_pk); let pk_ty = with_pk.then(|| table.primary_keys_type().clone()); let offset = bounds.0.unwrap_or(0); @@ -218,7 +211,7 @@ pub trait Transaction: Sized { table_cache: &'a TableCache, table_name: TableName, (offset_option, limit_option): Bounds, - mut columns: BTreeMap, + columns: Vec, index_meta: IndexMetaRef, ranges: R, with_pk: bool, @@ -228,18 +221,12 @@ pub trait Transaction: Sized { where R: Into, { - debug_assert!(columns.keys().all_unique()); let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; let table_name = table.name.as_ref(); let offset = offset_option.unwrap_or(0); - if with_pk { - for (i, column) in table.primary_keys() { - columns.insert(*i, column.clone()); - } - } let is_primary_index = matches!(index_meta.ty, IndexType::PrimaryKey { .. }); let (inner, deserializers, cover_mapping) = match ( covered_deserializers, @@ -260,7 +247,7 @@ pub trait Transaction: Sized { ) } _ => { - let deserializers = Self::create_deserializers(&columns, table); + let deserializers = Self::create_deserializers(&columns, table, with_pk); (IndexImplEnum::instance(index_meta.ty), deserializers, None) } }; @@ -285,25 +272,42 @@ pub trait Transaction: Sized { } fn create_deserializers( - columns: &BTreeMap, + columns: &[ColumnRef], table: &TableCatalog, + with_pk: bool, ) -> Vec { - let mut deserializers = Vec::with_capacity(columns.len()); - let mut last_projection = None; - for (projection, column) in columns.iter() { - let (start, end) = last_projection - .map(|last_projection| { - let start = last_projection + 1; - let len = projection - start; - (start, start + len) - }) - .unwrap_or((0, *projection)); - for skip_column in table.schema_ref()[start..end].iter() { - deserializers.push(skip_column.datatype().skip_serializable()); + let mut pk_len = if with_pk { + table.primary_keys().len() + } else { + 0 + }; + let mut deserializers = Vec::with_capacity(table.columns_len()); + let mut columns = columns.iter().peekable(); + + for table_column in table.columns() { + if columns.peek().is_none() && pk_len == 0 { + break; + } + + let is_primary_key = with_pk && table_column.desc().primary().is_some(); + if columns + .peek() + .is_some_and(|column| same_projection_column(column, table_column)) + { + deserializers.push(table_column.datatype().serializable()); + columns.next(); + if is_primary_key { + pk_len -= 1; + } + } else if is_primary_key { + deserializers.push(table_column.datatype().serializable()); + pk_len -= 1; + } else { + deserializers.push(table_column.datatype().skip_serializable()); } - deserializers.push(column.datatype().serializable()); - last_projection = Some(*projection); } + debug_assert!(columns.next().is_none()); + debug_assert_eq!(pk_len, 0); deserializers } @@ -1142,6 +1146,13 @@ pub(crate) fn reuse_bound_as_excluded(bound: &mut Bound, key: &[u8]) { *bound = Bound::Excluded(bytes); } +fn same_projection_column(left: &ColumnRef, right: &ColumnRef) -> bool { + match (left.id(), right.id()) { + (Some(left), Some(right)) => left == right, + _ => left.name() == right.name(), + } +} + fn bytes_bound_as_slice(bound: &Bound) -> Bound<&[u8]> { match bound { Bound::Included(bytes) => Bound::Included(bytes.as_slice()), @@ -1961,39 +1972,29 @@ mod test { use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; - use std::collections::{BTreeMap, Bound}; + use std::collections::Bound; use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; - fn full_columns() -> BTreeMap { - let mut columns = BTreeMap::new(); - - columns.insert( - 0, + fn full_columns() -> Vec { + vec![ ColumnRef::from(ColumnCatalog::new( "c1".to_string(), false, ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), )), - ); - columns.insert( - 1, ColumnRef::from(ColumnCatalog::new( "c2".to_string(), false, ColumnDesc::new(LogicalType::Boolean, None, false, None).unwrap(), )), - ); - columns.insert( - 2, ColumnRef::from(ColumnCatalog::new( "c3".to_string(), false, ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), )), - ); - columns + ] } fn build_tuples() -> Vec { diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 9a8f5e82..579b9ac6 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -801,7 +801,7 @@ mod test { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use itertools::Itertools; - use std::collections::{BTreeMap, Bound}; + use std::collections::Bound; use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; @@ -896,8 +896,7 @@ mod test { false, )?; - let mut read_columns = BTreeMap::new(); - read_columns.insert(0, columns[0].clone()); + let read_columns = vec![columns[0].clone()]; let mut iter = transaction.read( &table_cache, @@ -1010,7 +1009,7 @@ mod test { kite_sql.state.table_cache(), "t1".to_string().into(), (Some(0), Some(1)), - table.columns().cloned().enumerate().collect(), + table.columns().cloned().collect(), table.indexes[0].clone(), vec![Range::Scope { min: Bound::Excluded(DataValue::Int32(0)), @@ -1029,8 +1028,8 @@ mod test { } // projection { - let mut columns: BTreeMap<_, _> = table.columns().cloned().enumerate().collect(); - let _ = columns.pop_last(); + let mut columns: Vec<_> = table.columns().cloned().collect(); + let _ = columns.pop(); let mut iter = transaction .read_by_index( @@ -1092,14 +1091,12 @@ mod test { .find(|index| matches!(index.ty, IndexType::Unique)) .unwrap() .clone(); - let (b_pos, b_column) = table + let b_column = table .columns() .cloned() - .enumerate() - .find(|(_, column)| column.name() == "b") + .find(|column| column.name() == "b") .unwrap(); - let mut columns = BTreeMap::new(); - columns.insert(b_pos, b_column.clone()); + let columns = vec![b_column.clone()]; let covered_deserializers = vec![b_column.datatype().serializable()]; // ensure cover mapping can reorder index values to match scan order @@ -1109,9 +1106,7 @@ mod test { .find(|index| index.name == "idx_b_a") .unwrap() .clone(); - let mut reordered_columns = BTreeMap::new(); - reordered_columns.insert(0, a_cover_column.clone()); - reordered_columns.insert(1, b_cover_column.clone()); + let reordered_columns = vec![a_cover_column.clone(), b_cover_column.clone()]; let reordered_deserializers = vec![ a_cover_column.datatype().serializable(), b_cover_column.datatype().serializable(), @@ -1184,8 +1179,7 @@ mod test { .find(|index| index.name == "pk_index") .unwrap() .clone(); - let mut pk_columns = BTreeMap::new(); - pk_columns.insert(0, a_cover_column.clone()); + let pk_columns = vec![a_cover_column.clone()]; let pk_deserializers = vec![a_cover_column.datatype().serializable()]; let mut iter = transaction.read_by_index( kite_sql.state.table_cache(),