diff --git a/parquet/src/arrow/array_reader/fixed_size_list_array.rs b/parquet/src/arrow/array_reader/fixed_size_list_array.rs index 8ef3bd6c2a4b..518cd8625ed5 100644 --- a/parquet/src/arrow/array_reader/fixed_size_list_array.rs +++ b/parquet/src/arrow/array_reader/fixed_size_list_array.rs @@ -226,7 +226,7 @@ mod tests { use super::*; use crate::arrow::{ ArrowWriter, - array_reader::{ListArrayReader, test_util::InMemoryArrayReader}, + array_reader::{ListArrayReader, test_util::make_int32_page_reader}, arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader}, }; use arrow::datatypes::{Field, Int32Type}; @@ -254,28 +254,16 @@ mod tests { 3, ); - let array = Arc::new(PrimitiveArray::::from(vec![ - None, - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - Some(5), - None, - None, - None, - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]), - Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 5], + &[0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2], + &[0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1], + 3, + 1, ); let mut list_array_reader = FixedSizeListArrayReader::new( - Box::new(item_array_reader), + item_array_reader, 3, ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 3), 2, @@ -303,25 +291,16 @@ mod tests { 2, ); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - Some(3), - None, - None, - Some(4), - Some(5), - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![2, 1, 2, 2, 1, 1, 2, 2]), - Some(vec![0, 1, 0, 1, 0, 1, 0, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 5], + &[2, 1, 2, 2, 1, 1, 2, 2], + &[0, 1, 0, 1, 0, 1, 0, 1], + 2, + 1, ); let mut list_array_reader = FixedSizeListArrayReader::new( - Box::new(item_array_reader), + item_array_reader, 2, ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 2), 1, @@ -381,28 +360,15 @@ mod tests { let expected = FixedSizeListArray::from(l1); - let values = Arc::new(PrimitiveArray::::from(vec![ - None, - Some(1), - Some(2), - None, - Some(3), - None, - Some(4), - Some(5), - None, - None, - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - values, - Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]), - Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 5], + &[0, 5, 5, 4, 5, 0, 5, 5, 4, 4], + &[0, 0, 2, 0, 2, 0, 0, 2, 0, 2], + 5, + 2, ); - let l2 = - FixedSizeListArrayReader::new(Box::new(item_array_reader), 2, l2_type, 4, 2, false); + let l2 = FixedSizeListArrayReader::new(item_array_reader, 2, l2_type, 4, 2, false); let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true); let expected_1 = expected.slice(0, 2); @@ -423,18 +389,10 @@ mod tests { 0, ); - let array = Arc::new(PrimitiveArray::::from(vec![ - None, None, None, None, - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![0, 1, 0, 1]), - Some(vec![0, 0, 0, 0]), - ); + let item_array_reader = make_int32_page_reader(&[], &[0, 1, 0, 1], &[0, 0, 0, 0], 2, 1); let mut list_array_reader = FixedSizeListArrayReader::new( - Box::new(item_array_reader), + item_array_reader, 0, ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 0), 2, @@ -467,33 +425,20 @@ mod tests { builder.append(false); let expected = builder.finish(); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(3), - None, - Some(4), - None, - Some(5), - Some(6), - None, - None, - None, - ])); - let inner_type = ArrowType::List(Arc::new(Field::new_list_field(ArrowType::Int32, true))); let list_type = ArrowType::FixedSizeList(Arc::new(Field::new_list_field(inner_type.clone(), true)), 2); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), - Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), + let item_array_reader = make_int32_page_reader( + &[1, 3, 4, 5, 6], + &[5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0], + &[0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0], + 5, + 2, ); let inner_array_reader = - ListArrayReader::::new(Box::new(item_array_reader), inner_type, 4, 2, true); + ListArrayReader::::new(item_array_reader, inner_type, 4, 2, true); let mut list_array_reader = FixedSizeListArrayReader::new(Box::new(inner_array_reader), 2, list_type, 2, 1, true); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index a7adc01b912d..e6c834096902 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -248,7 +248,7 @@ mod tests { use super::*; use crate::arrow::array_reader::ArrayReaderBuilder; use crate::arrow::array_reader::list_array::ListArrayReader; - use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::array_reader::test_util::make_int32_page_reader; use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; @@ -257,7 +257,7 @@ mod tests { use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type}; + use arrow::datatypes::{Field, Int32Type}; use arrow_array::{Array, PrimitiveArray}; use arrow_data::ArrayDataBuilder; use arrow_schema::Fields; @@ -354,36 +354,15 @@ mod tests { let expected = GenericListArray::::from(l1); - let values = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - None, - Some(4), - None, - None, - Some(7), - None, - Some(1), - Some(2), - Some(3), - Some(4), - None, - Some(6), - None, - None, - None, - Some(11), - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - values, - Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]), - Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]), + let item_array_reader = make_int32_page_reader( + &[1, 4, 7, 1, 2, 3, 4, 6, 11], + &[6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6], + &[0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0], + 6, + 3, ); - let l3 = - ListArrayReader::::new(Box::new(item_array_reader), l3_type, 5, 3, true); + let l3 = ListArrayReader::::new(item_array_reader, l3_type, 5, 3, true); let l2 = ListArrayReader::::new(Box::new(l3), l2_type, 3, 2, false); @@ -411,28 +390,16 @@ mod tests { Some(vec![None, Some(1)]), ]); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - None, - None, - None, - Some(1), - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]), - Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 1], + &[2, 1, 2, 0, 2, 2, 0, 0, 1, 2], + &[0, 1, 1, 0, 0, 1, 0, 0, 0, 1], + 2, + 1, ); let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), + item_array_reader, list_type::(ArrowType::Int32, true), 1, 1, @@ -460,31 +427,16 @@ mod tests { Some(vec![None, Some(1)]), ]); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - None, - Some(3), - Some(4), - None, - None, - None, - None, - None, - Some(1), - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]), - Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 1], + &[3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3], + &[0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1], + 3, + 1, ); let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), + item_array_reader, list_type::(ArrowType::Int32, true), 2, 1, diff --git a/parquet/src/arrow/array_reader/list_view_array.rs b/parquet/src/arrow/array_reader/list_view_array.rs index 294135d41f8b..357ab9dc14ae 100644 --- a/parquet/src/arrow/array_reader/list_view_array.rs +++ b/parquet/src/arrow/array_reader/list_view_array.rs @@ -106,9 +106,8 @@ impl ArrayReader for ListViewArrayReader() { // [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]] @@ -125,27 +124,12 @@ mod tests { Some(vec![None, Some(1)]), ]); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - None, - Some(3), - Some(4), - None, - None, - None, - None, - None, - Some(1), - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]), - Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 1], + &[3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3], + &[0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1], + 3, + 1, ); let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true)); @@ -155,13 +139,8 @@ mod tests { ArrowType::ListView(field) }; - let mut list_view_array_reader = ListViewArrayReader::::new( - Box::new(item_array_reader), - data_type, - 2, - 1, - true, - ); + let mut list_view_array_reader = + ListViewArrayReader::::new(item_array_reader, data_type, 2, 1, true); let actual = list_view_array_reader.next_batch(1024).unwrap(); let actual = actual @@ -184,24 +163,12 @@ mod tests { Some(vec![None, Some(1)]), ]); - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - None, - None, - None, - Some(1), - ])); - - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]), - Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]), + let item_array_reader = make_int32_page_reader( + &[1, 2, 3, 4, 1], + &[2, 1, 2, 0, 2, 2, 0, 0, 1, 2], + &[0, 1, 1, 0, 0, 1, 0, 0, 0, 1], + 2, + 1, ); let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true)); @@ -211,13 +178,8 @@ mod tests { ArrowType::ListView(field) }; - let mut list_view_array_reader = ListViewArrayReader::::new( - Box::new(item_array_reader), - data_type, - 1, - 1, - false, - ); + let mut list_view_array_reader = + ListViewArrayReader::::new(item_array_reader, data_type, 1, 1, false); let actual = list_view_array_reader.next_batch(1024).unwrap(); let actual = actual diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index 63b20eb0d655..da92d410f32b 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -224,39 +224,27 @@ impl ArrayReader for StructArrayReader { mod tests { use super::*; use crate::arrow::array_reader::ListArrayReader; - use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::array_reader::test_util::make_int32_page_reader; use arrow::buffer::Buffer; use arrow::datatypes::Field; use arrow_array::cast::AsArray; - use arrow_array::{Array, Int32Array, ListArray}; + use arrow_array::{Array, ListArray}; use arrow_schema::Fields; #[test] fn test_struct_array_reader() { - let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); - let array_reader_1 = InMemoryArrayReader::new( - ArrowType::Int32, - array_1.clone(), - Some(vec![0, 1, 2, 3, 1]), - Some(vec![0, 1, 1, 1, 1]), - ); + let array_reader_1 = make_int32_page_reader(&[4], &[0, 1, 2, 3, 1], &[0, 1, 1, 1, 1], 3, 1); - let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1])); - let array_reader_2 = InMemoryArrayReader::new( - ArrowType::Int32, - array_2.clone(), - Some(vec![0, 1, 3, 1, 2]), - Some(vec![0, 1, 1, 1, 1]), - ); + let array_reader_2 = make_int32_page_reader(&[3], &[0, 1, 3, 1, 2], &[0, 1, 1, 1, 1], 3, 1); let struct_type = ArrowType::Struct(Fields::from(vec![ - Field::new("f1", array_1.data_type().clone(), true), - Field::new("f2", array_2.data_type().clone(), true), + Field::new("f1", ArrowType::Int32, true), + Field::new("f2", ArrowType::Int32, true), ])); let mut struct_array_reader = StructArrayReader::new( struct_type, - vec![Box::new(array_reader_1), Box::new(array_reader_2)], + vec![array_reader_1, array_reader_2], 1, 1, true, @@ -306,28 +294,11 @@ mod tests { )]; let expected = StructArray::from((struct_fields, validity)); - let array = Arc::new(Int32Array::from_iter(vec![ - Some(1), - Some(2), - None, - None, - None, - None, - ])); - let reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![4, 4, 3, 2, 1, 0]), - Some(vec![0, 1, 1, 0, 0, 0]), - ); + let reader = + make_int32_page_reader(&[1, 2], &[4, 4, 3, 2, 1, 0], &[0, 1, 1, 0, 0, 0], 4, 1); - let list_reader = ListArrayReader::::new( - Box::new(reader), - expected_l.data_type().clone(), - 3, - 1, - true, - ); + let list_reader = + ListArrayReader::::new(reader, expected_l.data_type().clone(), 3, 1, true); let mut struct_reader = StructArrayReader::new( expected.data_type().clone(), diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 45af5a2777b3..afba7c2c30c2 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -15,16 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::{Array, ArrayRef}; -use arrow_schema::DataType as ArrowType; use bytes::Bytes; -use std::any::Any; use std::sync::Arc; use crate::arrow::array_reader::ArrayReader; use crate::basic::{ConvertedType, Encoding, Type as PhysicalType}; use crate::column::page::{PageIterator, PageReader}; -use crate::data_type::{ByteArray, ByteArrayType}; +use crate::data_type::{ByteArray, ByteArrayType, Int32Type}; use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder}; use crate::errors::Result; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; @@ -91,112 +88,51 @@ pub fn byte_array_all_encodings( (pages, encoded_dictionary) } -/// Array reader for test. -pub struct InMemoryArrayReader { - data_type: ArrowType, - array: ArrayRef, - def_levels: Option>, - rep_levels: Option>, - last_idx: usize, - cur_idx: usize, - need_consume_records: usize, -} - -impl InMemoryArrayReader { - pub fn new( - data_type: ArrowType, - array: ArrayRef, - def_levels: Option>, - rep_levels: Option>, - ) -> Self { - assert!( - def_levels - .as_ref() - .map(|d| d.len() == array.len()) - .unwrap_or(true) - ); - - assert!( - rep_levels - .as_ref() - .map(|r| r.len() == array.len()) - .unwrap_or(true) - ); - - Self { - data_type, - array, - def_levels, - rep_levels, - cur_idx: 0, - last_idx: 0, - need_consume_records: 0, - } - } -} - -impl ArrayReader for InMemoryArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn read_records(&mut self, batch_size: usize) -> Result { - assert_ne!(batch_size, 0); - // This replicates the logical normally performed by - // RecordReader to delimit semantic records - let read = match &self.rep_levels { - Some(rep_levels) => { - let rep_levels = &rep_levels[self.cur_idx..]; - let mut levels_read = 0; - let mut records_read = 0; - while levels_read < rep_levels.len() && records_read < batch_size { - if rep_levels[levels_read] == 0 { - records_read += 1; // Start of new record - } - levels_read += 1; - } - - // Find end of current record - while levels_read < rep_levels.len() && rep_levels[levels_read] != 0 { - levels_read += 1 - } - levels_read - } - None => batch_size.min(self.array.len() - self.cur_idx), - }; - self.need_consume_records += read; - Ok(read) - } - - fn consume_batch(&mut self) -> Result { - let batch_size = self.need_consume_records; - assert_ne!(batch_size, 0); - self.last_idx = self.cur_idx; - self.cur_idx += batch_size; - self.need_consume_records = 0; - Ok(self.array.slice(self.last_idx, batch_size)) - } +/// Build a real `PrimitiveArrayReader` from raw non-null values +/// and definition/repetition levels. This exercises the full production +/// `RecordReader` code path (including selective padding when a parent +/// `ListArrayReader` calls `enable_selective_padding`). +/// +/// `values` must contain only the non-null values (entries where +/// `def_levels[i] == max_def_level`), in order, as Parquet encodes them. +pub fn make_int32_page_reader( + values: &[i32], + def_levels: &[i16], + rep_levels: &[i16], + max_def_level: i16, + max_rep_level: i16, +) -> Box { + use crate::arrow::array_reader::PrimitiveArrayReader; + use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE; + use crate::util::InMemoryPageIterator; + use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; + + let leaf_type = Type::primitive_type_builder("leaf", PhysicalType::INT32) + .build() + .unwrap(); - fn skip_records(&mut self, num_records: usize) -> Result { - let array = self.next_batch(num_records)?; - Ok(array.len()) - } + let desc = Arc::new(ColumnDescriptor::new( + Arc::new(leaf_type), + max_def_level, + max_rep_level, + ColumnPath::new(vec![]), + )); - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels - .as_ref() - .map(|l| &l[self.last_idx..self.cur_idx]) + let mut pb = DataPageBuilderImpl::new(desc.clone(), def_levels.len() as u32, true); + if max_rep_level > 0 { + pb.add_rep_levels(max_rep_level, rep_levels); } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels - .as_ref() - .map(|l| &l[self.last_idx..self.cur_idx]) + if max_def_level > 0 { + pb.add_def_levels(max_def_level, def_levels); } + pb.add_values::(Encoding::PLAIN, values); + + let pages = vec![vec![pb.consume()]]; + let page_iter = InMemoryPageIterator::new(pages); + Box::new( + PrimitiveArrayReader::::new(Box::new(page_iter), desc, None, DEFAULT_BATCH_SIZE) + .unwrap(), + ) } /// Iterator for testing reading empty columns diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index ac2e105ecf4f..674ae2b8d964 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -536,11 +536,9 @@ mod tests { fn with_predicate_options_limit_pads_tail_when_no_prior_selection() { use crate::arrow::ProjectionMask; use crate::arrow::array_reader::StructArrayReader; - use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::array_reader::test_util::make_int32_page_reader; use crate::arrow::arrow_reader::ArrowPredicateFn; - use arrow_array::Int32Array; use arrow_schema::{DataType as ArrowType, Field, Fields}; - use std::sync::Arc; // 100 rows, all match the predicate. Limit stops the loop after 10 // matches — but the resulting RowSelection must still describe the @@ -550,14 +548,14 @@ mod tests { const LIMIT: usize = 10; let data: Vec = (0..TOTAL_ROWS as i32).collect(); - let array = Arc::new(Int32Array::from(data)); - let leaf = InMemoryArrayReader::new(ArrowType::Int32, array.clone(), None, None); + let levels = vec![0; TOTAL_ROWS]; + let leaf = make_int32_page_reader(&data, &levels, &levels, 0, 0); let struct_type = ArrowType::Struct(Fields::from(vec![Field::new( "c0", ArrowType::Int32, false, )])); - let struct_reader = StructArrayReader::new(struct_type, vec![Box::new(leaf)], 0, 0, false); + let struct_reader = StructArrayReader::new(struct_type, vec![leaf], 0, 0, false); let mut predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| { Ok(BooleanArray::from(vec![true; batch.num_rows()]))