From f3c4038492220400e8d13558e2d523541d8e579d Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Fri, 29 May 2026 15:56:50 +0530 Subject: [PATCH 1/3] feat(sqlite): add streaming SqliteSidecarBuilder keyed by a column open_or_build opens parquet files and synthesises monotonic 0..N keys. Add SqliteSidecarBuilder: an incremental begin/push_batch/finish builder that consumes RecordBatches and reads each row's key from a designated column, so a caller can build the sidecar from any batch source (e.g. a storage engine's native rowid) in a single bounded-memory pass without materialising an intermediate parquet file. Shares CREATE/INSERT DDL and row-param construction with the parquet path via new ddl()/row_to_params() helpers; build_table behaviour is unchanged. One transaction wraps the build; an abandoned builder rolls back on drop. Keys are stored as INTEGER PRIMARY KEY, preserving the B-tree point-lookup performance. --- src/lib.rs | 2 +- src/sqlite_provider.rs | 240 +++++++++++++++++++++++++++++++--- tests/sqlite_provider_test.rs | 84 +++++++++++- 3 files changed, 302 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 702e10a..a5f9eab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,7 +85,7 @@ pub use udtf::VectorSearchVectorUDTF; #[cfg(feature = "parquet-provider")] pub use parquet_provider::ParquetLookupProvider; #[cfg(feature = "sqlite-provider")] -pub use sqlite_provider::SqliteLookupProvider; +pub use sqlite_provider::{SqliteLookupProvider, SqliteSidecarBuilder}; use std::sync::Arc; diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 996b7eb..de35b76 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -272,6 +272,144 @@ impl SqliteLookupProvider { } } +// ── Streaming sidecar builder ─────────────────────────────────────────────── + +/// Incremental builder for a [`SqliteLookupProvider`] backed by a stream of +/// [`RecordBatch`]es instead of parquet files. +/// +/// [`SqliteLookupProvider::open_or_build`] opens parquet files itself and +/// synthesises monotonic `0..N` keys. This builder instead takes batches one at +/// a time and reads each row's key from a designated column (e.g. a storage +/// engine's native `rowid`). That lets a caller drive a single pass over a +/// source — fanning each batch out to both a USearch index and this sidecar — +/// without first materialising an intermediate parquet file. +/// +/// Memory is bounded: [`push_batch`](Self::push_batch) inserts a batch's rows +/// and returns, accumulating nothing across batches. All inserts share one +/// transaction (opened in [`begin`](Self::begin), committed in +/// [`finish`](Self::finish)); dropping the builder before `finish` rolls the +/// transaction back, so a half-built table is never persisted. +/// +/// The first field of `schema` is the key column, created as +/// `INTEGER PRIMARY KEY` (the rowid-alias B-tree), matching `open_or_build`. +pub struct SqliteSidecarBuilder { + conn: Connection, + db_path: String, + table_name: String, + schema: SchemaRef, + pool_size: usize, + insert_sql: String, + key_col_index: usize, + value_col_indices: Vec, +} + +impl SqliteSidecarBuilder { + /// Begin a build at `db_path`: open the connection, start the transaction, + /// and create the table. + /// + /// `schema` is the output SQLite schema — field 0 is the key column + /// (`INTEGER PRIMARY KEY`), fields 1.. are the stored value columns. + /// `key_col_index` and `value_col_indices` index into the *input* batches + /// passed to [`push_batch`](Self::push_batch): `key_col_index` is the + /// column holding the row key, and `value_col_indices` map input columns to + /// schema fields 1.. in order. + pub fn begin( + db_path: &str, + table_name: &str, + pool_size: usize, + schema: SchemaRef, + key_col_index: usize, + value_col_indices: Vec, + ) -> DFResult { + if pool_size == 0 { + return Err(DataFusionError::Execution( + "pool_size must be at least 1".into(), + )); + } + if schema.fields().len() != value_col_indices.len() + 1 { + return Err(DataFusionError::Execution(format!( + "schema has {} fields but expected 1 key column + {} value columns", + schema.fields().len(), + value_col_indices.len() + ))); + } + let (create_sql, insert_sql) = ddl(table_name, &schema); + let conn = open_conn(db_path)?; + // Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the + // transaction can span many push_batch calls without a self-referential + // borrow. An uncommitted transaction is rolled back when `conn` is + // dropped, so an abandoned build leaves no half-written table. + conn.execute_batch("BEGIN;") + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + conn.execute_batch(&create_sql) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + Ok(Self { + conn, + db_path: db_path.to_string(), + table_name: table_name.to_string(), + schema, + pool_size, + insert_sql, + key_col_index, + value_col_indices, + }) + } + + /// Insert every row of `batch`. The key is read from `key_col_index`; the + /// value columns from `value_col_indices`. Rows are inserted as they are + /// read — nothing is buffered, so peak memory is O(one batch). + pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> { + let ncols = batch.num_columns(); + if self.key_col_index >= ncols { + return Err(DataFusionError::Execution(format!( + "key_col_index {} out of range for batch with {ncols} columns", + self.key_col_index + ))); + } + let key_col = batch.column(self.key_col_index); + let mut stmt = self + .conn + .prepare_cached(&self.insert_sql) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + for row in 0..batch.num_rows() { + let key = extract_key(key_col, row)?; + let params = row_to_params(key, batch, &self.value_col_indices, row); + stmt.execute(rusqlite::params_from_iter(params.iter())) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + } + Ok(()) + } + + /// Commit the build, checkpoint the WAL, and open the read connection pool. + pub fn finish(self) -> DFResult { + self.conn + .execute_batch("COMMIT;") + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + // Flush WAL to the main db so the data survives process exit (matches + // open_or_build). + self.conn + .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);") + .map_err(|e| DataFusionError::Execution(format!("WAL checkpoint failed: {e}")))?; + tracing::info!( + "SQLite table '{}' built from stream and committed.", + self.table_name + ); + + let key_col = self.schema.field(0).name().clone(); + let mut conns = vec![self.conn]; + for _ in 1..self.pool_size { + conns.push(open_conn(&self.db_path)?); + } + Ok(SqliteLookupProvider { + schema: self.schema, + table_name: self.table_name, + key_col, + pool: Arc::new(Mutex::new(conns)), + sem: Arc::new(Semaphore::new(self.pool_size)), + }) + } +} + // ── PointLookupProvider ─────────────────────────────────────────────────────── #[async_trait] @@ -682,14 +820,11 @@ fn build_scan_batch(schema: &SchemaRef, col_bufs: Vec>) -> DFResul // ── Build helpers ───────────────────────────────────────────────────────────── -fn build_table( - conn: &Connection, - table_name: &str, - parquet_files: &[String], - schema: &SchemaRef, - parquet_col_indices: &[usize], -) -> DFResult<()> { - // The first field is the key column (INTEGER PRIMARY KEY). +/// Build the `CREATE TABLE` and `INSERT` SQL for a sidecar table. The first +/// schema field is the key column (`INTEGER PRIMARY KEY`, i.e. the rowid-alias +/// B-tree); the rest are typed from their Arrow type. Shared by the parquet +/// build path ([`build_table`]) and the streaming [`SqliteSidecarBuilder`]. +fn ddl(table_name: &str, schema: &SchemaRef) -> (String, String) { let key_col_name = schema.field(0).name(); let col_defs = schema .fields() @@ -698,12 +833,16 @@ fn build_table( if f.name() == key_col_name { format!("{} INTEGER PRIMARY KEY", quote_ident(f.name())) } else { - let sql_type = arrow_type_to_sql(f.data_type()); - format!("{} {}", quote_ident(f.name()), sql_type) + format!( + "{} {}", + quote_ident(f.name()), + arrow_type_to_sql(f.data_type()) + ) } }) .collect::>() .join(", "); + let create_sql = format!("CREATE TABLE {} ({col_defs});", quote_ident(table_name)); let placeholders = schema .fields() @@ -715,6 +854,73 @@ fn build_table( "INSERT INTO {} VALUES ({placeholders})", quote_ident(table_name) ); + (create_sql, insert_sql) +} + +/// Build the INSERT parameter row: the key (as SQLite INTEGER) first, then each +/// value column in `value_col_indices` order. +fn row_to_params( + key: i64, + batch: &RecordBatch, + value_col_indices: &[usize], + row: usize, +) -> Vec { + let mut params: Vec = Vec::with_capacity(value_col_indices.len() + 1); + params.push(SqlValue::Integer(key)); + for &ci in value_col_indices { + params.push(arrow_cell_to_sql(batch.column(ci), row)); + } + params +} + +/// Read a non-null integer row key from `col` at `row`. Supports the 32/64-bit +/// signed and unsigned integer types; the value is stored as SQLite INTEGER +/// (i64), so a storage engine's native `rowid` (non-negative i64) round-trips +/// through the `u64` lookup API unchanged. +fn extract_key(col: &ArrayRef, row: usize) -> DFResult { + if col.is_null(row) { + return Err(DataFusionError::Execution( + "key column has a null value; row keys must be non-null".into(), + )); + } + let key = match col.data_type() { + DataType::Int64 => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + DataType::UInt64 => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + DataType::Int32 => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + DataType::UInt32 => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + other => { + return Err(DataFusionError::Execution(format!( + "unsupported key column type {other:?}; expected an integer type" + ))); + } + }; + Ok(key) +} + +fn build_table( + conn: &Connection, + table_name: &str, + parquet_files: &[String], + schema: &SchemaRef, + parquet_col_indices: &[usize], +) -> DFResult<()> { + let (create_sql, insert_sql) = ddl(table_name, schema); // CREATE TABLE and all INSERTs share one transaction so a mid-build crash // leaves no half-built table. If the table exists with zero rows on the @@ -724,11 +930,8 @@ fn build_table( .unchecked_transaction() .map_err(|e| DataFusionError::Execution(e.to_string()))?; { - tx.execute_batch(&format!( - "CREATE TABLE {} ({col_defs});", - quote_ident(table_name) - )) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; + tx.execute_batch(&create_sql) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; let mut stmt = tx .prepare(&insert_sql) @@ -754,12 +957,7 @@ fn build_table( let key = global_row_idx; global_row_idx += 1; - let mut params: Vec = Vec::with_capacity(schema.fields().len()); - params.push(SqlValue::Integer(key as i64)); - - for &ci in parquet_col_indices { - params.push(arrow_cell_to_sql(batch.column(ci), row_i)); - } + let params = row_to_params(key as i64, &batch, parquet_col_indices, row_i); stmt.execute(rusqlite::params_from_iter(params.iter())) .map_err(|e| DataFusionError::Execution(e.to_string()))?; diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index 77ee09b..8438d7b 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -2,11 +2,13 @@ use std::sync::Arc; -use arrow_array::{Array, RecordBatch, StringArray, UInt64Array}; +use arrow_array::{Array, Int64Array, RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion::catalog::TableProvider; use datafusion::prelude::SessionContext; -use datafusion_vector_search_ext::{PointLookupProvider, SqliteLookupProvider}; +use datafusion_vector_search_ext::{ + PointLookupProvider, SqliteLookupProvider, SqliteSidecarBuilder, +}; use parquet::arrow::ArrowWriter; use tempfile::tempdir; @@ -81,6 +83,84 @@ async fn test_fetch_existing_keys() { assert!(!names.contains(&"bob".to_string())); } +#[tokio::test] +async fn test_stream_builder_with_explicit_rowid_keys() { + // The streaming builder reads each row's key from a column (e.g. a storage + // engine's native rowid) instead of synthesising 0..N. Keys here are sparse + // and non-monotonic across two batches to prove that works end to end. + let dir = tempdir().unwrap(); + + let batch_schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + // Output schema: key column first, then the stored value column. + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let b1 = RecordBatch::try_new( + batch_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![100_i64, 250])), + Arc::new(StringArray::from(vec![Some("alice"), Some("bob")])), + ], + ) + .unwrap(); + let b2 = RecordBatch::try_new( + batch_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![999_i64])), + Arc::new(StringArray::from(vec![Some("carol")])), + ], + ) + .unwrap(); + + let db_path = dir.path().join("stream.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 4, + provider_schema, + 0, // key (rowid) is column 0 of the input batches + vec![1], // input column 1 (name) → provider field 1 + ) + .unwrap(); + builder.push_batch(&b1).unwrap(); + builder.push_batch(&b2).unwrap(); + let provider = builder.finish().unwrap(); + + // Point-lookup by sparse rowids. + let batches = provider + .fetch_by_keys(&[100, 999], "rowid", None) + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + let names: Vec = batches + .iter() + .flat_map(|b| { + b.column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + }) + .collect(); + assert!(names.contains(&"alice".to_string())); + assert!(names.contains(&"carol".to_string())); + assert!(!names.contains(&"bob".to_string())); // rowid 250 not requested + + // A rowid that was never inserted returns nothing. + let empty = provider.fetch_by_keys(&[42], "rowid", None).await.unwrap(); + assert_eq!(empty.iter().map(|b| b.num_rows()).sum::(), 0); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From d88fdaa657ee841e4854c64010aa5351bf135623 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Fri, 29 May 2026 16:08:28 +0530 Subject: [PATCH 2/3] test(sqlite): cover rollback, uint64 keys, and validation for stream builder --- tests/sqlite_provider_test.rs | 128 ++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index 8438d7b..cbc348e 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -161,6 +161,134 @@ async fn test_stream_builder_with_explicit_rowid_keys() { assert_eq!(empty.iter().map(|b| b.num_rows()).sum::(), 0); } +#[test] +fn test_stream_builder_abandon_rolls_back() { + // Dropping the builder before finish() must roll the transaction back, so + // the table is never persisted. We prove it by re-running begin() on the + // same path: its CREATE TABLE only succeeds if the abandoned build left no + // table behind. + let dir = tempdir().unwrap(); + let db_path = dir.path().join("abandon.db"); + let db = db_path.to_str().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2])), + Arc::new(StringArray::from(vec![Some("a"), Some("b")])), + ], + ) + .unwrap(); + + { + let mut builder = + SqliteSidecarBuilder::begin(db, "models", 1, schema.clone(), 0, vec![1]).unwrap(); + builder.push_batch(&batch).unwrap(); + // builder dropped here without finish() → rollback + } + + // A fresh build on the same path must succeed (table does not already exist). + assert!( + SqliteSidecarBuilder::begin(db, "models", 1, schema, 0, vec![1]).is_ok(), + "abandoned build must not persist its table" + ); +} + +#[tokio::test] +async fn test_stream_builder_uint64_keys() { + // The key column may be UInt64 (as well as Int64); it is stored as SQLite + // INTEGER and looked up via the u64 fetch API. + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![7_u64, 11])), + Arc::new(StringArray::from(vec![Some("x"), Some("y")])), + ], + ) + .unwrap(); + let db_path = dir.path().join("u64.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "t", 2, schema, 0, vec![1]).unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[11], "rowid", None).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); +} + +#[test] +fn test_stream_builder_validation_errors() { + let dir = tempdir().unwrap(); + let db = |n: &str| dir.path().join(n).to_str().unwrap().to_string(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + + // pool_size must be >= 1. + assert!(SqliteSidecarBuilder::begin(&db("a.db"), "t", 0, schema.clone(), 0, vec![1]).is_err()); + + // schema has 2 fields → exactly 1 value column index expected, not 2. + assert!( + SqliteSidecarBuilder::begin(&db("b.db"), "t", 1, schema.clone(), 0, vec![1, 2]).is_err() + ); + + // key_col_index out of range for the pushed batch (2 columns, index 9). + let mut b_oob = + SqliteSidecarBuilder::begin(&db("c.db"), "t", 1, schema.clone(), 9, vec![1]).unwrap(); + let two_col = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64])), + Arc::new(StringArray::from(vec![Some("a")])), + ], + ) + .unwrap(); + assert!(b_oob.push_batch(&two_col).is_err()); + + // A null key value is rejected. + let nullable = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, true), + Field::new("name", DataType::Utf8, true), + ])); + let mut b_null = + SqliteSidecarBuilder::begin(&db("d.db"), "t", 1, nullable.clone(), 0, vec![1]).unwrap(); + let null_key = RecordBatch::try_new( + nullable, + vec![ + Arc::new(Int64Array::from(vec![None, Some(2)])), + Arc::new(StringArray::from(vec![Some("a"), Some("b")])), + ], + ) + .unwrap(); + assert!(b_null.push_batch(&null_key).is_err()); + + // A non-integer key column type is rejected. + let text_key = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Utf8, false), + Field::new("name", DataType::Utf8, true), + ])); + let mut b_text = + SqliteSidecarBuilder::begin(&db("e.db"), "t", 1, text_key.clone(), 0, vec![1]).unwrap(); + let text_batch = RecordBatch::try_new( + text_key, + vec![ + Arc::new(StringArray::from(vec![Some("k")])), + Arc::new(StringArray::from(vec![Some("a")])), + ], + ) + .unwrap(); + assert!(b_text.push_batch(&text_batch).is_err()); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From a289ae4da00edd414b6d82e9ed61d47846cb0b15 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Fri, 29 May 2026 18:02:04 +0530 Subject: [PATCH 3/3] fix(sqlite): bounds-check value_col_indices in push_batch push_batch validated key_col_index but not value_col_indices; an out-of-range entry would panic in row_to_params on batch.column(ci) rather than returning a clean DataFusionError. Validate both, and add a test for the out-of-range value-index case. Addresses PR review nit. --- src/sqlite_provider.rs | 5 +++++ tests/sqlite_provider_test.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index de35b76..1c453ca 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -366,6 +366,11 @@ impl SqliteSidecarBuilder { self.key_col_index ))); } + if let Some(&bad) = self.value_col_indices.iter().find(|&&i| i >= ncols) { + return Err(DataFusionError::Execution(format!( + "value column index {bad} out of range for batch with {ncols} columns" + ))); + } let key_col = batch.column(self.key_col_index); let mut stmt = self .conn diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index cbc348e..ee85449 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -254,6 +254,11 @@ fn test_stream_builder_validation_errors() { .unwrap(); assert!(b_oob.push_batch(&two_col).is_err()); + // value_col_index out of range for the pushed batch (clean error, no panic). + let mut b_voob = + SqliteSidecarBuilder::begin(&db("c2.db"), "t", 1, schema.clone(), 0, vec![9]).unwrap(); + assert!(b_voob.push_batch(&two_col).is_err()); + // A null key value is rejected. let nullable = Arc::new(Schema::new(vec![ Field::new("rowid", DataType::Int64, true),