From 6e8f4f2b8d37847588e2654d88b71f7e875dcbaa Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Sat, 2 May 2026 12:23:25 +0200 Subject: [PATCH 1/3] hybrid: added support for segment pruning --- src/Storages/HybridSegmentPruner.cpp | 451 ++++++++++++++++++ src/Storages/HybridSegmentPruner.h | 37 ++ src/Storages/StorageDistributed.cpp | 324 ++++++++++--- src/Storages/StorageDistributed.h | 35 ++ .../tests/gtest_hybrid_segment_pruner.cpp | 91 ++++ .../03646_hybrid_segment_pruning.reference | 84 ++++ .../03646_hybrid_segment_pruning.sql | 127 +++++ 7 files changed, 1087 insertions(+), 62 deletions(-) create mode 100644 src/Storages/HybridSegmentPruner.cpp create mode 100644 src/Storages/HybridSegmentPruner.h create mode 100644 src/Storages/tests/gtest_hybrid_segment_pruner.cpp create mode 100644 tests/queries/0_stateless/03646_hybrid_segment_pruning.reference create mode 100644 tests/queries/0_stateless/03646_hybrid_segment_pruning.sql diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp new file mode 100644 index 000000000000..57b835df1774 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.cpp @@ -0,0 +1,451 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace +{ + +/// Bounded DNF defaults: expand at most this many multi-disjunct OR groups, +/// and at most this many total branches. Anything beyond returns Keep. +constexpr size_t MAX_OR_GROUPS = 2; +constexpr size_t MAX_TOTAL_BRANCHES = 4; + +enum class CompareOp : uint8_t +{ + Less, + LessOrEqual, + Greater, + GreaterOrEqual, + Equals, +}; + +/// Per-column domain accumulator. `range` is the typed interval (closed/open ends, possibly +/// unbounded) tightened by each comparison atom. `allowed`, when set, constrains the column +/// to a finite set of typed values (introduced by `=` or `IN` atoms). +struct ColumnDomain +{ + Range range = Range::createWholeUniverse(); + std::optional> allowed; + bool empty = false; +}; + +/// Trim `allowed` against the current range and check overall emptiness. Returns true if the +/// domain is still satisfiable; sets `empty` and returns false if it's been narrowed to nothing. +bool finalizeDomain(ColumnDomain & domain) +{ + if (domain.empty || domain.range.empty()) + return !(domain.empty = true); + if (domain.allowed) + { + for (auto it = domain.allowed->begin(); it != domain.allowed->end();) + { + /// Use intersectsRange(Range(point)) rather than Range::contains(FieldRef) because the + /// Core/Range.cpp implementation has a buggy (effectively always-false) semantics. + if (domain.range.intersectsRange(Range(*it))) + ++it; + else + it = domain.allowed->erase(it); + } + if (domain.allowed->empty()) + return !(domain.empty = true); + } + return true; +} + +/// One typed atom extracted from a supported AST shape. +struct TypedAtom +{ + String column; + /// For comparisons: op + single value. For IN: op == Equals + values populated. + CompareOp op = CompareOp::Equals; + Field value; + std::vector values; /// Used only for IN. + bool is_in = false; +}; + +/// Look up Hybrid column type. Returns nullptr if the column is not part of the Hybrid schema. +DataTypePtr findColumnType(const NamesAndTypesList & cols, const String & name) +{ + for (const auto & c : cols) + if (c.name == name) + return c.type; + return nullptr; +} + +/// Collect top-level conjuncts, flattening nested `and(and(...), ...)`. +void collectConjuncts(const ASTPtr & ast, std::vector & out) +{ + if (!ast) + return; + if (const auto * func = ast->as(); func && func->name == "and" && func->arguments) + { + for (const auto & child : func->arguments->children) + collectConjuncts(child, out); + return; + } + out.push_back(ast); +} + +void collectDisjuncts(const ASTPtr & ast, std::vector & out) +{ + if (!ast) + return; + if (const auto * func = ast->as(); func && func->name == "or" && func->arguments) + { + for (const auto & child : func->arguments->children) + collectDisjuncts(child, out); + return; + } + out.push_back(ast); +} + +/// Try to extract a Field of the given type from an arbitrary AST expression by constant-folding. +/// Returns nullopt if the expression is not foldable or the result cannot be coerced to `target_type`. +std::optional evalAndCoerce( + const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) +{ + auto evaluated = tryEvaluateConstantExpression(ast, context); + if (!evaluated) + return {}; + if (!evaluated->second) + return {}; + return convertFieldToTypeStrict(evaluated->first, *evaluated->second, target_type, FormatSettings{}); +} + +/// If `ast` is a tuple-of-literals or constant-folds to one, return the typed elements. +std::optional> evalTupleAndCoerce( + const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) +{ + auto evaluated = tryEvaluateConstantExpression(ast, context); + if (!evaluated) + return {}; + if (evaluated->first.getType() != Field::Types::Tuple) + return {}; + const auto & tup = evaluated->first.safeGet(); + std::vector out; + out.reserve(tup.size()); + for (const auto & f : tup) + { + /// Each element's source type is the tuple's element type. We pass nullptr as + /// the from-type hint here; convertFieldToTypeStrict will conservatively reject + /// imprecise/lossy coercions, which is what we want. + Field coerced = convertFieldToType(f, target_type, nullptr, FormatSettings{}); + if (coerced.isNull() && !f.isNull()) + return {}; + out.push_back(std::move(coerced)); + } + return out; +} + +/// Extract a typed atom from a comparison/IN AST. `negated` reflects an outer `NOT`. +/// Returns nullopt for unsupported shapes (caller treats branch as satisfiable). +std::optional extractAtom( + ASTPtr ast, + bool negated, + const NamesAndTypesList & hybrid_cols, + const ContextPtr & context) +{ + /// Peel off outer `not` once. + if (const auto * func = ast->as(); func && func->name == "not") + { + if (!func->arguments || func->arguments->children.size() != 1) + return {}; + return extractAtom(func->arguments->children[0], !negated, hybrid_cols, context); + } + + const auto * func = ast->as(); + if (!func || !func->arguments || func->arguments->children.size() != 2) + return {}; + + String fname = func->name; + /// Effective op after applying `negated`. + auto invert = [](CompareOp op) -> std::optional + { + switch (op) + { + case CompareOp::Less: return CompareOp::GreaterOrEqual; + case CompareOp::LessOrEqual: return CompareOp::Greater; + case CompareOp::Greater: return CompareOp::LessOrEqual; + case CompareOp::GreaterOrEqual: return CompareOp::Less; + case CompareOp::Equals: return std::nullopt; /// `!=` is deferred. + } + return std::nullopt; + }; + + if (fname == "in") + { + if (negated) + return {}; /// NOT IN deferred. + + const auto & lhs = func->arguments->children[0]; + const auto & rhs = func->arguments->children[1]; + + const auto * ident = lhs->as(); + if (!ident) + return {}; + /// Use the unqualified column name. The analyzer rewrites bare `ts` to + /// `__table1.ts`; ownership-by-table is enforced separately by the + /// caller (which skips pruning when the query has a JOIN). + auto col_type = findColumnType(hybrid_cols, ident->shortName()); + if (!col_type) + return {}; + + auto values = evalTupleAndCoerce(rhs, *col_type, context); + if (!values) + return {}; + + TypedAtom atom; + atom.column = ident->shortName(); + atom.is_in = true; + atom.values = std::move(*values); + return atom; + } + + CompareOp op; + if (fname == "less") op = CompareOp::Less; + else if (fname == "lessOrEquals") op = CompareOp::LessOrEqual; + else if (fname == "greater") op = CompareOp::Greater; + else if (fname == "greaterOrEquals") op = CompareOp::GreaterOrEqual; + else if (fname == "equals") op = CompareOp::Equals; + else + return {}; /// Unsupported function (notEquals, like, etc.) + + if (negated) + { + auto inv = invert(op); + if (!inv) + return {}; + op = *inv; + } + + /// Identify which side is the column and which is the constant. + ASTPtr col_ast = func->arguments->children[0]; + ASTPtr val_ast = func->arguments->children[1]; + bool flipped = false; + const auto * col_ident = col_ast->as(); + if (!col_ident) + { + col_ident = val_ast->as(); + if (!col_ident) + return {}; + std::swap(col_ast, val_ast); + flipped = true; + } + auto col_type = findColumnType(hybrid_cols, col_ident->shortName()); + if (!col_type) + return {}; + + /// If we swapped sides, the comparison is reversed: `5 < x` ≡ `x > 5`. + if (flipped) + { + switch (op) + { + case CompareOp::Less: op = CompareOp::Greater; break; + case CompareOp::LessOrEqual: op = CompareOp::GreaterOrEqual; break; + case CompareOp::Greater: op = CompareOp::Less; break; + case CompareOp::GreaterOrEqual: op = CompareOp::LessOrEqual; break; + case CompareOp::Equals: break; + } + } + + auto coerced = evalAndCoerce(val_ast, *col_type, context); + if (!coerced) + return {}; + + TypedAtom atom; + atom.column = col_ident->shortName(); + atom.op = op; + atom.value = std::move(*coerced); + return atom; +} + +/// Apply an atom to the per-column domain map. Returns true if the branch can still +/// be satisfiable; false if the atom proves the branch unsatisfiable. +bool applyAtomToDomains( + std::unordered_map & domains, const TypedAtom & atom) +{ + auto & domain = domains[atom.column]; + if (domain.empty) + return false; + + if (atom.is_in) + { + std::set incoming(atom.values.begin(), atom.values.end()); + if (incoming.empty()) + return !(domain.empty = true); + if (!domain.allowed) + domain.allowed = std::move(incoming); + else + { + std::set intersection; + for (const auto & f : *domain.allowed) + if (incoming.contains(f)) + intersection.insert(f); + domain.allowed = std::move(intersection); + } + return finalizeDomain(domain); + } + + Range atom_range = Range::createWholeUniverse(); + switch (atom.op) + { + case CompareOp::Less: atom_range = Range::createRightBounded(atom.value, /*included*/ false); break; + case CompareOp::LessOrEqual: atom_range = Range::createRightBounded(atom.value, /*included*/ true); break; + case CompareOp::Greater: atom_range = Range::createLeftBounded(atom.value, /*included*/ false); break; + case CompareOp::GreaterOrEqual: atom_range = Range::createLeftBounded(atom.value, /*included*/ true); break; + case CompareOp::Equals: atom_range = Range(atom.value); break; + } + + auto narrowed = domain.range.intersectWith(atom_range); + if (!narrowed) + return !(domain.empty = true); + domain.range = std::move(*narrowed); + + if (atom.op == CompareOp::Equals) + { + if (!domain.allowed) + domain.allowed = std::set{atom.value}; + else if (!domain.allowed->contains(atom.value)) + return !(domain.empty = true); + else + domain.allowed = std::set{atom.value}; + } + + return finalizeDomain(domain); +} + +/// True if every atom in `branch` extracts to a supported typed atom AND the +/// per-column intersection is empty. Unsupported atoms make the branch satisfiable +/// (keep), as required by the fail-open contract. +bool branchIsUnsatisfiable( + const std::vector & branch_atoms, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context) +{ + std::unordered_map domains; + for (const auto & ast : branch_atoms) + { + auto atom = extractAtom(ast, /*negated*/ false, hybrid_columns, context); + if (!atom) + return false; /// Unsupported atom → branch is "unknown" → treat as satisfiable. + if (!applyAtomToDomains(domains, *atom)) + return true; + } + return false; +} + +} + +bool canPruneHybridSegment( + const ASTPtr & prewhere, + const ASTPtr & where, + const ASTPtr & segment_predicate, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context) +try +{ + /// Step 1: split top-level AND of (prewhere, where, segment_predicate) into conjuncts. + std::vector conjuncts; + collectConjuncts(prewhere, conjuncts); + collectConjuncts(where, conjuncts); + collectConjuncts(segment_predicate, conjuncts); + if (conjuncts.empty()) + return false; + + /// Step 2: classify each conjunct as a mandatory atom (singleton) or a multi-disjunct + /// OR group (alternative). Atoms inside an OR alternative are themselves AND-flattened + /// so each disjunct can be a small conjunction such as `date = today() AND id IN (...)`. + std::vector mandatory; + std::vector>> or_groups; /// group → branch → atoms + + for (const auto & c : conjuncts) + { + std::vector disjuncts; + collectDisjuncts(c, disjuncts); + if (disjuncts.size() == 1) + { + mandatory.push_back(disjuncts.front()); + continue; + } + std::vector> branches; + branches.reserve(disjuncts.size()); + for (const auto & d : disjuncts) + { + std::vector b; + collectConjuncts(d, b); + branches.push_back(std::move(b)); + } + or_groups.push_back(std::move(branches)); + } + + /// Step 3: enforce bounded DNF. + if (or_groups.size() > MAX_OR_GROUPS) + return false; + size_t total = 1; + for (const auto & g : or_groups) + { + if (g.empty()) + return false; + total *= g.size(); + if (total > MAX_TOTAL_BRANCHES) + return false; + } + + /// Step 4: if there are no OR groups, evaluate the single mandatory branch directly. + if (or_groups.empty()) + return branchIsUnsatisfiable(mandatory, hybrid_columns, context); + + /// Step 5: cartesian product over OR groups; each combination ANDed with `mandatory` + /// forms a DNF branch. Prune iff every branch is provably unsatisfiable. + std::vector idx(or_groups.size(), 0); + while (true) + { + std::vector branch = mandatory; + for (size_t i = 0; i < or_groups.size(); ++i) + { + const auto & disjunct = or_groups[i][idx[i]]; + branch.insert(branch.end(), disjunct.begin(), disjunct.end()); + } + + if (!branchIsUnsatisfiable(branch, hybrid_columns, context)) + return false; + + /// Increment cartesian-product indices. + size_t k = 0; + for (; k < or_groups.size(); ++k) + { + if (++idx[k] < or_groups[k].size()) + break; + idx[k] = 0; + } + if (k == or_groups.size()) + break; + } + + return true; +} +catch (...) +{ + /// Fail-open: any unexpected exception in atom extraction, type coercion, or + /// constant evaluation must not prevent the segment from being scanned normally. + return false; +} + +} diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h new file mode 100644 index 000000000000..235dab6704f1 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/// Conservative satisfiability check for Hybrid segment pruning. +/// +/// Combines (PREWHERE AND WHERE AND segment_predicate), restricted to atoms over +/// columns of the Hybrid table, normalizes via top-level AND/OR walking, and tries +/// to prove the resulting condition unsatisfiable through bounded DNF expansion +/// plus per-column typed range/IN intersection. +/// +/// Returns true only when the conjunction is provably empty (the segment can be +/// pruned). Returns false in all other cases — including unsupported predicates, +/// constant-folding failures, type-coercion ambiguity, and exceptions — so the +/// caller can fall back to scanning the segment normally. +/// +/// Inputs: +/// - prewhere, where, segment_predicate: ASTs (any may be null). +/// The caller is responsible for removing JOIN-side predicates and for +/// substituting hybridParam(...) literals before invoking this function. +/// - hybrid_columns: column names and types from the Hybrid storage snapshot. +/// Atoms referencing columns not in this list are treated as unsupported and +/// keep the segment. +/// - context: used for constant-expression evaluation. +bool canPruneHybridSegment( + const ASTPtr & prewhere, + const ASTPtr & where, + const ASTPtr & segment_predicate, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context); + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index ff1e0025aa9b..60937ccac6cf 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -102,7 +104,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -536,6 +540,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( } } + /// Hybrid segment pruning: mirror the per-shard pruning above, but at the segment level. + /// When a segment's predicate is provably unsatisfiable with the user query, drop it from + /// the plan. The base segment is signalled to `read()` by emptying `optimized_cluster` — + /// the same idiom `optimize_skip_unused_shards` uses for empty shard sets — and `nodes` is + /// recomputed automatically from the empty cluster. The verdict is recomputed in `read()` + /// for per-segment skipping; both calls read the watermark snapshot frozen on + /// `storage_snapshot` (see `HybridSnapshotData`), so the two verdicts agree even under a + /// concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + { + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); + if (pruning_verdict.base_pruned) + { + query_info.optimized_cluster = cluster->getClusterWithMultipleShards({}); + cluster = query_info.optimized_cluster; + nodes = getClusterQueriedNodes(settings, cluster); + } + } + if (settings[Setting::distributed_group_by_no_merge]) { if (settings[Setting::distributed_group_by_no_merge] == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION) @@ -560,7 +584,13 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( // TODO: check logic if (!segments.empty()) - nodes += segments.size(); + { + size_t surviving_segments = segments.size(); + for (bool is_pruned : pruning_verdict.segments_pruned) + if (is_pruned && surviving_segments > 0) + --surviving_segments; + nodes += surviving_segments; + } /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. @@ -792,6 +822,17 @@ std::optional StorageDistributed::getOptimizedQueryP StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr) const { + /// For Hybrid tables, freeze the watermark snapshot at snapshot acquisition time so + /// every later phase (`getQueryProcessingStage()`, `read()`) operates on the same + /// values. A concurrent `ALTER MODIFY SETTING hybrid_watermark_*` cannot change what + /// this query sees, which keeps the pruning verdict — and therefore the chosen + /// processing stage — consistent with the planned segment set. + if (!segments.empty() || base_segment_predicate) + { + auto data = std::make_unique(); + data->watermark_snapshot = hybrid_watermark_params.get(); + return std::make_shared(*this, metadata_snapshot, std::move(data)); + } return std::make_shared(*this, metadata_snapshot); } @@ -1179,6 +1220,141 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, } +ASTPtr StorageDistributed::substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks) +{ + if (!predicate_ast) + return predicate_ast; + predicate_ast = predicate_ast->clone(); + + std::function replace_hybrid_params = [&](ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (!arg_list || arg_list->children.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() requires exactly 2 arguments: (name, type)"); + + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (!name_lit || name_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() first argument (name) must be a string literal"); + if (!type_lit || type_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() second argument (type) must be a string literal"); + + const auto & param_name = name_lit->value.safeGet(); + const auto & type_name = type_lit->value.safeGet(); + + if (!watermarks) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto it = watermarks->find(param_name); + if (it == watermarks->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto col = data_type->createColumn(); + ReadBufferFromString buf(it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + return; + } + + for (auto & child : node->children) + replace_hybrid_params(child); + }; + replace_hybrid_params(predicate_ast); + return predicate_ast; +} + +StorageDistributed::HybridPruningVerdict StorageDistributed::computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const +{ + StorageDistributed::HybridPruningVerdict verdict; + verdict.segments_pruned.assign(segments.size(), false); + + if (segments.empty() && !base_segment_predicate) + return verdict; + + /// Reuse the watermark snapshot frozen at `getStorageSnapshot()` time. Both + /// `getQueryProcessingStage()` and `read()` call this function with the same + /// `storage_snapshot`, so the verdict is identical across the two calls regardless + /// of any concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. Falling back to a + /// fresh `MultiVersion::get()` only happens when `getStorageSnapshot()` did not + /// attach `HybridSnapshotData` (e.g. a code path that bypasses it); we keep the + /// fallback for defensiveness, but it is not exercised by the standard read path. + if (const auto * hybrid_data = storage_snapshot->data + ? dynamic_cast(storage_snapshot->data.get()) + : nullptr) + verdict.watermark_snapshot = hybrid_data->watermark_snapshot; + else + verdict.watermark_snapshot = hybrid_watermark_params.get(); + + /// Extract Hybrid-owned WHERE/PREWHERE from the user query. JOINs are conservatively + /// excluded — when a JOIN is present, leave both null so the pruner can never claim + /// unsatisfiability based on a joined table's predicate. + NamesAndTypesList hybrid_columns = storage_snapshot->metadata->getColumns().getAll(); + ASTPtr prunable_where; + ASTPtr prunable_prewhere; + ASTSelectQuery * select_for_pruning = nullptr; + if (query_info.query) + { + if (auto * select = query_info.query->as()) + select_for_pruning = select; + else if (auto * union_query = query_info.query->as(); + union_query && union_query->list_of_selects && !union_query->list_of_selects->children.empty()) + select_for_pruning = union_query->list_of_selects->children.front()->as(); + } + if (select_for_pruning) + { + bool has_join = false; + if (auto tables = select_for_pruning->tables()) + { + for (const auto & child : tables->children) + { + if (auto * elem = child->as(); elem && elem->table_join) + { + has_join = true; + break; + } + } + } + if (!has_join) + { + prunable_where = select_for_pruning->where(); + prunable_prewhere = select_for_pruning->prewhere(); + } + } + + auto check = [&](const ASTPtr & predicate_ast) -> bool + { + if (!predicate_ast) + return false; + ASTPtr substituted = substituteHybridWatermarks(predicate_ast, verdict.watermark_snapshot); + return canPruneHybridSegment( + prunable_prewhere, prunable_where, substituted, + hybrid_columns, local_context); + }; + + if (base_segment_predicate) + verdict.base_pruned = check(base_segment_predicate); + + for (size_t i = 0; i < segments.size(); ++i) + verdict.segments_pruned[i] = check(segments[i].predicate_ast); + + return verdict; +} + void StorageDistributed::read( QueryPlan & query_plan, const Names &, @@ -1229,61 +1405,30 @@ void StorageDistributed::read( LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); }; - auto watermark_snapshot = hybrid_watermark_params.get(); - - auto substitute_watermarks = [&](ASTPtr predicate_ast) -> ASTPtr - { - if (!predicate_ast) - return predicate_ast; - predicate_ast = predicate_ast->clone(); - - std::function replace_hybrid_params = [&](ASTPtr & node) - { - if (auto * func = node->as(); func && func->name == "hybridParam") - { - auto * arg_list = func->arguments ? func->arguments->as() : nullptr; - if (!arg_list || arg_list->children.size() != 2) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() requires exactly 2 arguments: (name, type)"); - - auto * name_lit = arg_list->children[0]->as(); - auto * type_lit = arg_list->children[1]->as(); - if (!name_lit || name_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() first argument (name) must be a string literal"); - if (!type_lit || type_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() second argument (type) must be a string literal"); - - const auto & param_name = name_lit->value.safeGet(); - const auto & type_name = type_lit->value.safeGet(); - - if (!watermark_snapshot) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); + /// Recompute the Hybrid pruning verdict for per-segment skipping. The watermark snapshot + /// it depends on was frozen at `getStorageSnapshot()` time and is reused via + /// `HybridSnapshotData`, so this verdict matches the one `getQueryProcessingStage()` + /// produced — both the surviving-segment set and the substitution of `hybridParam(...)` + /// literals stay consistent with the chosen processing stage even under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); - auto it = watermark_snapshot->find(param_name); - if (it == watermark_snapshot->end()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); - - auto data_type = DataTypeFactory::instance().get(type_name); - auto col = data_type->createColumn(); - ReadBufferFromString buf(it->second); - data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); - node = make_intrusive((*col)[0]); - return; - } + auto watermark_snapshot = pruning_verdict.watermark_snapshot + ? pruning_verdict.watermark_snapshot : hybrid_watermark_params.get(); - for (auto & child : node->children) - replace_hybrid_params(child); - }; - replace_hybrid_params(predicate_ast); - return predicate_ast; + auto try_prune_additional = [&](size_t segment_idx, const String & target) -> bool + { + if (segment_idx >= pruning_verdict.segments_pruned.size() || !pruning_verdict.segments_pruned[segment_idx]) + return false; + LOG_TRACE(log, "Hybrid segment pruned (target: {})", target); + return true; }; + if (pruning_verdict.base_pruned) + LOG_TRACE(log, "Hybrid segment pruned (target: {})", base_target); + if (settings[Setting::allow_experimental_analyzer]) { StorageID remote_storage_id = StorageID::createEmpty(); @@ -1294,7 +1439,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -1311,8 +1456,14 @@ void StorageDistributed::read( if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr substituted_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); + // Create a modified query info with the segment predicate SelectQueryInfo additional_query_info = query_info; @@ -1320,7 +1471,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), segment.storage_id ? nullptr : segment.table_function_ast, - substitute_watermarks(segment.predicate_ast)); + substituted_predicate); additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); additional_query_info.query_tree = std::move(additional_query_tree); @@ -1330,8 +1481,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) return; } else @@ -1341,16 +1495,20 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, remote_database, remote_table, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); log_rewritten_query(base_target, modified_query_info.query); if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr resolved_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); SelectQueryInfo additional_query_info = query_info; - ASTPtr resolved_predicate = substitute_watermarks(segment.predicate_ast); if (segment.storage_id) { additional_query_info.query = ClusterProxy::rewriteSelectQuery( @@ -1372,8 +1530,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1384,6 +1545,45 @@ void StorageDistributed::read( } } + /// Hybrid case 2: base pruned (cluster empty via `getQueryProcessingStage`'s empty + /// `optimized_cluster`) and at least one additional segment survives. The all-pruned + /// subcase is already handled by the existing empty-cluster early-returns above. We + /// can't call `ClusterProxy::executeQuery` with an empty cluster (its + /// `updateSettingsAndClientInfoForCluster` dereferences `getShardsAddresses().front()` + /// when `is_remote_function=true`), so build the local plans directly. The block below + /// is the same shape as the `additional_query_infos` block in `ClusterProxy::executeQuery` + /// — that block uses the original context (not `new_context`), so we don't depend on the + /// shared distributed-context setup. + if (modified_query_info.getCluster()->getShardsInfo().empty() && !additional_query_infos.empty()) + { + const Block & header_block = *header; + std::vector plans; + plans.reserve(additional_query_infos.size()); + for (const auto & additional_query_info : additional_query_infos) + { + plans.emplace_back(createLocalPlan( + additional_query_info.query, header_block, local_context, + processed_stage, /*shard_num=*/0, /*shard_count=*/1, /*has_missing_objects=*/false, "")); + } + + if (plans.size() == 1) + { + query_plan = std::move(*plans.front()); + } + else + { + SharedHeaders input_headers; + input_headers.reserve(plans.size()); + for (auto & plan : plans) + input_headers.emplace_back(plan->getCurrentHeader()); + + auto union_step = std::make_unique(std::move(input_headers)); + union_step->setStepDescription("Hybrid"); + query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + return; + } + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) { ClusterProxy::SelectStreamFactory select_stream_factory = diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 138a49d9c992..1dd6690650fb 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -220,6 +220,41 @@ class StorageDistributed final : public IStorage, WithContext std::vector getDirectoryQueueStatuses() const; static IColumn::Selector createSelector(ClusterPtr cluster, const ColumnWithTypeAndName & result); + + /// Substitute hybridParam(name, type) calls in `predicate_ast` with literal values from + /// `watermarks`. Returns a fresh cloned AST. Pass-through for nullptr. + static ASTPtr substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks); + + /// Hybrid-specific snapshot-time state attached to `StorageSnapshot::data`. Populated + /// once in `StorageDistributed::getStorageSnapshot()` so the watermark values seen by + /// `getQueryProcessingStage()` and `read()` cannot diverge mid-query under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. Without this, two independent + /// `MultiVersion::get()` calls could observe different versions and produce inconsistent + /// pruning verdicts (e.g. a `Complete`-stage plan unioned without final merge). + struct HybridSnapshotData : public StorageSnapshot::Data + { + MultiVersion::Version watermark_snapshot; + }; + + /// Per-query Hybrid pruning verdict. Recomputed in both `getQueryProcessingStage()` + /// (to drive the stage decision and empty `optimized_cluster` when the base is pruned) + /// and `read()` (to skip planning of pruned additional segments). The verdict is + /// deterministic across both calls because the watermark snapshot it depends on is + /// taken once at `getStorageSnapshot()` time and reused via `HybridSnapshotData`. + struct HybridPruningVerdict + { + bool base_pruned = false; + std::vector segments_pruned; + MultiVersion::Version watermark_snapshot; + }; + + HybridPruningVerdict computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const; + /// Apply the following settings: /// - optimize_skip_unused_shards /// - force_optimize_skip_unused_shards diff --git a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp new file mode 100644 index 000000000000..58d55257a367 --- /dev/null +++ b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp @@ -0,0 +1,91 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include + +using namespace DB; + +namespace +{ + +ASTPtr parseExpression(const std::string & text) +{ + ParserExpression parser; + return parseQuery(parser, text, 4096, 1000, 1000000); +} + +NamesAndTypesList hybridColumnsForTests() +{ + return { + {"ts", std::make_shared()}, + {"date", std::make_shared()}, + {"customerid", std::make_shared()}, + {"x", std::make_shared()}, + {"y", std::make_shared()}, + }; +} + +class HybridSegmentPrunerTest : public ::testing::Test +{ +public: + static void SetUpTestSuite() + { + tryRegisterFunctions(); + } +}; + +} + +TEST_F(HybridSegmentPrunerTest, RangeContradictionPrunes) +{ + /// `ts > '2025-10-01' AND ts <= '2025-09-01'` → unsat → can prune. + auto where = parseExpression("ts > '2025-10-01'"); + auto seg = parseExpression("ts <= '2025-09-01'"); + EXPECT_TRUE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, OverlappingRangeKeeps) +{ + /// `ts > '2025-10-01' AND ts > '2025-08-01'` → satisfiable → keep. + auto where = parseExpression("ts > '2025-10-01'"); + auto seg = parseExpression("ts > '2025-08-01'"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, BoundedDnfWithConstantFolding) +{ + /// `(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))` + /// combined with `date < '2015-01-01'` is unsat in every DNF branch. + auto where = parseExpression( + "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))"); + auto seg = parseExpression("date < '2015-01-01'"); + EXPECT_TRUE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, OrAlternativeNotMandatoryConstraint) +{ + /// `(x < 0 OR y = 1) AND x > 5`: the `x < 0` branch is unsat, + /// but the `y = 1` branch is satisfiable → keep. + auto where = parseExpression("(x < 0 OR y = 1) AND x > 5"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, UnsupportedAtomInOrKeeps) +{ + /// An OR with an unsupported atom (e.g. `length(...)`) cannot be pruned. + auto where = parseExpression("(length(toString(x)) > 10 OR x = 1) AND x = 2"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); +} diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference new file mode 100644 index 000000000000..b9a8b083e394 --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference @@ -0,0 +1,84 @@ +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +4 +SELECT count() FROM pruning_t WHERE value > 0; +4 +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; +Expression ((Project names + Projection)) + MergingAggregated + Union (Hybrid) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) + MergingAggregated + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_cold) +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +0 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Filter ((WHERE + Change column names to column identifiers)) + ReadNothing (Read from NullSource) +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; +2 +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); +4 diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql new file mode 100644 index 000000000000..27cdee91bbab --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql @@ -0,0 +1,127 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: requires remote() table function + +SET allow_experimental_hybrid_table = 1; + +-- The EXPLAIN-based assertions below print plan shapes that some randomized session +-- settings perturb. Pin them for deterministic output. None of these affect pruning logic; +-- they just stabilize how the plan is rendered. +SET prefer_localhost_replica = 1; -- avoid ReadFromRemote vs ReadFromMergeTree flips +SET query_plan_join_swap_table = 'false'; -- pin JOIN side ordering +SET use_query_condition_cache = 0; -- consistent EXPLAIN across runs +SET optimize_trivial_count_query = 1; +SET parallel_replicas_local_plan = 0; + +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS dim SYNC; + +CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2); +INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4); + +CREATE TABLE pruning_t +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +SELECT count() FROM pruning_t WHERE value > 0; +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; + +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; + +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; + +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; + +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; + +-- {echoOff} + +-- Test 6: three-segment table; cold + middle pruned, only hot kept. +CREATE TABLE local_warm (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_warm VALUES ('2025-09-15', 5), ('2025-09-25', 6); + +CREATE TABLE pruning_t3 +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_warm'), + ts > hybridParam('hybrid_watermark_cold', 'DateTime') + AND ts <= hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_cold', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-10-01', hybrid_watermark_cold = '2025-09-01' +AS local_hot; + +CREATE TABLE pruning_or +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +CREATE TABLE dim (id UInt64, label String) ENGINE = MergeTree ORDER BY id; +INSERT INTO dim VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'); + +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; + +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; + +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; + +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); + +-- {echoOff} + +DROP TABLE IF EXISTS dim SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; From 6ec76f90808b8fd0f839cf256fc640c3a5652bf0 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Thu, 7 May 2026 16:23:09 +0200 Subject: [PATCH 2/3] hybrid segment pruning: first attempt of reducing the code size --- src/Storages/HybridSegmentPruner.cpp | 485 +++++------------- src/Storages/HybridSegmentPruner.h | 56 +- src/Storages/StorageDistributed.cpp | 53 +- .../tests/gtest_hybrid_segment_pruner.cpp | 65 ++- 4 files changed, 215 insertions(+), 444 deletions(-) diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp index 57b835df1774..a002af6142f1 100644 --- a/src/Storages/HybridSegmentPruner.cpp +++ b/src/Storages/HybridSegmentPruner.cpp @@ -7,13 +7,13 @@ #include #include #include +#include #include #include -#include #include +#include #include -#include #include namespace DB @@ -22,74 +22,63 @@ namespace DB namespace { -/// Bounded DNF defaults: expand at most this many multi-disjunct OR groups, -/// and at most this many total branches. Anything beyond returns Keep. -constexpr size_t MAX_OR_GROUPS = 2; -constexpr size_t MAX_TOTAL_BRANCHES = 4; - -enum class CompareOp : uint8_t +/// Build `tuple(col1, col2, ...)` to feed into KeyDescription::getKeyFromAST. +/// Mirrors the pattern in Paimon::PartitionPruner. +ASTPtr makeIdentityKeyAST(const Names & column_names) { - Less, - LessOrEqual, - Greater, - GreaterOrEqual, - Equals, -}; + auto key_ast = make_intrusive(); + key_ast->name = "tuple"; + key_ast->arguments = make_intrusive(); + key_ast->children.push_back(key_ast->arguments); + for (const auto & name : column_names) + key_ast->arguments->children.push_back(make_intrusive(name)); + return key_ast; +} -/// Per-column domain accumulator. `range` is the typed interval (closed/open ends, possibly -/// unbounded) tightened by each comparison atom. `allowed`, when set, constrains the column -/// to a finite set of typed values (introduced by `=` or `IN` atoms). -struct ColumnDomain +/// KeyDescription requires every key column to be comparable. Drop the rest; +/// segment predicates over filtered columns will fail-open in canBePruned(). +NamesAndTypesList filterComparable(const NamesAndTypesList & in) { - Range range = Range::createWholeUniverse(); - std::optional> allowed; - bool empty = false; -}; + NamesAndTypesList out; + for (const auto & c : in) + if (c.type && c.type->isComparable()) + out.push_back(c); + return out; +} -/// Trim `allowed` against the current range and check overall emptiness. Returns true if the -/// domain is still satisfiable; sets `empty` and returns false if it's been narrowed to nothing. -bool finalizeDomain(ColumnDomain & domain) +KeyDescription buildIdentityKey(const NamesAndTypesList & comparable_cols, ContextPtr context) { - if (domain.empty || domain.range.empty()) - return !(domain.empty = true); - if (domain.allowed) - { - for (auto it = domain.allowed->begin(); it != domain.allowed->end();) - { - /// Use intersectsRange(Range(point)) rather than Range::contains(FieldRef) because the - /// Core/Range.cpp implementation has a buggy (effectively always-false) semantics. - if (domain.range.intersectsRange(Range(*it))) - ++it; - else - it = domain.allowed->erase(it); - } - if (domain.allowed->empty()) - return !(domain.empty = true); - } - return true; + Names names; + names.reserve(comparable_cols.size()); + for (const auto & c : comparable_cols) + names.push_back(c.name); + return KeyDescription::getKeyFromAST( + makeIdentityKeyAST(names), + ColumnsDescription{comparable_cols}, + context); } -/// One typed atom extracted from a supported AST shape. -struct TypedAtom +/// Strictly recognize `<`, `<=`, `>`, `>=`, `=`. Returns the typed Range for `column op value`. +std::optional rangeForCompare(const String & op, const Field & value) { - String column; - /// For comparisons: op + single value. For IN: op == Equals + values populated. - CompareOp op = CompareOp::Equals; - Field value; - std::vector values; /// Used only for IN. - bool is_in = false; -}; + if (op == "less") return Range::createRightBounded(value, /*included*/ false); + if (op == "lessOrEquals") return Range::createRightBounded(value, /*included*/ true); + if (op == "greater") return Range::createLeftBounded(value, /*included*/ false); + if (op == "greaterOrEquals") return Range::createLeftBounded(value, /*included*/ true); + if (op == "equals") return Range(value); + return std::nullopt; +} -/// Look up Hybrid column type. Returns nullptr if the column is not part of the Hybrid schema. -DataTypePtr findColumnType(const NamesAndTypesList & cols, const String & name) +/// `5 < x` ≡ `x > 5`, etc. +String swapCompareOp(const String & op) { - for (const auto & c : cols) - if (c.name == name) - return c.type; - return nullptr; + if (op == "less") return "greater"; + if (op == "lessOrEquals") return "greaterOrEquals"; + if (op == "greater") return "less"; + if (op == "greaterOrEquals") return "lessOrEquals"; + return op; /// equals stays equals } -/// Collect top-level conjuncts, flattening nested `and(and(...), ...)`. void collectConjuncts(const ASTPtr & ast, std::vector & out) { if (!ast) @@ -103,348 +92,132 @@ void collectConjuncts(const ASTPtr & ast, std::vector & out) out.push_back(ast); } -void collectDisjuncts(const ASTPtr & ast, std::vector & out) -{ - if (!ast) - return; - if (const auto * func = ast->as(); func && func->name == "or" && func->arguments) - { - for (const auto & child : func->arguments->children) - collectDisjuncts(child, out); - return; - } - out.push_back(ast); -} - -/// Try to extract a Field of the given type from an arbitrary AST expression by constant-folding. -/// Returns nullopt if the expression is not foldable or the result cannot be coerced to `target_type`. -std::optional evalAndCoerce( - const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) -{ - auto evaluated = tryEvaluateConstantExpression(ast, context); - if (!evaluated) - return {}; - if (!evaluated->second) - return {}; - return convertFieldToTypeStrict(evaluated->first, *evaluated->second, target_type, FormatSettings{}); -} - -/// If `ast` is a tuple-of-literals or constant-folds to one, return the typed elements. -std::optional> evalTupleAndCoerce( - const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) +enum class ApplyOutcome { - auto evaluated = tryEvaluateConstantExpression(ast, context); - if (!evaluated) - return {}; - if (evaluated->first.getType() != Field::Types::Tuple) - return {}; - const auto & tup = evaluated->first.safeGet(); - std::vector out; - out.reserve(tup.size()); - for (const auto & f : tup) - { - /// Each element's source type is the tuple's element type. We pass nullptr as - /// the from-type hint here; convertFieldToTypeStrict will conservatively reject - /// imprecise/lossy coercions, which is what we want. - Field coerced = convertFieldToType(f, target_type, nullptr, FormatSettings{}); - if (coerced.isNull() && !f.isNull()) - return {}; - out.push_back(std::move(coerced)); - } - return out; -} + Recognized, /// atom narrowed the rect on its column + Unsupported, /// caller should fail open (keep segment) + EmptyIntersection, /// segment self-contradicts → segment is provably empty +}; -/// Extract a typed atom from a comparison/IN AST. `negated` reflects an outer `NOT`. -/// Returns nullopt for unsupported shapes (caller treats branch as satisfiable). -std::optional extractAtom( - ASTPtr ast, - bool negated, - const NamesAndTypesList & hybrid_cols, +/// Tighten `rect[idx]` for the column referenced by a single comparison conjunct. +/// `column_index_by_name` maps Hybrid column name → rect index (already restricted +/// to comparable columns). +ApplyOutcome applyConjunct( + const ASTPtr & ast, + const std::unordered_map & column_index_by_name, + const DataTypes & key_types, + Hyperrectangle & rect, const ContextPtr & context) { - /// Peel off outer `not` once. - if (const auto * func = ast->as(); func && func->name == "not") - { - if (!func->arguments || func->arguments->children.size() != 1) - return {}; - return extractAtom(func->arguments->children[0], !negated, hybrid_cols, context); - } - const auto * func = ast->as(); if (!func || !func->arguments || func->arguments->children.size() != 2) - return {}; + return ApplyOutcome::Unsupported; - String fname = func->name; - /// Effective op after applying `negated`. - auto invert = [](CompareOp op) -> std::optional - { - switch (op) - { - case CompareOp::Less: return CompareOp::GreaterOrEqual; - case CompareOp::LessOrEqual: return CompareOp::Greater; - case CompareOp::Greater: return CompareOp::LessOrEqual; - case CompareOp::GreaterOrEqual: return CompareOp::Less; - case CompareOp::Equals: return std::nullopt; /// `!=` is deferred. - } - return std::nullopt; - }; - - if (fname == "in") - { - if (negated) - return {}; /// NOT IN deferred. + const String & fname = func->name; + if (!rangeForCompare(fname, Field{})) + return ApplyOutcome::Unsupported; - const auto & lhs = func->arguments->children[0]; - const auto & rhs = func->arguments->children[1]; + ASTPtr lhs = func->arguments->children[0]; + ASTPtr rhs = func->arguments->children[1]; + const auto * lhs_id = lhs->as(); + const auto * rhs_id = rhs->as(); - const auto * ident = lhs->as(); - if (!ident) - return {}; - /// Use the unqualified column name. The analyzer rewrites bare `ts` to - /// `__table1.ts`; ownership-by-table is enforced separately by the - /// caller (which skips pruning when the query has a JOIN). - auto col_type = findColumnType(hybrid_cols, ident->shortName()); - if (!col_type) - return {}; - - auto values = evalTupleAndCoerce(rhs, *col_type, context); - if (!values) - return {}; - - TypedAtom atom; - atom.column = ident->shortName(); - atom.is_in = true; - atom.values = std::move(*values); - return atom; - } - - CompareOp op; - if (fname == "less") op = CompareOp::Less; - else if (fname == "lessOrEquals") op = CompareOp::LessOrEqual; - else if (fname == "greater") op = CompareOp::Greater; - else if (fname == "greaterOrEquals") op = CompareOp::GreaterOrEqual; - else if (fname == "equals") op = CompareOp::Equals; - else - return {}; /// Unsupported function (notEquals, like, etc.) - - if (negated) + String column_name; + ASTPtr value_ast; + String op = fname; + if (lhs_id && !rhs_id) { - auto inv = invert(op); - if (!inv) - return {}; - op = *inv; + column_name = lhs_id->shortName(); + value_ast = rhs; } - - /// Identify which side is the column and which is the constant. - ASTPtr col_ast = func->arguments->children[0]; - ASTPtr val_ast = func->arguments->children[1]; - bool flipped = false; - const auto * col_ident = col_ast->as(); - if (!col_ident) + else if (!lhs_id && rhs_id) { - col_ident = val_ast->as(); - if (!col_ident) - return {}; - std::swap(col_ast, val_ast); - flipped = true; + column_name = rhs_id->shortName(); + value_ast = lhs; + op = swapCompareOp(fname); } - auto col_type = findColumnType(hybrid_cols, col_ident->shortName()); - if (!col_type) - return {}; - - /// If we swapped sides, the comparison is reversed: `5 < x` ≡ `x > 5`. - if (flipped) + else { - switch (op) - { - case CompareOp::Less: op = CompareOp::Greater; break; - case CompareOp::LessOrEqual: op = CompareOp::GreaterOrEqual; break; - case CompareOp::Greater: op = CompareOp::Less; break; - case CompareOp::GreaterOrEqual: op = CompareOp::LessOrEqual; break; - case CompareOp::Equals: break; - } + return ApplyOutcome::Unsupported; } - auto coerced = evalAndCoerce(val_ast, *col_type, context); - if (!coerced) - return {}; + auto idx_it = column_index_by_name.find(column_name); + if (idx_it == column_index_by_name.end()) + return ApplyOutcome::Unsupported; + size_t idx = idx_it->second; - TypedAtom atom; - atom.column = col_ident->shortName(); - atom.op = op; - atom.value = std::move(*coerced); - return atom; -} + auto evaluated = tryEvaluateConstantExpression(value_ast, context); + if (!evaluated || !evaluated->second) + return ApplyOutcome::Unsupported; -/// Apply an atom to the per-column domain map. Returns true if the branch can still -/// be satisfiable; false if the atom proves the branch unsatisfiable. -bool applyAtomToDomains( - std::unordered_map & domains, const TypedAtom & atom) -{ - auto & domain = domains[atom.column]; - if (domain.empty) - return false; + auto coerced = convertFieldToTypeStrict(evaluated->first, *evaluated->second, *key_types[idx], FormatSettings{}); + if (!coerced || (coerced->isNull() && !evaluated->first.isNull())) + return ApplyOutcome::Unsupported; - if (atom.is_in) - { - std::set incoming(atom.values.begin(), atom.values.end()); - if (incoming.empty()) - return !(domain.empty = true); - if (!domain.allowed) - domain.allowed = std::move(incoming); - else - { - std::set intersection; - for (const auto & f : *domain.allowed) - if (incoming.contains(f)) - intersection.insert(f); - domain.allowed = std::move(intersection); - } - return finalizeDomain(domain); - } + auto atom_range = rangeForCompare(op, *coerced); + if (!atom_range) + return ApplyOutcome::Unsupported; - Range atom_range = Range::createWholeUniverse(); - switch (atom.op) - { - case CompareOp::Less: atom_range = Range::createRightBounded(atom.value, /*included*/ false); break; - case CompareOp::LessOrEqual: atom_range = Range::createRightBounded(atom.value, /*included*/ true); break; - case CompareOp::Greater: atom_range = Range::createLeftBounded(atom.value, /*included*/ false); break; - case CompareOp::GreaterOrEqual: atom_range = Range::createLeftBounded(atom.value, /*included*/ true); break; - case CompareOp::Equals: atom_range = Range(atom.value); break; - } - - auto narrowed = domain.range.intersectWith(atom_range); + auto narrowed = rect[idx].intersectWith(*atom_range); if (!narrowed) - return !(domain.empty = true); - domain.range = std::move(*narrowed); - - if (atom.op == CompareOp::Equals) - { - if (!domain.allowed) - domain.allowed = std::set{atom.value}; - else if (!domain.allowed->contains(atom.value)) - return !(domain.empty = true); - else - domain.allowed = std::set{atom.value}; - } - - return finalizeDomain(domain); + return ApplyOutcome::EmptyIntersection; + rect[idx] = std::move(*narrowed); + return ApplyOutcome::Recognized; } -/// True if every atom in `branch` extracts to a supported typed atom AND the -/// per-column intersection is empty. Unsupported atoms make the branch satisfiable -/// (keep), as required by the fail-open contract. -bool branchIsUnsatisfiable( - const std::vector & branch_atoms, - const NamesAndTypesList & hybrid_columns, - const ContextPtr & context) -{ - std::unordered_map domains; - for (const auto & ast : branch_atoms) - { - auto atom = extractAtom(ast, /*negated*/ false, hybrid_columns, context); - if (!atom) - return false; /// Unsupported atom → branch is "unknown" → treat as satisfiable. - if (!applyAtomToDomains(domains, *atom)) - return true; - } - return false; } +HybridSegmentPruner::HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context_) + : identity_key(buildIdentityKey(filterComparable(hybrid_columns), context_)) + , user_condition(filter_dag, context_, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false) + , context(std::move(context_)) +{ + useless = identity_key.column_names.empty() || user_condition.alwaysUnknownOrTrue(); } -bool canPruneHybridSegment( - const ASTPtr & prewhere, - const ASTPtr & where, - const ASTPtr & segment_predicate, - const NamesAndTypesList & hybrid_columns, - const ContextPtr & context) +bool HybridSegmentPruner::canBePruned(const ASTPtr & substituted_segment_predicate) const try { - /// Step 1: split top-level AND of (prewhere, where, segment_predicate) into conjuncts. + if (useless || !substituted_segment_predicate) + return false; + std::vector conjuncts; - collectConjuncts(prewhere, conjuncts); - collectConjuncts(where, conjuncts); - collectConjuncts(segment_predicate, conjuncts); + collectConjuncts(substituted_segment_predicate, conjuncts); if (conjuncts.empty()) return false; - /// Step 2: classify each conjunct as a mandatory atom (singleton) or a multi-disjunct - /// OR group (alternative). Atoms inside an OR alternative are themselves AND-flattened - /// so each disjunct can be a small conjunction such as `date = today() AND id IN (...)`. - std::vector mandatory; - std::vector>> or_groups; /// group → branch → atoms - - for (const auto & c : conjuncts) - { - std::vector disjuncts; - collectDisjuncts(c, disjuncts); - if (disjuncts.size() == 1) - { - mandatory.push_back(disjuncts.front()); - continue; - } - std::vector> branches; - branches.reserve(disjuncts.size()); - for (const auto & d : disjuncts) - { - std::vector b; - collectConjuncts(d, b); - branches.push_back(std::move(b)); - } - or_groups.push_back(std::move(branches)); - } - - /// Step 3: enforce bounded DNF. - if (or_groups.size() > MAX_OR_GROUPS) - return false; - size_t total = 1; - for (const auto & g : or_groups) - { - if (g.empty()) - return false; - total *= g.size(); - if (total > MAX_TOTAL_BRANCHES) - return false; - } + std::unordered_map idx_by_name; + idx_by_name.reserve(identity_key.column_names.size()); + for (size_t i = 0; i < identity_key.column_names.size(); ++i) + idx_by_name.emplace(identity_key.column_names[i], i); - /// Step 4: if there are no OR groups, evaluate the single mandatory branch directly. - if (or_groups.empty()) - return branchIsUnsatisfiable(mandatory, hybrid_columns, context); + Hyperrectangle rect(identity_key.column_names.size(), Range::createWholeUniverse()); - /// Step 5: cartesian product over OR groups; each combination ANDed with `mandatory` - /// forms a DNF branch. Prune iff every branch is provably unsatisfiable. - std::vector idx(or_groups.size(), 0); - while (true) + for (const auto & c : conjuncts) { - std::vector branch = mandatory; - for (size_t i = 0; i < or_groups.size(); ++i) - { - const auto & disjunct = or_groups[i][idx[i]]; - branch.insert(branch.end(), disjunct.begin(), disjunct.end()); - } - - if (!branchIsUnsatisfiable(branch, hybrid_columns, context)) - return false; - - /// Increment cartesian-product indices. - size_t k = 0; - for (; k < or_groups.size(); ++k) + switch (applyConjunct(c, idx_by_name, identity_key.data_types, rect, context)) { - if (++idx[k] < or_groups[k].size()) + case ApplyOutcome::Unsupported: + return false; + case ApplyOutcome::EmptyIntersection: + /// Segment predicate self-contradicts → segment is provably empty. + return true; + case ApplyOutcome::Recognized: break; - idx[k] = 0; } - if (k == or_groups.size()) - break; } - return true; + return !user_condition.checkInHyperrectangle(rect, identity_key.data_types).can_be_true; } catch (...) { - /// Fail-open: any unexpected exception in atom extraction, type coercion, or - /// constant evaluation must not prevent the segment from being scanned normally. + /// Fail-open: any unexpected exception in extraction, type coercion, or constant + /// evaluation must not prevent the segment from being scanned normally. return false; } diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h index 235dab6704f1..5f6edbdb0a8a 100644 --- a/src/Storages/HybridSegmentPruner.h +++ b/src/Storages/HybridSegmentPruner.h @@ -3,35 +3,43 @@ #include #include #include +#include +#include namespace DB { -/// Conservative satisfiability check for Hybrid segment pruning. +/// Hybrid-segment pruner, modeled after PartitionPruner / Iceberg::ManifestFilesPruner / +/// Paimon::PartitionPruner. /// -/// Combines (PREWHERE AND WHERE AND segment_predicate), restricted to atoms over -/// columns of the Hybrid table, normalizes via top-level AND/OR walking, and tries -/// to prove the resulting condition unsatisfiable through bounded DNF expansion -/// plus per-column typed range/IN intersection. +/// Build one KeyCondition over the user filter (PREWHERE+WHERE represented as an +/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, parse +/// its (already watermark-substituted) predicate AST into a Hyperrectangle and ask +/// `KeyCondition::checkInHyperrectangle(rect, types).can_be_true`. The segment can be +/// pruned iff the answer is false. /// -/// Returns true only when the conjunction is provably empty (the segment can be -/// pruned). Returns false in all other cases — including unsupported predicates, -/// constant-folding failures, type-coercion ambiguity, and exceptions — so the -/// caller can fall back to scanning the segment normally. -/// -/// Inputs: -/// - prewhere, where, segment_predicate: ASTs (any may be null). -/// The caller is responsible for removing JOIN-side predicates and for -/// substituting hybridParam(...) literals before invoking this function. -/// - hybrid_columns: column names and types from the Hybrid storage snapshot. -/// Atoms referencing columns not in this list are treated as unsupported and -/// keep the segment. -/// - context: used for constant-expression evaluation. -bool canPruneHybridSegment( - const ASTPtr & prewhere, - const ASTPtr & where, - const ASTPtr & segment_predicate, - const NamesAndTypesList & hybrid_columns, - const ContextPtr & context); +/// canBePruned() returns true only when (user_filter AND segment_predicate) is provably +/// empty. It returns false in all other cases — unsupported segment shapes, missing user +/// filter, exceptions — so the caller falls back to scanning the segment normally. +class HybridSegmentPruner +{ +public: + HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context); + + bool canBePruned(const ASTPtr & substituted_segment_predicate) const; + + /// True if the user filter is unrecognizable / always-true on the Hybrid key columns: + /// no segment can ever be pruned, so callers can short-circuit. + bool isUseless() const { return useless; } + +private: + KeyDescription identity_key; + KeyCondition user_condition; + ContextPtr context; + bool useless = false; +}; } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 60937ccac6cf..590b3e86830b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -49,7 +49,6 @@ #include #include #include -#include #include #include #include @@ -1300,50 +1299,26 @@ StorageDistributed::HybridPruningVerdict StorageDistributed::computeHybridPrunin else verdict.watermark_snapshot = hybrid_watermark_params.get(); - /// Extract Hybrid-owned WHERE/PREWHERE from the user query. JOINs are conservatively - /// excluded — when a JOIN is present, leave both null so the pruner can never claim - /// unsatisfiability based on a joined table's predicate. + /// Without a materialized user filter (legacy non-analyzer path, or a query before + /// filter actions are computed) we can't prune. Fail open — same precedent as + /// `skipUnusedShardsWithAnalyzer()`. The DAG is per-table-expression, so JOIN-side + /// predicates are already excluded; no JOIN guard needed. + if (!query_info.filter_actions_dag) + return verdict; + NamesAndTypesList hybrid_columns = storage_snapshot->metadata->getColumns().getAll(); - ASTPtr prunable_where; - ASTPtr prunable_prewhere; - ASTSelectQuery * select_for_pruning = nullptr; - if (query_info.query) - { - if (auto * select = query_info.query->as()) - select_for_pruning = select; - else if (auto * union_query = query_info.query->as(); - union_query && union_query->list_of_selects && !union_query->list_of_selects->children.empty()) - select_for_pruning = union_query->list_of_selects->children.front()->as(); - } - if (select_for_pruning) - { - bool has_join = false; - if (auto tables = select_for_pruning->tables()) - { - for (const auto & child : tables->children) - { - if (auto * elem = child->as(); elem && elem->table_join) - { - has_join = true; - break; - } - } - } - if (!has_join) - { - prunable_where = select_for_pruning->where(); - prunable_prewhere = select_for_pruning->prewhere(); - } - } + ActionsDAGWithInversionPushDown inverted_dag( + query_info.filter_actions_dag->getOutputs().at(0), local_context); + HybridSegmentPruner pruner(inverted_dag, hybrid_columns, local_context); + if (pruner.isUseless()) + return verdict; auto check = [&](const ASTPtr & predicate_ast) -> bool { if (!predicate_ast) return false; - ASTPtr substituted = substituteHybridWatermarks(predicate_ast, verdict.watermark_snapshot); - return canPruneHybridSegment( - prunable_prewhere, prunable_where, substituted, - hybrid_columns, local_context); + return pruner.canBePruned( + substituteHybridWatermarks(predicate_ast, verdict.watermark_snapshot)); }; if (base_segment_predicate) diff --git a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp index 58d55257a367..02af38dd3865 100644 --- a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp +++ b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp @@ -5,8 +5,11 @@ #include #include #include +#include +#include #include #include +#include #include #include @@ -33,6 +36,28 @@ NamesAndTypesList hybridColumnsForTests() }; } +/// Build a HybridSegmentPruner over `where_text` and ask whether `segment_text` can be pruned. +/// The user-side ActionsDAG is built via the same `TreeRewriter + ExpressionAnalyzer` idiom +/// the planner uses to populate `query_info.filter_actions_dag`. +bool canPrune(const std::string & where_text, const std::string & segment_text) +{ + auto context = getContext().context; + auto cols = hybridColumnsForTests(); + + auto where_ast = parseExpression(where_text); + auto syntax_result = TreeRewriter(context).analyze(where_ast, cols); + /// `add_aliases=true` projects the DAG outputs to the predicate only, mirroring the shape of + /// the analyzer-built `query_info.filter_actions_dag` (one output = the filter expression). + /// With `add_aliases=false` the source columns are also kept as outputs, so `getOutputs().at(0)` + /// can point to an input column instead of the predicate. + auto dag = ExpressionAnalyzer(where_ast, syntax_result, context).getActionsDAG(true); + + ActionsDAGWithInversionPushDown inverted(dag.getOutputs().at(0), context); + HybridSegmentPruner pruner(inverted, cols, context); + + return pruner.canBePruned(parseExpression(segment_text)); +} + class HybridSegmentPrunerTest : public ::testing::Test { public: @@ -46,46 +71,36 @@ class HybridSegmentPrunerTest : public ::testing::Test TEST_F(HybridSegmentPrunerTest, RangeContradictionPrunes) { - /// `ts > '2025-10-01' AND ts <= '2025-09-01'` → unsat → can prune. - auto where = parseExpression("ts > '2025-10-01'"); - auto seg = parseExpression("ts <= '2025-09-01'"); - EXPECT_TRUE(canPruneHybridSegment( - /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); + /// `ts > '2025-10-01'` (user) ∧ `ts <= '2025-09-01'` (segment) is unsat → prune. + EXPECT_TRUE(canPrune("ts > '2025-10-01'", "ts <= '2025-09-01'")); } TEST_F(HybridSegmentPrunerTest, OverlappingRangeKeeps) { - /// `ts > '2025-10-01' AND ts > '2025-08-01'` → satisfiable → keep. - auto where = parseExpression("ts > '2025-10-01'"); - auto seg = parseExpression("ts > '2025-08-01'"); - EXPECT_FALSE(canPruneHybridSegment( - /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); + /// `ts > '2025-10-01'` (user) ∧ `ts > '2025-08-01'` (segment) is satisfiable → keep. + EXPECT_FALSE(canPrune("ts > '2025-10-01'", "ts > '2025-08-01'")); } TEST_F(HybridSegmentPrunerTest, BoundedDnfWithConstantFolding) { /// `(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))` - /// combined with `date < '2015-01-01'` is unsat in every DNF branch. - auto where = parseExpression( - "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))"); - auto seg = parseExpression("date < '2015-01-01'"); - EXPECT_TRUE(canPruneHybridSegment( - /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); + /// (user) ∧ `date < '2015-01-01'` (segment): KeyCondition handles the OR by itself; the segment + /// hyperrectangle on `date` is (-∞, '2015-01-01'), which excludes both yesterday() and today(). + EXPECT_TRUE(canPrune( + "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))", + "date < '2015-01-01'")); } TEST_F(HybridSegmentPrunerTest, OrAlternativeNotMandatoryConstraint) { - /// `(x < 0 OR y = 1) AND x > 5`: the `x < 0` branch is unsat, - /// but the `y = 1` branch is satisfiable → keep. - auto where = parseExpression("(x < 0 OR y = 1) AND x > 5"); - EXPECT_FALSE(canPruneHybridSegment( - /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); + /// `(x < 0 OR y = 1) AND x > 5` (user) ∧ `x > 0` (segment): the OR's `y = 1` branch is + /// satisfiable inside the segment hyperrectangle (e.g. x = 10, y = 1) → keep. + EXPECT_FALSE(canPrune("(x < 0 OR y = 1) AND x > 5", "x > 0")); } TEST_F(HybridSegmentPrunerTest, UnsupportedAtomInOrKeeps) { - /// An OR with an unsupported atom (e.g. `length(...)`) cannot be pruned. - auto where = parseExpression("(length(toString(x)) > 10 OR x = 1) AND x = 2"); - EXPECT_FALSE(canPruneHybridSegment( - /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); + /// The OR contains an atom KeyCondition can't analyze (`length(toString(x)) > 10`), + /// so it conservatively keeps the segment. + EXPECT_FALSE(canPrune("(length(toString(x)) > 10 OR x = 1) AND x = 2", "x > 0")); } From a6e604cfbf49ab34cb1f58e65172eccab26d6f75 Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Fri, 8 May 2026 12:44:13 +0200 Subject: [PATCH 3/3] hybrid segment pruning: second attempt of reducing the code size --- src/Storages/HybridSegmentPruner.cpp | 170 +++++------------------ src/Storages/HybridSegmentPruner.h | 6 +- src/Storages/MergeTree/KeyCondition.cpp | 175 +++++++++++++++--------- src/Storages/MergeTree/KeyCondition.h | 5 + 4 files changed, 151 insertions(+), 205 deletions(-) diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp index a002af6142f1..8633fcd4ec84 100644 --- a/src/Storages/HybridSegmentPruner.cpp +++ b/src/Storages/HybridSegmentPruner.cpp @@ -1,29 +1,21 @@ #include -#include #include #include -#include #include -#include -#include +#include +#include #include #include #include -#include #include -#include -#include - namespace DB { namespace { -/// Build `tuple(col1, col2, ...)` to feed into KeyDescription::getKeyFromAST. -/// Mirrors the pattern in Paimon::PartitionPruner. ASTPtr makeIdentityKeyAST(const Names & column_names) { auto key_ast = make_intrusive(); @@ -35,8 +27,6 @@ ASTPtr makeIdentityKeyAST(const Names & column_names) return key_ast; } -/// KeyDescription requires every key column to be comparable. Drop the rest; -/// segment predicates over filtered columns will fail-open in canBePruned(). NamesAndTypesList filterComparable(const NamesAndTypesList & in) { NamesAndTypesList out; @@ -58,111 +48,12 @@ KeyDescription buildIdentityKey(const NamesAndTypesList & comparable_cols, Conte context); } -/// Strictly recognize `<`, `<=`, `>`, `>=`, `=`. Returns the typed Range for `column op value`. -std::optional rangeForCompare(const String & op, const Field & value) +NamesAndTypesList namesAndTypesFromKey(const KeyDescription & key) { - if (op == "less") return Range::createRightBounded(value, /*included*/ false); - if (op == "lessOrEquals") return Range::createRightBounded(value, /*included*/ true); - if (op == "greater") return Range::createLeftBounded(value, /*included*/ false); - if (op == "greaterOrEquals") return Range::createLeftBounded(value, /*included*/ true); - if (op == "equals") return Range(value); - return std::nullopt; -} - -/// `5 < x` ≡ `x > 5`, etc. -String swapCompareOp(const String & op) -{ - if (op == "less") return "greater"; - if (op == "lessOrEquals") return "greaterOrEquals"; - if (op == "greater") return "less"; - if (op == "greaterOrEquals") return "lessOrEquals"; - return op; /// equals stays equals -} - -void collectConjuncts(const ASTPtr & ast, std::vector & out) -{ - if (!ast) - return; - if (const auto * func = ast->as(); func && func->name == "and" && func->arguments) - { - for (const auto & child : func->arguments->children) - collectConjuncts(child, out); - return; - } - out.push_back(ast); -} - -enum class ApplyOutcome -{ - Recognized, /// atom narrowed the rect on its column - Unsupported, /// caller should fail open (keep segment) - EmptyIntersection, /// segment self-contradicts → segment is provably empty -}; - -/// Tighten `rect[idx]` for the column referenced by a single comparison conjunct. -/// `column_index_by_name` maps Hybrid column name → rect index (already restricted -/// to comparable columns). -ApplyOutcome applyConjunct( - const ASTPtr & ast, - const std::unordered_map & column_index_by_name, - const DataTypes & key_types, - Hyperrectangle & rect, - const ContextPtr & context) -{ - const auto * func = ast->as(); - if (!func || !func->arguments || func->arguments->children.size() != 2) - return ApplyOutcome::Unsupported; - - const String & fname = func->name; - if (!rangeForCompare(fname, Field{})) - return ApplyOutcome::Unsupported; - - ASTPtr lhs = func->arguments->children[0]; - ASTPtr rhs = func->arguments->children[1]; - const auto * lhs_id = lhs->as(); - const auto * rhs_id = rhs->as(); - - String column_name; - ASTPtr value_ast; - String op = fname; - if (lhs_id && !rhs_id) - { - column_name = lhs_id->shortName(); - value_ast = rhs; - } - else if (!lhs_id && rhs_id) - { - column_name = rhs_id->shortName(); - value_ast = lhs; - op = swapCompareOp(fname); - } - else - { - return ApplyOutcome::Unsupported; - } - - auto idx_it = column_index_by_name.find(column_name); - if (idx_it == column_index_by_name.end()) - return ApplyOutcome::Unsupported; - size_t idx = idx_it->second; - - auto evaluated = tryEvaluateConstantExpression(value_ast, context); - if (!evaluated || !evaluated->second) - return ApplyOutcome::Unsupported; - - auto coerced = convertFieldToTypeStrict(evaluated->first, *evaluated->second, *key_types[idx], FormatSettings{}); - if (!coerced || (coerced->isNull() && !evaluated->first.isNull())) - return ApplyOutcome::Unsupported; - - auto atom_range = rangeForCompare(op, *coerced); - if (!atom_range) - return ApplyOutcome::Unsupported; - - auto narrowed = rect[idx].intersectWith(*atom_range); - if (!narrowed) - return ApplyOutcome::EmptyIntersection; - rect[idx] = std::move(*narrowed); - return ApplyOutcome::Recognized; + NamesAndTypesList out; + for (size_t i = 0; i < key.column_names.size(); ++i) + out.emplace_back(key.column_names[i], key.data_types[i]); + return out; } } @@ -186,38 +77,45 @@ try if (useless || !substituted_segment_predicate) return false; - std::vector conjuncts; - collectConjuncts(substituted_segment_predicate, conjuncts); - if (conjuncts.empty()) - return false; + auto segment_ast = substituted_segment_predicate->clone(); + auto sample = namesAndTypesFromKey(identity_key); + auto syntax_result = TreeRewriter(context).analyze(segment_ast, sample); + auto segment_dag = ExpressionAnalyzer(segment_ast, syntax_result, context).getActionsDAG(true); + ActionsDAGWithInversionPushDown segment_filter(segment_dag.getOutputs().at(0), context); - std::unordered_map idx_by_name; - idx_by_name.reserve(identity_key.column_names.size()); - for (size_t i = 0; i < identity_key.column_names.size(); ++i) - idx_by_name.emplace(identity_key.column_names[i], i); + KeyCondition segment_condition( + segment_filter, context, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false); - Hyperrectangle rect(identity_key.column_names.size(), Range::createWholeUniverse()); + Hyperrectangle rect; + rect.reserve(identity_key.column_names.size()); - for (const auto & c : conjuncts) + for (size_t i = 0; i < identity_key.column_names.size(); ++i) { - switch (applyConjunct(c, idx_by_name, identity_key.data_types, rect, context)) + Ranges col_ranges; + if (!segment_condition.extractPlainRangesForColumn(i, col_ranges)) + { + rect.push_back(Range::createWholeUniverse()); + continue; + } + + if (col_ranges.empty()) + return true; + + if (col_ranges.size() != 1) { - case ApplyOutcome::Unsupported: - return false; - case ApplyOutcome::EmptyIntersection: - /// Segment predicate self-contradicts → segment is provably empty. - return true; - case ApplyOutcome::Recognized: - break; + rect.push_back(Range::createWholeUniverse()); + continue; } + + rect.push_back(col_ranges.front()); } return !user_condition.checkInHyperrectangle(rect, identity_key.data_types).can_be_true; } catch (...) { - /// Fail-open: any unexpected exception in extraction, type coercion, or constant - /// evaluation must not prevent the segment from being scanned normally. return false; } diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h index 5f6edbdb0a8a..7b6db05c9871 100644 --- a/src/Storages/HybridSegmentPruner.h +++ b/src/Storages/HybridSegmentPruner.h @@ -13,8 +13,10 @@ namespace DB /// Paimon::PartitionPruner. /// /// Build one KeyCondition over the user filter (PREWHERE+WHERE represented as an -/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, parse -/// its (already watermark-substituted) predicate AST into a Hyperrectangle and ask +/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, build +/// a second KeyCondition from its (already watermark-substituted) predicate AST and +/// use `KeyCondition::extractPlainRangesForColumn` to obtain a Hyperrectangle (fail-open +/// to whole-universe per column when extraction is ambiguous). Then ask /// `KeyCondition::checkInHyperrectangle(rect, types).can_be_true`. The segment can be /// pruned iff the answer is false. /// diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 58005311865b..da92b200aaaf 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -3226,12 +3226,109 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const if (key_columns.size() != 1) return false; + return extractPlainRangesForColumn(0, ranges); +} + +bool KeyCondition::extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const +{ + if (column_index >= key_columns.size()) + return false; + if (hasMonotonicFunctionsChain()) return false; - /// All Ranges in rpn_stack is plain. + /// All Ranges in rpn_stack are plain. std::stack rpn_stack; + auto push_range_atom = [&](const RPNElement & element, bool negate) + { + if (element.getKeyColumn() != column_index) + rpn_stack.push(PlainRanges::makeUniverse()); + else if (negate) + rpn_stack.push(PlainRanges(element.range.invertRange())); + else + rpn_stack.push(PlainRanges(element.range)); + }; + + auto find_tuple_index_for_column = [&](const RPNElement & element) -> std::optional + { + chassert(element.set_index); + for (const auto & mapping : element.set_index->getIndexesMapping()) + if (mapping.key_index == column_index) + return mapping.tuple_index; + return std::nullopt; + }; + + auto try_extract_set_ranges = [&](const RPNElement & element, bool negate, PlainRanges & out) -> bool + { + auto tuple_index = find_tuple_index_for_column(element); + if (!tuple_index) + { + out = PlainRanges::makeUniverse(); + return true; + } + + if (element.set_index->hasMonotonicFunctionsChain()) + return false; + + if (element.set_index->size() == 0) + { + out = negate ? PlainRanges::makeUniverse() : PlainRanges::makeBlank(); + return true; + } + + const auto & values = element.set_index->getOrderedSet(); + if (*tuple_index >= values.size()) + return false; + + const auto & column_values = *values[*tuple_index]; + const size_t values_size = element.set_index->size(); + Ranges points_range; + + if (!negate) + { + /// values in set_index are ordered and no duplication + for (size_t i = 0; i < values_size; ++i) + { + FieldRef value; + column_values.get(i, value); + if (value.isNull()) + return false; + points_range.push_back({value}); + } + } + else + { + std::optional previous; + for (size_t i = 0; i < values_size; ++i) + { + FieldRef current; + column_values.get(i, current); + if (current.isNull()) + return false; + + if (previous) + { + Range between(*previous, false, current, false); + /// skip blank range + if (!(between.left > between.right || (between.left == between.right && !between.left_included && !between.right_included))) + points_range.push_back(between); + } + else + { + points_range.push_back(Range::createRightBounded(current, false)); + } + + previous = current; + } + + points_range.push_back(Range::createLeftBounded(*previous, false)); + } + + out = PlainRanges(points_range); + return true; + }; + for (const auto & element : rpn) { if (element.function == RPNElement::FUNCTION_AND) @@ -3279,76 +3376,20 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const else /// atom relational expression or constants { if (element.function == RPNElement::FUNCTION_IN_RANGE) - { - rpn_stack.push(PlainRanges(element.range)); - } + push_range_atom(element, /*negate=*/false); else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE) { - rpn_stack.push(PlainRanges(element.range.invertRange())); - } - else if (element.function == RPNElement::FUNCTION_IN_SET) - { - if (element.set_index->hasMonotonicFunctionsChain()) - return false; - - if (element.set_index->size() == 0) - { - rpn_stack.push(PlainRanges::makeBlank()); /// skip blank range - continue; - } - - const auto & values = element.set_index->getOrderedSet(); - Ranges points_range; - - /// values in set_index are ordered and no duplication - for (size_t i = 0; i < element.set_index->size(); i++) - { - FieldRef f; - values[0]->get(i, f); - if (f.isNull()) - return false; - points_range.push_back({f}); - } - rpn_stack.push(PlainRanges(points_range)); + push_range_atom(element, /*negate=*/true); } - else if (element.function == RPNElement::FUNCTION_NOT_IN_SET) + else if (element.function == RPNElement::FUNCTION_IN_SET || element.function == RPNElement::FUNCTION_NOT_IN_SET) { - if (element.set_index->hasMonotonicFunctionsChain()) + PlainRanges set_ranges = PlainRanges::makeUniverse(); + if (!try_extract_set_ranges( + element, + /*negate=*/element.function == RPNElement::FUNCTION_NOT_IN_SET, + set_ranges)) return false; - - if (element.set_index->size() == 0) - { - rpn_stack.push(PlainRanges::makeUniverse()); - continue; - } - - const auto & values = element.set_index->getOrderedSet(); - Ranges points_range; - - std::optional pre; - for (size_t i=0; isize(); i++) - { - FieldRef cur; - values[0]->get(i, cur); - - if (cur.isNull()) - return false; - if (pre) - { - Range r(*pre, false, cur, false); - /// skip blank range - if (!(r.left > r.right || (r.left == r.right && !r.left_included && !r.right_included))) - points_range.push_back(r); - } - else - { - points_range.push_back(Range::createRightBounded(cur, false)); - } - pre = cur; - } - - points_range.push_back(Range::createLeftBounded(*pre, false)); - rpn_stack.push(PlainRanges(points_range)); + rpn_stack.push(std::move(set_ranges)); } else if (element.function == RPNElement::ALWAYS_FALSE) { @@ -3379,7 +3420,7 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const } if (rpn_stack.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRanges"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRangesForColumn"); ranges = std::move(rpn_stack.top().ranges); return true; diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index b733920df792..b1f9099e87bd 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -199,6 +199,11 @@ class KeyCondition /// TODO handle the cases when generate RPN. bool extractPlainRanges(Ranges & ranges) const; + /// Same stack algorithm as extractPlainRanges, but for a multi-column key: logical ops apply + /// as usual, while atoms that constrain other key columns become the universe for `column_index`. + /// Returns false if the RPN contains unsupported atoms for this extraction (same as extractPlainRanges). + bool extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const; + /// The expression is stored as Reverse Polish Notation. struct RPNElement {