Skip to content

Commit 04ea6c1

Browse files
zhang2014claude
andauthored
refactor(query): simplify SpillsBufferPool with owned runtime and configurable settings (#19781)
* refactor(query): simplify SpillsBufferPool with owned runtime and configurable settings - SpillsBufferPool now owns a dedicated Runtime instead of borrowing GlobalIORuntime - Remove SpillTarget from public buffer pool APIs; callers no longer derive it - Add spill_buffer_pool_memory and spill_buffer_pool_workers to SpillConfig - Track buffer pool blocking time via atomic counters for observability - Simplify BufferWriter by removing SpillTarget and redundant comments - Clean up tests to use multi_thread tokio flavor * z * z * fix(query): prevent hang in SpillsBufferPool by spawning Fetch and CreateWriter ops Background workers were directly awaiting Fetch and CreateWriter ops in their event loop, blocking the worker thread for the duration of the I/O. With only 2 workers, concurrent Fetch/CreateWriter ops could starve reader_task_loop tasks (spawned via tokio::spawn onto the same runtime), causing recv_blocking() in SpillsDataReader::read() to hang indefinitely. Fix: spawn Fetch and CreateWriter as independent tasks, consistent with WriterTask and ReaderTask, so workers remain free to dequeue new ops. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * z * z * z * z * z * z --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 51f9cd7 commit 04ea6c1

10 files changed

Lines changed: 354 additions & 470 deletions

File tree

src/query/config/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3481,6 +3481,14 @@ pub struct SpillConfig {
34813481
/// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
34823482
#[clap(long, value_name = "PERCENT", default_value = "0")]
34833483
pub result_set_spilling_disk_quota_ratio: u64,
3484+
3485+
/// Total memory for the spill buffer pool in bytes.
3486+
#[clap(long, value_name = "VALUE", default_value = "209715200")]
3487+
pub spill_buffer_pool_memory: u64,
3488+
3489+
/// Number of worker tasks in the spill buffer pool.
3490+
#[clap(long, value_name = "VALUE", default_value = "2")]
3491+
pub spill_buffer_pool_workers: usize,
34843492
}
34853493

34863494
impl Default for SpillConfig {
@@ -3594,6 +3602,8 @@ mod config_converters {
35943602
window_partition_spilling_disk_quota_ratio: spill
35953603
.window_partition_spilling_disk_quota_ratio,
35963604
result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio,
3605+
buffer_pool_memory: spill.spill_buffer_pool_memory,
3606+
buffer_pool_workers: spill.spill_buffer_pool_workers,
35973607
})
35983608
}
35993609

@@ -3616,6 +3626,8 @@ mod config_converters {
36163626
window_partition_spilling_disk_quota_ratio: value
36173627
.window_partition_spilling_disk_quota_ratio,
36183628
result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio,
3629+
spill_buffer_pool_memory: value.buffer_pool_memory,
3630+
spill_buffer_pool_workers: value.buffer_pool_workers,
36193631
}
36203632
}
36213633
}

src/query/config/src/inner.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,12 @@ pub struct SpillConfig {
433433
/// Maximum percentage of the global local spill quota that HTTP
434434
/// result-set spilling may use for one query.
435435
pub result_set_spilling_disk_quota_ratio: u64,
436+
437+
/// Total memory for the spill buffer pool in bytes.
438+
pub buffer_pool_memory: u64,
439+
440+
/// Number of worker tasks in the spill buffer pool.
441+
pub buffer_pool_workers: usize,
436442
}
437443

438444
impl SpillConfig {
@@ -503,6 +509,8 @@ impl SpillConfig {
503509
window_partition_spilling_disk_quota_ratio: 60,
504510
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
505511
result_set_spilling_disk_quota_ratio: 0,
512+
buffer_pool_memory: 200 * 1024 * 1024,
513+
buffer_pool_workers: 2,
506514
}
507515
}
508516
}
@@ -519,6 +527,8 @@ impl Default for SpillConfig {
519527
window_partition_spilling_disk_quota_ratio: 60,
520528
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
521529
result_set_spilling_disk_quota_ratio: 0,
530+
buffer_pool_memory: 200 * 1024 * 1024,
531+
buffer_pool_workers: 2,
522532
}
523533
}
524534
}

