Skip to content

Commit 3bee584

Browse files
committed
Simplify UDF impl mode, default CMS to sketchlib
1 parent 81126b1 commit 3bee584

2 files changed

Lines changed: 17 additions & 31 deletions

File tree

asap-summary-ingest/run_arroyosketch.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -380,16 +380,6 @@ def delete_connection_table(args, table_name):
380380
)
381381

382382

383-
# Map UDF names to the sketch impl CLI arg. UDFs use same impl mode as QueryEngine.
384-
_UDF_IMPL_MODE_ARG = {
385-
"countminsketch_count": "sketch_cms_impl",
386-
"countminsketch_sum": "sketch_cms_impl",
387-
"countminsketchwithheap_topk": "sketch_cmwh_impl",
388-
"datasketcheskll_": "sketch_kll_impl",
389-
"hydrakll_": "sketch_kll_impl",
390-
}
391-
392-
393383
def create_pipeline(
394384
args: argparse.Namespace,
395385
sql_queries: List[str],
@@ -453,15 +443,9 @@ def create_pipeline(
453443
template_path = os.path.join(udf_dir, f"{udf_name}.rs.j2")
454444
regular_path = os.path.join(udf_dir, f"{udf_name}.rs")
455445

456-
# Get parameters for this UDF
446+
# Get parameters for this UDF (impl_mode injected in main() for sketch UDFs)
457447
params = dict(agg_function_params.get(udf_name, {}))
458448

459-
# Inject impl_mode from CLI so UDFs use same backend as QueryEngine
460-
impl_arg = _UDF_IMPL_MODE_ARG.get(udf_name)
461-
if impl_arg:
462-
impl_val = getattr(args, impl_arg, "legacy")
463-
params["impl_mode"] = impl_val.capitalize() # "Legacy" or "Sketchlib"
464-
465449
if len(params) > 0 and not os.path.exists(template_path):
466450
raise ValueError(
467451
f"UDF {udf_name} requires parameters {params} but no template found at {template_path}"
@@ -480,18 +464,6 @@ def create_pipeline(
480464
template_source, udf_template.environment
481465
)
482466

483-
# Per-UDF impl mode defaults (aligned with sketch-core config)
484-
UDF_IMPL_DEFAULTS = {
485-
"countminsketch_count": "Sketchlib",
486-
"countminsketch_sum": "Sketchlib",
487-
"countminsketchwithheap_topk": "Sketchlib",
488-
"datasketcheskll_": "Sketchlib",
489-
"hydrakll_": "Sketchlib",
490-
}
491-
params.setdefault(
492-
"impl_mode", UDF_IMPL_DEFAULTS.get(udf_name, "Sketchlib")
493-
)
494-
495467
# Handle config key mapping (K -> k for KLL)
496468
if "K" in params and "k" in required_params:
497469
params["k"] = params["K"]
@@ -974,6 +946,20 @@ def main(args):
974946
filter_metric_name,
975947
)
976948

949+
parameters = dict(parameters)
950+
if agg_function in ("countminsketch_count", "countminsketch_sum"):
951+
parameters["impl_mode"] = getattr(
952+
args, "sketch_cms_impl", "legacy"
953+
).capitalize()
954+
elif agg_function == "countminsketchwithheap_topk":
955+
parameters["impl_mode"] = getattr(
956+
args, "sketch_cmwh_impl", "legacy"
957+
).capitalize()
958+
elif agg_function in ("datasketcheskll_", "hydrakll_"):
959+
parameters["impl_mode"] = getattr(
960+
args, "sketch_kll_impl", "legacy"
961+
).capitalize()
962+
977963
sql_queries.append(sql_query)
978964
# if not is_labels_accumulator:
979965
agg_functions_with_params.append((agg_function, parameters))
@@ -1133,7 +1119,7 @@ def main(args):
11331119
"--sketch_cms_impl",
11341120
type=str,
11351121
choices=["legacy", "sketchlib"],
1136-
default="legacy",
1122+
default="sketchlib",
11371123
help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.",
11381124
)
11391125
parser.add_argument(

asap-tools/experiments/experiment_utils/services/arroyo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def run_arroyosketch(
105105
use_kafka_ingest: bool = False,
106106
enable_optimized_remote_write: bool = False,
107107
avoid_long_ssh: bool = False,
108-
sketch_cms_impl: str = "legacy",
108+
sketch_cms_impl: str = "sketchlib",
109109
sketch_kll_impl: str = "legacy",
110110
sketch_cmwh_impl: str = "legacy",
111111
) -> str:

0 commit comments

Comments
 (0)