Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ name = "arrow_reader_row_filter"
required-features = ["arrow", "async"]
harness = false

[[bench]]
name = "arrow_reader_materialization_policy"
required-features = ["arrow", "async"]
harness = false

[[bench]]
name = "arrow_reader_clickbench"
required-features = ["arrow", "async", "object_store"]
Expand Down
268 changes: 268 additions & 0 deletions parquet/benches/arrow_reader_common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, TimestampMillisecondArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::basic::Compression;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
use rand::{Rng, SeedableRng, rngs::StdRng};
use std::ops::Range;
use std::sync::Arc;

pub(crate) const COLUMN_NAMES: [&str; 4] = ["int64", "float64", "utf8View", "ts"];
pub(crate) const UTF8_VIEW_MISSING_VALUE: &str = "__arrow_rs_missing__";
pub(crate) const TOTAL_ROWS: usize = 500_000;
pub(crate) const ROW_GROUP_SIZE: usize = 100_000;

/// Generates a random string. Has a 50% chance to generate a short string
/// (3-11 characters) or a long string (13-20 characters).
fn random_string(rng: &mut StdRng) -> String {
let charset = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let is_long = rng.random_bool(0.5);
let len = if is_long {
rng.random_range(13..21)
} else {
rng.random_range(3..12)
};
(0..len)
.map(|_| charset[rng.random_range(0..charset.len())] as char)
.collect()
}

/// Creates an int64 array of a given size with random integers in [0, 100).
/// Then, it overwrites a single random index with 9999 to serve as the unique
/// value for point lookup.
fn create_int64_array(size: usize) -> ArrayRef {
let mut rng = StdRng::seed_from_u64(42);
let mut values: Vec<i64> = (0..size).map(|_| rng.random_range(0..100)).collect();
let unique_index = rng.random_range(0..size);
values[unique_index] = 9999;
Arc::new(Int64Array::from(values)) as ArrayRef
}

/// Creates a float64 array of a given size with random floats in [0.0, 100.0).
fn create_float64_array(size: usize) -> ArrayRef {
let mut rng = StdRng::seed_from_u64(43);
let values: Vec<f64> = (0..size).map(|_| rng.random_range(0.0..100.0)).collect();
Arc::new(Float64Array::from(values)) as ArrayRef
}

fn append_utf8_view_value(builder: &mut StringViewBuilder, value: &str) {
if builder.len() % 1_000 == 0 {
builder.append_value(UTF8_VIEW_MISSING_VALUE);
} else {
builder.append_value(value);
}
}

/// Creates a utf8View array of a given size with random strings.
///
/// This is modeled after the "SearchPhrase" column in the ClickBench benchmark.
///
/// See <https://github.com/apache/arrow-rs/issues/7460> for calculations.
///
/// The important ClickBench data properties are:
/// * Selectivity is: 13172392 / 99997497 = 0.132
/// * Number of RowSelections = 14054784
/// * Average run length of each RowSelection: 99997497 / 14054784 = 7.114
///
/// A 100K-row reference generated by this shape has:
/// * Selectivity is: 15144 / 100000 = 0.15144
/// * Number of RowSelections = 12904
/// * Average run length of each RowSelection: 100000 / 12904 = 7.75
fn create_utf8_view_array(size: usize) -> ArrayRef {
const AVG_RUN_LENGTH: usize = 4;
const EMPTY_DENSITY: u32 = 85;

let mut builder = StringViewBuilder::with_capacity(size);
let mut rng = StdRng::seed_from_u64(44);
while builder.len() < size {
let mut run_length = rng.random_range(1..AVG_RUN_LENGTH);
if builder.len() + run_length > size {
run_length = size - builder.len();
}

let choice = rng.random_range(0..100);
if choice < EMPTY_DENSITY {
for _ in 0..run_length {
append_utf8_view_value(&mut builder, "");
}
} else {
for _ in 0..run_length {
append_utf8_view_value(&mut builder, &random_string(&mut rng));
}
}
}
Arc::new(builder.finish()) as ArrayRef
}

/// Creates a ts (timestamp) array of a given size. Each value is computed as
/// i % 10_000, which simulates repeating blocks to model clustered patterns.
fn create_ts_array(size: usize) -> ArrayRef {
let values: Vec<i64> = (0..size).map(|i| (i % 10_000) as i64).collect();
Arc::new(TimestampMillisecondArray::from(values)) as ArrayRef
}

