@@ -1260,3 +1260,212 @@ impl SimpleEngine {
12601260 ) )
12611261 }
12621262}
1263+
1264+ /// End-to-end tests for PromQL `topk(k, …)` served by `CountMinSketchWithHeap`.
1265+ ///
1266+ /// Exercises the PromQL half of the top-k flag split:
1267+ /// `execute_query_pipeline(ctx, true, true)`. Unlike SQL (`true, false`), PromQL
1268+ /// enables formatting so each result row carries the metric name as the first
1269+ /// label value (Prometheus series shape).
1270+ #[ cfg( test) ]
1271+ mod topk_pipeline_tests {
1272+ use super :: SimpleEngine ;
1273+ use crate :: data_model:: {
1274+ AggregationConfig , AggregationReference , AggregationType , CleanupPolicy , InferenceConfig ,
1275+ PrecomputedOutput , PromQLSchema , QueryConfig , QueryLanguage , SchemaConfig , StreamingConfig ,
1276+ WindowType ,
1277+ } ;
1278+ use crate :: engines:: QueryResult ;
1279+ use crate :: precompute_operators:: CountMinSketchWithHeapAccumulator ;
1280+ use crate :: stores:: simple_map_store:: SimpleMapStore ;
1281+ use crate :: stores:: Store ;
1282+ use crate :: utils:: http:: convert_query_result_to_prometheus;
1283+ use promql_utilities:: data_model:: KeyByLabelNames ;
1284+ use promql_utilities:: query_logics:: enums:: Statistic ;
1285+ use std:: collections:: { HashMap , HashSet } ;
1286+ use std:: sync:: Arc ;
1287+
1288+ const AGG_ID : u64 = 101 ;
1289+ const METRIC : & str = "transfer_events" ;
1290+ // Aligned to the 1s scrape interval (multiple of 1000ms).
1291+ const QUERY_TIME : f64 = 1_759_276_810.0 ;
1292+ const TOPK_QUERY : & str = "topk(10, transfer_events)" ;
1293+
1294+ fn build_topk_engine ( ) -> ( SimpleEngine , Arc < SimpleMapStore > ) {
1295+ let promql_schema = PromQLSchema :: new ( ) . add_metric (
1296+ METRIC . to_string ( ) ,
1297+ KeyByLabelNames :: new ( vec ! [ "srcip" . to_string( ) ] ) ,
1298+ ) ;
1299+
1300+ let query_config = QueryConfig :: new ( TOPK_QUERY . to_string ( ) )
1301+ . add_aggregation ( AggregationReference :: new ( AGG_ID , None ) ) ;
1302+
1303+ let inference_config = InferenceConfig {
1304+ schema : SchemaConfig :: PromQL ( promql_schema) ,
1305+ query_configs : vec ! [ query_config] ,
1306+ cleanup_policy : CleanupPolicy :: NoCleanup ,
1307+ } ;
1308+
1309+ let agg_config = AggregationConfig {
1310+ aggregation_id : AGG_ID ,
1311+ aggregation_type : AggregationType :: CountMinSketchWithHeap ,
1312+ aggregation_sub_type : String :: new ( ) ,
1313+ parameters : HashMap :: new ( ) ,
1314+ grouping_labels : KeyByLabelNames :: empty ( ) ,
1315+ aggregated_labels : KeyByLabelNames :: new ( vec ! [ "srcip" . to_string( ) ] ) ,
1316+ rollup_labels : KeyByLabelNames :: empty ( ) ,
1317+ original_yaml : String :: new ( ) ,
1318+ window_size : 1 ,
1319+ slide_interval : 1 ,
1320+ window_type : WindowType :: Tumbling ,
1321+ spatial_filter : String :: new ( ) ,
1322+ spatial_filter_normalized : String :: new ( ) ,
1323+ metric : METRIC . to_string ( ) ,
1324+ num_aggregates_to_retain : None ,
1325+ read_count_threshold : None ,
1326+ table_name : None ,
1327+ value_column : None ,
1328+ } ;
1329+
1330+ let mut agg_configs = HashMap :: new ( ) ;
1331+ agg_configs. insert ( AGG_ID , agg_config) ;
1332+ let streaming_config = Arc :: new ( StreamingConfig {
1333+ aggregation_configs : agg_configs,
1334+ } ) ;
1335+
1336+ let store = Arc :: new ( SimpleMapStore :: new (
1337+ streaming_config. clone ( ) ,
1338+ CleanupPolicy :: NoCleanup ,
1339+ ) ) ;
1340+
1341+ let engine = SimpleEngine :: new (
1342+ store. clone ( ) ,
1343+ inference_config,
1344+ streaming_config,
1345+ 1 ,
1346+ QueryLanguage :: promql,
1347+ ) ;
1348+ ( engine, store)
1349+ }
1350+
1351+ #[ test]
1352+ fn detects_topk_and_resolves_self_keyed_heap ( ) {
1353+ let ( engine, _store) = build_topk_engine ( ) ;
1354+ let context = engine
1355+ . build_query_execution_context_promql ( TOPK_QUERY . to_string ( ) , QUERY_TIME )
1356+ . expect ( "topk(k, metric) should build a context via the query_config path" ) ;
1357+
1358+ assert_eq ! (
1359+ context. metadata. statistic_to_compute,
1360+ Statistic :: Topk ,
1361+ "topk(...) must resolve to Statistic::Topk" ,
1362+ ) ;
1363+ assert_eq ! (
1364+ context. metadata. query_kwargs. get( "k" ) . map( String :: as_str) ,
1365+ Some ( "10" ) ,
1366+ "the topk k argument should be threaded through as the `k` kwarg" ,
1367+ ) ;
1368+ assert_eq ! (
1369+ context. agg_info. aggregation_id_for_key,
1370+ context. agg_info. aggregation_id_for_value,
1371+ ) ;
1372+ assert ! ( context. store_plan. keys_query. is_none( ) ) ;
1373+ assert_eq ! (
1374+ context. metadata. query_output_labels. labels,
1375+ vec![ "__name__" . to_string( ) , "srcip" . to_string( ) ] ,
1376+ "topk PromQL rows zip to {{ __name__, srcip }} in the wire format" ,
1377+ ) ;
1378+ }
1379+
1380+ #[ test]
1381+ fn returns_top_k_srcips_sorted_descending_with_metric_prefix ( ) {
1382+ let ( engine, store) = build_topk_engine ( ) ;
1383+
1384+ let context = engine
1385+ . build_query_execution_context_promql ( TOPK_QUERY . to_string ( ) , QUERY_TIME )
1386+ . expect ( "context should build" ) ;
1387+ let window = & context. store_plan . values_query ;
1388+
1389+ let mut sketch = CountMinSketchWithHeapAccumulator :: new ( 3 , 1024 , 32 ) ;
1390+ for i in 1 ..=15u64 {
1391+ let srcip = format ! ( "10.0.0.{i}" ) ;
1392+ sketch. inner . update ( & srcip, ( i * 10 ) as f64 ) ;
1393+ }
1394+
1395+ let output =
1396+ PrecomputedOutput :: new ( window. start_timestamp , window. end_timestamp , None , AGG_ID ) ;
1397+ store
1398+ . insert_precomputed_output ( output, Box :: new ( sketch) )
1399+ . expect ( "insert should succeed" ) ;
1400+
1401+ let results = engine
1402+ . execute_query_pipeline ( & context, true , true )
1403+ . expect ( "pipeline should produce results" ) ;
1404+
1405+ assert_eq ! ( results. len( ) , 10 , "topk(10, ...) must truncate to 10 rows" ) ;
1406+
1407+ for pair in results. windows ( 2 ) {
1408+ assert ! (
1409+ pair[ 0 ] . value >= pair[ 1 ] . value,
1410+ "results must be sorted by count descending: {} then {}" ,
1411+ pair[ 0 ] . value,
1412+ pair[ 1 ] . value,
1413+ ) ;
1414+ }
1415+
1416+ assert_eq ! (
1417+ results[ 0 ] . labels. labels,
1418+ vec![ METRIC . to_string( ) , "10.0.0.15" . to_string( ) ] ,
1419+ ) ;
1420+ assert_eq ! ( results[ 0 ] . value, 150.0 ) ;
1421+ for element in & results {
1422+ assert_eq ! (
1423+ element. labels. labels. len( ) ,
1424+ 2 ,
1425+ "PromQL top-k rows carry the metric-name prefix plus the srcip" ,
1426+ ) ;
1427+ assert_eq ! (
1428+ element. labels. labels[ 0 ] , METRIC ,
1429+ "first label value must be the metric name (PromQL formatting)" ,
1430+ ) ;
1431+ }
1432+
1433+ let returned: HashSet < String > =
1434+ results. iter ( ) . map ( |e| e. labels . labels [ 1 ] . clone ( ) ) . collect ( ) ;
1435+ let expected: HashSet < String > = ( 6 ..=15u64 ) . map ( |i| format ! ( "10.0.0.{i}" ) ) . collect ( ) ;
1436+ assert_eq ! ( returned, expected) ;
1437+
1438+ // Wire format: zip label names with values into Prometheus instant-vector JSON.
1439+ let output_labels = context. metadata . query_output_labels . clone ( ) ;
1440+ let query_result = QueryResult :: vector ( results, context. query_time ) ;
1441+ let prometheus_data = convert_query_result_to_prometheus ( & query_result, & output_labels)
1442+ . expect ( "pipeline output should convert to Prometheus instant-vector JSON" ) ;
1443+
1444+ assert_eq ! ( prometheus_data[ "resultType" ] , "vector" ) ;
1445+ let wire_rows = prometheus_data[ "result" ]
1446+ . as_array ( )
1447+ . expect ( "result must be an array" ) ;
1448+ assert_eq ! ( wire_rows. len( ) , 10 ) ;
1449+
1450+ let top_row = & wire_rows[ 0 ] ;
1451+ assert_eq ! ( top_row[ "metric" ] [ "__name__" ] , METRIC ) ;
1452+ assert_eq ! ( top_row[ "metric" ] [ "srcip" ] , "10.0.0.15" ) ;
1453+ assert_eq ! ( top_row[ "value" ] [ 0 ] , QUERY_TIME ) ;
1454+ assert_eq ! ( top_row[ "value" ] [ 1 ] , "150" ) ;
1455+
1456+ for row in wire_rows {
1457+ assert_eq ! ( row[ "metric" ] [ "__name__" ] , METRIC ) ;
1458+ assert ! ( row[ "metric" ] [ "srcip" ] . is_string( ) ) ;
1459+ assert ! ( row[ "value" ] [ 1 ] . is_string( ) ) ;
1460+ }
1461+
1462+ // Descending order is preserved in the wire format.
1463+ let wire_values: Vec < f64 > = wire_rows
1464+ . iter ( )
1465+ . map ( |row| row[ "value" ] [ 1 ] . as_str ( ) . unwrap ( ) . parse :: < f64 > ( ) . unwrap ( ) )
1466+ . collect ( ) ;
1467+ for pair in wire_values. windows ( 2 ) {
1468+ assert ! ( pair[ 0 ] >= pair[ 1 ] ) ;
1469+ }
1470+ }
1471+ }
0 commit comments