Skip to content

Commit f956b70

Browse files
refactor: remove Legacy ImplMode fork — all sketch UDFs use Sketchlib (#312)
Drops the dual-backend (Legacy/Sketchlib) abstraction that was introduced in PRs 207/215/255/257 from the ingest side. Every UDF template now unconditionally uses the Sketchlib path; dead legacy code, the ImplMode enum, Jinja impl_mode template variables, CLI args, and experiment plumbing are all removed. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 09e4629 commit f956b70

8 files changed

Lines changed: 121 additions & 462 deletions

File tree

asap-summary-ingest/run_arroyosketch.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,6 @@ def create_pipeline(
443443
template_path = os.path.join(udf_dir, f"{udf_name}.rs.j2")
444444
regular_path = os.path.join(udf_dir, f"{udf_name}.rs")
445445

446-
# Get parameters for this UDF (impl_mode injected in main() for sketch UDFs)
447446
params = dict(agg_function_params.get(udf_name, {}))
448447

449448
if len(params) > 0 and not os.path.exists(template_path):
@@ -943,20 +942,6 @@ def main(args):
943942
filter_metric_name,
944943
)
945944

946-
parameters = dict(parameters)
947-
if agg_function in ("countminsketch_count", "countminsketch_sum"):
948-
parameters["impl_mode"] = getattr(
949-
args, "sketch_cms_impl", "legacy"
950-
).capitalize()
951-
elif agg_function == "countminsketchwithheap_topk":
952-
parameters["impl_mode"] = getattr(
953-
args, "sketch_cmwh_impl", "legacy"
954-
).capitalize()
955-
elif agg_function in ("datasketcheskll_", "hydrakll_"):
956-
parameters["impl_mode"] = getattr(
957-
args, "sketch_kll_impl", "sketchlib"
958-
).capitalize()
959-
960945
sql_queries.append(sql_query)
961946
# if not is_labels_accumulator:
962947
agg_functions_with_params.append((agg_function, parameters))
@@ -1111,29 +1096,6 @@ def main(args):
11111096
help="Query language for schema interpretation (default: promql)",
11121097
)
11131098

1114-
# Sketch implementation mode - must match QueryEngine (--sketch-cms-impl etc.)
1115-
parser.add_argument(
1116-
"--sketch_cms_impl",
1117-
type=str,
1118-
choices=["legacy", "sketchlib"],
1119-
default="sketchlib",
1120-
help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.",
1121-
)
1122-
parser.add_argument(
1123-
"--sketch_kll_impl",
1124-
type=str,
1125-
choices=["legacy", "sketchlib"],
1126-
default="sketchlib",
1127-
help="KLL Sketch backend (legacy | sketchlib). Must match QueryEngine.",
1128-
)
1129-
parser.add_argument(
1130-
"--sketch_cmwh_impl",
1131-
type=str,
1132-
choices=["legacy", "sketchlib"],
1133-
default="sketchlib",
1134-
help="Count-Min-With-Heap backend (legacy | sketchlib). Must match QueryEngine.",
1135-
)
1136-
11371099
args = parser.parse_args()
11381100
check_args(args)
11391101
main(args)

asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2

Lines changed: 25 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,18 @@
22
[dependencies]
33
rmp-serde = "1.1"
44
serde = { version = "1.0", features = ["derive"] }
5-
twox-hash = "2.1.0"
65
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }
76
*/
87

98
use arroyo_udf_plugin::udf;
109
use rmp_serde::Serializer;
1110
use serde::{Deserialize, Serialize};
12-
use twox_hash::XxHash32;
1311

1412
use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D};
1513

16-
// Count-Min Sketch parameters
1714
const DEPTH: usize = {{ depth }}; // Number of hash functions
1815
const WIDTH: usize = {{ width }}; // Number of buckets per hash function
1916

20-
// Implementation mode for Count-Min Sketch. Set at compile time; no env vars.
21-
enum ImplMode {
22-
Legacy,
23-
Sketchlib,
24-
}
25-
26-
{% set _impl_mode = impl_mode | default("Sketchlib") %}
27-
const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %};
28-
29-
fn use_sketchlib_for_cms() -> bool {
30-
matches!(IMPL_MODE, ImplMode::Sketchlib)
31-
}
32-
3317
type SketchlibCms = SketchlibCountMin<Vector2D<i64>, RegularPath>;
3418

