Skip to content

Commit ce30905

Browse files
GnaneshGnanikavyabhat02
authored andcommitted
Sketchlib KLL Integration (#257)
* Integrate sketchlib-rust for KLL quantile sketches * Restore per-backend default constants, global default Sketchlib * Use per-backend defaults in fidelity, configurable impl_mode in UDF templates * UDFs: use same impl mode as QueryEngine (sketch_cms_impl, etc.) * Simplify UDF impl mode, default all to sketchlib * Fix black formatting in arroyo.py
1 parent 8028dba commit ce30905

11 files changed

Lines changed: 504 additions & 135 deletions

File tree

asap-common/sketch-core/report.md

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
# Sketchlib Fidelity Report
1+
# Report
22

3-
Compares the **legacy** Count-Min Sketch implementation in `sketch-core` vs the new **sketchlib-rust** backend.
3+
Compares the **legacy** sketch implementations in `sketch-core` vs the **sketchlib-rust** backends (Count-Min Sketch, Count-Min-With-Heap, KLL, HydraKLL).
44

55
## Fidelity harness
66

7-
The fidelity binary selects backends via CLI flags.
7+
The fidelity binary selects backends via CLI flags (`--cms-impl`, `--kll-impl`, `--cmwh-impl`).
88

9-
| Goal | Command |
10-
|-------------|---------------------------------------------------------------|
11-
| CMS sketchlib | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib` |
12-
| CMS legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy` |
9+
| Goal | Command |
10+
|--------------------------|--------------------------------------------------------------------------------------------------------------|
11+
| Default (all sketchlib) | `cargo run -p sketch-core --bin sketchlib_fidelity` |
12+
| All legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy --kll-impl legacy --cmwh-impl legacy` |
13+
| Legacy KLL only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib --kll-impl legacy --cmwh-impl sketchlib` |
14+
| CMS sketchlib only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib` |
15+
| CMS legacy only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy` |
1316

1417
## Unit tests
1518

@@ -68,3 +71,55 @@ The heap is maintained by local updates; recall is measured against the **true**
6871
| 2048 | 200000 | 2000 | 20 | sketchlib-rust | 1.00 | 0.9982 | 0.021 | 0.067 |
6972
| 2048 | 200000 | 2000 | 50 | Legacy | 0.40 | 0.9999983 | 5.60 | 16.49 |
7073
| 2048 | 200000 | 2000 | 50 | sketchlib-rust | 0.48 | 0.9999990 | 3.90 | 12.95 |
74+
75+
---
76+
77+
### KllSketch (quantiles, absolute rank error)
78+
79+
For each quantile \(q\), we compute the sketch estimate `est_value`, then:
80+
`abs_rank_error = |rank_fraction(exact_sorted_values, est_value) - q|`.
81+
82+
#### k=20
83+
84+
| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 |
85+
|-----------|----------------|---------|---------|---------|
86+
| 200000 | Legacy | 0.0104 | 0.0145 | 0.0028 |
87+
| 200000 | sketchlib-rust | 0.0275 | 0.0470 | 0.0061 |
88+
| 50000 | Legacy | 0.0131 | 0.0091 | 0.0054 |
89+
| 50000 | sketchlib-rust | 0.0110 | 0.0116 | 0.0031 |
90+
91+
#### k=50
92+
93+
| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 |
94+
|-----------|----------------|---------|---------|---------|
95+
| 200000 | Legacy | 0.0013 | 0.0021 | 0.0012 |
96+
| 200000 | sketchlib-rust | 0.0101 | 0.0044 | 0.0074 |
97+
98+
#### k=200
99+
100+
| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 |
101+
|-----------|----------------|---------|---------|---------|
102+
| 200000 | Legacy | 0.0021 | 0.0036 | 0.0000 |
103+
| 200000 | sketchlib-rust | 0.0015 | 0.0001 | 0.0002 |
104+
105+
---
106+
107+
### HydraKllSketch (per-key quantiles, mean/max absolute rank error across 50 keys)
108+
109+
#### rows=2, cols=64
110+
111+
| k | n | domain | Mode | q=0.5 (mean / max) | q=0.9 (mean / max) |
112+
|-----|--------|--------|----------------|--------------------|--------------------|
113+
| 20 | 200000 | 200 | Legacy | 0.0170 / 0.0546 | 0.0165 / 0.0452 |
114+
| 20 | 200000 | 200 | sketchlib-rust | 0.0254 / 0.0629 | 0.0546 / 0.0942 |
115+
116+
#### rows=3, cols=128
117+
118+
| k | n | domain | Mode | q=0.5 (mean / max) | q=0.9 (mean / max) |
119+
|-----|--------|--------|----------------|--------------------|--------------------|
120+
| 20 | 200000 | 200 | Legacy | 0.0166 / 0.0591 | 0.0114 / 0.0304 |
121+
| 20 | 200000 | 200 | sketchlib-rust | 0.0216 / 0.0534 | 0.0238 / 0.1087 |
122+
| 50 | 200000 | 200 | Legacy | 0.0099 / 0.0352 | 0.0087 / 0.0330 |
123+
| 50 | 200000 | 200 | sketchlib-rust | 0.0119 / 0.0458 | 0.0119 / 0.0296 |
124+
| 20 | 100000 | 100 | Legacy | 0.0141 / 0.0574 | 0.0149 / 0.0471 |
125+
| 20 | 100000 | 100 | sketchlib-rust | 0.0202 / 0.0621 | 0.0287 / 0.0779 |

asap-common/sketch-core/src/bin/sketchlib_fidelity.rs

Lines changed: 190 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use clap::Parser;
77
use sketch_core::config::{self, ImplMode};
88
use sketch_core::count_min::CountMinSketch;
99
use sketch_core::count_min_with_heap::CountMinSketchWithHeap;
10+
use sketch_core::hydra_kll::HydraKllSketch;
11+
use sketch_core::kll::KllSketch;
1012

1113
#[derive(Clone)]
1214
struct Lcg64 {
@@ -93,6 +95,16 @@ fn rmse_percentage(exact: &[f64], est: &[f64]) -> f64 {
9395
(sum_sq / denom).sqrt() * 100.0
9496
}
9597

98+
#[derive(Parser)]
99+
struct Args {
100+
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMS_IMPL)]
101+
cms_impl: ImplMode,
102+
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_KLL_IMPL)]
103+
kll_impl: ImplMode,
104+
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMWH_IMPL)]
105+
cmwh_impl: ImplMode,
106+
}
107+
96108
fn rank_fraction(sorted: &[f64], x: f64) -> f64 {
97109
if sorted.is_empty() {
98110
return 0.0;
@@ -210,14 +222,110 @@ fn run_countmin_with_heap_once(seed: u64, p: &CmwhParams) -> CmwhResult {
210222
}
211223
}
212224

213-
#[derive(Parser)]
214-
struct Args {
215-
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMS_IMPL)]
216-
cms_impl: ImplMode,
217-
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_KLL_IMPL)]
218-
kll_impl: ImplMode,
219-
#[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMWH_IMPL)]
220-
cmwh_impl: ImplMode,
225+
// --- KllSketch ---
226+
227+
struct KllParams {
228+
k: u16,
229+
n: usize,
230+
}
231+
232+
struct KllResult {
233+
rank_err_50: f64,
234+
rank_err_90: f64,
235+
rank_err_99: f64,
236+
}
237+
238+
fn run_kll_once(seed: u64, p: &KllParams) -> KllResult {
239+
let mut rng = Lcg64::new(seed ^ 0x1234_5678);
240+
let mut values: Vec<f64> = Vec::with_capacity(p.n);
241+
let mut sk = KllSketch::new(p.k);
242+
243+
for _ in 0..p.n {
244+
let v = rng.next_f64_0_1() * 1_000_000.0;
245+
values.push(v);
246+
sk.update(v);
247+
}
248+
249+
values.sort_by(f64::total_cmp);
250+
let qs = [0.5, 0.9, 0.99];
251+
let rank_err = |q: f64| (rank_fraction(&values, sk.get_quantile(q)) - q).abs();
252+
253+
KllResult {
254+
rank_err_50: rank_err(qs[0]),
255+
rank_err_90: rank_err(qs[1]),
256+
rank_err_99: rank_err(qs[2]),
257+
}
258+
}
259+
260+
// --- HydraKllSketch ---
261+
262+
struct HydraKllParams {
263+
rows: usize,
264+
cols: usize,
265+
k: u16,
266+
n: usize,
267+
domain: usize,
268+
eval_keys: usize,
269+
}
270+
271+
struct HydraKllResult {
272+
mean_50: f64,
273+
max_50: f64,
274+
mean_90: f64,
275+
max_90: f64,
276+
}
277+
278+
fn run_hydra_kll_once(seed: u64, p: &HydraKllParams) -> HydraKllResult {
279+
let mut rng = Lcg64::new(seed ^ 0xDEAD_BEEF);
280+
let mut hydra = HydraKllSketch::new(p.rows, p.cols, p.k);
281+
let mut exact: HashMap<String, Vec<f64>> = HashMap::new();
282+
283+
for _ in 0..p.n {
284+
let r = rng.next_u64();
285+
let key_id = if (r & 0xFF) < 200 {
286+
(r as usize) % 20
287+
} else {
288+
(r as usize) % p.domain
289+
};
290+
let key = format!("k{key_id}");
291+
let v = rng.next_f64_0_1() * 1_000_000.0;
292+
hydra.update(&key, v);
293+
exact.entry(key).or_default().push(v);
294+
}
295+
296+
let mut keys: Vec<String> = exact.keys().cloned().collect();
297+
keys.sort();
298+
keys.truncate(p.eval_keys);
299+
300+
let mut mean_50 = 0.0f64;
301+
let mut max_50 = 0.0f64;
302+
let mut mean_90 = 0.0f64;
303+
let mut max_90 = 0.0f64;
304+
let nk = keys.len() as f64;
305+
for key in &keys {
306+
let mut vals = exact.get(key).cloned().unwrap_or_default();
307+
vals.sort_by(f64::total_cmp);
308+
for (q, mean_ref, max_ref) in [
309+
(0.5, &mut mean_50, &mut max_50),
310+
(0.9, &mut mean_90, &mut max_90),
311+
] {
312+
let est = hydra.query(key, q);
313+
let err = (rank_fraction(&vals, est) - q).abs();
314+
*mean_ref += err;
315+
if err > *max_ref {
316+
*max_ref = err;
317+
}
318+
}
319+
}
320+
mean_50 /= nk;
321+
mean_90 /= nk;
322+
323+
HydraKllResult {
324+
mean_50,
325+
max_50,
326+
mean_90,
327+
max_90,
328+
}
221329
}
222330

