Skip to content

Commit e141160

Browse files
committed
Merge branch 'main' of https://github.com/datafuselabs/databend into feat/add-billing-history-table-functions
2 parents 0d15353 + 04ea6c1 commit e141160

22 files changed

Lines changed: 1122 additions & 565 deletions

File tree

src/common/io/src/format_settings.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ pub struct OutputFormatSettings {
148148
pub headers: u8,
149149
// used only for compat with old bendSQL driver.
150150
pub http_arrow_use_jsonb: bool,
151+
// used only in http arrow response to gate decimal64 physical type output.
152+
pub http_arrow_use_decimal64: bool,
151153

152154
pub json_compact: bool,
153155
pub json_strings: bool,
@@ -165,6 +167,7 @@ impl Default for OutputFormatSettings {
165167
http_json_result_mode: HttpHandlerDataFormat::Display,
166168
headers: 0,
167169
http_arrow_use_jsonb: false,
170+
http_arrow_use_decimal64: true,
168171
json_compact: false,
169172
json_strings: false,
170173
format_null_as_str: false,

src/query/config/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3481,6 +3481,14 @@ pub struct SpillConfig {
34813481
/// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
34823482
#[clap(long, value_name = "PERCENT", default_value = "0")]
34833483
pub result_set_spilling_disk_quota_ratio: u64,
3484+
3485+
/// Total memory for the spill buffer pool in bytes.
3486+
#[clap(long, value_name = "VALUE", default_value = "209715200")]
3487+
pub spill_buffer_pool_memory: u64,
3488+
3489+
/// Number of worker tasks in the spill buffer pool.
3490+
#[clap(long, value_name = "VALUE", default_value = "2")]
3491+
pub spill_buffer_pool_workers: usize,
34843492
}
34853493

34863494
impl Default for SpillConfig {
@@ -3594,6 +3602,8 @@ mod config_converters {
35943602
window_partition_spilling_disk_quota_ratio: spill
35953603
.window_partition_spilling_disk_quota_ratio,
35963604
result_set_spilling_disk_quota_ratio: spill.result_set_spilling_disk_quota_ratio,
3605+
buffer_pool_memory: spill.spill_buffer_pool_memory,
3606+
buffer_pool_workers: spill.spill_buffer_pool_workers,
35973607
})
35983608
}
35993609

@@ -3616,6 +3626,8 @@ mod config_converters {
36163626
window_partition_spilling_disk_quota_ratio: value
36173627
.window_partition_spilling_disk_quota_ratio,
36183628
result_set_spilling_disk_quota_ratio: value.result_set_spilling_disk_quota_ratio,
3629+
spill_buffer_pool_memory: value.buffer_pool_memory,
3630+
spill_buffer_pool_workers: value.buffer_pool_workers,
36193631
}
36203632
}
36213633
}

src/query/config/src/inner.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,12 @@ pub struct SpillConfig {
433433
/// Maximum percentage of the global local spill quota that HTTP
434434
/// result-set spilling may use for one query.
435435
pub result_set_spilling_disk_quota_ratio: u64,
436+
437+
/// Total memory for the spill buffer pool in bytes.
438+
pub buffer_pool_memory: u64,
439+
440+
/// Number of worker tasks in the spill buffer pool.
441+
pub buffer_pool_workers: usize,
436442
}
437443

438444
impl SpillConfig {
@@ -503,6 +509,8 @@ impl SpillConfig {
503509
window_partition_spilling_disk_quota_ratio: 60,
504510
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
505511
result_set_spilling_disk_quota_ratio: 0,
512+
buffer_pool_memory: 200 * 1024 * 1024,
513+
buffer_pool_workers: 2,
506514
}
507515
}
508516
}
@@ -519,6 +527,8 @@ impl Default for SpillConfig {
519527
window_partition_spilling_disk_quota_ratio: 60,
520528
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
521529
result_set_spilling_disk_quota_ratio: 0,
530+
buffer_pool_memory: 200 * 1024 * 1024,
531+
buffer_pool_workers: 2,
522532
}
523533
}
524534
}

src/query/config/src/mask.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ impl SpillConfig {
218218
sort_spilling_disk_quota_ratio,
219219
window_partition_spilling_disk_quota_ratio,
220220
result_set_spilling_disk_quota_ratio,
221+
spill_buffer_pool_memory,
222+
spill_buffer_pool_workers,
221223
} = *self;
222224