3519
#[derive(Serialize, Deserialize, Clone)]
@@ -39,75 +23,37 @@ struct CountMinSketch {
3923
col_num: usize,
4024
}
4125

42-
impl CountMinSketch {
43-
fn new() -> Self {
44-
CountMinSketch {
45-
sketch: vec![vec![0.0; WIDTH]; DEPTH],
46-
row_num: DEPTH,
47-
col_num: WIDTH,
48-
}
49-
}
50-
51-
// Legacy path: update the sketch with a key-value pair using twox-hash.
52-
fn update(&mut self, key: &str, value: f64) {
53-
for i in 0..self.row_num {
54-
// already UTF-8
55-
let hash = XxHash32::oneshot(i as u32, key.as_bytes());
56-
let bucket = (hash as usize) % self.col_num;
57-
self.sketch[i][bucket] += value;
58-
}
59-
}
60-
}
61-
6226
#[udf]
6327
fn countminsketch_count(keys: Vec<&str>, values: Vec<f64>) -> Option<Vec<u8>> {
64-
if use_sketchlib_for_cms() {
65-
// asap_sketchlib backed implementation: integer counters + internal hashing.
66-
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
28+
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
6729

68-
for &key in keys.iter() {
69-
let input = DataInput::String(key.to_owned());
70-
inner.insert_many(&input, 1);
71-
}
30+
for &key in keys.iter() {
31+
let input = DataInput::String(key.to_owned());
32+
inner.insert_many(&input, 1);
33+
}
7234

73-
// Convert sketchlib storage to legacy matrix wire format.
74-
let storage: &Vector2D<i64> = inner.as_storage();
75-
let rows = storage.rows();
76-
let cols = storage.cols();
77-
let mut sketch = vec![vec![0.0; cols]; rows];
35+
let storage: &Vector2D<i64> = inner.as_storage();
36+
let rows = storage.rows();
37+
let cols = storage.cols();
38+
let mut sketch = vec![vec![0.0; cols]; rows];
7839

79-
for r in 0..rows {
80-
for c in 0..cols {
81-
if let Some(v) = storage.get(r, c) {
82-
sketch[r][c] = *v as f64;
83-
}
40+
for r in 0..rows {
41+
for c in 0..cols {
42+
if let Some(v) = storage.get(r, c) {
43+
sketch[r][c] = *v as f64;
8444
}
8545
}
86-
87-
let countminsketch = CountMinSketch {
88-
sketch,
89-
row_num: rows,
90-
col_num: cols,
91-
};
92-
93-
let mut buf = Vec::new();
94-
countminsketch
95-
.serialize(&mut Serializer::new(&mut buf))
96-
.ok()?;
97-
Some(buf)
98-
} else {
99-
// Legacy twox-hash backed implementation (unchanged).
100-
let mut countminsketch = CountMinSketch::new();
101-
102-
// Iterate through the keys and update the sketch for each entry
103-
for &key in keys.iter() {
104-
countminsketch.update(key, 1.0);
105-
}
106-
107-
let mut buf = Vec::new();
108-
countminsketch
109-
.serialize(&mut Serializer::new(&mut buf))
110-
.ok()?;
111-
Some(buf)
11246
}
47+
48+
let countminsketch = CountMinSketch {
49+
sketch,
50+
row_num: rows,
51+
col_num: cols,
52+
};
53+
54+
let mut buf = Vec::new();
55+
countminsketch
56+
.serialize(&mut Serializer::new(&mut buf))
57+
.ok()?;
58+
Some(buf)
11359
}

asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2

Lines changed: 29 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,18 @@
22
[dependencies]
33
rmp-serde = "1.1"
44
serde = { version = "1.0", features = ["derive"] }
5-
twox-hash = "2.1.0"
65
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }
76
*/
87

98
use arroyo_udf_plugin::udf;
109
use rmp_serde::Serializer;
1110
use serde::{Deserialize, Serialize};
12-
use twox_hash::XxHash32;
1311

1412
use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D};
1513

16-
// Count-Min Sketch parameters
1714
const DEPTH: usize = {{ depth }}; // Number of hash functions
1815
const WIDTH: usize = {{ width }}; // Number of buckets per hash function
1916

20-
// Implementation mode for Count-Min Sketch. Set at compile time; no env vars.
21-
enum ImplMode {
22-
Legacy,
23-
Sketchlib,
24-
}
25-
26-
{% set _impl_mode = impl_mode | default("Sketchlib") %}
27-
const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %};
28-
29-
fn use_sketchlib_for_cms() -> bool {
30-
matches!(IMPL_MODE, ImplMode::Sketchlib)
31-
}
32-
3317
type SketchlibCms = SketchlibCountMin<Vector2D<i64>, RegularPath>;
3418

