@@ -50,7 +50,11 @@ pub struct Worker {
5050 receiver : mpsc:: Receiver < WorkerMessage > ,
5151 output_sink : Arc < dyn OutputSink > ,
5252 /// Map from (agg_id, group_key) to per-group state.
53- group_states : HashMap < ( u64 , String ) , GroupState > ,
53+ /// Per-group state, keyed by `agg_id` then by an interned `Arc<str>`
54+ /// group key. Nesting lets the per-sample hot path look up by `&str`
55+ /// (no allocation); the group-key string is allocated once, on first
56+ /// sight of the group, and shared via the `Arc`.
57+ group_states : HashMap < u64 , HashMap < Arc < str > , GroupState > > ,
5458 /// Aggregation configs, keyed by aggregation_id.
5559 agg_configs : HashMap < u64 , Arc < AggregationConfig > > ,
5660 /// Allowed lateness in ms.
@@ -185,25 +189,18 @@ impl Worker {
185189 Vec :: new ( ) ;
186190
187191 for agg_id in & removed_ids {
188- // Drain all group states for this agg_id.
189- let removed_keys: Vec < _ > = self
190- . group_states
191- . keys ( )
192- . filter ( |( id, _) | id == agg_id)
193- . cloned ( )
194- . collect ( ) ;
195-
196- for key in removed_keys {
197- let Some ( state) = self . group_states . remove ( & key) else {
198- continue ;
199- } ;
192+ // Drain all group states for this agg_id in one move.
193+ let Some ( inner) = self . group_states . remove ( agg_id) else {
194+ continue ;
195+ } ;
196+
197+ for ( group_key_str, state) in inner {
200198 if state. previous_watermark_ms == i64:: MIN {
201199 continue ; // No samples received — nothing to emit.
202200 }
203201 // Force-close all open windows by advancing the watermark
204202 // to i64::MAX. No new samples will arrive for this group.
205- let ( group_key_str, mut active_panes) =
206- ( key. 1 . clone ( ) , state. active_panes ) ;
203+ let mut active_panes = state. active_panes ;
207204 let closed = state
208205 . window_manager
209206 . closed_windows ( state. previous_watermark_ms , i64:: MAX ) ;
@@ -240,7 +237,7 @@ impl Worker {
240237 }
241238
242239 self . group_count
243- . store ( self . group_states . len ( ) , Ordering :: Relaxed ) ;
240+ . store ( self . total_groups ( ) , Ordering :: Relaxed ) ;
244241 info ! (
245242 "Worker {}: evicted {} removed agg_id(s) {:?}" ,
246243 self . id,
@@ -264,31 +261,44 @@ impl Worker {
264261 info ! (
265262 "Worker {} stopped, {} active groups" ,
266263 self . id,
267- self . group_states . len ( )
264+ self . total_groups ( )
268265 ) ;
269266 }
270267
268+ /// Total number of live groups across all agg_ids.
269+ fn total_groups ( & self ) -> usize {
270+ self . group_states . values ( ) . map ( |m| m. len ( ) ) . sum ( )
271+ }
272+
271273 /// Get or create the GroupState for a (agg_id, group_key) pair.
272274 /// Returns None if agg_id has no matching config.
273275 fn get_or_create_group_state (
274276 & mut self ,
275277 agg_id : u64 ,
276278 group_key : & str ,
277279 ) -> Option < & mut GroupState > {
278- let key = ( agg_id, group_key. to_string ( ) ) ;
279- if !self . group_states . contains_key ( & key) {
280- let config = self . agg_configs . get ( & agg_id) ?;
280+ // Fast path: group already exists — borrow-based lookup, no allocation.
281+ let exists = self
282+ . group_states
283+ . get ( & agg_id)
284+ . is_some_and ( |m| m. contains_key ( group_key) ) ;
285+ if !exists {
286+ // Creation path: requires a config, and allocates the interned key once.
287+ let config = Arc :: clone ( self . agg_configs . get ( & agg_id) ?) ;
281288 let gs = GroupState {
282289 window_manager : WindowManager :: new ( config. window_size , config. slide_interval ) ,
283- config : Arc :: clone ( config ) ,
290+ config,
284291 active_panes : BTreeMap :: new ( ) ,
285292 previous_watermark_ms : i64:: MIN ,
286293 } ;
287- self . group_states . insert ( key. clone ( ) , gs) ;
294+ self . group_states
295+ . entry ( agg_id)
296+ . or_default ( )
297+ . insert ( Arc :: from ( group_key) , gs) ;
288298 self . group_count
289- . store ( self . group_states . len ( ) , Ordering :: Relaxed ) ;
299+ . store ( self . total_groups ( ) , Ordering :: Relaxed ) ;
290300 }
291- self . group_states . get_mut ( & key )
301+ self . group_states . get_mut ( & agg_id ) ? . get_mut ( group_key )
292302 }
293303
294304 /// Process a batch of samples for a specific (agg_id, group_key).
@@ -305,17 +315,16 @@ impl Worker {
305315 let allowed_lateness_ms = self . allowed_lateness_ms ;
306316 let late_data_policy = self . late_data_policy ;
307317
308- if self . get_or_create_group_state ( agg_id, group_key) . is_none ( ) {
309- warn ! (
310- "Worker {} skipping samples for unknown agg_id={}, group_key={}" ,
311- self . id, agg_id, group_key
312- ) ;
313- return Ok ( ( ) ) ;
314- }
315- let state = self
316- . group_states
317- . get_mut ( & ( agg_id, group_key. to_string ( ) ) )
318- . unwrap ( ) ;
318+ let state = match self . get_or_create_group_state ( agg_id, group_key) {
319+ Some ( state) => state,
320+ None => {
321+ warn ! (
322+ "Worker {} skipping samples for unknown agg_id={}, group_key={}" ,
323+ self . id, agg_id, group_key
324+ ) ;
325+ return Ok ( ( ) ) ;
326+ }
327+ } ;
319328
320329 // Find the max timestamp in this batch to advance the watermark
321330 let batch_max_ts = samples
@@ -470,6 +479,7 @@ impl Worker {
470479 let worker_wm = self
471480 . group_states
472481 . values ( )
482+ . flat_map ( |m| m. values ( ) )
473483 . map ( |s| s. previous_watermark_ms )
474484 . filter ( |& wm| wm != i64:: MIN )
475485 . max ( )
@@ -484,44 +494,46 @@ impl Worker {
484494 // Step 4: For each group, advance watermark and close due windows.
485495 let mut emit_batch: Vec < ( PrecomputedOutput , Box < dyn AggregateCore > ) > = Vec :: new ( ) ;
486496
487- for ( ( agg_id, group_key) , state) in & mut self . group_states {
488- if state. previous_watermark_ms == i64:: MIN {
489- continue ; // No samples received yet — no panes to close.
490- }
497+ for ( agg_id, inner) in & mut self . group_states {
498+ for ( group_key, state) in inner. iter_mut ( ) {
499+ if state. previous_watermark_ms == i64:: MIN {
500+ continue ; // No samples received yet — no panes to close.
501+ }
491502
492- // Effective watermark: max(group's own, global) + 1ms for boundary.
493- let propagated_wm = if global_wm != i64:: MIN {
494- state. previous_watermark_ms . max ( global_wm)
495- } else {
496- state. previous_watermark_ms
497- } ;
498- let effective_wm = propagated_wm. saturating_add ( 1 ) ;
499-
500- let closed = state
501- . window_manager
502- . closed_windows ( state. previous_watermark_ms , effective_wm) ;
503-
504- for window_start in & closed {
505- let ( _, window_end) = state. window_manager . window_bounds ( * window_start) ;
506- let pane_starts = state. window_manager . panes_for_window ( * window_start) ;
507-
508- if let Some ( accumulator) =
509- merge_panes_for_window ( & mut state. active_panes , & pane_starts)
510- {
511- let key = build_group_key_label_values ( group_key) ;
512- let output = PrecomputedOutput :: new (
513- * window_start as u64 ,
514- window_end as u64 ,
515- Some ( key) ,
516- * agg_id,
517- ) ;
518- emit_batch. push ( ( output, accumulator) ) ;
503+ // Effective watermark: max(group's own, global) + 1ms for boundary.
504+ let propagated_wm = if global_wm != i64:: MIN {
505+ state. previous_watermark_ms . max ( global_wm)
506+ } else {
507+ state. previous_watermark_ms
508+ } ;
509+ let effective_wm = propagated_wm. saturating_add ( 1 ) ;
510+
511+ let closed = state
512+ . window_manager
513+ . closed_windows ( state. previous_watermark_ms , effective_wm) ;
514+
515+ for window_start in & closed {
516+ let ( _, window_end) = state. window_manager . window_bounds ( * window_start) ;
517+ let pane_starts = state. window_manager . panes_for_window ( * window_start) ;
518+
519+ if let Some ( accumulator) =
520+ merge_panes_for_window ( & mut state. active_panes , & pane_starts)
521+ {
522+ let key = build_group_key_label_values ( group_key) ;
523+ let output = PrecomputedOutput :: new (
524+ * window_start as u64 ,
525+ window_end as u64 ,
526+ Some ( key) ,
527+ * agg_id,
528+ ) ;
529+ emit_batch. push ( ( output, accumulator) ) ;
530+ }
519531 }
520- }
521532
522- // Update group watermark to reflect the advancement.
523- if effective_wm > state. previous_watermark_ms {
524- state. previous_watermark_ms = effective_wm;
533+ // Update group watermark to reflect the advancement.
534+ if effective_wm > state. previous_watermark_ms {
535+ state. previous_watermark_ms = effective_wm;
536+ }
525537 }
526538 }
527539
0 commit comments