223225
Self {
@@ -227,8 +229,9 @@ impl SpillConfig {
227229
storage: storage.as_ref().map(|storage| storage.mask_display()),
228230
sort_spilling_disk_quota_ratio,
229231
window_partition_spilling_disk_quota_ratio,
230-
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
231232
result_set_spilling_disk_quota_ratio,
233+
spill_buffer_pool_memory,
234+
spill_buffer_pool_workers,
232235
}
233236
}
234237
}
@@ -384,8 +387,9 @@ mod tests {
384387
spill_local_disk_max_bytes: 10,
385388
sort_spilling_disk_quota_ratio: 60,
386389
window_partition_spilling_disk_quota_ratio: 30,
387-
// TODO: keep 0 to avoid deleting local result-set spill dir before HTTP pagination finishes.
388390
result_set_spilling_disk_quota_ratio: 0,
391+
spill_buffer_pool_memory: 209715200,
392+
spill_buffer_pool_workers: 2,
389393
storage: Some(StorageConfig {
390394
typ: "s3".to_string(),
391395
s3: S3StorageConfig {

src/query/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ storage-hdfs = ["opendal/services-hdfs", "databend-common-storage/storage-hdfs"]
3232
anyhow = { workspace = true }
3333
arrow-array = { workspace = true }
3434
arrow-buffer = { workspace = true }
35+
arrow-cast = { workspace = true }
3536
arrow-flight = { workspace = true }
3637
arrow-ipc = { workspace = true }
3738
arrow-json = { workspace = true }
@@ -185,7 +186,6 @@ uuid = { workspace = true }
185186
walkdir = { workspace = true }
186187

187188
[dev-dependencies]
188-
arrow-cast = { workspace = true }
189189
databend-common-sql-test-support = { workspace = true }
190190
databend-storages-common-pruner = { workspace = true }
191191
geo = { workspace = true }

src/query/service/src/global_services.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl GlobalServices {
115115
// 4. cluster discovery init.
116116
ClusterDiscovery::init(config, version).await?;
117117

118-
SpillsBufferPool::init();
118+
SpillsBufferPool::init(&config.spill)?;
119119
// TODO(xuanwo):
120120
//
121121
// This part is a bit complex because catalog are used widely in different

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ struct PayloadWriter {
5454
impl PayloadWriter {
5555
fn try_create(prefix: &str) -> Result<Self> {
5656
let data_operator = DataOperator::instance();
57-
let target = SpillTarget::from_storage_params(data_operator.spill_params());
5857
let operator = data_operator.spill_operator();
5958
let buffer_pool = SpillsBufferPool::instance();
6059
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
61-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
60+
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
6261

6362
Ok(PayloadWriter {
6463
path: file_path,
@@ -414,10 +413,11 @@ fn restore_payload(
414413
data_schema.clone(),
415414
vec![row_group.clone()],
416415
target,
416+
read_setting,
417417
)?;
418418

419419
let instant = Instant::now();
420-
let data_block = reader.read(read_setting)?;
420+
let data_block = reader.read()?;
421421
let elapsed = instant.elapsed();
422422

423423
let read_size = reader.read_bytes();

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,10 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
278278
self.desc.build_schema.clone(),
279279
data.row_groups,
280280
target,
281+
self.read_settings,
281282
)?;
282283

283-
while let Some(data_block) = reader.read(self.read_settings)? {
284+
while let Some(data_block) = reader.read()? {
284285
self.memory_hash_join.add_block(Some(data_block))?;
285286
}
286287
}
@@ -417,12 +418,11 @@ pub struct GraceJoinPartition {
417418
impl GraceJoinPartition {
418419
pub fn create(prefix: &str) -> Result<GraceJoinPartition> {
419420
let data_operator = DataOperator::instance();
420-
let target = SpillTarget::from_storage_params(data_operator.spill_params());
421421

422422
let operator = data_operator.spill_operator();
423423
let buffer_pool = SpillsBufferPool::instance();
424424
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
425-
let spills_data_writer = buffer_pool.writer(operator, file_path.clone(), target)?;
425+
let spills_data_writer = buffer_pool.writer(operator, file_path.clone())?;
426426

427427
Ok(GraceJoinPartition {
428428
path: file_path,
@@ -500,6 +500,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
500500
self.join.desc.probe_schema.clone(),
501501
data.row_groups,
502502
target,
503+
self.join.read_settings,
503504
)?;
504505
self.spills_reader = Some(reader);
505506
break;
@@ -511,7 +512,7 @@ impl<'a, T: GraceMemoryJoin> RestoreProbeStream<'a, T> {
511512
}
512513

513514
if let Some(mut spills_reader) = self.spills_reader.take() {
514-
if let Some(v) = spills_reader.read(self.join.read_settings)? {
515+
if let Some(v) = spills_reader.read()? {
515516
self.spills_reader = Some(spills_reader);
516517
return Ok(Some(v));
517518
}

0 commit comments

Comments
 (0)