@@ -303,8 +303,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
303303
304304 // Send a few raw samples — no need to advance watermark.
305305 println ! ( "\n === Sending raw-mode samples ===" ) ;
306- let raw_timestamps = vec ! [ 100_000i64 , 101_000 , 102_000 ] ;
307- let raw_values = vec ! [ 42.0f64 , 43.0 , 44.0 ] ;
306+ let raw_timestamps = [ 100_000i64 , 101_000 , 102_000 ] ;
307+ let raw_values = [ 42.0f64 , 43.0 , 44.0 ] ;
308308 for ( & ts, & val) in raw_timestamps. iter ( ) . zip ( raw_values. iter ( ) ) {
309309 let body = build_remote_write_body ( vec ! [ make_sample( "fake_metric" , "groupA" , ts, val) ] ) ;
310310 let resp = client
@@ -353,7 +353,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
353353 for series_idx in 0 ..10 {
354354 let label_val = format ! ( "batch_{series_idx}" ) ;
355355 for t in 0 ..100 {
356- let ts = 200_000 + ( series_idx as i64 ) * 1000 + t; // unique ts per sample
356+ let ts = 200_000 + series_idx * 1000 + t; // unique ts per sample
357357 let val = ( series_idx * 100 + t) as f64 ;
358358 batch_timeseries. push ( make_sample ( "fake_metric" , & label_val, ts, val) ) ;
359359 }
@@ -408,7 +408,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
408408 // Pre-build all request bodies in parallel using rayon-style chunking via tokio tasks.
409409 // Each task builds its share of requests, then we flatten the results.
410410 let num_build_tasks = num_concurrent_senders;
411- let requests_per_task = ( num_requests as usize + num_build_tasks - 1 ) / num_build_tasks;
411+ let requests_per_task = ( num_requests as usize ) . div_ceil ( num_build_tasks) ;
412412 let mut build_handles = Vec :: with_capacity ( num_build_tasks) ;
413413 for task_idx in 0 ..num_build_tasks {
414414 let start = task_idx * requests_per_task;
@@ -566,6 +566,17 @@ struct BenchResult {
566566 batch_latency_ms : f64 ,
567567}
568568
569+ struct BenchRunConfig {
570+ label : String ,
571+ port : u16 ,
572+ streaming_config : Arc < StreamingConfig > ,
573+ num_workers : usize ,
574+ num_concurrent_senders : usize ,
575+ num_requests : u64 ,
576+ samples_per_request : u64 ,
577+ num_series : u64 ,
578+ }
579+
569580/// Build an AggregationConfig for Sum with specified window parameters.
570581fn make_sum_agg_config (
571582 agg_id : u64 ,
@@ -602,15 +613,18 @@ fn make_sum_agg_config(
602613/// Run a single windowed benchmark and return the results.
603614async fn run_single_bench (
604615 client : & reqwest:: Client ,
605- label : & str ,
606- port : u16 ,
607- streaming_config : Arc < StreamingConfig > ,
608- num_workers : usize ,
609- num_concurrent_senders : usize ,
610- num_requests : u64 ,
611- samples_per_request : u64 ,
612- num_series : u64 ,
616+ config : BenchRunConfig ,
613617) -> Result < BenchResult , Box < dyn std:: error:: Error + Send + Sync > > {
618+ let BenchRunConfig {
619+ label,
620+ port,
621+ streaming_config,
622+ num_workers,
623+ num_concurrent_senders,
624+ num_requests,
625+ samples_per_request,
626+ num_series,
627+ } = config;
614628 let total_samples = num_requests * samples_per_request;
615629
616630 let noop_sink = Arc :: new ( NoopOutputSink :: new ( ) ) ;
@@ -728,7 +742,7 @@ async fn run_single_bench(
728742 println ! ( " Batch latency: {batch_latency_ms:.1}ms" ) ;
729743
730744 Ok ( BenchResult {
731- label : label . to_string ( ) ,
745+ label,
732746 send_throughput,
733747 e2e_throughput,
734748 batch_latency_ms,
@@ -760,14 +774,16 @@ async fn run_windowed_benchmarks(
760774
761775 let r = run_single_bench (
762776 client,
763- label,
764- port,
765- sc,
766- 4 ,
767- 4 , // concurrent senders to saturate workers
768- num_requests,
769- samples_per_request,
770- num_series,
777+ BenchRunConfig {
778+ label : label. to_string ( ) ,
779+ port,
780+ streaming_config : sc,
781+ num_workers : 4 ,
782+ num_concurrent_senders : 4 , // concurrent senders to saturate workers
783+ num_requests,
784+ samples_per_request,
785+ num_series,
786+ } ,
771787 )
772788 . await ?;
773789 results. push ( r) ;
@@ -802,14 +818,16 @@ async fn run_scalability_benchmark(
802818
803819 let r = run_single_bench (
804820 client,
805- & label,
806- port,
807- sc,
808- num_workers,
809- num_workers, // concurrent senders match worker count
810- num_requests,
811- samples_per_request,
812- num_series,
821+ BenchRunConfig {
822+ label,
823+ port,
824+ streaming_config : sc,
825+ num_workers,
826+ num_concurrent_senders : num_workers, // concurrent senders match worker count
827+ num_requests,
828+ samples_per_request,
829+ num_series,
830+ } ,
813831 )
814832 . await ?;
815833 results. push ( r) ;
0 commit comments