3519
#[derive(Serialize, Deserialize, Clone)]
@@ -39,86 +23,46 @@ struct CountMinSketch {
3923
col_num: usize,
4024
}
4125

42-
impl CountMinSketch {
43-
fn new() -> Self {
44-
CountMinSketch {
45-
sketch: vec![vec![0.0; WIDTH]; DEPTH],
46-
row_num: DEPTH,
47-
col_num: WIDTH,
48-
}
49-
}
50-
51-
// Legacy path: update the sketch with a key-value pair using twox-hash.
52-
fn update(&mut self, key: &str, value: f64) {
53-
for i in 0..self.row_num {
54-
// already UTF-8
55-
let hash = XxHash32::oneshot(i as u32, key.as_bytes());
56-
let bucket = (hash as usize) % self.col_num;
57-
self.sketch[i][bucket] += value;
58-
}
59-
}
60-
}
61-
6226
#[udf]
6327
fn countminsketch_sum(keys: Vec<&str>, values: Vec<f64>) -> Option<Vec<u8>> {
64-
// Check that keys and values have equal length
6528
if keys.len() != values.len() {
6629
return None;
6730
}
6831

69-
if use_sketchlib_for_cms() {
70-
// asap_sketchlib backed implementation: integer counters + internal hashing.
71-
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
32+
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
7233

73-
for (i, &key) in keys.iter().enumerate() {
74-
let value = values[i];
75-
// Values arrive as f64; Count-Min counters are integers.
76-
let many = value.round() as i64;
77-
if many <= 0 {
78-
continue;
79-
}
80-
let input = DataInput::String(key.to_owned());
81-
inner.insert_many(&input, many);
34+
for (i, &key) in keys.iter().enumerate() {
35+
let value = values[i];
36+
let many = value.round() as i64;
37+
if many <= 0 {
38+
continue;
8239
}
40+
let input = DataInput::String(key.to_owned());
41+
inner.insert_many(&input, many);
42+
}
8343

84-
// Convert sketchlib storage to legacy matrix wire format.
85-
let storage: &Vector2D<i64> = inner.as_storage();
86-
let rows = storage.rows();
87-
let cols = storage.cols();
88-
let mut sketch = vec![vec![0.0; cols]; rows];
44+
let storage: &Vector2D<i64> = inner.as_storage();
45+
let rows = storage.rows();
46+
let cols = storage.cols();
47+
let mut sketch = vec![vec![0.0; cols]; rows];
8948

90-
for r in 0..rows {
91-
for c in 0..cols {
92-
if let Some(v) = storage.get(r, c) {
93-
sketch[r][c] = *v as f64;
94-
}
49+
for r in 0..rows {
50+
for c in 0..cols {
51+
if let Some(v) = storage.get(r, c) {
52+
sketch[r][c] = *v as f64;
9553
}
9654
}
97-
98-
let countminsketch = CountMinSketch {
99-
sketch,
100-
row_num: rows,
101-
col_num: cols,
102-
};
103-
104-
let mut buf = Vec::new();
105-
countminsketch
106-
.serialize(&mut Serializer::new(&mut buf))
107-
.ok()?;
108-
Some(buf)
109-
} else {
110-
// Legacy twox-hash backed implementation (unchanged).
111-
let mut countminsketch = CountMinSketch::new();
112-
113-
// Iterate through the keys and values and update the sketch for each entry
114-
for (i, &key) in keys.iter().enumerate() {
115-
countminsketch.update(key, values[i]);
116-
}
117-
118-
let mut buf = Vec::new();
119-
countminsketch
120-
.serialize(&mut Serializer::new(&mut buf))
121-
.ok()?;
122-
Some(buf)
12355
}
56+
57+
let countminsketch = CountMinSketch {
58+
sketch,
59+
row_num: rows,
60+
col_num: cols,
61+
};
62+
63+
let mut buf = Vec::new();
64+
countminsketch
65+
.serialize(&mut Serializer::new(&mut buf))
66+
.ok()?;
67+
Some(buf)
12468
}

0 commit comments

Comments
 (0)