From e5ab2710fb3854a88c6b73cf96d177707c7dc0af Mon Sep 17 00:00:00 2001 From: jeadie Date: Mon, 29 Jun 2026 14:47:13 +1000 Subject: [PATCH 1/7] =?UTF-8?q?Specialize=20`filter`=20for=20list-like=20a?= =?UTF-8?q?rrays=20(List/LargeList/FixedSizeList/Map/=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `FilterPredicate::filter` previously fell back to the generic `MutableArrayData` path for `List`/`LargeList`/`FixedSizeList`/`Map`. This adds specialized kernels that map each retained run of parent rows to a contiguous range of child elements and reuse the already-vectorized per-type child filter kernels, instead of the generic byte-copy fallback. Child handling is selectivity-aware (work is proportional to retained runs and elements, not the full child length) and streams ranges without an intermediate `Vec`: byte children go straight to `FilterBytes`, nested lists recurse, and others use a `Slices` predicate. A child-type allowlist keeps types that can't beat the fallback (dense `Union`, `RunEndEncoded`) on `MutableArrayData`, and a cheap selectivity guard routes dense `Map` filters to the fallback too. Adds benchmarks for the affected types in `arrow/benches/filter_kernels.rs`. --- arrow-select/src/filter.rs | 720 +++++++++++++++++++++++++++++++- arrow/benches/filter_kernels.rs | 406 +++++++++++++++++- 2 files changed, 1124 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 1de22e79c0de..c20c0678a0e7 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -23,7 +23,8 @@ use std::sync::Arc; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::cast::AsArray; use arrow_array::types::{ - ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, ByteViewType, RunEndIndexType, + ArrowDictionaryKeyType, ArrowPrimitiveType, BinaryType, ByteArrayType, ByteViewType, + LargeBinaryType, LargeUtf8Type, RunEndIndexType, Utf8Type, }; use arrow_array::*; use arrow_buffer::{ @@ -556,6 +557,19 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result { Ok(Arc::new(filter_fixed_size_binary(values.as_fixed_size_binary(), predicate))) } + DataType::FixedSizeList(_, _) => { + Ok(Arc::new(filter_fixed_size_list(values.as_fixed_size_list(), predicate)?)) + } + // For dense `Map` filters only specialize when the filter is selective enough to win. + DataType::Map(_, _) if !filter_keeps_most(predicate, values.len()) => { + Ok(Arc::new(filter_map(values.as_map(), predicate)?)) + } + DataType::List(field) if filter_list_child_is_fast(field.data_type()) => { + Ok(Arc::new(filter_list::(values.as_list::(), predicate)?)) + } + DataType::LargeList(field) if filter_list_child_is_fast(field.data_type()) => { + Ok(Arc::new(filter_list::(values.as_list::(), predicate)?)) + } DataType::ListView(_) => { Ok(Arc::new(filter_list_view::(values.as_list_view(), predicate))) } @@ -608,6 +622,44 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result bool { + child.is_primitive() + || matches!( + child, + DataType::Boolean + | DataType::Null + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + | DataType::BinaryView + | DataType::FixedSizeBinary(_) + | DataType::FixedSizeList(_, _) + | DataType::Dictionary(_, _) + | DataType::Struct(_) + | DataType::Map(_, _) + | DataType::Union(_, UnionMode::Sparse) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::ListView(_) + | DataType::LargeListView(_) + ) +} + /// Filter any supported [`RunArray`] based on a [`FilterPredicate`] fn filter_run_end_array( array: &RunArray, @@ -1070,6 +1122,302 @@ fn filter_sparse_union( }) } +/// Whether the filter retains most of `len` rows (a "dense" filter) +fn filter_keeps_most(predicate: &FilterPredicate, len: usize) -> bool { + predicate.count as f64 > len as f64 * FILTER_SLICES_SELECTIVITY_THRESHOLD +} + +/// An iterator of `[start, end)` ranges. Used for both retained parent-row ranges and +/// the child element ranges they map to. +type RangeIter<'a> = Box + 'a>; + +/// Iterates the retained rows of `predicate` as `[start, end)` ranges according +/// to its strategy. +fn predicate_row_ranges(predicate: &FilterPredicate) -> RangeIter<'_> { + match &predicate.strategy { + IterationStrategy::Slices(slices) => Box::new(slices.iter().copied()), + IterationStrategy::SlicesIterator => Box::new(SlicesIterator::new(&predicate.filter)), + IterationStrategy::IndexIterator => { + Box::new(IndexIterator::new(&predicate.filter, predicate.count).map(|i| (i, i + 1))) + } + IterationStrategy::Indices(indices) => Box::new(indices.iter().map(|&i| (i, i + 1))), + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } +} + +/// `filter` for `List`/`LargeList`: returns a new list array containing only the +/// rows selected by `predicate`. +/// +/// # How it works +/// +/// A `List`/`LargeList` stores child elements contiguously. Because the offsets are +/// monotonically non-decreasing, a *run* of consecutive retained rows +/// `[start, end)` maps to a single contiguous block of child elements +/// `[offsets[start], offsets[end])`. Filtering a list is therefore two steps: +/// +/// 1. Rebuild the offset buffer from the retained rows' lengths. +/// 2. Filter the child `values` down to those contiguous element ranges. +/// +/// ```text +/// input: [ [a,b,c] [d,e] [f,g,h] [i,j,k] ] keep rows 0, 2, 3 +/// offsets: 0 3 5 8 11 +/// values: [ a b c d e f g h i j k ] +/// └──────┘ (drop) └──────────────────┘ +/// +/// retained row runs -> [0, 1) [2, 4) +/// child element blocks = [offsets[0], [offsets[2], +/// offsets[1]) offsets[4]) +/// = [0, 3) [5, 11) rows 2 & 3 are +/// a b c f g h i j k adjacent -> one copy +/// +/// output: [ [a,b,c] [f,g,h] [i,j,k] ] new offsets: 0, 3, 6, 9 +/// ``` +/// +fn filter_list( + array: &GenericListArray, + predicate: &FilterPredicate, +) -> Result, ArrowError> { + let make_row_ranges = || predicate_row_ranges(predicate); + filter_list_inner(array, &make_row_ranges, predicate.count) +} + +/// Core of [`filter_list`], driven by a function producing the retained *row* +/// ranges (`[start, end)`) of `array` rather than a [`FilterPredicate`]. This +/// lets nested lists recurse with streamed child ranges. The range source is +/// type-erased (`&dyn Fn` returning a boxed iterator) so the recursion is a +/// single instantiation per offset type, not an unbounded tower of closure +/// types. +fn filter_list_inner<'a, OffsetType: OffsetSizeTrait>( + array: &'a GenericListArray, + make_row_ranges: &dyn Fn() -> RangeIter<'a>, + count: usize, +) -> Result, ArrowError> { + let (offsets, filtered_values) = + filter_list_offsets_and_child(array.offsets(), array.values(), make_row_ranges, count)?; + + let field = match array.data_type() { + DataType::List(field) | DataType::LargeList(field) => field.clone(), + _ => unreachable!(), + }; + let nulls = filter_nulls_ranges(array.nulls(), make_row_ranges(), count); + + Ok(GenericListArray::new( + field, + offsets, + filtered_values, + nulls, + )) +} + +/// Rebuilds the offset buffer for the retained rows and filters the child `values` by the corresponding contiguous element ranges. +/// +/// Returns the new, filtered [`OffsetBuffer`] and [`ArrayRef`]. +fn filter_list_offsets_and_child<'a, O: OffsetSizeTrait>( + offsets: &'a OffsetBuffer, + values: &'a ArrayRef, + make_row_ranges: &dyn Fn() -> RangeIter<'a>, + count: usize, +) -> Result<(OffsetBuffer, ArrayRef), ArrowError> { + // Rebuild offsets from the retained rows' lengths, reading consecutive + // offsets sequentially (mirrors arrow-data's `try_extend_offsets`). + let mut new_offsets = Vec::with_capacity(count + 1); + new_offsets.push(O::usize_as(0)); + let mut acc = 0usize; // running length of selected child elements + for (start, end) in make_row_ranges() { + for window in offsets[start..=end].windows(2) { + acc += (window[1] - window[0]).as_usize(); + new_offsets.push(O::usize_as(acc)); + } + } + + // Each retained row run maps to one contiguous block of child elements. + let make_child_ranges = || -> RangeIter<'a> { + Box::new( + make_row_ranges() + .map(|(start, end)| (offsets[start].as_usize(), offsets[end].as_usize())), + ) + }; + let filtered = filter_list_child(values, &make_child_ranges, acc)?; + + Ok((OffsetBuffer::new(ScalarBuffer::from(new_offsets)), filtered)) +} + +/// Specialised method to filter a [`FixedSizeListArray`] by a [`FilterPredicate`]. +/// +/// Each list occupies exactly `size` contiguous child elements (no offsets), so +/// a run of retained rows `[start, end)` maps to one contiguous child block +/// `[start * size, end * size)`. The child is filtered with those blocks via the +/// shared [`filter_list_child`] machinery, reusing the specialized per-type +/// kernels instead of [`MutableArrayData`]. +fn filter_fixed_size_list( + array: &FixedSizeListArray, + predicate: &FilterPredicate, +) -> Result { + let size = array.value_length() as usize; + let child_count = predicate.count * size; + + let child_ranges = || -> RangeIter<'_> { + Box::new( + predicate_row_ranges(predicate).map(move |(start, end)| (start * size, end * size)), + ) + }; + let filtered_values = filter_list_child(array.values(), &child_ranges, child_count)?; + + let field = match array.data_type() { + DataType::FixedSizeList(field, _) => field.clone(), + _ => unreachable!(), + }; + let nulls = predicate.filter_nulls(array.nulls()); + Ok(FixedSizeListArray::new( + field, + size as i32, + filtered_values, + nulls, + )) +} + +/// A map is structurally a `List>`: i32 offsets over a struct +/// `entries` child. Filter the `entries` struct with the offsets from the retained rows (via +/// shared [`filter_list_child`]). +fn filter_map(array: &MapArray, predicate: &FilterPredicate) -> Result { + let make_row_ranges = || predicate_row_ranges(predicate); + let entries: ArrayRef = Arc::new(array.entries().clone()); + let (offsets, filtered_entries) = filter_list_offsets_and_child( + array.offsets(), + &entries, + &make_row_ranges, + predicate.count, + )?; + + let (field, ordered) = match array.data_type() { + DataType::Map(field, ordered) => (field.clone(), *ordered), + _ => unreachable!(), + }; + let nulls = predicate.filter_nulls(array.nulls()); + Ok(MapArray::new( + field, + offsets, + filtered_entries.as_struct().clone(), + nulls, + ordered, + )) +} + +/// Filters the `values` child of a list-like array (`List`/`LargeList`/ +/// `FixedSizeList`) given a function producing the retained child-element +/// ranges. Byte children stream into [`FilterBytes`]; other children reuse the +/// specialized [`filter_array`] kernels via a `Slices` predicate (which copies +/// directly from the ranges and never reads `FilterPredicate::filter`, so an +/// empty filter suffices). +#[allow( + clippy::needless_lifetimes, + reason = "`'a` couples the boxed iterator's lifetime to `values` so the nested-list \ + recursion typechecks; eliding it does not compile (E0106)" +)] +fn filter_list_child<'a>( + values: &'a ArrayRef, + make_ranges: &dyn Fn() -> RangeIter<'a>, + child_count: usize, +) -> Result { + Ok(match values.data_type() { + DataType::Utf8 => Arc::new(filter_list_bytes::( + values.as_string::(), + make_ranges, + child_count, + )), + DataType::LargeUtf8 => Arc::new(filter_list_bytes::( + values.as_string::(), + make_ranges, + child_count, + )), + DataType::Binary => Arc::new(filter_list_bytes::( + values.as_binary::(), + make_ranges, + child_count, + )), + DataType::LargeBinary => Arc::new(filter_list_bytes::( + values.as_binary::(), + make_ranges, + child_count, + )), + // Nested lists recurse with the same streamed ranges (the child element + // ranges are exactly the inner list's retained row ranges). + DataType::List(_) => Arc::new(filter_list_inner::( + values.as_list::(), + make_ranges, + child_count, + )?), + DataType::LargeList(_) => Arc::new(filter_list_inner::( + values.as_list::(), + make_ranges, + child_count, + )?), + _ => { + let ranges: Vec<(usize, usize)> = make_ranges().collect(); + let child_predicate = FilterPredicate { + filter: BooleanArray::from(Vec::::new()), + count: child_count, + strategy: IterationStrategy::Slices(ranges), + }; + filter_array(values, &child_predicate)? + } + }) +} + +/// Filters a byte child of a list by streaming child-element ranges into +/// [`FilterBytes`], avoiding an intermediate `Vec` of ranges. `make_ranges` +/// produces a fresh iterator over the child ranges each call (one per pass: +/// offsets, values, nulls). +fn filter_list_bytes( + child: &GenericByteArray, + make_ranges: F, + child_count: usize, +) -> GenericByteArray +where + T: ByteArrayType, + F: Fn() -> I, + I: Iterator, +{ + let mut filter = FilterBytes::new(child_count, child); + filter.extend_offsets_slices(make_ranges(), child_count); + filter.extend_slices(make_ranges()); + + // SAFETY: `dst_offsets` starts at `[0]` and grows monotonically. + let offsets = unsafe { OffsetBuffer::new_unchecked(filter.dst_offsets.into()) }; + let nulls = filter_nulls_ranges(child.nulls(), make_ranges(), child_count); + // SAFETY: offsets index into `dst_values`, each value copied byte-for-byte. + unsafe { GenericByteArray::new_unchecked(offsets, filter.dst_values.into(), nulls) } +} + +/// Builds the null buffer for a filtered child by copying the validity bits of +/// the selected element ranges (mirrors [`filter_bits`] for the `Slices` case, +/// but streamed from an iterator rather than a materialized predicate). +fn filter_nulls_ranges( + nulls: Option<&NullBuffer>, + ranges: impl Iterator, + count: usize, +) -> Option { + let nulls = nulls?; + if nulls.null_count() == 0 { + return None; + } + let inner = nulls.inner(); + let src = inner.values(); + let offset = inner.offset(); + + let mut builder = BooleanBufferBuilder::new(count); + for (start, end) in ranges { + builder.append_packed_range(start + offset..end + offset, src); + } + let buffer = builder.finish(); + let null_count = count - buffer.count_set_bits(); + if null_count == 0 { + return None; + } + // SAFETY: `null_count` was derived from `buffer` above. + Some(unsafe { NullBuffer::new_unchecked(buffer, null_count) }) +} + /// `filter` implementation for list views fn filter_list_view( array: &GenericListViewArray, @@ -1655,6 +2003,376 @@ mod tests { test_case_filter_sliced_list_view::(); } + #[test] + fn test_filter_list_string_child() { + // [["a","b"], null, [], ["c"], ["d","e","f"]] + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.values().append_value("a"); + builder.values().append_value("b"); + builder.append(true); + builder.append(false); + builder.append(true); + builder.values().append_value("c"); + builder.append(true); + builder.values().append_value("d"); + builder.values().append_value("e"); + builder.values().append_value("f"); + builder.append(true); + let array = builder.finish(); + + // keep rows 0, 2, 4 -> [["a","b"], [], ["d","e","f"]] + let mask = BooleanArray::from(vec![true, false, true, false, true]); + let result = filter(&array, &mask).unwrap(); + + let mut expected = ListBuilder::new(StringBuilder::new()); + expected.values().append_value("a"); + expected.values().append_value("b"); + expected.append(true); + expected.append(true); + expected.values().append_value("d"); + expected.values().append_value("e"); + expected.values().append_value("f"); + expected.append(true); + let expected = expected.finish(); + + assert_eq!(result.as_list::(), &expected); + } + + #[test] + fn test_filter_nested_list() { + // List> exercises the recursive child filter path. + let mut builder = ListBuilder::new(ListBuilder::new(Int32Builder::new())); + // row 0: [[1], [2, 3]] + builder.values().values().append_value(1); + builder.values().append(true); + builder.values().values().append_value(2); + builder.values().values().append_value(3); + builder.values().append(true); + builder.append(true); + // row 1: [[4]] + builder.values().values().append_value(4); + builder.values().append(true); + builder.append(true); + // row 2: null + builder.append(false); + let array = builder.finish(); + + let mask = BooleanArray::from(vec![false, true, true]); + let result = filter(&array, &mask).unwrap(); + + // Cross-check against the independently-implemented `take` kernel. + let indices = UInt32Array::from(vec![1, 2]); + let expected = crate::take::take(&array, &indices, None).unwrap(); + assert_eq!(&result, &expected); + } + + /// Randomized cross-check of `filter` for `List`/`LargeList` against the + /// independently-implemented `take` kernel: `filter(a, mask)` must be + /// logically equal to `take(a, indices_of_true(mask))`. + #[test] + fn test_filter_list_matches_take() { + let mut rng = rng(); + + for _ in 0..200 { + let n = rng.random_range(1..96usize); + + // List (specialized path) + let mut int_builder = ListBuilder::new(Int32Builder::new()); + // List (specialized view path) + let mut view_builder = ListBuilder::new(StringViewBuilder::new()); + // List (specialized path) + let mut fsb_builder = ListBuilder::new(FixedSizeBinaryBuilder::new(4)); + // List> (specialized path) + let mut dict_builder = ListBuilder::new(StringDictionaryBuilder::::new()); + // List / / / (specialized + // byte paths, incl. the i64-offset variants) + let mut str_builder = ListBuilder::new(StringBuilder::new()); + let mut lstr_builder = ListBuilder::new(LargeStringBuilder::new()); + let mut bin_builder = ListBuilder::new(BinaryBuilder::new()); + let mut lbin_builder = ListBuilder::new(LargeBinaryBuilder::new()); + // List> (falls back; exercises nested correctness) + let mut nested_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new())); + + for _ in 0..n { + if rng.random_bool(0.2) { + int_builder.append_null(); + view_builder.append_null(); + fsb_builder.append_null(); + dict_builder.append_null(); + str_builder.append_null(); + lstr_builder.append_null(); + bin_builder.append_null(); + lbin_builder.append_null(); + nested_builder.append_null(); + continue; + } + let len = rng.random_range(0..6); + for _ in 0..len { + if rng.random_bool(0.2) { + int_builder.values().append_null(); + view_builder.values().append_null(); + fsb_builder.values().append_null(); + dict_builder.values().append_null(); + str_builder.values().append_null(); + lstr_builder.values().append_null(); + bin_builder.values().append_null(); + lbin_builder.values().append_null(); + nested_builder.values().append_null(); + } else { + let v = rng.random_range(0..100u8); + int_builder.values().append_value(v as i32); + // Mix inline (<=12 byte) and non-inline view strings + view_builder + .values() + .append_value("y".repeat(rng.random_range(0..20))); + fsb_builder.values().append_value([v; 4]).unwrap(); + dict_builder.values().append_value(format!("{}", v % 8)); + let s = "x".repeat(rng.random_range(0..16)); + str_builder.values().append_value(&s); + lstr_builder.values().append_value(&s); + bin_builder.values().append_value(s.as_bytes()); + lbin_builder.values().append_value(s.as_bytes()); + for _ in 0..rng.random_range(0..3) { + nested_builder + .values() + .values() + .append_value(rng.random_range(0..100)); + } + nested_builder.values().append(true); + } + } + int_builder.append(true); + view_builder.append(true); + fsb_builder.append(true); + dict_builder.append(true); + str_builder.append(true); + lstr_builder.append(true); + bin_builder.append(true); + lbin_builder.append(true); + nested_builder.append(true); + } + let arrays: [ArrayRef; 9] = [ + Arc::new(int_builder.finish()), + Arc::new(view_builder.finish()), + Arc::new(fsb_builder.finish()), + Arc::new(dict_builder.finish()), + Arc::new(str_builder.finish()), + Arc::new(lstr_builder.finish()), + Arc::new(bin_builder.finish()), + Arc::new(lbin_builder.finish()), + Arc::new(nested_builder.finish()), + ]; + + for array in &arrays { + check_filter_matches_take(array.as_ref(), &mut rng); + // Also exercise sliced inputs (offsets[0] != 0). + if n > 3 { + let offset = rng.random_range(0..n / 2); + let length = rng.random_range(1..(n - offset)); + check_filter_matches_take(&array.slice(offset, length), &mut rng); + } + } + } + } + + fn check_filter_matches_take(arr: &dyn Array, rng: &mut impl Rng) { + let len = arr.len(); + let mask: BooleanArray = (0..len).map(|_| rng.random_bool(0.5)).collect(); + let filtered = filter(arr, &mask).unwrap(); + let indices: UInt32Array = mask + .iter() + .enumerate() + .filter_map(|(i, b)| (b == Some(true)).then_some(i as u32)) + .collect(); + let expected = crate::take::take(arr, &indices, None).unwrap(); + assert_eq!(&filtered, &expected, "len={len}"); + } + + /// Cross-check `filter` for exotic list children against `take`: + /// `ListView` (specialized), `FixedSizeList` and `RunEndEncoded` (fallback). + /// All children are length `total` and wrapped in a `List` with random + /// per-row lengths + validity. + #[test] + fn test_filter_list_exotic_children_match_take() { + let mut rng = rng(); + + for _ in 0..100 { + let n = rng.random_range(1..48usize); + let mut row_lens: Vec> = Vec::with_capacity(n); + let mut total = 0usize; + for _ in 0..n { + if rng.random_bool(0.2) { + row_lens.push(None); + } else { + let l = rng.random_range(0..5); + total += l; + row_lens.push(Some(l)); + } + } + + // Build the three child arrays, each of length `total`. + let mut lv = GenericListViewBuilder::::new(Int32Builder::new()); + let mut fsl = FixedSizeListBuilder::new(Int32Builder::new(), 2); + let mut ree = PrimitiveRunBuilder::::new(); + for _ in 0..total { + lv.values().append_value(rng.random_range(0..50)); + lv.values().append_value(rng.random_range(0..50)); + lv.append(true); + fsl.values().append_value(rng.random_range(0..50)); + fsl.values().append_value(rng.random_range(0..50)); + fsl.append(true); + ree.append_value(rng.random_range(0..5)); + } + let children: [ArrayRef; 3] = [ + Arc::new(lv.finish()), + Arc::new(fsl.finish()), + Arc::new(ree.finish()), + ]; + + // Wrap each child in a List with offsets/validity from `row_lens`. + let mut offsets = vec![0i32]; + let mut acc = 0i32; + let mut validity = BooleanBufferBuilder::new(n); + for rl in &row_lens { + if let Some(l) = rl { + acc += *l as i32; + validity.append(true); + } else { + validity.append(false); + } + offsets.push(acc); + } + let nulls = NullBuffer::new(validity.finish()); + + for child in &children { + let field = Arc::new(Field::new_list_field(child.data_type().clone(), true)); + let list = ListArray::new( + field, + OffsetBuffer::new(offsets.clone().into()), + Arc::clone(child), + Some(nulls.clone()), + ); + check_filter_matches_take(&list, &mut rng); + if n > 3 { + let offset = rng.random_range(0..n / 2); + let length = rng.random_range(1..(n - offset)); + check_filter_matches_take(&list.slice(offset, length), &mut rng); + } + } + } + } + + /// Cross-check `filter` for `List`, `List` and `List` + /// against `take`. `Struct`/sparse `Union` take the specialized path; `Map` + /// uses the fallback — both must agree with `take`. + #[test] + fn test_filter_list_struct_map_union_match_take() { + let mut rng = rng(); + + for _ in 0..100 { + let n = rng.random_range(1..64usize); + + // List> + let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]); + let mut struct_builder = + ListBuilder::new(StructBuilder::from_fields(struct_fields.clone(), 0)); + // List> + let mut map_builder = ListBuilder::new(MapBuilder::new( + None, + Int32Builder::new(), + Int32Builder::new(), + )); + + // List built manually (no union builder): per-row lengths drive + // a sparse union child and the list offsets/validity. + let union_fields = [ + (0_i8, Arc::new(Field::new("a", DataType::Int32, false))), + (1_i8, Arc::new(Field::new("b", DataType::Float64, false))), + ] + .into_iter() + .collect::(); + let mut type_ids: Vec = Vec::new(); + let mut ua = Int32Builder::new(); + let mut ub = Float64Builder::new(); + let mut union_offsets: Vec = vec![0]; + let mut union_validity = BooleanBufferBuilder::new(n); + let mut union_acc = 0i32; + + for _ in 0..n { + let row_null = rng.random_bool(0.2); + let len = if row_null { 0 } else { rng.random_range(0..6) }; + + // struct + if row_null { + struct_builder.append_null(); + } else { + for _ in 0..len { + let sb = struct_builder.values(); + sb.field_builder::(0) + .unwrap() + .append_value(rng.random_range(0..100)); + sb.append(true); + } + struct_builder.append(true); + } + + // map + if row_null { + map_builder.append_null(); + } else { + for _ in 0..len { + let mb = map_builder.values(); + for _ in 0..rng.random_range(0..3) { + mb.keys().append_value(rng.random_range(0..10)); + mb.values().append_value(rng.random_range(0..100)); + } + mb.append(true).unwrap(); + } + map_builder.append(true); + } + + // union + for _ in 0..len { + type_ids.push(rng.random_range(0..2) as i8); + ua.append_value(rng.random_range(0..100)); + ub.append_value(rng.random_range(0..100) as f64); + } + union_acc += len as i32; + union_offsets.push(union_acc); + union_validity.append(!row_null); + } + + let union = UnionArray::try_new( + union_fields, + ScalarBuffer::from(type_ids), + None, + vec![Arc::new(ua.finish()), Arc::new(ub.finish())], + ) + .unwrap(); + let union_field = Arc::new(Field::new_list_field(union.data_type().clone(), true)); + let union_list = ListArray::new( + union_field, + OffsetBuffer::new(union_offsets.into()), + Arc::new(union), + Some(NullBuffer::new(union_validity.finish())), + ); + + let arrays: [ArrayRef; 3] = [ + Arc::new(struct_builder.finish()), + Arc::new(map_builder.finish()), + Arc::new(union_list), + ]; + for array in &arrays { + check_filter_matches_take(array.as_ref(), &mut rng); + if n > 3 { + let offset = rng.random_range(0..n / 2); + let length = rng.random_range(1..(n - offset)); + check_filter_matches_take(&array.slice(offset, length), &mut rng); + } + } + } + } + #[test] fn test_slice_iterator_bits() { let filter_values = (0..64).map(|i| i == 1).collect::>(); diff --git a/arrow/benches/filter_kernels.rs b/arrow/benches/filter_kernels.rs index ff117f9d63f5..7a90e161d13e 100644 --- a/arrow/benches/filter_kernels.rs +++ b/arrow/benches/filter_kernels.rs @@ -22,8 +22,11 @@ use arrow::compute::{FilterBuilder, FilterPredicate, filter_record_batch}; use arrow::util::bench_util::*; use arrow::array::*; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::compute::filter; -use arrow::datatypes::{Field, Float32Type, Int32Type, Int64Type, Schema, UInt8Type}; +use arrow::datatypes::{ + DataType, Field, Float32Type, Int32Type, Int64Type, Schema, UInt8Type, UnionFields, +}; use arrow_array::types::Decimal128Type; use criterion::{Criterion, criterion_group, criterion_main}; @@ -295,6 +298,407 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter run array low selectivity (kept 1/1024)", |b| { b.iter(|| bench_built_filter(&sparse_filter, &data_array)) }); + + // List with ~5 elements per row exercises the specialized list filter + let mut list_builder = ListBuilder::new(Int32Builder::new()); + for i in 0..size { + for j in 0..5 { + list_builder.values().append_value((i + j) as i32); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list i32 (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter list i32 high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter list i32 low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List exercises the specialized byte child kernel + let mut list_builder = ListBuilder::new(StringBuilder::new()); + for i in 0..size { + for j in 0..5 { + list_builder.values().append_value(format!("{}", i + j)); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list utf8 (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter list utf8 high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter list utf8 low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List exercises the specialized byte-view child kernel + // (Utf8View is DataFusion's default string type) + let mut list_builder = ListBuilder::new(StringViewBuilder::new()); + for i in 0..size { + for j in 0..5 { + list_builder.values().append_value(format!("{}", i + j)); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list utf8view (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list utf8view high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list utf8view low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List> exercises the recursive child filter path + let mut list_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new())); + for i in 0..size { + for j in 0..3 { + for k in 0..2 { + list_builder + .values() + .values() + .append_value((i + j + k) as i32); + } + list_builder.values().append(true); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list nested (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list nested high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list nested low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List — fixed-width child (allowlist candidate) + let mut list_builder = ListBuilder::new(FixedSizeBinaryBuilder::new(8)); + for i in 0..size { + for j in 0..5 { + list_builder + .values() + .append_value([(i + j) as u8; 8]) + .unwrap(); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list fixedsizebinary (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list fixedsizebinary high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function( + "filter list fixedsizebinary low selectivity (kept 1/1024)", + |b| b.iter(|| bench_built_filter(&sparse_filter, &data_array)), + ); + + // List> — dictionary child (allowlist candidate) + let mut list_builder = ListBuilder::new(StringDictionaryBuilder::::new()); + for i in 0..size { + for j in 0..5 { + list_builder + .values() + .append_value(format!("{}", (i + j) % 128)); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list dict (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter list dict high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter list dict low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List + let mut list_builder = ListBuilder::new(BinaryBuilder::new()); + for i in 0..size { + for j in 0..5 { + list_builder + .values() + .append_value(format!("{}", i + j).as_bytes()); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list binary (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list binary high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list binary low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List + let mut list_builder = ListBuilder::new(LargeStringBuilder::new()); + for i in 0..size { + for j in 0..5 { + list_builder.values().append_value(format!("{}", i + j)); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list largeutf8 (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list largeutf8 high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list largeutf8 low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List + let mut list_builder = ListBuilder::new(LargeBinaryBuilder::new()); + for i in 0..size { + for j in 0..5 { + list_builder + .values() + .append_value(format!("{}", i + j).as_bytes()); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list largebinary (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list largebinary high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function( + "filter list largebinary low selectivity (kept 1/1024)", + |b| b.iter(|| bench_built_filter(&sparse_filter, &data_array)), + ); + + // List> + let struct_fields = vec![Field::new("a", DataType::Int32, true)]; + let mut list_builder = ListBuilder::new(StructBuilder::from_fields(struct_fields, 0)); + for i in 0..size { + for j in 0..5 { + let sb = list_builder.values(); + sb.field_builder::(0) + .unwrap() + .append_value((i + j) as i32); + sb.append(true); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list struct (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list struct high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list struct low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List> + let mut list_builder = ListBuilder::new(MapBuilder::new( + None, + StringBuilder::new(), + Int32Builder::new(), + )); + for i in 0..size { + for j in 0..5 { + let mb = list_builder.values(); + mb.keys().append_value(format!("k{j}")); + mb.values().append_value((i + j) as i32); + mb.append(true).unwrap(); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list map (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter list map high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter list map low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List (sparse: Int32 | Float64) + let child_len = size * 5; + let mut type_ids = Vec::with_capacity(child_len); + let mut union_a = Int32Builder::with_capacity(child_len); + let mut union_b = Float64Builder::with_capacity(child_len); + for idx in 0..child_len { + type_ids.push((idx % 2) as i8); + union_a.append_value(idx as i32); + union_b.append_value(idx as f64); + } + let union_fields = [ + (0, Arc::new(Field::new("a", DataType::Int32, false))), + (1, Arc::new(Field::new("b", DataType::Float64, false))), + ] + .into_iter() + .collect::(); + let union = UnionArray::try_new( + union_fields, + ScalarBuffer::from(type_ids), + None, + vec![Arc::new(union_a.finish()), Arc::new(union_b.finish())], + ) + .unwrap(); + let union_offsets = OffsetBuffer::from_lengths(std::iter::repeat_n(5usize, size)); + let union_list_field = Arc::new(Field::new_list_field(union.data_type().clone(), false)); + let data_array = ListArray::new(union_list_field, union_offsets, Arc::new(union), None); + c.bench_function("filter list union (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter list union high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter list union low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // --- Scale sensitivity of the List regression (1/2 selectivity) --- + // Vary value length: short strings are bookkeeping-bound, long strings are + // memcpy-bound (the memcpy is identical in both paths). + for str_len in [8usize, 64, 256] { + let mut lb = ListBuilder::new(StringBuilder::new()); + for _ in 0..size { + for _ in 0..5 { + lb.values().append_value("x".repeat(str_len)); + } + lb.append(true); + } + let data_array = lb.finish(); + let id = format!("filter list utf8 len{str_len} (kept 1/2)"); + c.bench_function(&id, |b| b.iter(|| bench_built_filter(&filter, &data_array))); + } + + // Scale rows 10x (short strings) to confirm row count does not move the ratio. + let big = size * 10; + let big_filter_array = create_boolean_array(big, 0.0, 0.5); + let big_filter = FilterBuilder::new(&big_filter_array).optimize().build(); + let mut lb = ListBuilder::new(StringBuilder::new()); + for i in 0..big { + for j in 0..5 { + lb.values().append_value(format!("{}", i + j)); + } + lb.append(true); + } + let data_array = lb.finish(); + c.bench_function("filter list utf8 10xrows (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&big_filter, &data_array)) + }); + + // List> — specialized (filter_list_view shares values buffer) + let mut list_builder = + ListBuilder::new(GenericListViewBuilder::::new(Int32Builder::new())); + for i in 0..size { + for j in 0..5 { + list_builder + .values() + .append_value([Some((i + j) as i32), Some((i + j + 1) as i32)]); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list listview (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list listview high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter list listview low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // List> — no filter_array kernel (stays on fallback) + let mut list_builder = ListBuilder::new(FixedSizeListBuilder::new(Int32Builder::new(), 2)); + for i in 0..size { + for j in 0..5 { + let fsl = list_builder.values(); + fsl.values().append_value((i + j) as i32); + fsl.values().append_value((i + j + 1) as i32); + fsl.append(true); + } + list_builder.append(true); + } + let data_array = list_builder.finish(); + c.bench_function("filter list fixedsizelist (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter list fixedsizelist high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function( + "filter list fixedsizelist low selectivity (kept 1/1024)", + |b| b.iter(|| bench_built_filter(&sparse_filter, &data_array)), + ); + + // FixedSizeList filtered directly (exercises filter_fixed_size_list) + let mut fsl = FixedSizeListBuilder::new(Int32Builder::new(), 4); + for i in 0..size { + for j in 0..4 { + fsl.values().append_value((i + j) as i32); + } + fsl.append(true); + } + let data_array = fsl.finish(); + c.bench_function("filter fixedsizelist (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function( + "filter fixedsizelist high selectivity (kept 1023/1024)", + |b| b.iter(|| bench_built_filter(&dense_filter, &data_array)), + ); + c.bench_function("filter fixedsizelist low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); + + // Map filtered directly (exercises filter_map) + let mut mb = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new()); + for i in 0..size { + for j in 0..3 { + mb.keys().append_value(format!("k{j}")); + mb.values().append_value((i + j) as i32); + } + mb.append(true).unwrap(); + } + let data_array = mb.finish(); + c.bench_function("filter map (kept 1/2)", |b| { + b.iter(|| bench_built_filter(&filter, &data_array)) + }); + c.bench_function("filter map high selectivity (kept 1023/1024)", |b| { + b.iter(|| bench_built_filter(&dense_filter, &data_array)) + }); + c.bench_function("filter map low selectivity (kept 1/1024)", |b| { + b.iter(|| bench_built_filter(&sparse_filter, &data_array)) + }); } criterion_group!(benches, add_benchmark); From 3a5b1de6f1963a4485d807d64e09028e27f6c20f Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 13:42:27 +1000 Subject: [PATCH 2/7] perf(filter): vectorize list/byte offset rebuild (per-run, not per-element) The specialized list/byte-child filter rebuilt offsets with a per-element, loop-carried accumulation (cur_offset += len; push). On x86-64 this lost to the generic fallback's bulk copy at high selectivity (~1.5x slower on 'filter list high selectivity (kept 1023/1024)'). Within a retained run the source offsets are contiguous and monotonic, so the run's new offsets are the source slice shifted by a constant. Emitting them as a map over the contiguous source slice removes the loop-carried dependency and lets the compiler vectorize. Applied to both the child-byte offset rebuild (FilterBytes::extend_offsets_slices) and the parent list offset rebuild (filter_list_offsets_and_child). --- arrow-select/src/filter.rs | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index c20c0678a0e7..0912d6138b14 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -927,12 +927,22 @@ where fn extend_offsets_slices(&mut self, iter: impl Iterator, count: usize) { self.dst_offsets.reserve_exact(count); for (start, end) in iter { - // These can only fail if `array` contains invalid data - for idx in start..end { - let (_, _, len) = self.get_value_range(idx); - self.cur_offset += len; - self.dst_offsets.push(self.cur_offset); - } + // A retained run is contiguous in the source, so its source offsets are + // monotonic and the run's new offsets are just that slice shifted by a + // constant (`base - src_base`). Emitting them as a `map` over the + // contiguous source slice avoids the loop-carried `cur_offset += len` + // dependency of a per-element accumulation, letting the compiler + // vectorize the rebuild (matters at high selectivity, where the run + // covers most elements). + let base = self.cur_offset.as_usize(); + let src_base = self.src_offsets[start].as_usize(); + self.dst_offsets.extend( + self.src_offsets[start + 1..=end] + .iter() + .map(|&o| OffsetSize::usize_as(base + (o.as_usize() - src_base))), + ); + self.cur_offset = + OffsetSize::usize_as(base + (self.src_offsets[end].as_usize() - src_base)); } } @@ -1224,10 +1234,17 @@ fn filter_list_offsets_and_child<'a, O: OffsetSizeTrait>( new_offsets.push(O::usize_as(0)); let mut acc = 0usize; // running length of selected child elements for (start, end) in make_row_ranges() { - for window in offsets[start..=end].windows(2) { - acc += (window[1] - window[0]).as_usize(); - new_offsets.push(O::usize_as(acc)); - } + // Offsets within a retained run are contiguous and monotonic, so the new + // offsets are the source slice shifted by a constant (`acc - src_base`). + // Mapping over the contiguous slice avoids the loop-carried `acc +=` + // dependency of the per-window accumulation and vectorizes. + let src_base = offsets[start].as_usize(); + new_offsets.extend( + offsets[start + 1..=end] + .iter() + .map(|&o| O::usize_as(acc + (o.as_usize() - src_base))), + ); + acc += offsets[end].as_usize() - src_base; } // Each retained row run maps to one contiguous block of child elements. From ecd4fdf3cd5c7f226b76a9dfa7d69c1bf145b111 Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 14:13:52 +1000 Subject: [PATCH 3/7] docs(filter): update stale offset-monotonicity SAFETY comments After vectorizing the offset rebuild, monotonicity of the byte offset buffers no longer comes from incrementally growing `cur_offset` in the slice path; it derives from source-offset monotonicity (source offsets shifted by a non-decreasing base). Update the two `new_unchecked` SAFETY comments to match. --- arrow-select/src/filter.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 0912d6138b14..8049ab91fad2 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -989,8 +989,9 @@ where IterationStrategy::All | IterationStrategy::None => unreachable!(), } - // SAFETY: `dst_offsets` starts at `[0]` and only grows by the running - // `cur_offset`, so it is monotonically non-decreasing. + // SAFETY: `dst_offsets` starts at `[0]` and is monotonically non-decreasing: + // the index paths accumulate a non-negative `cur_offset`, and the slice path + // emits source offsets (themselves monotonic) shifted by a non-decreasing base. let offsets = unsafe { OffsetBuffer::new_unchecked(filter.dst_offsets.into()) }; let nulls = predicate.filter_nulls(array.nulls()); @@ -1399,7 +1400,9 @@ where filter.extend_offsets_slices(make_ranges(), child_count); filter.extend_slices(make_ranges()); - // SAFETY: `dst_offsets` starts at `[0]` and grows monotonically. + // SAFETY: `dst_offsets` starts at `[0]` and is monotonically non-decreasing: + // each retained run emits source offsets (themselves monotonic) shifted by a + // non-decreasing base. let offsets = unsafe { OffsetBuffer::new_unchecked(filter.dst_offsets.into()) }; let nulls = filter_nulls_ranges(child.nulls(), make_ranges(), child_count); // SAFETY: offsets index into `dst_values`, each value copied byte-for-byte. From f5031eac58c56c583eda6492d23be6994ca59400 Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 16:31:01 +1000 Subject: [PATCH 4/7] perf(filter): stream primitive list children instead of Vec + re-dispatch Non-byte list children went through filter_list_child's fallback branch: collect all child ranges into a Vec<(usize,usize)>, wrap in a FilterPredicate{Slices}, and re-dispatch through filter_array. At ~50% selectivity the retained runs are short, so that per-range collection + dispatch overhead dominated the (tiny) copy work and lost to the generic MutableArrayData fallback on x86 (while winning on ARM). Add filter_list_primitive, a streaming primitive child filter mirroring filter_list_bytes: copy native values per run straight into the output buffer, no Vec of ranges, no re-dispatch. Route primitive children to it in filter_list_child. --- arrow-select/src/filter.rs | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index aec55325b3b6..d132db7a5605 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -1364,6 +1364,22 @@ fn filter_list_child<'a>( make_ranges, child_count, )?), + // Primitive children stream native values straight into a buffer (like the + // byte path), avoiding the `Vec<(usize, usize)>` range collection and the + // `filter_array`/`FilterPredicate` re-dispatch. That overhead is what made + // the generic fallback win at ~50% selectivity on x86 for non-byte children, + // where the retained runs are short and the copy work is tiny. + dt if dt.is_primitive() => { + let arr: &dyn Array = values.as_ref(); + downcast_primitive_array! { + arr => { + let out: ArrayRef = + Arc::new(filter_list_primitive(arr, make_ranges, child_count)); + out + } + _ => unreachable!("{dt} passed is_primitive()"), + } + } _ => { let ranges: Vec<(usize, usize)> = make_ranges().collect(); let child_predicate = FilterPredicate { @@ -1376,6 +1392,36 @@ fn filter_list_child<'a>( }) } +/// Filters a primitive child of a list by streaming child-element ranges directly +/// into a values buffer, mirroring [`filter_list_bytes`]. Avoids materializing a +/// `Vec` of ranges and re-dispatching through [`filter_array`]; at ~50% selectivity +/// the runs are short and that per-range/dispatch overhead dominates the actual copy. +fn filter_list_primitive( + child: &PrimitiveArray, + make_ranges: F, + child_count: usize, +) -> PrimitiveArray +where + T: ArrowPrimitiveType, + F: Fn() -> I, + I: Iterator, +{ + let src = child.values(); + let mut buffer: Vec = Vec::with_capacity(child_count); + for (start, end) in make_ranges() { + // SAFETY: ranges are derived from the child offsets, so they are in-bounds. + buffer.extend_from_slice(unsafe { src.get_unchecked(start..end) }); + } + let nulls = filter_nulls_ranges(child.nulls(), make_ranges(), child_count); + let arr = PrimitiveArray::::new(ScalarBuffer::from(buffer), nulls); + // Preserve the concrete logical type (timestamps with tz, decimals, etc.). + if child.data_type() == &T::DATA_TYPE { + arr + } else { + arr.with_data_type(child.data_type().clone()) + } +} + /// Filters a byte child of a list by streaming child-element ranges into /// [`FilterBytes`], avoiding an intermediate `Vec` of ranges. `make_ranges` /// produces a fresh iterator over the child ranges each call (one per pass: From 0ef538501405c00c15ad846ad82248a824de3b47 Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 19:27:54 +1000 Subject: [PATCH 5/7] fix(filter): remove now-dead FilterBytes::get_value_range The offset-rebuild vectorization removed its only caller (the per-element extend_offsets_slices loop); drop the method to satisfy -D dead_code. --- arrow-select/src/filter.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index d132db7a5605..f878017bd72a 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -891,16 +891,6 @@ where self.src_offsets[idx].as_usize() } - /// Returns the start and end of the value at index `idx` along with its length - #[inline] - fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) { - // These can only fail if `array` contains invalid data - let start = self.get_value_offset(idx); - let end = self.get_value_offset(idx + 1); - let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); - (start, end, len) - } - fn extend_offsets_idx(&mut self, iter: impl Iterator) { self.dst_offsets.extend(iter.map(|idx| { let start = self.src_offsets[idx].as_usize(); From e25c0d6d09d3fd192a889e13f92cdce29b0d996e Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 19:29:42 +1000 Subject: [PATCH 6/7] refactor(filter): use safe slice indexing in filter_list_primitive The value copy is per-run (bounds-checked once per range, negligible) and now matches the sibling byte path (extend_slices), removing an unnecessary unsafe. --- arrow-select/src/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index f878017bd72a..8a264af29918 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -1399,8 +1399,8 @@ where let src = child.values(); let mut buffer: Vec = Vec::with_capacity(child_count); for (start, end) in make_ranges() { - // SAFETY: ranges are derived from the child offsets, so they are in-bounds. - buffer.extend_from_slice(unsafe { src.get_unchecked(start..end) }); + // Safe slice index (bounds-checked once per run); mirrors `extend_slices`. + buffer.extend_from_slice(&src[start..end]); } let nulls = filter_nulls_ranges(child.nulls(), make_ranges(), child_count); let arr = PrimitiveArray::::new(ScalarBuffer::from(buffer), nulls); From 6cc97b646dd3bc4461b5d53d9c89e1b8bfe86e93 Mon Sep 17 00:00:00 2001 From: jeadie Date: Fri, 3 Jul 2026 19:31:19 +1000 Subject: [PATCH 7/7] docs(filter): expand SAFETY comment on filter_list_bytes byte-array construction Match the pre-existing filter_bytes comment: call out UTF-8 validity preservation and the offsets/nulls length invariant, not just byte-for-byte copying. --- arrow-select/src/filter.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 8a264af29918..743ecd030592 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -1435,7 +1435,9 @@ where // non-decreasing base. let offsets = unsafe { OffsetBuffer::new_unchecked(filter.dst_offsets.into()) }; let nulls = filter_nulls_ranges(child.nulls(), make_ranges(), child_count); - // SAFETY: offsets index into `dst_values`, each value copied byte-for-byte. + // SAFETY: `offsets` index into `dst_values` by construction, and each slot is a + // byte-for-byte copy from `child`, so UTF-8 validity (if any) is preserved. + // Length invariant: `offsets.len() - 1 == child_count == nulls.len()`. unsafe { GenericByteArray::new_unchecked(offsets, filter.dst_values.into(), nulls) } }