11use indexmap:: IndexMap ;
22use serde_yaml:: Value as YamlValue ;
3+ use sketch_db_common:: enums:: CleanupPolicy ;
34use std:: collections:: HashMap ;
45use std:: time:: { SystemTime , UNIX_EPOCH } ;
56
67use crate :: config:: input:: SQLControllerConfig ;
78use crate :: error:: ControllerError ;
89use crate :: output:: generator:: {
9- key_by_labels_to_yaml , params_to_yaml , parse_cleanup_policy, GeneratorOutput ,
10+ build_aggregation_entry , build_queries_yaml , parse_cleanup_policy, GeneratorOutput ,
1011} ;
1112use crate :: planner:: single_query:: IntermediateAggConfig ;
1213use crate :: planner:: sql_single_query:: SQLSingleQueryProcessor ;
@@ -110,93 +111,8 @@ pub fn generate_sql_plan(
110111 } )
111112}
112113
113- fn build_sql_streaming_yaml (
114- config : & SQLControllerConfig ,
115- dedup_map : & IndexMap < String , IntermediateAggConfig > ,
116- id_map : & HashMap < String , u32 > ,
117- ) -> Result < YamlValue , ControllerError > {
118- let aggregations: Vec < YamlValue > = dedup_map
119- . iter ( )
120- . map ( |( key, cfg) | {
121- let id = id_map[ key] ;
122- let mut map = serde_yaml:: Mapping :: new ( ) ;
123- map. insert (
124- YamlValue :: String ( "aggregationId" . to_string ( ) ) ,
125- YamlValue :: Number ( id. into ( ) ) ,
126- ) ;
127- map. insert (
128- YamlValue :: String ( "aggregationSubType" . to_string ( ) ) ,
129- YamlValue :: String ( cfg. aggregation_sub_type . clone ( ) ) ,
130- ) ;
131- map. insert (
132- YamlValue :: String ( "aggregationType" . to_string ( ) ) ,
133- YamlValue :: String ( cfg. aggregation_type . clone ( ) ) ,
134- ) ;
135-
136- // labels
137- let mut labels_map = serde_yaml:: Mapping :: new ( ) ;
138- labels_map. insert (
139- YamlValue :: String ( "aggregated" . to_string ( ) ) ,
140- key_by_labels_to_yaml ( & cfg. aggregated_labels ) ,
141- ) ;
142- labels_map. insert (
143- YamlValue :: String ( "grouping" . to_string ( ) ) ,
144- key_by_labels_to_yaml ( & cfg. grouping_labels ) ,
145- ) ;
146- labels_map. insert (
147- YamlValue :: String ( "rollup" . to_string ( ) ) ,
148- key_by_labels_to_yaml ( & cfg. rollup_labels ) ,
149- ) ;
150- map. insert (
151- YamlValue :: String ( "labels" . to_string ( ) ) ,
152- YamlValue :: Mapping ( labels_map) ,
153- ) ;
154-
155- map. insert (
156- YamlValue :: String ( "metric" . to_string ( ) ) ,
157- YamlValue :: String ( cfg. metric . clone ( ) ) ,
158- ) ;
159- map. insert (
160- YamlValue :: String ( "parameters" . to_string ( ) ) ,
161- params_to_yaml ( & cfg. parameters ) ,
162- ) ;
163- map. insert (
164- YamlValue :: String ( "slideInterval" . to_string ( ) ) ,
165- YamlValue :: Number ( cfg. slide_interval . into ( ) ) ,
166- ) ;
167- map. insert (
168- YamlValue :: String ( "spatialFilter" . to_string ( ) ) ,
169- YamlValue :: String ( cfg. spatial_filter . clone ( ) ) ,
170- ) ;
171- map. insert (
172- YamlValue :: String ( "table_name" . to_string ( ) ) ,
173- match & cfg. table_name {
174- Some ( t) => YamlValue :: String ( t. clone ( ) ) ,
175- None => YamlValue :: Null ,
176- } ,
177- ) ;
178- map. insert (
179- YamlValue :: String ( "value_column" . to_string ( ) ) ,
180- match & cfg. value_column {
181- Some ( v) => YamlValue :: String ( v. clone ( ) ) ,
182- None => YamlValue :: Null ,
183- } ,
184- ) ;
185- map. insert (
186- YamlValue :: String ( "windowSize" . to_string ( ) ) ,
187- YamlValue :: Number ( cfg. window_size . into ( ) ) ,
188- ) ;
189- map. insert (
190- YamlValue :: String ( "windowType" . to_string ( ) ) ,
191- YamlValue :: String ( cfg. window_type . clone ( ) ) ,
192- ) ;
193-
194- YamlValue :: Mapping ( map)
195- } )
196- . collect ( ) ;
197-
198- // Build tables section
199- let tables_seq: Vec < YamlValue > = config
114+ fn build_tables_yaml ( config : & SQLControllerConfig ) -> Vec < YamlValue > {
115+ config
200116 . tables
201117 . iter ( )
202118 . map ( |t| {
@@ -229,6 +145,17 @@ fn build_sql_streaming_yaml(
229145 ) ;
230146 YamlValue :: Mapping ( map)
231147 } )
148+ . collect ( )
149+ }
150+
151+ fn build_sql_streaming_yaml (
152+ config : & SQLControllerConfig ,
153+ dedup_map : & IndexMap < String , IntermediateAggConfig > ,
154+ id_map : & HashMap < String , u32 > ,
155+ ) -> Result < YamlValue , ControllerError > {
156+ let aggregations: Vec < YamlValue > = dedup_map
157+ . iter ( )
158+ . map ( |( key, cfg) | build_aggregation_entry ( id_map[ key] , cfg) )
232159 . collect ( ) ;
233160
234161 let mut root = serde_yaml:: Mapping :: new ( ) ;
@@ -238,15 +165,15 @@ fn build_sql_streaming_yaml(
238165 ) ;
239166 root. insert (
240167 YamlValue :: String ( "tables" . to_string ( ) ) ,
241- YamlValue :: Sequence ( tables_seq ) ,
168+ YamlValue :: Sequence ( build_tables_yaml ( config ) ) ,
242169 ) ;
243170
244171 Ok ( YamlValue :: Mapping ( root) )
245172}
246173
247174fn build_sql_inference_yaml (
248175 config : & SQLControllerConfig ,
249- cleanup_policy : sketch_db_common :: enums :: CleanupPolicy ,
176+ cleanup_policy : CleanupPolicy ,
250177 cleanup_policy_str : & str ,
251178 query_keys_map : & IndexMap < String , Vec < ( String , Option < u64 > ) > > ,
252179 id_map : & HashMap < String , u32 > ,
@@ -257,100 +184,18 @@ fn build_sql_inference_yaml(
257184 YamlValue :: String ( cleanup_policy_str. to_string ( ) ) ,
258185 ) ;
259186
260- let queries: Vec < YamlValue > = query_keys_map
261- . iter ( )
262- . map ( |( query_str, keys) | {
263- let aggregations: Vec < YamlValue > = keys
264- . iter ( )
265- . map ( |( key, cleanup_param) | {
266- let agg_id = id_map[ key] ;
267- let mut agg_map = serde_yaml:: Mapping :: new ( ) ;
268- agg_map. insert (
269- YamlValue :: String ( "aggregation_id" . to_string ( ) ) ,
270- YamlValue :: Number ( agg_id. into ( ) ) ,
271- ) ;
272- if let Some ( param) = cleanup_param {
273- match cleanup_policy {
274- sketch_db_common:: enums:: CleanupPolicy :: CircularBuffer => {
275- agg_map. insert (
276- YamlValue :: String ( "num_aggregates_to_retain" . to_string ( ) ) ,
277- YamlValue :: Number ( ( * param) . into ( ) ) ,
278- ) ;
279- }
280- sketch_db_common:: enums:: CleanupPolicy :: ReadBased => {
281- agg_map. insert (
282- YamlValue :: String ( "read_count_threshold" . to_string ( ) ) ,
283- YamlValue :: Number ( ( * param) . into ( ) ) ,
284- ) ;
285- }
286- sketch_db_common:: enums:: CleanupPolicy :: NoCleanup => { }
287- }
288- }
289- YamlValue :: Mapping ( agg_map)
290- } )
291- . collect ( ) ;
292-
293- let mut q_map = serde_yaml:: Mapping :: new ( ) ;
294- q_map. insert (
295- YamlValue :: String ( "aggregations" . to_string ( ) ) ,
296- YamlValue :: Sequence ( aggregations) ,
297- ) ;
298- q_map. insert (
299- YamlValue :: String ( "query" . to_string ( ) ) ,
300- YamlValue :: String ( query_str. clone ( ) ) ,
301- ) ;
302- YamlValue :: Mapping ( q_map)
303- } )
304- . collect ( ) ;
305-
306- // Build tables section
307- let tables_seq: Vec < YamlValue > = config
308- . tables
309- . iter ( )
310- . map ( |t| {
311- let mut map = serde_yaml:: Mapping :: new ( ) ;
312- map. insert (
313- YamlValue :: String ( "name" . to_string ( ) ) ,
314- YamlValue :: String ( t. name . clone ( ) ) ,
315- ) ;
316- map. insert (
317- YamlValue :: String ( "time_column" . to_string ( ) ) ,
318- YamlValue :: String ( t. time_column . clone ( ) ) ,
319- ) ;
320- map. insert (
321- YamlValue :: String ( "value_columns" . to_string ( ) ) ,
322- YamlValue :: Sequence (
323- t. value_columns
324- . iter ( )
325- . map ( |c| YamlValue :: String ( c. clone ( ) ) )
326- . collect ( ) ,
327- ) ,
328- ) ;
329- map. insert (
330- YamlValue :: String ( "metadata_columns" . to_string ( ) ) ,
331- YamlValue :: Sequence (
332- t. metadata_columns
333- . iter ( )
334- . map ( |c| YamlValue :: String ( c. clone ( ) ) )
335- . collect ( ) ,
336- ) ,
337- ) ;
338- YamlValue :: Mapping ( map)
339- } )
340- . collect ( ) ;
341-
342187 let mut root = serde_yaml:: Mapping :: new ( ) ;
343188 root. insert (
344189 YamlValue :: String ( "cleanup_policy" . to_string ( ) ) ,
345190 YamlValue :: Mapping ( cleanup_map) ,
346191 ) ;
347192 root. insert (
348193 YamlValue :: String ( "queries" . to_string ( ) ) ,
349- YamlValue :: Sequence ( queries ) ,
194+ YamlValue :: Sequence ( build_queries_yaml ( cleanup_policy , query_keys_map , id_map ) ) ,
350195 ) ;
351196 root. insert (
352197 YamlValue :: String ( "tables" . to_string ( ) ) ,
353- YamlValue :: Sequence ( tables_seq ) ,
198+ YamlValue :: Sequence ( build_tables_yaml ( config ) ) ,
354199 ) ;
355200
356201 Ok ( YamlValue :: Mapping ( root) )
0 commit comments