diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index eaf8b29208aa..d46283fa3993 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -277,6 +277,11 @@ name = "row_selector" harness = false required-features = ["arrow"] +[[bench]] +name = "row_selector_boolean_buffer" +harness = false +required-features = ["arrow"] + [[bench]] name = "row_group_index_reader" required-features = ["arrow"] diff --git a/parquet/benches/row_selector_boolean_buffer.rs b/parquet/benches/row_selector_boolean_buffer.rs new file mode 100644 index 000000000000..c5e1177b4d9f --- /dev/null +++ b/parquet/benches/row_selector_boolean_buffer.rs @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReaderBuilder, RowSelection}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint; +use std::sync::Arc; + +const TOTAL_ROWS: usize = 3_000_000; +const SELECTIVITY_CASES: &[Selectivity] = &[ + Selectivity::new("select01", 1, 100), + Selectivity::new("select10", 1, 10), + Selectivity::new("select33", 1, 3), + Selectivity::new("select80", 4, 5), +]; + +/// Generates a deterministic random row mask with the specified selectivity. +fn generate_random_row_selection(total_rows: usize, selectivity: Selectivity) -> BooleanBuffer { + let mut rng = StdRng::seed_from_u64(0x5E1EC7_u64 ^ selectivity.seed()); + let bools: Vec = (0..total_rows) + .map(|_| rng.random_bool(selectivity.ratio())) + .collect(); + BooleanBuffer::from(bools) +} + +fn generate_fragmented_selection(total_rows: usize, selectivity: Selectivity) -> BooleanBuffer { + let mut builder = BooleanBufferBuilder::new(total_rows); + for row in 0..total_rows { + builder.append(row % selectivity.denominator < selectivity.numerator); + } + builder.finish() +} + +fn generate_clustered_selection(total_rows: usize, selectivity: Selectivity) -> BooleanBuffer { + const RUN: usize = 8 * 1024; + const JITTER: usize = RUN / 2; + let mut rng = StdRng::seed_from_u64(0xC1057E_u64 ^ selectivity.seed()); + let mut builder = BooleanBufferBuilder::new(total_rows); + let mut rows_remaining = total_rows; + let mut run_idx = 0usize; + + while rows_remaining != 0 { + let run_len = rng + .random_range((RUN - JITTER)..=(RUN + JITTER)) + .min(rows_remaining); + let selected = run_idx % selectivity.denominator < selectivity.numerator; + builder.append_n(run_len, selected); + rows_remaining -= run_len; + run_idx += 1; + } + builder.finish() +} + +fn boolean_array(mask: &BooleanBuffer) -> BooleanArray { + BooleanArray::new(mask.clone(), None) +} + +fn criterion_benchmark(c: &mut Criterion) { + let patterns = build_patterns(); + + let mut construction = c.benchmark_group("row_selector_boolean_buffer/construction"); + for case in &patterns { + construction.bench_with_input( + BenchmarkId::new("from_filters", case.label()), + &case.mask, + |b, mask| { + b.iter(|| { + let array = boolean_array(mask); + let selection = RowSelection::from_filters(&[array]); + hint::black_box(selection); + }) + }, + ); + + construction.bench_with_input( + BenchmarkId::new("from_boolean_buffer", case.label()), + &case.mask, + |b, mask| { + b.iter(|| { + let selection = RowSelection::from_boolean_buffer(mask.clone()); + hint::black_box(selection); + }) + }, + ); + } + construction.finish(); + + let parquet_data = write_parquet_file(TOTAL_ROWS); + let mut reader = c.benchmark_group("row_selector_boolean_buffer/reader"); + for case in &patterns { + reader.bench_with_input( + BenchmarkId::new("from_filters", case.label()), + &case.mask, + |b, mask| { + b.iter(|| { + let selection = RowSelection::from_filters(&[boolean_array(mask)]); + let rows = read_rows(&parquet_data, selection); + hint::black_box(rows); + }) + }, + ); + + reader.bench_with_input( + BenchmarkId::new("from_boolean_buffer", case.label()), + &case.mask, + |b, mask| { + b.iter(|| { + let selection = RowSelection::from_boolean_buffer(mask.clone()); + let rows = read_rows(&parquet_data, selection); + hint::black_box(rows); + }) + }, + ); + } + reader.finish(); +} + +fn build_patterns() -> Vec { + let mut patterns = Vec::new(); + for selectivity in SELECTIVITY_CASES { + patterns.push(BenchCase::new( + "fragmented", + *selectivity, + generate_fragmented_selection(TOTAL_ROWS, *selectivity), + )); + patterns.push(BenchCase::new( + "clustered", + *selectivity, + generate_clustered_selection(TOTAL_ROWS, *selectivity), + )); + patterns.push(BenchCase::new( + "random", + *selectivity, + generate_random_row_selection(TOTAL_ROWS, *selectivity), + )); + } + patterns +} + +fn write_parquet_file(total_rows: usize) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let values: ArrayRef = Arc::new(Int32Array::from_iter_values( + (0..total_rows).map(|row| row as i32), + )); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + + let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap(); + writer.write(&batch).unwrap(); + Bytes::from(writer.into_inner().unwrap()) +} + +fn read_rows(parquet_data: &Bytes, selection: RowSelection) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_row_selection(selection) + .build() + .unwrap(); + + reader.map(|batch| batch.unwrap().num_rows()).sum::() +} + +struct BenchCase { + pattern: &'static str, + selectivity: Selectivity, + mask: BooleanBuffer, +} + +impl BenchCase { + fn new(pattern: &'static str, selectivity: Selectivity, mask: BooleanBuffer) -> Self { + Self { + pattern, + selectivity, + mask, + } + } + + fn label(&self) -> String { + format!("{}/{}", self.pattern, self.selectivity.label) + } +} + +#[derive(Clone, Copy)] +struct Selectivity { + label: &'static str, + numerator: usize, + denominator: usize, +} + +impl Selectivity { + const fn new(label: &'static str, numerator: usize, denominator: usize) -> Self { + Self { + label, + numerator, + denominator, + } + } + + fn ratio(self) -> f64 { + self.numerator as f64 / self.denominator as f64 + } + + fn seed(self) -> u64 { + ((self.numerator as u64) << 32) ^ self.denominator as u64 + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 814b8250508e..0424ad3036a8 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -22,7 +22,9 @@ use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef}; use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector}; +pub use selection::{ + MaskRunIter, RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector, +}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 3d80526c8282..016fd589025a 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -19,14 +19,15 @@ //! from a Parquet file use crate::arrow::array_reader::ArrayReader; -use crate::arrow::arrow_reader::selection::RowSelectionPolicy; -use crate::arrow::arrow_reader::selection::RowSelectionStrategy; +use crate::arrow::arrow_reader::selection::{ + RowSelectionInner, RowSelectionPolicy, RowSelectionStrategy, mask_to_selectors, +}; use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; use crate::errors::{ParquetError, Result}; use arrow_array::{Array, BooleanArray}; -use arrow_buffer::BooleanBuffer; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; @@ -157,26 +158,7 @@ impl ReadPlanBuilder { None => return RowSelectionStrategy::Selectors, }; - // total_rows: total number of rows selected / skipped - // effective_count: number of non-empty selectors - let (total_rows, effective_count) = - selection.iter().fold((0usize, 0usize), |(rows, count), s| { - if s.row_count > 0 { - (rows + s.row_count, count + 1) - } else { - (rows, count) - } - }); - - if effective_count == 0 { - return RowSelectionStrategy::Mask; - } - - if total_rows < effective_count.saturating_mul(threshold) { - RowSelectionStrategy::Mask - } else { - RowSelectionStrategy::Selectors - } + selection.auto_selection_strategy(threshold) } } } @@ -275,14 +257,23 @@ impl ReadPlanBuilder { } } - // If the predicate selected all rows and there is no prior selection, - // skip creating a RowSelection entirely — this avoids the allocation - // and keeps selection as None which enables coalesced page fetches. + // If the predicate selected all rows, applying it is a no-op. With no + // prior selection this keeps selection as None, enabling coalesced page + // fetches; with a prior selection it avoids rebuilding the same + // selection. let all_selected = filters.iter().all(|f| f.true_count() == f.len()); - if all_selected && self.selection.is_none() { + if all_selected { return Ok(self); } - let raw = RowSelection::from_filters(&filters); + let raw = if self + .selection + .as_ref() + .is_some_and(|s| s.as_mask().is_some()) + { + RowSelection::from_boolean_buffer(filters_to_boolean_buffer(&filters)) + } else { + RowSelection::from_filters(&filters) + }; self.selection = match self.selection.take() { Some(selection) => Some(selection.and_then(&raw)), None => Some(raw), @@ -306,19 +297,8 @@ impl ReadPlanBuilder { row_selection_policy: _, } = self; - let selection = selection.map(|s| s.trim()); - let row_selection_cursor = selection - .map(|s| { - let trimmed = s.trim(); - let selectors: Vec = trimmed.into(); - match selection_strategy { - RowSelectionStrategy::Mask => { - RowSelectionCursor::new_mask_from_selectors(selectors) - } - RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors), - } - }) + .map(|s| build_cursor(s.trim(), selection_strategy)) .unwrap_or(RowSelectionCursor::new_all()); ReadPlan { @@ -328,6 +308,24 @@ impl ReadPlanBuilder { } } +/// Lower a [`RowSelection`] to the cursor form requested by the resolved strategy. +fn build_cursor(selection: RowSelection, strategy: RowSelectionStrategy) -> RowSelectionCursor { + match (strategy, selection.into_inner()) { + (RowSelectionStrategy::Mask, RowSelectionInner::Mask(mask)) => { + RowSelectionCursor::new_mask_from_buffer((*mask).into_mask()) + } + (RowSelectionStrategy::Mask, RowSelectionInner::Selectors(selectors)) => { + RowSelectionCursor::new_mask_from_selectors(selectors) + } + (RowSelectionStrategy::Selectors, RowSelectionInner::Selectors(selectors)) => { + RowSelectionCursor::new_selectors(selectors) + } + (RowSelectionStrategy::Selectors, RowSelectionInner::Mask(mask)) => { + RowSelectionCursor::new_selectors(mask_to_selectors(mask.mask())) + } + } +} + /// Builder for [`ReadPlan`] that applies a limit and offset to the read plan /// /// See [`ReadPlanBuilder::limited`] to create this builder. @@ -413,6 +411,16 @@ impl LimitedReadPlanBuilder { } } +fn filters_to_boolean_buffer(filters: &[BooleanArray]) -> BooleanBuffer { + let total_rows = filters.iter().map(|f| f.len()).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for filter in filters { + assert_eq!(filter.null_count(), 0); + builder.append_buffer(filter.values()); + } + builder.finish() +} + /// A plan reading specific rows from a Parquet Row Group. /// /// See [`ReadPlanBuilder`] to create `ReadPlan`s @@ -476,6 +484,30 @@ mod tests { ); } + #[test] + fn preferred_selection_strategy_handles_dense_mask_backing() { + let bits: Vec<_> = (0..16).map(|i| i % 2 == 0).collect(); + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits)); + let builder = builder_with_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 4 }); + assert_eq!( + builder.resolve_selection_strategy(), + RowSelectionStrategy::Mask + ); + } + + #[test] + fn preferred_selection_strategy_handles_sparse_mask_backing() { + let bits: Vec<_> = (0..128).map(|i| i < 64).collect(); + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits)); + let builder = builder_with_selection(selection) + .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 }); + assert_eq!( + builder.resolve_selection_strategy(), + RowSelectionStrategy::Selectors + ); + } + #[test] fn with_predicate_options_limit_pads_tail_when_no_prior_selection() { use crate::arrow::ProjectionMask; @@ -528,6 +560,50 @@ mod tests { ); } + #[test] + fn with_predicate_options_preserves_mask_selection() { + use crate::arrow::ProjectionMask; + use crate::arrow::array_reader::StructArrayReader; + use crate::arrow::array_reader::test_util::make_int32_page_reader; + use crate::arrow::arrow_reader::ArrowPredicateFn; + use arrow_schema::{DataType as ArrowType, Field, Fields}; + + let data: Vec = (0..6).collect(); + let levels = vec![0; data.len()]; + let leaf = make_int32_page_reader(&data, &levels, &levels, 0, 0, None); + let struct_type = ArrowType::Struct(Fields::from(vec![Field::new( + "c0", + ArrowType::Int32, + false, + )])); + let struct_reader = StructArrayReader::new(struct_type, vec![leaf], 0, 0, false, None); + + let prior = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + true, false, true, true, false, true, + ])); + let mut filters = vec![BooleanArray::from(vec![true, false, true, false])]; + let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), move |batch| { + assert_eq!(batch.num_rows(), 4); + Ok(filters.remove(0)) + }); + + let builder = ReadPlanBuilder::new(16) + .with_selection(Some(prior)) + .with_predicate_options(PredicateOptions::new( + Box::new(struct_reader), + &mut predicate, + )) + .unwrap(); + + let selection = builder.selection().unwrap(); + assert!(selection.as_mask().is_some()); + + let expected = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + true, false, false, true, false, false, + ])); + assert_eq!(selection, &expected); + } + #[test] fn with_predicate_options_limit_handles_null_filters() { use crate::arrow::ProjectionMask; diff --git a/parquet/src/arrow/arrow_reader/selection/boolean.rs b/parquet/src/arrow/arrow_reader/selection/boolean.rs new file mode 100644 index 000000000000..3c749ba5a354 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/selection/boolean.rs @@ -0,0 +1,962 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{RowSelection, RowSelectionInner, RowSelector}; +use crate::errors::ParquetError; +use arrow_array::BooleanArray; +use arrow_buffer::bit_iterator::BitSliceIterator; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use std::cmp::Ordering; +use std::sync::OnceLock; + +/// Mask-backed [`RowSelection`] storage. +/// +/// `selectors` is only populated if callers use the borrowed [`RowSelection::iter`] +/// compatibility API. Internal paths that can stream or consume the bitmap avoid +/// this cache. +#[derive(Debug)] +pub(crate) struct MaskSelection { + mask: BooleanBuffer, + selectors: OnceLock>, +} + +impl MaskSelection { + pub(super) fn new(mask: BooleanBuffer) -> Self { + Self { + mask, + selectors: OnceLock::new(), + } + } + + pub(crate) fn mask(&self) -> &BooleanBuffer { + &self.mask + } + + pub(crate) fn into_mask(self) -> BooleanBuffer { + let Self { mask, .. } = self; + mask + } + + pub(super) fn selectors(&self) -> &[RowSelector] { + self.selectors + .get_or_init(|| mask_to_selectors(&self.mask)) + .as_slice() + } +} + +impl Clone for MaskSelection { + fn clone(&self) -> Self { + Self::new(self.mask.clone()) + } +} + +/// Streaming RLE view of a [`BooleanBuffer`], yielding owned [`RowSelector`]s +/// without allocation. +/// +/// Useful as a zero-cost alternative to [`RowSelection::iter`] for mask-backed +/// selections, via [`RowSelection::as_mask`]: +/// +/// ```ignore +/// if let Some(mask) = selection.as_mask() { +/// for run in MaskRunIter::new(mask) { ... } +/// } +/// ``` +#[derive(Debug)] +pub struct MaskRunIter<'a> { + slices: BitSliceIterator<'a>, + cursor: usize, + total: usize, + pending: Option, + finished: bool, +} + +impl<'a> MaskRunIter<'a> { + /// Create a streaming RLE iterator over a [`BooleanBuffer`]. + pub fn new(mask: &'a BooleanBuffer) -> Self { + Self { + slices: mask.set_slices(), + cursor: 0, + total: mask.len(), + pending: None, + finished: false, + } + } +} + +impl Iterator for MaskRunIter<'_> { + type Item = RowSelector; + + fn next(&mut self) -> Option { + if let Some(p) = self.pending.take() { + return Some(p); + } + if self.finished { + return None; + } + match self.slices.next() { + Some((start, end)) => { + let select = RowSelector::select(end - start); + if start > self.cursor { + let skip = RowSelector::skip(start - self.cursor); + self.pending = Some(select); + self.cursor = end; + Some(skip) + } else { + self.cursor = end; + Some(select) + } + } + None => { + self.finished = true; + if self.cursor < self.total { + let skip = RowSelector::skip(self.total - self.cursor); + self.cursor = self.total; + Some(skip) + } else { + None + } + } + } + } +} + +/// Cursor for iterating a mask-backed [`RowSelection`] +/// +/// This is best for dense selections where there are many small skips +/// or selections. For example, selecting every other row. +#[derive(Debug)] +pub struct MaskCursor { + pub(super) mask: BooleanBuffer, + /// Current absolute offset into the selection + pub(super) position: usize, +} + +impl MaskCursor { + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + self.position >= self.mask.len() + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = &self.mask; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskChunk { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len() { + return Err(ParquetError::General( + "Internal Error: MaskChunk exceeds mask length".to_string(), + )); + } + Ok(BooleanArray::from( + self.mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } +} + +/// Result of computing the next chunk to read when using a [`MaskCursor`] +#[derive(Debug)] +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +/// Materialize a [`BooleanBuffer`] into its RLE form. +pub(crate) fn mask_to_selectors(mask: &BooleanBuffer) -> Vec { + let total_rows = mask.len(); + if total_rows == 0 { + return Vec::new(); + } + let mut selectors: Vec = Vec::new(); + let mut last_end = 0; + for (start, end) in mask.set_slices() { + if start > last_end { + selectors.push(RowSelector::skip(start - last_end)); + } + selectors.push(RowSelector::select(end - start)); + last_end = end; + } + if last_end != total_rows { + selectors.push(RowSelector::skip(total_rows - last_end)); + } + selectors +} + +pub(super) fn mask_run_count(mask: &BooleanBuffer) -> usize { + let total_rows = mask.len(); + if total_rows == 0 { + return 0; + } + + let mut run_count = 0; + let mut last_end = 0; + for (start, end) in mask.set_slices() { + if start > last_end { + run_count += 1; + } + if end > start { + run_count += 1; + } + last_end = end; + } + if last_end < total_rows { + run_count += 1; + } + + run_count +} + +/// Bitwise AND of two mask-backed selections. Longer side's tail passes through. +pub(super) fn intersect_masks(l: &BooleanBuffer, r: &BooleanBuffer) -> BooleanBuffer { + if l.len() == r.len() { + return l & r; + } + let common = l.len().min(r.len()); + let head = &l.slice(0, common) & &r.slice(0, common); + let (longer, longer_len) = if l.len() > r.len() { + (l, l.len()) + } else { + (r, r.len()) + }; + let tail = longer.slice(common, longer_len - common); + let mut builder = BooleanBufferBuilder::new(longer_len); + builder.append_buffer(&head); + builder.append_buffer(&tail); + builder.finish() +} + +/// Bitwise OR of two mask-backed selections. Longer side's tail passes through. +pub(super) fn union_masks(l: &BooleanBuffer, r: &BooleanBuffer) -> BooleanBuffer { + if l.len() == r.len() { + return l | r; + } + let common = l.len().min(r.len()); + let head = &l.slice(0, common) | &r.slice(0, common); + let (longer, longer_len) = if l.len() > r.len() { + (l, l.len()) + } else { + (r, r.len()) + }; + let tail = longer.slice(common, longer_len - common); + let mut builder = BooleanBufferBuilder::new(longer_len); + builder.append_buffer(&head); + builder.append_buffer(&tail); + builder.finish() +} + +/// Applies `other` to the selected rows of `mask`, preserving the original row domain. +pub(super) fn and_then_mask(mask: &BooleanBuffer, other: &RowSelection) -> BooleanBuffer { + match &other.inner { + RowSelectionInner::Mask(other_mask) => and_then_masks(mask, other_mask.mask()), + RowSelectionInner::Selectors(selectors) => { + and_then_mask_from_selectors(mask, selectors.iter().copied()) + } + } +} + +fn and_then_mask_from_selectors(mask: &BooleanBuffer, other: I) -> BooleanBuffer +where + I: IntoIterator, +{ + let mut builder = BooleanBufferBuilder::new(mask.len()); + let mut other_iter = other.into_iter(); + let mut current = other_iter.next(); + let mut cursor = 0usize; + + // Iterate only over the set positions in `mask`; the gaps of unset bits + // are filled in bulk with `append_n` instead of bit-by-bit. + for set_idx in mask.set_indices() { + if set_idx > cursor { + builder.append_n(set_idx - cursor, false); + } + cursor = set_idx + 1; + + while current.as_ref().is_some_and(|s| s.row_count == 0) { + current = other_iter.next(); + } + let selector = current + .as_mut() + .expect("selection contains less than the number of selected rows"); + let selected = !selector.skip; + selector.row_count -= 1; + builder.append(selected); + } + if cursor < mask.len() { + builder.append_n(mask.len() - cursor, false); + } + + if current.is_some_and(|s| s.row_count != 0) || other_iter.any(|s| s.row_count != 0) { + panic!("selection exceeds the number of selected rows"); + } + + builder.finish() +} + +fn and_then_masks(mask: &BooleanBuffer, other: &BooleanBuffer) -> BooleanBuffer { + let selected_count = mask.count_set_bits(); + match other.len().cmp(&selected_count) { + Ordering::Less => panic!("selection contains less than the number of selected rows"), + Ordering::Greater => panic!("selection exceeds the number of selected rows"), + Ordering::Equal => {} + } + + let other_true_count = other.count_set_bits(); + if other_true_count == 0 { + return BooleanBuffer::new_unset(mask.len()); + } + if other_true_count == selected_count { + return mask.clone(); + } + + let mut builder = BooleanBufferBuilder::new(mask.len()); + let mut outer_set_indices = mask.set_indices(); + let mut next_selected_ordinal = 0usize; + let mut cursor = 0usize; + + for selected_ordinal in other.set_indices() { + let skip = selected_ordinal - next_selected_ordinal; + let set_idx = outer_set_indices + .nth(skip) + .expect("validated other length matches selected row count"); + if set_idx > cursor { + builder.append_n(set_idx - cursor, false); + } + builder.append(true); + cursor = set_idx + 1; + next_selected_ordinal = selected_ordinal + 1; + } + + if cursor < mask.len() { + builder.append_n(mask.len() - cursor, false); + } + + builder.finish() +} + +/// Split a mask into `(head, tail)` at `row_count`, preserving an empty mask tail +/// when the split point is past the end. +pub(super) fn split_off_mask( + mask: BooleanBuffer, + row_count: usize, +) -> (BooleanBuffer, BooleanBuffer) { + let total = mask.len(); + if row_count >= total { + return (mask, BooleanBuffer::new_unset(0)); + } + + let head = mask.slice(0, row_count); + let tail = mask.slice(row_count, total - row_count); + (head, tail) +} + +/// Trims trailing unset bits from a mask-backed selection. +pub(super) fn trim_mask(mask: &BooleanBuffer) -> Option { + let popcount = mask.count_set_bits(); + let new_len = if popcount == 0 { + 0 + } else { + mask.find_nth_set_bit_position(0, popcount) + }; + (new_len != mask.len()).then(|| mask.slice(0, new_len)) +} + +/// Skips the first `offset` selected rows of a mask-backed selection. +pub(super) fn offset_mask(mask: BooleanBuffer, offset: usize) -> BooleanBuffer { + let popcount = mask.count_set_bits(); + if offset >= popcount { + return BooleanBuffer::new_unset(0); + } + // Position one past the `offset`-th set bit, i.e. the index of the first + // selected row to keep. + let pos = mask.find_nth_set_bit_position(0, offset); + let mut builder = BooleanBufferBuilder::new(mask.len()); + builder.append_n(pos, false); + builder.append_buffer(&mask.slice(pos, mask.len() - pos)); + builder.finish() +} + +/// Keeps only the first `limit` selected rows of a mask-backed selection. +pub(super) fn limit_mask(mask: BooleanBuffer, limit: usize) -> BooleanBuffer { + // `find_nth_set_bit_position` returns `mask.len()` when there are fewer + // than `limit` set bits, so the slice naturally degrades to the original + // mask in that case. + let cut = mask.find_nth_set_bit_position(0, limit); + mask.slice(0, cut) +} + +pub(super) fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::BooleanArray; + use rand::{Rng, rng}; + + #[test] + fn test_mask_iter_yields_borrowed_selectors() { + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, false, true, true, false, true, false, false, + ])); + + let borrowed: Vec<&RowSelector> = selection.iter().collect(); + assert_eq!( + borrowed, + vec![ + &RowSelector::skip(2), + &RowSelector::select(2), + &RowSelector::skip(1), + &RowSelector::select(1), + &RowSelector::skip(2), + ] + ); + } + + #[test] + fn test_mask_iter_clone_drops_cache() { + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, false, true, true, false, true, false, false, + ])); + + let _ = selection.iter().count(); + match &selection.inner { + RowSelectionInner::Mask(m) => assert!(m.selectors.get().is_some()), + _ => unreachable!(), + } + + let cloned = selection.clone(); + match &cloned.inner { + RowSelectionInner::Mask(m) => assert!(m.selectors.get().is_none()), + _ => unreachable!(), + } + + let round_tripped: Vec = cloned.iter().copied().collect(); + assert_eq!( + round_tripped, + vec![ + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(2), + ] + ); + } + + #[test] + fn test_mask_run_iter_streams_without_cache() { + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, false, true, true, false, true, false, false, + ])); + let mut iter = MaskRunIter::new(selection.as_mask().unwrap()); + + assert_eq!(iter.next(), Some(RowSelector::skip(2))); + assert_eq!(iter.next(), Some(RowSelector::select(2))); + assert_eq!(iter.next(), Some(RowSelector::skip(1))); + assert_eq!(iter.next(), Some(RowSelector::select(1))); + assert_eq!(iter.next(), Some(RowSelector::skip(2))); + assert_eq!(iter.next(), None); + assert_eq!(iter.next(), None); + + let selection = + RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, true, false])); + let mut iter = MaskRunIter::new(selection.as_mask().unwrap()); + assert_eq!(iter.next(), Some(RowSelector::select(2))); + assert_eq!(iter.next(), Some(RowSelector::skip(1))); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_from_boolean_buffer() { + let bits = vec![ + false, false, true, true, false, true, false, false, true, false, false, false, false, + false, false, true, + ]; + let buf = BooleanBuffer::from(bits.clone()); + let selection = RowSelection::from_boolean_buffer(buf.clone()); + + assert!(selection.as_mask().is_some()); + assert_eq!(selection.row_count(), 5); + assert_eq!(selection.skipped_row_count(), 11); + assert!(selection.selects_any()); + + let from_filters = RowSelection::from_filters(&[BooleanArray::from(bits)]); + assert_eq!(selection, from_filters); + + let bits_tail = vec![true, false, true, false, false, false]; + let trimmed = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits_tail)).trim(); + assert!(trimmed.as_mask().is_some()); + assert_eq!(trimmed.as_mask().unwrap().len(), 3); + } + + #[test] + fn test_from_boolean_buffer_empty() { + let empty = RowSelection::from_boolean_buffer(BooleanBuffer::from(Vec::::new())); + assert!(empty.as_mask().is_some()); + assert_eq!(empty.row_count(), 0); + assert_eq!(empty.skipped_row_count(), 0); + assert!(!empty.selects_any()); + assert!(empty.selectors().is_empty()); + } + + #[test] + fn test_from_boolean_buffer_all_unset_does_not_select() { + let all_zero = RowSelection::from_boolean_buffer(BooleanBuffer::new_unset(1024)); + assert!(all_zero.as_mask().is_some()); + assert!(!all_zero.selects_any()); + assert_eq!(all_zero.row_count(), 0); + assert_eq!(all_zero.skipped_row_count(), 1024); + } + + #[test] + fn test_from_boolean_buffer_via_from_impl() { + let buf = BooleanBuffer::from(vec![true, false, true, true]); + let a = RowSelection::from(buf.clone()); + let b = RowSelection::from_boolean_buffer(buf); + assert_eq!(a, b); + assert!(a.as_mask().is_some()); + } + + #[test] + fn test_mask_backing_clone_preserves_backing() { + let buf = BooleanBuffer::from(vec![true, false, true]); + let original = RowSelection::from_boolean_buffer(buf); + let cloned = original.clone(); + assert!(cloned.as_mask().is_some()); + assert_eq!(original, cloned); + } + + #[test] + fn test_mask_backing_mutation_equivalence() { + let bits = vec![true, true, false, false, true, false, true, true]; + + let from_mask = { + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let split = s.split_off(3); + (split, s) + }; + let from_selectors = { + let mut s = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]); + let split = s.split_off(3); + (split, s) + }; + assert_eq!(from_mask.0, from_selectors.0); + assert_eq!(from_mask.1, from_selectors.1); + assert!(from_mask.0.as_mask().is_some()); + assert!(from_mask.1.as_mask().is_some()); + + let limited_mask = + RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())).limit(3); + let limited_sel = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]).limit(3); + assert!(limited_mask.as_mask().is_some()); + assert_eq!(limited_mask, limited_sel); + + let offset_mask = + RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())).offset(2); + let offset_sel = RowSelection::from_filters(&[BooleanArray::from(bits)]).offset(2); + assert!(offset_mask.as_mask().is_some()); + assert_eq!(offset_mask, offset_sel); + } + + #[test] + fn test_mask_backing_fuzz_equivalence() { + let mut rand = rng(); + for _ in 0..100 { + let len = rand.random_range(0..200); + let bits: Vec<_> = (0..len).map(|_| rand.random_bool(0.35)).collect(); + + let from_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let from_filters = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]); + + assert_eq!(from_mask, from_filters); + assert_eq!(from_mask.row_count(), from_filters.row_count()); + assert_eq!( + from_mask.skipped_row_count(), + from_filters.skipped_row_count() + ); + assert_eq!(from_mask.selects_any(), from_filters.selects_any()); + + let inner_len: usize = bits.iter().map(|b| *b as usize).sum(); + let inner_bits: Vec<_> = (0..inner_len).map(|_| rand.random_bool(0.7)).collect(); + let inner = RowSelection::from_filters(&[BooleanArray::from(inner_bits.clone())]); + let inner_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(inner_bits)); + let and_then_mask = from_mask.and_then(&inner); + let and_then_both_masks = from_mask.and_then(&inner_mask); + assert!(and_then_mask.as_mask().is_some()); + assert!(and_then_both_masks.as_mask().is_some()); + assert_eq!(and_then_mask, from_filters.and_then(&inner)); + assert_eq!(and_then_both_masks, and_then_mask); + } + } + + #[test] + fn test_mask_and_then_preserves_backing() { + let outer_bits = vec![false, true, true, false, true, false, true]; + let inner_bits = vec![true, false, true, false]; + let outer_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(outer_bits.clone())); + let inner = RowSelection::from_filters(&[BooleanArray::from(inner_bits.clone())]); + + let result = outer_mask.and_then(&inner); + assert!(result.as_mask().is_some()); + + let outer_selectors = RowSelection::from_filters(&[BooleanArray::from(outer_bits)]); + let expected = outer_selectors.and_then(&inner); + assert_eq!(result, expected); + + let result_mask = result.as_mask().unwrap(); + let actual_bits: Vec<_> = (0..result_mask.len()) + .map(|i| result_mask.value(i)) + .collect(); + assert_eq!( + actual_bits, + vec![false, true, false, false, true, false, false] + ); + } + + #[test] + fn test_mask_and_then_mask_preserves_backing() { + let outer_bits = vec![false, true, true, false, true, false, true, true]; + let inner_bits = vec![false, true, false, true, false]; + let outer_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(outer_bits.clone())); + let inner_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(inner_bits)); + + let result = outer_mask.and_then(&inner_mask); + assert!(result.as_mask().is_some()); + + let outer_selectors = RowSelection::from_filters(&[BooleanArray::from(outer_bits)]); + let inner_selectors = RowSelection::from_filters(&[BooleanArray::from(vec![ + false, true, false, true, false, + ])]); + assert_eq!(result, outer_selectors.and_then(&inner_selectors)); + + let result_mask = result.as_mask().unwrap(); + let actual_bits: Vec<_> = (0..result_mask.len()) + .map(|i| result_mask.value(i)) + .collect(); + assert_eq!( + actual_bits, + vec![false, false, true, false, false, false, true, false] + ); + } + + #[test] + fn test_selector_and_then_mask() { + let outer = + RowSelection::from_filters(&[BooleanArray::from(vec![false, true, true, false, true])]); + let inner = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, false, true])); + + let result = outer.and_then(&inner); + assert!(result.as_mask().is_none()); + assert_eq!( + result, + RowSelection::from_filters(&[BooleanArray::from(vec![ + false, true, false, false, true, + ])]) + ); + } + + #[test] + fn test_mask_offset_past_end_preserves_empty_mask_backing() { + let selection = + RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, false, true])) + .offset(2); + + assert!(selection.as_mask().is_some()); + assert_eq!(selection.as_mask().unwrap().len(), 0); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.skipped_row_count(), 0); + } + + #[test] + fn test_mask_limit_truncates_at_nth_selected_row() { + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, true, false, true, false, true, false, + ])) + .limit(2); + + let mask = selection.as_mask().unwrap(); + assert_eq!(mask.len(), 4); + let actual_bits: Vec<_> = (0..mask.len()).map(|i| mask.value(i)).collect(); + assert_eq!(actual_bits, vec![false, true, false, true]); + } + + #[test] + fn test_mask_intersection_uses_bitwise() { + let a_bits = vec![true, true, false, true, false, true]; + let b_bits = vec![true, false, true, true, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())); + + let r = a.intersection(&b); + assert!(r.as_mask().is_some()); + + let expected: Vec = a_bits.iter().zip(&b_bits).map(|(x, y)| *x && *y).collect(); + let expected_sel = RowSelection::from_filters(&[BooleanArray::from(expected)]); + assert_eq!(r, expected_sel); + } + + #[test] + fn test_mask_union_uses_bitwise() { + let a_bits = vec![true, false, false, true, false, false]; + let b_bits = vec![false, true, false, false, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())); + + let r = a.union(&b); + assert!(r.as_mask().is_some()); + + let expected: Vec = a_bits.iter().zip(&b_bits).map(|(x, y)| *x || *y).collect(); + let expected_sel = RowSelection::from_filters(&[BooleanArray::from(expected)]); + assert_eq!(r, expected_sel); + } + + #[test] + fn test_mixed_mask_selector_intersection_and_union() { + let mask_bits = vec![true, false, true, false, true, false]; + let selector_bits = vec![false, true, true, false, false, true]; + let mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(mask_bits.clone())); + let selectors = RowSelection::from_filters(&[BooleanArray::from(selector_bits.clone())]); + + let intersection_bits: Vec<_> = mask_bits + .iter() + .zip(&selector_bits) + .map(|(x, y)| *x && *y) + .collect(); + let expected_intersection = + RowSelection::from_filters(&[BooleanArray::from(intersection_bits)]); + assert_eq!(mask.intersection(&selectors), expected_intersection); + assert_eq!(selectors.intersection(&mask), expected_intersection); + + let union_bits: Vec<_> = mask_bits + .iter() + .zip(&selector_bits) + .map(|(x, y)| *x || *y) + .collect(); + let expected_union = RowSelection::from_filters(&[BooleanArray::from(union_bits)]); + assert_eq!(mask.union(&selectors), expected_union); + assert_eq!(selectors.union(&mask), expected_union); + } + + #[test] + fn test_mask_intersection_uneven_passes_tail_through() { + let a_bits = vec![true, true, true, true, true]; + let b_bits = vec![true, false, true]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits)); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits)); + + let r = a.intersection(&b); + let r_mask = r.as_mask().unwrap(); + assert_eq!(r_mask.len(), 5); + let bits: Vec = (0..5).map(|i| r_mask.value(i)).collect(); + assert_eq!(bits, vec![true, false, true, true, true]); + } + + #[test] + fn test_mask_union_uneven_passes_tail_through() { + let a_bits = vec![true, false, true]; + let b_bits = vec![false, true, false, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits)); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits)); + + let r = a.union(&b); + let r_mask = r.as_mask().unwrap(); + assert_eq!(r_mask.len(), 5); + let bits: Vec = (0..5).map(|i| r_mask.value(i)).collect(); + assert_eq!(bits, vec![true, true, true, true, false]); + + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, true, false, false, true, + ])); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, false, false])); + let r = a.union(&b); + let r_mask = r.as_mask().unwrap(); + let bits: Vec = (0..5).map(|i| r_mask.value(i)).collect(); + assert_eq!(bits, vec![true, true, false, false, true]); + } + + #[test] + fn test_mask_split_off_preserves_backing() { + let bits: Vec = (0..40).map(|i| i % 3 == 0).collect(); + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let head = s.split_off(15); + + assert!(head.as_mask().is_some()); + assert!(s.as_mask().is_some()); + + let head_sel = RowSelection::from_filters(&[BooleanArray::from(bits[..15].to_vec())]); + let tail_sel = RowSelection::from_filters(&[BooleanArray::from(bits[15..].to_vec())]); + assert_eq!(head, head_sel); + assert_eq!(s, tail_sel); + } + + #[test] + fn test_mask_split_off_past_end_returns_whole() { + let bits = vec![true, false, true]; + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let head = s.split_off(100); + + assert!(head.as_mask().is_some()); + assert_eq!(head.as_mask().unwrap().len(), 3); + // `self` keeps its mask backing and is left empty. + assert!(s.as_mask().is_some()); + assert_eq!(s.as_mask().unwrap().len(), 0); + assert_eq!(s.row_count(), 0); + assert_eq!(s.skipped_row_count(), 0); + } + + #[test] + fn test_mask_offset_exceeds_selected_returns_empty() { + let s = + RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, true, false, true])); + let r = s.offset(10); + assert_eq!(r.row_count(), 0); + assert_eq!(r.skipped_row_count(), 0); + + let from_selectors = + RowSelection::from_filters(&[BooleanArray::from(vec![true, true, false, true])]) + .offset(10); + assert_eq!(r, from_selectors); + } + + #[test] + fn test_mask_limit_exceeds_selected_returns_all() { + let bits = vec![true, true, false, true]; + let s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let r = s.limit(10); + assert_eq!(r.row_count(), 3); + + let from_selectors = RowSelection::from_filters(&[BooleanArray::from(bits)]).limit(10); + assert_eq!(r, from_selectors); + } + + #[test] + fn test_mask_trim_all_zero_collapses_to_empty() { + let s = RowSelection::from_boolean_buffer(BooleanBuffer::new_unset(128)); + let trimmed = s.trim(); + assert!(trimmed.as_mask().is_some()); + assert_eq!(trimmed.as_mask().unwrap().len(), 0); + } + + #[test] + fn test_from_iter_all_mask_preserves_mask_backing() { + let a_bits = vec![true, false, true, true]; + let b_bits = vec![false, true, false]; + let c_bits = vec![true, true, false, false, true]; + + let parts = vec![ + RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())), + RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())), + RowSelection::from_boolean_buffer(BooleanBuffer::from(c_bits.clone())), + ]; + let collected: RowSelection = parts.into_iter().collect(); + + let combined = a_bits + .iter() + .chain(b_bits.iter()) + .chain(c_bits.iter()) + .copied() + .collect::>(); + let expected = RowSelection::from_filters(&[BooleanArray::from(combined)]); + + assert!(collected.as_mask().is_some()); + assert_eq!(collected, expected); + } + + #[test] + fn test_from_iter_mixed_backing_falls_back_to_selectors() { + let a_bits = vec![true, false, true]; + let b_selectors = vec![RowSelector::skip(2), RowSelector::select(3)]; + let c_bits = vec![false, true]; + + let parts = vec![ + RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())), + RowSelection::from(b_selectors), + RowSelection::from_boolean_buffer(BooleanBuffer::from(c_bits.clone())), + ]; + let collected: RowSelection = parts.into_iter().collect(); + + assert!(collected.as_mask().is_none()); + + let combined_bits = vec![ + true, false, true, false, false, true, true, true, false, true, + ]; + let expected = RowSelection::from_filters(&[BooleanArray::from(combined_bits)]); + assert_eq!(collected, expected); + } + + #[test] + fn test_from_iter_empty_yields_empty_selection() { + let collected: RowSelection = std::iter::empty::().collect(); + assert_eq!(collected, RowSelection::default()); + assert!(collected.as_mask().is_some()); + assert_eq!(collected.as_mask().unwrap().len(), 0); + } +} diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection/mod.rs similarity index 65% rename from parquet/src/arrow/arrow_reader/selection.rs rename to parquet/src/arrow/arrow_reader/selection/mod.rs index 2ddf812f9c39..e46195e82e28 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection/mod.rs @@ -16,7 +16,6 @@ // under the License. use crate::arrow::ProjectionMask; -use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; @@ -25,7 +24,15 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// Policy for picking a strategy to materialise [`RowSelection`] during execution. +mod boolean; +pub use boolean::MaskRunIter; +pub(crate) use boolean::mask_to_selectors; +use boolean::{ + MaskCursor, MaskSelection, and_then_mask, boolean_mask_from_selectors, intersect_masks, + limit_mask, mask_run_count, offset_mask, split_off_mask, trim_mask, union_masks, +}; + +/// Policy for picking a strategy to materialize [`RowSelection`] during execution. /// /// Note that this is a user-provided preference, and the actual strategy used /// may differ based on safety considerations (e.g. page skipping). @@ -33,7 +40,7 @@ use std::ops::Range; pub enum RowSelectionPolicy { /// Use a queue of [`RowSelector`] values Selectors, - /// Use a boolean mask to materialise the selection + /// Use a boolean mask to materialize the selection Mask, /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density Auto { @@ -56,7 +63,7 @@ impl Default for RowSelectionPolicy { pub(crate) enum RowSelectionStrategy { /// Use a queue of [`RowSelector`] values Selectors, - /// Use a boolean mask to materialise the selection + /// Use a boolean mask to materialize the selection Mask, } @@ -127,6 +134,13 @@ impl RowSelector { /// RowSelection::from_consecutive_ranges(ranges.into_iter(), 20); /// let actual: Vec = selection.into(); /// assert_eq!(actual, expected); +/// +/// // or directly from a packed bitmap, when the upstream producer already +/// // has one. The bitmap is kept as-is rather than run-length-encoded. +/// use arrow_buffer::BooleanBuffer; +/// let mask = BooleanBuffer::from(vec![true, false, true, true]); +/// let selection = RowSelection::from_boolean_buffer(mask); +/// assert_eq!(selection.row_count(), 3); /// ``` /// /// A [`RowSelection`] maintains the following invariants: @@ -135,12 +149,297 @@ impl RowSelector { /// * Consecutive [`RowSelector`]s alternate skipping or selecting rows /// /// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData -#[derive(Debug, Clone, Default, Eq, PartialEq)] +#[derive(Default, Clone)] pub struct RowSelection { - selectors: Vec, + inner: RowSelectionInner, +} + +/// Internal storage for [`RowSelection`]. +#[derive(Debug, Clone)] +pub(crate) enum RowSelectionInner { + Selectors(Vec), + Mask(Box), +} + +impl Default for RowSelectionInner { + fn default() -> Self { + Self::Selectors(Vec::new()) + } +} + +impl std::fmt::Debug for RowSelection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.inner { + RowSelectionInner::Selectors(s) => f + .debug_struct("RowSelection") + .field("selectors", s) + .finish(), + RowSelectionInner::Mask(m) => f + .debug_struct("RowSelection") + .field("mask_len", &m.mask().len()) + .finish_non_exhaustive(), + } + } +} + +impl PartialEq for RowSelection { + fn eq(&self, other: &Self) -> bool { + match (&self.inner, &other.inner) { + (RowSelectionInner::Selectors(a), RowSelectionInner::Selectors(b)) => a == b, + (RowSelectionInner::Mask(a), RowSelectionInner::Mask(b)) => a.mask() == b.mask(), + (RowSelectionInner::Mask(mask), RowSelectionInner::Selectors(selectors)) + | (RowSelectionInner::Selectors(selectors), RowSelectionInner::Mask(mask)) => { + if selectors + .iter() + .try_fold(0usize, |acc, selector| acc.checked_add(selector.row_count)) + != Some(mask.mask().len()) + { + return false; + } + + let mut slices = mask.mask().set_slices().peekable(); + let mut cursor = 0usize; + + for selector in selectors { + let end = cursor + selector.row_count; + + if selector.skip { + if slices.peek().is_some_and(|(start, _)| *start < end) { + return false; + } + } else { + match slices.next() { + Some((start, slice_end)) if start == cursor && slice_end == end => {} + _ => return false, + } + } + + cursor = end; + } + + slices.next().is_none() + } + } + } +} + +impl Eq for RowSelection {} + +/// Borrowed iterator over the [`RowSelector`]s of a [`RowSelection`]. +#[derive(Debug)] +pub struct RowSelectionIter<'a>(std::slice::Iter<'a, RowSelector>); + +impl<'a> Iterator for RowSelectionIter<'a> { + type Item = &'a RowSelector; + + #[inline] + fn next(&mut self) -> Option { + self.0.next() + } +} + +#[inline] +fn scan_ranges_from_selectors(selectors: I, page_locations: &[PageLocation]) -> Vec> +where + I: IntoIterator, +{ + let mut ranges: Vec> = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().peekable(); + let mut selectors = selectors.into_iter(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { + if !(selector.skip || current_page_included) { + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; + ranges.push(start..end); + current_page_included = true; + } + + if let Some(next_page) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page = pages.next(); + current_page_included = false; + + continue; + } else { + if row_offset + selector.row_count == next_page.first_row_index as usize { + current_page = pages.next(); + current_page_included = false; + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + if !(selector.skip || current_page_included) { + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; + ranges.push(start..end); + } + current_selector = selectors.next() + } + } + + ranges +} + +#[inline] +fn expand_to_batch_boundaries_from_selectors( + selectors: I, + batch_size: usize, + total_rows: usize, +) -> RowSelection +where + I: IntoIterator, +{ + let mut expanded_ranges = Vec::new(); + let mut row_offset = 0; + + for selector in selectors { + if selector.skip { + row_offset += selector.row_count; + } else { + let start = row_offset; + let end = row_offset + selector.row_count; + + // Expand start to batch boundary + let expanded_start = (start / batch_size) * batch_size; + // Expand end to batch boundary + let expanded_end = end.div_ceil(batch_size) * batch_size; + let expanded_end = expanded_end.min(total_rows); + + expanded_ranges.push(expanded_start..expanded_end); + row_offset += selector.row_count; + } + } + + // Sort ranges by start position + expanded_ranges.sort_by_key(|range| range.start); + + // Merge overlapping or consecutive ranges + let mut merged_ranges: Vec> = Vec::new(); + for range in expanded_ranges { + if let Some(last) = merged_ranges.last_mut() { + if range.start <= last.end { + // Overlapping or consecutive - merge them + last.end = last.end.max(range.end); + } else { + // No overlap - add new range + merged_ranges.push(range); + } + } else { + // First range + merged_ranges.push(range); + } + } + + RowSelection::from_consecutive_ranges(merged_ranges.into_iter(), total_rows) } impl RowSelection { + fn from_selectors(selectors: Vec) -> Self { + Self { + inner: RowSelectionInner::Selectors(selectors), + } + } + + /// Create a [`RowSelection`] from a packed [`BooleanBuffer`]. + /// + /// Each set bit selects a row, each unset bit skips one. Unlike + /// [`Self::from_filters`], the bitmap is kept as-is rather than + /// eagerly run-length-encoded. [`Self::iter`] materializes and caches the + /// RLE form on first use; use [`MaskRunIter`] to stream the RLE form + /// directly from the bitmap. + pub fn from_boolean_buffer(mask: BooleanBuffer) -> Self { + Self { + inner: RowSelectionInner::Mask(Box::new(MaskSelection::new(mask))), + } + } + + /// Returns the underlying mask if this selection is mask-backed. + /// + /// Public so that engines composing selections (e.g. DataFusion's + /// `ParquetAccessPlan::into_overall_row_selection`) can concatenate + /// mask-backed selections without materialising the RLE form. + pub fn as_mask(&self) -> Option<&BooleanBuffer> { + match &self.inner { + RowSelectionInner::Mask(m) => Some(m.mask()), + _ => None, + } + } + + /// Consume the selection and return its internal storage. + pub(crate) fn into_inner(self) -> RowSelectionInner { + self.inner + } + + /// Choose the automatic materialisation strategy without converting between + /// selector and mask backing. + #[inline] + pub(crate) fn auto_selection_strategy(&self, threshold: usize) -> RowSelectionStrategy { + let (total_rows, effective_count) = match &self.inner { + RowSelectionInner::Selectors(selectors) => { + selectors.iter().fold((0usize, 0usize), |(rows, count), s| { + if s.row_count > 0 { + (rows + s.row_count, count + 1) + } else { + (rows, count) + } + }) + } + RowSelectionInner::Mask(mask) => { + let mask = mask.mask(); + let total_rows = mask.len(); + (total_rows, mask_run_count(mask)) + } + }; + + if effective_count == 0 { + return RowSelectionStrategy::Mask; + } + + if total_rows < effective_count.saturating_mul(threshold) { + RowSelectionStrategy::Mask + } else { + RowSelectionStrategy::Selectors + } + } + + #[cfg(test)] + fn selectors(&self) -> Vec { + self.iter().copied().collect() + } + + fn into_selectors_vec(self) -> Vec { + match self.inner { + RowSelectionInner::Selectors(s) => s, + RowSelectionInner::Mask(m) => mask_to_selectors(m.mask()), + } + } + + /// Promote a mask-backed selection to selector backing in place. + fn selectors_mut(&mut self) -> &mut Vec { + if let RowSelectionInner::Mask(_) = &self.inner { + let mask = match std::mem::take(&mut self.inner) { + RowSelectionInner::Mask(m) => m, + RowSelectionInner::Selectors(_) => unreachable!(), + }; + self.inner = RowSelectionInner::Selectors(mask_to_selectors(mask.mask())); + } + match &mut self.inner { + RowSelectionInner::Selectors(s) => s, + RowSelectionInner::Mask(_) => unreachable!(), + } + } + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -191,7 +490,7 @@ impl RowSelection { selectors.push(RowSelector::skip(total_rows - last_end)) } - Self { selectors } + Self::from_selectors(selectors) } /// Given an offset index, return the byte ranges for all data pages selected by `self` @@ -202,52 +501,14 @@ impl RowSelection { /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec> { - let mut ranges: Vec> = vec![]; - let mut row_offset = 0; - - let mut pages = page_locations.iter().peekable(); - let mut selectors = self.selectors.iter().cloned(); - let mut current_selector = selectors.next(); - let mut current_page = pages.next(); - - let mut current_page_included = false; - - while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { - if !(selector.skip || current_page_included) { - let start = page.offset as u64; - let end = start + page.compressed_page_size as u64; - ranges.push(start..end); - current_page_included = true; + match &self.inner { + RowSelectionInner::Selectors(selectors) => { + scan_ranges_from_selectors(selectors.iter().copied(), page_locations) } - - if let Some(next_page) = pages.peek() { - if row_offset + selector.row_count > next_page.first_row_index as usize { - let remaining_in_page = next_page.first_row_index as usize - row_offset; - selector.row_count -= remaining_in_page; - row_offset += remaining_in_page; - current_page = pages.next(); - current_page_included = false; - - continue; - } else { - if row_offset + selector.row_count == next_page.first_row_index as usize { - current_page = pages.next(); - current_page_included = false; - } - row_offset += selector.row_count; - current_selector = selectors.next(); - } - } else { - if !(selector.skip || current_page_included) { - let start = page.offset as u64; - let end = start + page.compressed_page_size as u64; - ranges.push(start..end); - } - current_selector = selectors.next() + RowSelectionInner::Mask(mask) => { + scan_ranges_from_selectors(MaskRunIter::new(mask.mask()), page_locations) } } - - ranges } /// Returns true if this selection would skip any data pages within the provided columns @@ -285,10 +546,21 @@ impl RowSelection { /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { + if matches!(&self.inner, RowSelectionInner::Mask(_)) { + let mask = match std::mem::take(&mut self.inner) { + RowSelectionInner::Mask(m) => m, + RowSelectionInner::Selectors(_) => unreachable!(), + }; + let (head, tail) = split_off_mask((*mask).into_mask(), row_count); + self.inner = RowSelectionInner::Mask(Box::new(MaskSelection::new(tail))); + return Self::from_boolean_buffer(head); + } + + let selectors = self.selectors_mut(); let mut total_count = 0; // Find the index where the selector exceeds the row count - let find = self.selectors.iter().position(|selector| { + let find = selectors.iter().position(|selector| { total_count += selector.row_count; total_count > row_count }); @@ -296,29 +568,27 @@ impl RowSelection { let split_idx = match find { Some(idx) => idx, None => { - let selectors = std::mem::take(&mut self.selectors); - return Self { selectors }; + let drained = std::mem::take(selectors); + return Self::from_selectors(drained); } }; - let mut remaining = self.selectors.split_off(split_idx); + let mut remaining = selectors.split_off(split_idx); - // Always present as `split_idx < self.selectors.len` + // Always present as `split_idx < selectors.len` let next = remaining.first_mut().unwrap(); let overflow = total_count - row_count; if next.row_count != overflow { - self.selectors.push(RowSelector { + selectors.push(RowSelector { row_count: next.row_count - overflow, skip: next.skip, }) } next.row_count = overflow; - std::mem::swap(&mut remaining, &mut self.selectors); - Self { - selectors: remaining, - } + std::mem::swap(&mut remaining, selectors); + Self::from_selectors(remaining) } /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s. @@ -343,66 +613,21 @@ impl RowSelection { /// by this RowSelection /// pub fn and_then(&self, other: &Self) -> Self { - let mut selectors = vec![]; - let mut first = self.selectors.iter().cloned().peekable(); - let mut second = other.selectors.iter().cloned().peekable(); - - let mut to_skip = 0; - while let Some(b) = second.peek_mut() { - let a = first - .peek_mut() - .expect("selection exceeds the number of selected rows"); - - if b.row_count == 0 { - second.next().unwrap(); - continue; - } - - if a.row_count == 0 { - first.next().unwrap(); - continue; - } - - if a.skip { - // Records were skipped when producing second - to_skip += a.row_count; - first.next().unwrap(); - continue; + match (&self.inner, &other.inner) { + (RowSelectionInner::Mask(mask), _) => { + Self::from_boolean_buffer(and_then_mask(mask.mask(), other)) } - - let skip = b.skip; - let to_process = a.row_count.min(b.row_count); - - a.row_count -= to_process; - b.row_count -= to_process; - - match skip { - true => to_skip += to_process, - false => { - if to_skip != 0 { - selectors.push(RowSelector::skip(to_skip)); - to_skip = 0; - } - selectors.push(RowSelector::select(to_process)) - } + (RowSelectionInner::Selectors(first), RowSelectionInner::Selectors(second)) => { + and_then_row_selections(first, second) } - } - - for v in first { - if v.row_count != 0 { - assert!( - v.skip, - "selection contains less than the number of selected rows" - ); - to_skip += v.row_count + (RowSelectionInner::Selectors(first), RowSelectionInner::Mask(second)) => { + let mut selectors = vec![]; + let mut first = first.iter().copied().peekable(); + let mut second = MaskRunIter::new(second.mask()).peekable(); + and_then_iter(&mut selectors, &mut first, &mut second); + Self::from_selectors(selectors) } } - - if to_skip != 0 { - selectors.push(RowSelector::skip(to_skip)); - } - - Self { selectors } } /// Compute the intersection of two [`RowSelection`] @@ -412,7 +637,22 @@ impl RowSelection { /// /// returned: NNNNNNNNYYNYN pub fn intersection(&self, other: &Self) -> Self { - intersect_row_selections(&self.selectors, &other.selectors) + match (&self.inner, &other.inner) { + (RowSelectionInner::Mask(l), RowSelectionInner::Mask(r)) => { + Self::from_boolean_buffer(intersect_masks(l.mask(), r.mask())) + } + (RowSelectionInner::Selectors(l), RowSelectionInner::Selectors(r)) => { + intersect_row_selections(l, r) + } + (RowSelectionInner::Selectors(l), RowSelectionInner::Mask(r)) => { + let r = mask_to_selectors(r.mask()); + intersect_row_selections(l, &r) + } + (RowSelectionInner::Mask(l), RowSelectionInner::Selectors(r)) => { + let l = mask_to_selectors(l.mask()); + intersect_row_selections(&l, r) + } + } } /// Compute the union of two [`RowSelection`] @@ -422,96 +662,151 @@ impl RowSelection { /// /// returned: NYYYYYNNYYNYN pub fn union(&self, other: &Self) -> Self { - union_row_selections(&self.selectors, &other.selectors) + match &self.inner { + RowSelectionInner::Mask(l) => match &other.inner { + RowSelectionInner::Mask(r) => { + Self::from_boolean_buffer(union_masks(l.mask(), r.mask())) + } + RowSelectionInner::Selectors(r) => { + let l = mask_to_selectors(l.mask()); + union_row_selections(&l, r) + } + }, + RowSelectionInner::Selectors(l) => match &other.inner { + RowSelectionInner::Mask(r) => { + let r = mask_to_selectors(r.mask()); + union_row_selections(l, &r) + } + RowSelectionInner::Selectors(r) => union_row_selections(l, r), + }, + } } /// Returns `true` if this [`RowSelection`] selects any rows pub fn selects_any(&self) -> bool { - self.selectors.iter().any(|x| !x.skip) + match &self.inner { + RowSelectionInner::Selectors(s) => s.iter().any(|x| !x.skip), + RowSelectionInner::Mask(m) => m.mask().set_indices().next().is_some(), + } } /// Trims this [`RowSelection`] removing any trailing skips pub(crate) fn trim(mut self) -> Self { - while self.selectors.last().map(|x| x.skip).unwrap_or(false) { - self.selectors.pop(); + if let RowSelectionInner::Mask(m) = &self.inner { + if let Some(mask) = trim_mask(m.mask()) { + return Self::from_boolean_buffer(mask); + } + return self; + } + let selectors = self.selectors_mut(); + while selectors.last().map(|x| x.skip).unwrap_or(false) { + selectors.pop(); } self } /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows - pub(crate) fn offset(mut self, offset: usize) -> Self { + pub(crate) fn offset(self, offset: usize) -> Self { if offset == 0 { return self; } + let mut selectors = match self.inner { + RowSelectionInner::Mask(mask) => { + return Self::from_boolean_buffer(offset_mask((*mask).into_mask(), offset)); + } + RowSelectionInner::Selectors(selectors) => selectors, + }; let mut selected_count = 0; let mut skipped_count = 0; // Find the index where the selector exceeds the row count - let find = self - .selectors - .iter() - .position(|selector| match selector.skip { - true => { - skipped_count += selector.row_count; - false - } - false => { - selected_count += selector.row_count; - selected_count > offset - } - }); + let find = selectors.iter().position(|selector| match selector.skip { + true => { + skipped_count += selector.row_count; + false + } + false => { + selected_count += selector.row_count; + selected_count > offset + } + }); let split_idx = match find { Some(idx) => idx, None => { - self.selectors.clear(); - return self; + selectors.clear(); + return Self::from_selectors(selectors); } }; - let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1); - selectors.push(RowSelector::skip(skipped_count + offset)); - selectors.push(RowSelector::select(selected_count - offset)); - selectors.extend_from_slice(&self.selectors[split_idx + 1..]); + let mut new_selectors = Vec::with_capacity(selectors.len() - split_idx + 1); + new_selectors.push(RowSelector::skip(skipped_count + offset)); + new_selectors.push(RowSelector::select(selected_count - offset)); + new_selectors.extend_from_slice(&selectors[split_idx + 1..]); - Self { selectors } + Self::from_selectors(new_selectors) } /// Limit this [`RowSelection`] to only select `limit` rows - pub(crate) fn limit(mut self, mut limit: usize) -> Self { + pub(crate) fn limit(self, mut limit: usize) -> Self { + let mut selectors = match self.inner { + RowSelectionInner::Mask(mask) => { + return Self::from_boolean_buffer(limit_mask((*mask).into_mask(), limit)); + } + RowSelectionInner::Selectors(selectors) => selectors, + }; if limit == 0 { - self.selectors.clear(); + selectors.clear(); } - for (idx, selection) in self.selectors.iter_mut().enumerate() { + for (idx, selection) in selectors.iter_mut().enumerate() { if !selection.skip { if selection.row_count >= limit { selection.row_count = limit; - self.selectors.truncate(idx + 1); + selectors.truncate(idx + 1); break; } else { limit -= selection.row_count; } } } - self + Self::from_selectors(selectors) } - /// Returns an iterator over the [`RowSelector`]s for this - /// [`RowSelection`]. - pub fn iter(&self) -> impl Iterator { - self.selectors.iter() + /// Returns a borrowed iterator yielding the [`RowSelector`]s for this selection. + /// + /// Mask-backed selections materialize a `Vec` cache on first + /// call (one allocation, `O(set_slices)` work) so the iterator can hand out + /// `&RowSelector`; the cache is not copied on clone. For single-pass walks + /// over mask-backed selections, prefer streaming directly via + /// [`Self::as_mask`] + [`MaskRunIter::new`] — that path is allocation-free + /// and avoids populating the cache. + pub fn iter(&self) -> RowSelectionIter<'_> { + match &self.inner { + RowSelectionInner::Selectors(s) => RowSelectionIter(s.iter()), + RowSelectionInner::Mask(m) => RowSelectionIter(m.selectors().iter()), + } } /// Returns the number of selected rows pub fn row_count(&self) -> usize { - self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() + match &self.inner { + RowSelectionInner::Selectors(s) => { + s.iter().filter(|x| !x.skip).map(|x| x.row_count).sum() + } + RowSelectionInner::Mask(m) => m.mask().count_set_bits(), + } } /// Returns the number of de-selected rows pub fn skipped_row_count(&self) -> usize { - self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() + match &self.inner { + RowSelectionInner::Selectors(s) => { + s.iter().filter(|x| x.skip).map(|x| x.row_count).sum() + } + RowSelectionInner::Mask(m) => m.mask().len() - m.mask().count_set_bits(), + } } /// Expands the selection to align with batch boundaries. @@ -522,48 +817,18 @@ impl RowSelection { return self.clone(); } - let mut expanded_ranges = Vec::new(); - let mut row_offset = 0; - - for selector in &self.selectors { - if selector.skip { - row_offset += selector.row_count; - } else { - let start = row_offset; - let end = row_offset + selector.row_count; - - // Expand start to batch boundary - let expanded_start = (start / batch_size) * batch_size; - // Expand end to batch boundary - let expanded_end = end.div_ceil(batch_size) * batch_size; - let expanded_end = expanded_end.min(total_rows); - - expanded_ranges.push(expanded_start..expanded_end); - row_offset += selector.row_count; - } - } - - // Sort ranges by start position - expanded_ranges.sort_by_key(|range| range.start); - - // Merge overlapping or consecutive ranges - let mut merged_ranges: Vec> = Vec::new(); - for range in expanded_ranges { - if let Some(last) = merged_ranges.last_mut() { - if range.start <= last.end { - // Overlapping or consecutive - merge them - last.end = last.end.max(range.end); - } else { - // No overlap - add new range - merged_ranges.push(range); - } - } else { - // First range - merged_ranges.push(range); - } + match &self.inner { + RowSelectionInner::Selectors(selectors) => expand_to_batch_boundaries_from_selectors( + selectors.iter().copied(), + batch_size, + total_rows, + ), + RowSelectionInner::Mask(mask) => expand_to_batch_boundaries_from_selectors( + MaskRunIter::new(mask.mask()), + batch_size, + total_rows, + ), } - - Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows) } } @@ -573,6 +838,12 @@ impl From> for RowSelection { } } +impl From for RowSelection { + fn from(mask: BooleanBuffer) -> Self { + Self::from_boolean_buffer(mask) + } +} + impl FromIterator for RowSelection { fn from_iter>(iter: T) -> Self { let iter = iter.into_iter(); @@ -599,19 +870,129 @@ impl FromIterator for RowSelection { } } - Self { selectors } + Self::from_selectors(selectors) } } impl From for Vec { fn from(r: RowSelection) -> Self { - r.selectors + r.into_selectors_vec() } } impl From for VecDeque { fn from(r: RowSelection) -> Self { - r.selectors.into() + r.into_selectors_vec().into() + } +} + +impl FromIterator for RowSelection { + /// Concatenate multiple [`RowSelection`]s in iterator order. + /// + /// When every input is mask-backed the result stays mask-backed + /// (`BooleanBuffer`s are appended); otherwise falls back to flattening + /// through the per-`RowSelector` form. + fn from_iter>(iter: T) -> Self { + let items: Vec = iter.into_iter().collect(); + + let all_mask = items + .iter() + .all(|s| matches!(&s.inner, RowSelectionInner::Mask(_))); + + if all_mask { + let total_len: usize = items + .iter() + .map(|s| match &s.inner { + RowSelectionInner::Mask(m) => m.mask().len(), + RowSelectionInner::Selectors(_) => unreachable!(), + }) + .sum(); + let mut builder = BooleanBufferBuilder::new(total_len); + for item in items { + match item.into_inner() { + RowSelectionInner::Mask(m) => builder.append_buffer(m.mask()), + RowSelectionInner::Selectors(_) => unreachable!(), + } + } + return Self::from_boolean_buffer(builder.finish()); + } + + items + .into_iter() + .flat_map(|s| s.into_selectors_vec()) + .collect() + } +} + +fn and_then_row_selections(first: &[RowSelector], second: &[RowSelector]) -> RowSelection { + let mut selectors = vec![]; + let mut first = first.iter().copied().peekable(); + let mut second = second.iter().copied().peekable(); + and_then_iter(&mut selectors, &mut first, &mut second); + RowSelection::from_selectors(selectors) +} + +fn and_then_iter( + selectors: &mut Vec, + first: &mut std::iter::Peekable, + second: &mut std::iter::Peekable, +) where + I: Iterator, + J: Iterator, +{ + let mut to_skip = 0; + while let Some(b) = second.peek_mut() { + let a = first + .peek_mut() + .expect("selection exceeds the number of selected rows"); + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } + + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); + + a.row_count -= to_process; + b.row_count -= to_process; + + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + to_skip = 0; + } + selectors.push(RowSelector::select(to_process)) + } + } + } + + for v in first { + if v.row_count != 0 { + assert!( + v.skip, + "selection contains less than the number of selected rows" + ); + to_skip += v.row_count + } + } + + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); } } @@ -761,82 +1142,6 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec iter.collect() } -/// Cursor for iterating a mask-backed [`RowSelection`] -/// -/// This is best for dense selections where there are many small skips -/// or selections. For example, selecting every other row. -#[derive(Debug)] -pub struct MaskCursor { - mask: BooleanBuffer, - /// Current absolute offset into the selection - position: usize, -} - -impl MaskCursor { - /// Returns `true` when no further rows remain - pub fn is_empty(&self) -> bool { - self.position >= self.mask.len() - } - - /// Advance through the mask representation, producing the next chunk summary - pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { - let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { - let mask = &self.mask; - - if self.position >= mask.len() { - return None; - } - - let start_position = self.position; - let mut cursor = start_position; - let mut initial_skip = 0; - - while cursor < mask.len() && !mask.value(cursor) { - initial_skip += 1; - cursor += 1; - } - - let mask_start = cursor; - let mut chunk_rows = 0; - let mut selected_rows = 0; - - // Advance until enough rows have been selected to satisfy the batch size, - // or until the mask is exhausted. This mirrors the behaviour of the legacy - // `RowSelector` queue-based iteration. - while cursor < mask.len() && selected_rows < batch_size { - chunk_rows += 1; - if mask.value(cursor) { - selected_rows += 1; - } - cursor += 1; - } - - (initial_skip, chunk_rows, selected_rows, mask_start, cursor) - }; - - self.position = end_position; - - Some(MaskChunk { - initial_skip, - chunk_rows, - selected_rows, - mask_start, - }) - } - - /// Materialise the boolean values for a mask-backed chunk - pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result { - if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len() { - return Err(ParquetError::General( - "Internal Error: MaskChunk exceeds mask length".to_string(), - )); - } - Ok(BooleanArray::from( - self.mask.slice(chunk.mask_start, chunk.chunk_rows), - )) - } -} - /// Cursor for iterating a selector-backed [`RowSelection`] /// /// This is best for sparse selections where large contiguous @@ -872,24 +1177,11 @@ impl SelectorsCursor { } } -/// Result of computing the next chunk to read when using a [`MaskCursor`] -#[derive(Debug)] -pub struct MaskChunk { - /// Number of leading rows to skip before reaching selected rows - pub initial_skip: usize, - /// Total rows covered by this chunk (selected + skipped) - pub chunk_rows: usize, - /// Rows actually selected within the chunk - pub selected_rows: usize, - /// Starting offset within the mask where the chunk begins - pub mask_start: usize, -} - /// Cursor for iterating a [`RowSelection`] during execution within a /// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). /// /// This keeps per-reader state such as the current position and delegates the -/// actual storage strategy to the internal `RowSelectionBacking`. +/// actual storage strategy to the internal `RowSelectionInner`. #[derive(Debug)] pub enum RowSelectionCursor { /// Reading all rows @@ -909,6 +1201,11 @@ impl RowSelectionCursor { }) } + /// Create a [`MaskCursor`] cursor backed by an existing bitmask. + pub(crate) fn new_mask_from_buffer(mask: BooleanBuffer) -> Self { + Self::Mask(MaskCursor { mask, position: 0 }) + } + /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors pub(crate) fn new_selectors(selectors: Vec) -> Self { Self::Selectors(SelectorsCursor { @@ -923,15 +1220,6 @@ impl RowSelectionCursor { } } -fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { - let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); - let mut builder = BooleanBufferBuilder::new(total_rows); - for selector in selectors { - builder.append_n(selector.row_count, !selector.skip); - } - builder.finish() -} - #[cfg(test)] mod tests { use super::*; @@ -949,14 +1237,14 @@ mod tests { let selection = RowSelection::from_filters(&filters[..1]); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(3), RowSelector::select(4)] ); let selection = RowSelection::from_filters(&filters[..2]); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(3), RowSelector::select(6), @@ -968,7 +1256,7 @@ mod tests { let selection = RowSelection::from_filters(&filters); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(3), RowSelector::select(6), @@ -980,7 +1268,7 @@ mod tests { let selection = RowSelection::from_filters(&filters[2..3]); assert!(!selection.selects_any()); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::skip(4)]); } #[test] @@ -993,9 +1281,9 @@ mod tests { ]); let split = selection.split_off(34); - assert_eq!(split.selectors, vec![RowSelector::skip(34)]); + assert_eq!(split.selectors(), vec![RowSelector::skip(34)]); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::select(12), RowSelector::skip(3), @@ -1004,9 +1292,9 @@ mod tests { ); let split = selection.split_off(5); - assert_eq!(split.selectors, vec![RowSelector::select(5)]); + assert_eq!(split.selectors(), vec![RowSelector::select(5)]); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::select(7), RowSelector::skip(3), @@ -1016,20 +1304,20 @@ mod tests { let split = selection.split_off(8); assert_eq!( - split.selectors, + split.selectors(), vec![RowSelector::select(7), RowSelector::skip(1)] ); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(2), RowSelector::select(35)] ); let split = selection.split_off(200); assert_eq!( - split.selectors, + split.selectors(), vec![RowSelector::skip(2), RowSelector::select(35)] ); - assert!(selection.selectors.is_empty()); + assert!(selection.selectors().is_empty()); } #[test] @@ -1044,7 +1332,7 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(2), RowSelector::select(3), @@ -1057,7 +1345,7 @@ mod tests { let selection = selection.offset(5); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(30), RowSelector::select(5), @@ -1068,7 +1356,7 @@ mod tests { let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(33), RowSelector::select(2), @@ -1079,13 +1367,13 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(68), RowSelector::select(6),] ); let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(71), RowSelector::select(3),] ); } @@ -1132,7 +1420,7 @@ mod tests { ]); assert_eq!( - a.and_then(&b).selectors, + a.and_then(&b).selectors(), vec![ RowSelector::select(2), RowSelector::skip(1), @@ -1185,30 +1473,30 @@ mod tests { fn test_combine_2elements() { let a = vec![RowSelector::select(10), RowSelector::select(5)]; let a_expect = vec![RowSelector::select(15)]; - assert_eq!(RowSelection::from_iter(a).selectors, a_expect); + assert_eq!(RowSelection::from_iter(a).selectors(), a_expect); let b = vec![RowSelector::select(10), RowSelector::skip(5)]; let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)]; - assert_eq!(RowSelection::from_iter(b).selectors, b_expect); + assert_eq!(RowSelection::from_iter(b).selectors(), b_expect); let c = vec![RowSelector::skip(10), RowSelector::select(5)]; let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)]; - assert_eq!(RowSelection::from_iter(c).selectors, c_expect); + assert_eq!(RowSelection::from_iter(c).selectors(), c_expect); let d = vec![RowSelector::skip(10), RowSelector::skip(5)]; let d_expect = vec![RowSelector::skip(15)]; - assert_eq!(RowSelection::from_iter(d).selectors, d_expect); + assert_eq!(RowSelection::from_iter(d).selectors(), d_expect); } #[test] fn test_from_one_and_empty() { let a = vec![RowSelector::select(10)]; let selection1 = RowSelection::from(a.clone()); - assert_eq!(selection1.selectors, a); + assert_eq!(selection1.selectors(), a); let b = vec![]; let selection1 = RowSelection::from(b.clone()); - assert_eq!(selection1.selectors, b) + assert_eq!(selection1.selectors(), b) } #[test] @@ -1253,7 +1541,7 @@ mod tests { let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![ RowSelector::select(5), RowSelector::skip(4), @@ -1271,7 +1559,7 @@ mod tests { let b = vec![RowSelector::select(36), RowSelector::skip(36)]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(3), RowSelector::skip(69)] ); @@ -1286,7 +1574,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); @@ -1300,7 +1588,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); } @@ -1328,7 +1616,7 @@ mod tests { let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); - let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); + let total_rows: usize = expected.selectors().iter().map(|s| s.row_count).sum(); assert_eq!(a_len, total_rows); assert_eq!(a.and_then(&b), expected); @@ -1345,10 +1633,10 @@ mod tests { RowSelector::select(4), ]; - let round_tripped = RowSelection::from(selectors.clone()) + let round_tripped: Vec = RowSelection::from(selectors.clone()) .iter() - .cloned() - .collect::>(); + .copied() + .collect(); assert_eq!(selectors, round_tripped); } @@ -1369,7 +1657,7 @@ mod tests { let limited = selection.clone().limit(5); let expected = vec![RowSelector::select(5)]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(15); let expected = vec![ @@ -1377,11 +1665,11 @@ mod tests { RowSelector::skip(10), RowSelector::select(5), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(0); let expected = vec![]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(30); let expected = vec![ @@ -1391,7 +1679,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.limit(100); let expected = vec![ @@ -1401,7 +1689,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); } #[test] @@ -1542,7 +1830,7 @@ mod tests { let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(1), RowSelector::select(2), @@ -1568,7 +1856,7 @@ mod tests { RowSelector::skip(0), RowSelector::select(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::select(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::select(4)]); let selection = RowSelection::from(vec![ RowSelector::select(0), @@ -1576,7 +1864,7 @@ mod tests { RowSelector::select(0), RowSelector::skip(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::skip(4)]); } #[test] @@ -1600,7 +1888,7 @@ mod tests { let result = a.intersection(&b); assert_eq!( - result.selectors, + result.selectors(), vec![ RowSelector::skip(30), RowSelector::select(10), @@ -1636,11 +1924,11 @@ mod tests { // NYYYYYN assert_eq!( - result.iter().collect::>(), + result.iter().copied().collect::>(), vec![ - &RowSelector::skip(10), - &RowSelector::select(50), - &RowSelector::skip(10), + RowSelector::skip(10), + RowSelector::select(50), + RowSelector::skip(10), ] ); } @@ -1689,7 +1977,7 @@ mod tests { RowSelector::select(35), ]; - assert_eq!(selection.trim().selectors, expected); + assert_eq!(selection.trim().selectors(), expected); let selection = RowSelection::from(vec![ RowSelector::skip(34), @@ -1699,6 +1987,6 @@ mod tests { let expected = vec![RowSelector::skip(34), RowSelector::select(12)]; - assert_eq!(selection.trim().selectors, expected); + assert_eq!(selection.trim().selectors(), expected); } }