Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType};
use arrow_array::RecordBatch;
use bytes::Bytes;
use chrono::Utc;
use once_cell::sync::Lazy;
use rayon::{ThreadPool, ThreadPoolBuilder};
use tokio::sync::oneshot;
use tracing::error;

Expand Down Expand Up @@ -55,6 +57,12 @@ use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;

pub static INGESTION_THREADPOOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.build()
.expect("Unable to create Rayon thread pool")
});

// Handler for POST /api/v1/ingest
// ingests events by extracting stream name from header
// creates if stream does not exist
Expand Down
118 changes: 96 additions & 22 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{
DEFAULT_TIMESTAMP_KEY,
format::{LogSource, LogSourceEntry},
},
handlers::DatasetTag,
handlers::{DatasetTag, http::ingest::INGESTION_THREADPOOL},
hottier::StreamHotTier,
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
Expand Down Expand Up @@ -86,7 +86,71 @@ static DISK_WRITE_BATCH_ROWS: Lazy<usize> = Lazy::new(|| {
});

const INPROCESS_DIR_PREFIX: &str = "processing_";
const METRIC_ROW_GROUP_PREP_IN_FLIGHT: usize = 1;
const METRIC_ROW_GROUP_PREP_IN_FLIGHT_VAR: &str = "METRIC_ROW_GROUP_PREP_IN_FLIGHT";
/// Caps how many arrow files feed a single parquet conversion group. A
/// minute with heavy schema-key churn can stage thousands of small arrow
/// files; converting them as one group means one kmerge holding an open
/// reader (and a decoded batch) per file. Chunking bounds that memory
/// while still collapsing thousands of files into a handful of parquets.
static METRIC_ROW_GROUP_PREP_IN_FLIGHT: Lazy<usize> = Lazy::new(|| {
if let Ok(var) = std::env::var(METRIC_ROW_GROUP_PREP_IN_FLIGHT_VAR)
&& let Ok(var) = var.parse::<usize>()
&& var > 0
{
var
} else {
1
}
});

const MAX_ARROW_FILES_PER_PARQUET_VAR: &str = "MAX_ARROW_FILES_PER_PARQUET";
/// Caps how many arrow files feed a single parquet conversion group. A
/// minute with heavy schema-key churn can stage thousands of small arrow
/// files; converting them as one group means one kmerge holding an open
/// reader (and a decoded batch) per file. Chunking bounds that memory
/// while still collapsing thousands of files into a handful of parquets.
static MAX_ARROW_FILES_PER_PARQUET: Lazy<usize> = Lazy::new(|| {
if let Ok(var) = std::env::var(MAX_ARROW_FILES_PER_PARQUET_VAR)
&& let Ok(var) = var.parse::<usize>()
&& var > 0
{
var
} else {
512
}
});

/// Splits any conversion group holding more than MAX_ARROW_FILES_PER_PARQUET
/// arrow files into chunks, giving chunk 1.. a "-{i}" suffix on the parquet
/// file stem so each chunk converts to its own parquet file.
fn chunk_arrow_file_groups(
grouped: HashMap<PathBuf, Vec<PathBuf>>,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let max_files = *MAX_ARROW_FILES_PER_PARQUET;
let mut chunked = HashMap::with_capacity(grouped.len());
for (parquet_path, arrow_files) in grouped {
if arrow_files.len() <= max_files {
chunked.insert(parquet_path, arrow_files);
continue;
}
for (i, chunk) in arrow_files.chunks(max_files).enumerate() {
let chunk_path = if i == 0 {
parquet_path.clone()
} else {
let mut path = parquet_path.clone();
if let (Some(stem), Some(ext)) = (
parquet_path.file_stem().and_then(|s| s.to_str()),
parquet_path.extension().and_then(|e| e.to_str()),
) {
path.set_file_name(format!("{stem}-{i}.{ext}"));
}
path
};
chunked.insert(chunk_path, chunk.to_vec());
}
}
chunked
}