src/query/config/src/mask.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ impl SpillConfig {
218218
sort_spilling_disk_quota_ratio,
219219
window_partition_spilling_disk_quota_ratio,
220220
result_set_spilling_disk_quota_ratio,
221+
spill_buffer_pool_memory,
222+
spill_buffer_pool_workers,
221223
} = *self;
222224

223225
Self {
@@ -227,8 +229,9 @@ impl SpillConfig {
227229
storage: storage.as_ref().map(|storage| storage.mask_display()),
228230
sort_spilling_disk_quota_ratio,
229231
window_partition_spilling_disk_quota_ratio,
230-
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
231232
result_set_spilling_disk_quota_ratio,
233+
spill_buffer_pool_memory,
234+
spill_buffer_pool_workers,
232235
}
233236
}
234237
}
@@ -384,8 +387,9 @@ mod tests {
384387
spill_local_disk_max_bytes: 10,
385388
sort_spilling_disk_quota_ratio: 60,
386389
window_partition_spilling_disk_quota_ratio: 30,
387-
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
388390
result_set_spilling_disk_quota_ratio: 0,
391+
spill_buffer_pool_memory: 209715200,
392+
spill_buffer_pool_workers: 2,
389393
storage: Some(StorageConfig {
390394
typ: "s3".to_string(),
391395
s3: S3StorageConfig {

src/query/service/src/global_services.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl GlobalServices {
115115
// 4. cluster discovery init.
116116
ClusterDiscovery::init(config, version).await?;
117117

118-
SpillsBufferPool::init();
118+
SpillsBufferPool::init(&config.spill)?;
119119
// TODO(xuanwo):
120120
//
121121
// This part is a bit complex because catalog are used widely in different

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ struct PayloadWriter {
5454
impl PayloadWriter {
5555
fn try_create(prefix: &str) -> Result<Self> {
5656
let data_operator = DataOperator::instance();
57-
let target = SpillTarget::from_storage_params(data_operator.spill_params());
5857
let operator = data_operator.spill_operator();
5958
let buffer_pool = SpillsBufferPool::instance();
6059
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
61-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
60+
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
6261

6362
Ok(PayloadWriter {
6463
path: file_path,
@@ -414,10 +413,11 @@ fn restore_payload(
414413
data_schema.clone(),
415414
vec![row_group.clone()],
416415
target,
416+
read_setting,
417417
)?;
418418

419419
let instant = Instant::now();
420-
let data_block = reader.read(read_setting)?;
420+
let data_block = reader.read()?;
421421
let elapsed = instant.elapsed();
422422

423423
let read_size = reader.read_bytes();

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,10 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
278278
self.desc.build_schema.clone(),
279279
data.row_groups,
280280
target,
281+
self.read_settings,
281282
)?;
282283

283-
while let Some(data_block) = reader.read(self.read_settings)? {
284+
while let Some(data_block) = reader.read()? {
284285
self.memory_hash_join.add_block(Some(data_block))?;
285286
}
286287
}
@@ -417,12 +418,11 @@ pub struct GraceJoinPartition {
417418
impl GraceJoinPartition {
418419
pub fn create(prefix: &str) -> Result<GraceJoinPartition> {
419420
let data_operator = DataOperator::instance();
420-
let target = SpillTarget::from_storage_params(data_operator.spill_params());
421421

422422
let operator = data_operator.spill_operator();
423423
let buffer_pool = SpillsBufferPool::instance();
424424
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
425-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
425+
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
426426

427427
Ok(GraceJoinPartition {
428428
path: file_path,
@@ -500,6 +500,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
500500
self.join.desc.probe_schema.clone(),
501501
data.row_groups,
502502
target,
503+
self.join.read_settings,
503504
)?;
504505
self.spills_reader = Some(reader);
505506
break;
@@ -511,7 +512,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
511512
}
512513

513514
if let Some(mut spills_reader) = self.spills_reader.take() {
514-
if let Some(v) = spills_reader.read(self.join.read_settings)? {
515+
if let Some(v) = spills_reader.read()? {
515516
self.spills_reader = Some(spills_reader);
516517
return Ok(Some(v));
517518
}

0 commit comments

Comments
 (0)