223331
fn main() {
@@ -236,6 +344,11 @@ fn main() {
236344
} else {
237345
"sketchlib-rust"
238346
};
347+
let kll_mode = if matches!(args.kll_impl, ImplMode::Legacy) {
348+
"Legacy"
349+
} else {
350+
"sketchlib-rust"
351+
};
239352

240353
// CountMinSketch: multiple (depth, width, n, domain)
241354
let cms_param_sets: Vec<CmsParams> = vec![
@@ -311,4 +424,73 @@ fn main() {
311424
p.depth, p.width, p.n, p.domain, p.heap_size, r.topk_recall, r.pearson, r.mape, r.rmse
312425
);
313426
}
427+
// KllSketch
428+
let kll_param_sets: Vec<KllParams> = vec![
429+
KllParams { k: 20, n: 200_000 },
430+
KllParams { k: 50, n: 200_000 },
431+
KllParams { k: 200, n: 200_000 },
432+
KllParams { k: 20, n: 50_000 },
433+
];
434+
435+
println!("\n## KllSketch ({kll_mode})");
436+
println!(
437+
"| k | n_updates | q=0.5 abs_rank_error | q=0.9 abs_rank_error | q=0.99 abs_rank_error |"
438+
);
439+
println!(
440+
"|---|-----------|----------------------|----------------------|-----------------------|"
441+
);
442+
for p in &kll_param_sets {
443+
let r = run_kll_once(seed, p);
444+
println!(
445+
"| {} | {} | {:.6} | {:.6} | {:.6} |",
446+
p.k, p.n, r.rank_err_50, r.rank_err_90, r.rank_err_99
447+
);
448+
}
449+
450+
// HydraKllSketch
451+
let hydra_param_sets: Vec<HydraKllParams> = vec![
452+
HydraKllParams {
453+
rows: 2,
454+
cols: 64,
455+
k: 20,
456+
n: 200_000,
457+
domain: 200,
458+
eval_keys: 50,
459+
},
460+
HydraKllParams {
461+
rows: 3,
462+
cols: 128,
463+
k: 20,
464+
n: 200_000,
465+
domain: 200,
466+
eval_keys: 50,
467+
},
468+
HydraKllParams {
469+
rows: 3,
470+
cols: 128,
471+
k: 50,
472+
n: 200_000,
473+
domain: 200,
474+
eval_keys: 50,
475+
},
476+
HydraKllParams {
477+
rows: 3,
478+
cols: 128,
479+
k: 20,
480+
n: 100_000,
481+
domain: 100,
482+
eval_keys: 50,
483+
},
484+
];
485+
486+
println!("\n## HydraKllSketch ({kll_mode})");
487+
println!("| rows | cols | k | n | domain | q=0.5 mean/max | q=0.9 mean/max |");
488+
println!("|------|------|---|-----|--------|----------------|----------------|");
489+
for p in &hydra_param_sets {
490+
let r = run_hydra_kll_once(seed, p);
491+
println!(
492+
"| {} | {} | {} | {} | {} | {:.5} / {:.5} | {:.5} / {:.5} |",
493+
p.rows, p.cols, p.k, p.n, p.domain, r.mean_50, r.max_50, r.mean_90, r.max_90
494+
);
495+
}
314496
}

asap-common/sketch-core/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ pub enum ImplMode {
1010
}
1111

1212
/// Global default when impl mode is not explicitly configured (e.g. env var parsing).
13-
pub const DEFAULT_IMPL_MODE: ImplMode = ImplMode::Legacy;
13+
pub const DEFAULT_IMPL_MODE: ImplMode = ImplMode::Sketchlib;
1414

1515
/// Per-backend defaults. Used when configure() has not been called.
1616
pub const DEFAULT_CMS_IMPL: ImplMode = ImplMode::Sketchlib;
17-
pub const DEFAULT_KLL_IMPL: ImplMode = ImplMode::Legacy;
17+
pub const DEFAULT_KLL_IMPL: ImplMode = ImplMode::Sketchlib;
1818
pub const DEFAULT_CMWH_IMPL: ImplMode = ImplMode::Sketchlib;
1919

2020
static COUNTMIN_MODE: OnceLock<ImplMode> = OnceLock::new();

0 commit comments

Comments
 (0)