struct PreparedMetricRowGroup {
batch: RecordBatch,
Expand All @@ -100,10 +164,20 @@ fn arrow_path_to_parquet(
) -> Option<PathBuf> {
let filename = path.file_stem()?.to_str()?;
let (_, front) = filename.split_once('.')?;
if !front.contains('.') {
warn!("Skipping unexpected arrow file without `.`: {}", filename);
// Writers may suffix the filename with a per-file ULID after the
// ".data" marker (ONE_PARQUET_PER_ARROW). Truncate at the marker so
// the parquet grouping key stays per-minute: with high schema-key
// churn, keying on the full name made every arrow file its own
// conversion group (thousands per minute), which exploded conversion
// parallelism/memory and the object-store file count.
let Some(idx) = front.rfind(".data") else {
warn!(
"Skipping unexpected arrow file without `.data`: {}",
filename
);
return None;
}
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let front = &front[..idx + ".data".len()];
let filename_with_random_number = format!("{front}.{random_string}.parquet");
let mut parquet_path = stream_staging_path.to_owned();
parquet_path.push(filename_with_random_number);
Expand Down Expand Up @@ -352,7 +426,7 @@ impl Stream {
}
}
}
grouped
chunk_arrow_file_groups(grouped)
}

/// Returns a mapping for inprocess arrow files (init_signal=true).
Expand All @@ -370,7 +444,7 @@ impl Stream {
warn!("Unexpected arrow file: {}", inprocess_file.display());
}
}
grouped
chunk_arrow_file_groups(grouped)
}

/// Returns arrow files for conversion, filtering by time and removing invalid files.
Expand Down Expand Up @@ -757,7 +831,7 @@ impl Stream {
time_partition_field: String,
) -> std::sync::mpsc::Receiver<Result<PreparedMetricRowGroup, StagingError>> {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
rayon::spawn(move || {
INGESTION_THREADPOOL.spawn(move || {
let _ = tx.send(Self::prepare_metric_row_group(
schema,
buffer,
Expand Down Expand Up @@ -856,7 +930,7 @@ impl Stream {
let merged_schema = record_reader.merged_schema();
let props =
self.parquet_writer_props(&merged_schema, time_partition, custom_partition);
// schemas.push(merged_schema.clone());

let schema = Arc::new(merged_schema.clone());

let part_path = parquet_path.with_extension("part");
Expand Down Expand Up @@ -938,7 +1012,7 @@ impl Stream {
// each page actually carries.
let target = self.options.row_group_size;
let buffer_capacity = record_reader.readers.len();
let mut pending_row_groups = VecDeque::with_capacity(METRIC_ROW_GROUP_PREP_IN_FLIGHT);
let mut pending_row_groups = VecDeque::with_capacity(*METRIC_ROW_GROUP_PREP_IN_FLIGHT);
let mut buffer: Vec<RecordBatch> = Vec::with_capacity(buffer_capacity);
let mut buffered_rows: usize = 0;
let mut merged_iter =
Expand All @@ -951,6 +1025,12 @@ impl Stream {
buffered_rows += record_rows;
buffer.push(record);
if buffered_rows >= target {
if pending_row_groups.len() >= *METRIC_ROW_GROUP_PREP_IN_FLIGHT
&& let Some(rx) = pending_row_groups.pop_front()
{
let prepared = Self::receive_prepared_metric_row_group(rx)?;
writer.write(&prepared.batch)?;
}
let row_group_buffer =
std::mem::replace(&mut buffer, Vec::with_capacity(buffer_capacity));
let next_row_group = Self::spawn_metric_row_group_prepare(
Expand All @@ -959,29 +1039,23 @@ impl Stream {
time_partition_field.clone(),
);
pending_row_groups.push_back(next_row_group);
if pending_row_groups.len() > METRIC_ROW_GROUP_PREP_IN_FLIGHT
&& let Some(rx) = pending_row_groups.pop_front()
{
let prepared = Self::receive_prepared_metric_row_group(rx)?;
writer.write(&prepared.batch)?;
}
buffer.clear();
buffered_rows = 0;
}
}
if !buffer.is_empty() {
if pending_row_groups.len() >= *METRIC_ROW_GROUP_PREP_IN_FLIGHT
&& let Some(rx) = pending_row_groups.pop_front()
{
let prepared = Self::receive_prepared_metric_row_group(rx)?;
writer.write(&prepared.batch)?;
}
let next_row_group = Self::spawn_metric_row_group_prepare(
schema.clone(),
buffer,
time_partition_field.clone(),
);
pending_row_groups.push_back(next_row_group);
if pending_row_groups.len() > METRIC_ROW_GROUP_PREP_IN_FLIGHT
&& let Some(rx) = pending_row_groups.pop_front()
{
let prepared = Self::receive_prepared_metric_row_group(rx)?;
writer.write(&prepared.batch)?;
}
}
while let Some(rx) = pending_row_groups.pop_front() {
let prepared = Self::receive_prepared_metric_row_group(rx)?;
Expand Down
Loading