Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A DataFusion extension that integrates [USearch](https://github.com/unum-cloud/u

Queries matching the `ORDER BY distance_fn(col, query) LIMIT k` pattern are **transparently rewritten** by an optimizer rule into a native USearch index call — no query rewrite needed from the caller. `WHERE` clause filters are handled adaptively: high-selectivity filters use USearch's in-graph predicate API; low-selectivity filters bypass HNSW entirely and scan the data directly.

**DataFusion:** 52.2   **USearch:** 2.24
**DataFusion:** 53   **USearch:** 2.24

---

Expand Down Expand Up @@ -230,20 +230,33 @@ tests/

### Optimizer rewrite

The rule (`rule.rs`) matches two logical plan shapes:
The rule (`rule.rs`) matches the `Sort(fetch=k)` over a `TableScan`, with an
optional `Projection` between them and an optional `Filter` directly above the
scan:

```
Sort(fetch=k, ORDER BY dist ASC)
Projection([..., distance_fn(col, lit) AS dist, ...])
TableScan(name)

Sort(fetch=k, ORDER BY dist ASC)
Projection([..., distance_fn(col, lit) AS dist, ...])
Filter(predicate)
[ Projection([..., distance_fn(col, lit) AS dist, ...]) ] ← optional
[ Filter(predicate) ] ← optional
TableScan(name)
```

Preconditions: sort is `ASC`, distance UDF matches index metric, table is registered, query vector is a literal. When the rule fires, it replaces the inner nodes with a `USearchNode` leaf carrying: table name, vector column, query vector, k, distance type, and absorbed filter predicates. The `Sort` node is preserved above for final ordering.
DataFusion omits the `Projection` for `SELECT *` (and for any SELECT whose
columns come straight from the scan), so the `Sort` can sit directly on the
`TableScan`.

Preconditions: sort is `ASC`, distance UDF matches index metric, table is
registered, query vector is a literal. When the rule fires, it replaces the inner
nodes with a `USearchNode` leaf carrying: table name, vector column, query
vector, k, distance type, and absorbed filter predicates. The `Sort` node is
preserved above for final ordering.

**Schema preservation:** an optimizer rule must not change the plan's output
schema. The `USearchNode` produces only what the `lookup_provider` can fetch by
key (addressing key + non-vector columns) plus `_distance` — it cannot produce
the indexed vector column. If the matched `Sort`'s output would include the
vector column (e.g. `SELECT *`), the rule declines and the query falls back to
exact execution rather than emitting a schema-incompatible plan.

Physical planning (`planner.rs`) translates `USearchNode` into `USearchExec`, a physical plan node that executes the actual search.

Expand Down Expand Up @@ -305,6 +318,7 @@ Tests cover optimizer rule matching/rejection, end-to-end execution through both

| Limitation | Notes |
|---|---|
| Projecting the indexed vector column | A k-NN query whose output includes the vector column itself (e.g. `SELECT *`, or `SELECT id, vector`) falls back to exact execution. The `lookup_provider` does not store the vector column (see [registration](#register-providers-and-set-up-the-sessioncontext)), so the rewrite cannot reproduce it. Project the metadata columns and the distance instead. |
| Stacked `Filter` nodes | Only one `Filter -> TableScan` layer is absorbed. `Filter -> Filter -> TableScan` falls back to exact execution. DataFusion typically combines multiple WHERE conditions into a single Filter, so this rarely occurs. |
| Runtime query vectors | The query vector must be a compile-time literal (`ARRAY[0.1, ...]`). Column references or subquery results are not rewritten. Use `vector_search_vector` for explicit ANN queries. |
| `ef_search` per-query | `expansion_search` is global to the index instance. Per-query adjustment is not supported. |
Expand Down
114 changes: 92 additions & 22 deletions src/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,66 @@ impl USearchRule {
}

fn try_match(&self, plan: &LogicalPlan) -> Option<LogicalPlan> {
match plan {
// Anchor on the Sort itself. The projection (if any) sits *below* the
// Sort and supplies its output columns; SELECT * omits it entirely.
LogicalPlan::Sort(sort) => {
let (proj_exprs_slice, after_sort): (&[Expr], &LogicalPlan) =
match sort.input.as_ref() {
LogicalPlan::Projection(p) => (p.expr.as_slice(), p.input.as_ref()),
other => (&[], other),
};
self.build_rewrite(sort, proj_exprs_slice, after_sort)
}

// Output-aware passthrough. When a Projection sits directly over a
// k-NN Sort that rests on the scan (no projection between them), drive
// the rewrite with the OUTER projection's columns — i.e. the query's
// real output. The rewrite can only produce the index node's columns
// (addressing key + non-vector columns + _distance), never the indexed
// vector itself. Routing the output columns through `build_rewrite`
// lets it fire when they're all producible (e.g. `SELECT id … ORDER BY
// l2_distance(emb, …)`) and decline — falling back to exact search —
// when the output needs the vector (`SELECT *`, `SELECT id, emb`),
// rather than emitting a schema the consumer can't satisfy (issue #508).
//
// ALTERNATIVE (not taken): teach USearchExec to reconstruct the vector
// column for the result keys via `index.get(key)`, so even
// vector-returning queries stay on the index. Rejected to keep a single
// source of truth for returned vectors — the index would otherwise be a
// second source that must byte-match the parquet (breaks under F16
// quantization, and relies on USearch never transforming stored vectors).
// See the README "Limitations" entry and runtimedb issue #508.
LogicalPlan::Projection(outer) => {
let LogicalPlan::Sort(sort) = outer.input.as_ref() else {
return None;
};
// Only the passthrough shape; the remap shape (projection *below*
// the Sort) is handled when we visit the Sort above.
if !matches!(
sort.input.as_ref(),
LogicalPlan::TableScan(_) | LogicalPlan::Filter(_)
) {
return None;
}
self.build_rewrite(sort, &outer.expr, sort.input.as_ref())
}

_ => None,
}
}

fn build_rewrite(
&self,
sort: &datafusion::logical_expr::logical_plan::Sort,
proj_exprs_slice: &[Expr],
after_sort: &LogicalPlan,
) -> Option<LogicalPlan> {
use datafusion::logical_expr::logical_plan::TableScan;

// Require Sort with embedded fetch limit.
let sort = match plan {
LogicalPlan::Sort(s) => s,
_ => return None,
};
let k = sort.fetch?;

// Projection is optional — DataFusion 51 omits it for SELECT * queries.
let (proj_exprs_slice, after_sort): (&[Expr], &LogicalPlan) = match sort.input.as_ref() {
LogicalPlan::Projection(p) => (p.expr.as_slice(), p.input.as_ref()),
other => (&[], other),
};

// Accept TableScan directly, or Filter(TableScan) for WHERE clauses.
// Deeper nesting (Filter→Filter→…) is not absorbed — the rule does
// not fire and DataFusion falls back to exact execution.
Expand Down Expand Up @@ -155,7 +200,14 @@ impl USearchRule {
// Build the final user-visible projection over USearchNode output.
let dist_alias_str = dist_alias.as_deref().unwrap_or("_distance");
let final_proj_exprs = if proj_exprs_slice.is_empty() {
passthrough_projection(&vsn_df_schema, &table_ref)
// No explicit Projection node (e.g. SELECT *, or a SELECT whose
// columns come straight from the scan, so the Sort sits directly on
// the TableScan). The rewrite must reproduce the original output
// columns; if any isn't producible from the node — the indexed
// vector column is never stored in the fetch path — bail so the
// query falls back to exact brute-force search, like the other
// unsupported shapes (DESC, metric mismatch, stacked filters).
passthrough_projection(after_sort.schema().as_ref(), &vsn_df_schema, &table_ref)?
} else {
remap_projections(proj_exprs_slice, dist_alias_str, &table_ref)
};
Expand Down Expand Up @@ -375,21 +427,39 @@ fn build_outer_projection(exprs: &[Expr]) -> Vec<Expr> {
.collect()
}

/// Build a passthrough Projection for SELECT * queries (no original Projection node).
/// Projects only the original table columns (not `_distance`) so the output schema
/// matches the original Sort schema. The Sort re-evaluates the distance UDF expression
/// on the k result rows returned by USearchExec (O(k × dim), negligible for small k).
fn passthrough_projection(schema: &DFSchema, table_ref: &TableReference) -> Vec<Expr> {
schema
/// Build a passthrough Projection for queries with no explicit Projection node
/// (e.g. `SELECT *`, or a SELECT whose columns come straight from the scan so the
/// Sort sits directly on the TableScan).
///
/// The projection must reproduce the *original* output columns (`original_schema`,
/// the Sort's input). The `USearchNode` can only produce the columns in
/// `node_schema` — the fetch path's addressing key + non-vector columns +
/// `_distance`; the indexed vector column is never stored there (see
/// `PointLookupProvider`). If the original output needs a column the node can't
/// produce (the vector column), return `None` so the rule declines to rewrite and
/// the query falls back to exact brute-force search. The Sort re-evaluates the
/// distance UDF on the k result rows returned by USearchExec (O(k × dim)).
fn passthrough_projection(
original_schema: &DFSchema,
node_schema: &DFSchema,
table_ref: &TableReference,
) -> Option<Vec<Expr>> {
original_schema
.inner()
.fields()
.iter()
.filter(|f| f.name() != "_distance")
.map(|f| {
Expr::Column(datafusion::common::Column::new(
Some(table_ref.clone()),
f.name().as_str(),
))
let producible = node_schema
.inner()
.fields()
.iter()
.any(|nf| nf.name() == f.name());
producible.then(|| {
Expr::Column(datafusion::common::Column::new(
Some(table_ref.clone()),
f.name().as_str(),
))
})
})
.collect()
}
Expand Down
Loading
Loading