/// Creates the standard row-filter benchmark RecordBatch with 4 root columns:
/// int64, float64, utf8View, and ts.
pub(crate) fn create_record_batch(size: usize) -> RecordBatch {
let fields = vec![
Field::new("int64", DataType::Int64, false),
Field::new("float64", DataType::Float64, false),
Field::new("utf8View", DataType::Utf8View, true),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
];
let schema = Arc::new(Schema::new(fields));

let arrays: Vec<ArrayRef> = vec![
create_int64_array(size),
create_float64_array(size),
create_utf8_view_array(size),
create_ts_array(size),
];
RecordBatch::try_new(schema, arrays).unwrap()
}

/// Writes the standard RecordBatch to an in memory Parquet buffer.
pub(crate) fn write_parquet_file() -> Vec<u8> {
write_parquet_file_with_rows(TOTAL_ROWS, ROW_GROUP_SIZE)
}

/// Writes a standard RecordBatch with a configurable shape to an in memory
/// Parquet buffer.
pub(crate) fn write_parquet_file_with_rows(total_rows: usize, row_group_size: usize) -> Vec<u8> {
let batch = create_record_batch(total_rows);
write_record_batch_to_parquet(&batch, row_group_size)
}

pub(crate) fn write_record_batch_to_parquet(batch: &RecordBatch, row_group_size: usize) -> Vec<u8> {
let schema = batch.schema();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_row_count(Some(row_group_size))
.build();
let mut buffer = vec![];
{
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
writer.write(batch).unwrap();
writer.close().unwrap();
}
buffer
}

pub(crate) fn read_projection_for_post_filter(
output_projection: &[usize],
filter_projection: &[usize],
) -> Vec<usize> {
let mut read_projection = output_projection.to_vec();
for filter_idx in filter_projection {
if !read_projection.contains(filter_idx) {
read_projection.push(*filter_idx);
}
}
read_projection.sort_unstable();
read_projection
}

pub(crate) fn projection_names(projection: &[usize]) -> Vec<&'static str> {
projection.iter().map(|idx| COLUMN_NAMES[*idx]).collect()
}

pub(crate) fn filter_projected_record_batch(
batch: &RecordBatch,
filter: &BooleanArray,
output_column_names: &[&str],
) -> arrow::error::Result<RecordBatch> {
let output_projection = output_column_names
.iter()
.map(|name| batch.schema().index_of(name))
.collect::<arrow::error::Result<Vec<_>>>()?;
let output = batch.project(&output_projection)?;
arrow_select::filter::filter_record_batch(&output, filter)
}

pub(crate) fn post_filter_projected_num_rows(
batch: &RecordBatch,
filter: &BooleanArray,
output_column_names: &[&str],
) -> arrow::error::Result<usize> {
if output_column_names.is_empty() {
return Ok(filter.true_count());
}

let output = filter_projected_record_batch(batch, filter, output_column_names)?;
Ok(output.num_rows())
}

/// Adapter to read asynchronously from in memory bytes and always loads the
/// metadata with page indexes.
#[derive(Debug, Clone)]
pub(crate) struct InMemoryReader {
inner: Bytes,
metadata: Arc<ParquetMetaData>,
}

impl InMemoryReader {
pub(crate) fn try_new(inner: &Bytes) -> parquet::errors::Result<Self> {
let mut metadata_reader =
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
metadata_reader.try_parse(inner)?;
let metadata = metadata_reader.finish().map(Arc::new)?;

Ok(Self {
inner: inner.clone(),
metadata,
})
}

pub(crate) fn metadata(&self) -> &Arc<ParquetMetaData> {
&self.metadata
}

#[allow(dead_code)]
pub(crate) fn into_inner(self) -> Bytes {
self.inner
}
}

impl AsyncFileReader for InMemoryReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let data = self.inner.slice(range.start as usize..range.end as usize);
async move { Ok(data) }.boxed()
}

fn get_metadata<'a>(
&'a mut self,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata = Arc::clone(&self.metadata);
async move { Ok(metadata) }.boxed()
}
}
Loading
Loading