Skip to content

Commit f7f1afc

Browse files
authored
fix(join): fix crash caused by assume UB in hash join hashtable (#19741)
* fix(join): fix crash caused by assume UB in hash join hashtable * z
1 parent 8933e96 commit f7f1afc

3 files changed

Lines changed: 189 additions & 16 deletions

File tree

src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/fixed_keys.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,20 @@ impl<Key: FixedKey + HashtableKeyable, const MATCHED: bool, const MATCH_FIRST: b
208208
{
209209
fn advance(&mut self, res: &mut ProbedRows, max_rows: usize) -> Result<()> {
210210
while self.key_idx < self.keys.len() {
211+
if res.matched_probe.len() == max_rows {
212+
break;
213+
}
214+
215+
if !MATCHED && res.unmatched.len() >= max_rows {
216+
break;
217+
}
218+
219+
assume(res.unmatched.len() < res.unmatched.capacity());
211220
assume(res.matched_probe.len() == res.matched_build.len());
212221
assume(res.matched_build.len() < res.matched_build.capacity());
213222
assume(res.matched_probe.len() < res.matched_probe.capacity());
214223
assume(self.key_idx < self.pointers.len());
215224

216-
if res.matched_probe.len() == max_rows {
217-
break;
218-
}
219-
220225
if self.probe_entry_ptr == 0 {
221226
self.probe_entry_ptr = self.pointers[self.key_idx];
222227

@@ -328,16 +333,20 @@ impl<'a, Key: FixedKey + HashtableKeyable, const MATCHED: bool, const MATCH_FIRS
328333
while self.idx < self.selections.len() {
329334
let key_idx = self.selections[self.idx] as usize;
330335

336+
if res.matched_probe.len() == max_rows {
337+
break;
338+
}
339+
340+
if !MATCHED && res.unmatched.len() >= max_rows {
341+
break;
342+
}
343+
331344
assume(res.unmatched.len() < res.unmatched.capacity());
332345
assume(res.matched_probe.len() == res.matched_build.len());
333346
assume(res.matched_build.len() < res.matched_build.capacity());
334347
assume(res.matched_probe.len() < res.matched_probe.capacity());
335348
assume(key_idx < self.pointers.len());
336349

337-
if res.matched_probe.len() == max_rows {
338-
break;
339-
}
340-
341350
if self.probe_entry_ptr == 0 {
342351
self.probe_entry_ptr = self.pointers[key_idx];
343352

src/query/service/src/pipelines/processors/transforms/new_hash_join/hashtable/serialize_keys.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,16 +222,20 @@ impl<const MATCHED: bool, const MATCH_FIRST: bool> ProbeStream
222222
{
223223
fn advance(&mut self, res: &mut ProbedRows, max_rows: usize) -> Result<()> {
224224
while self.key_idx < self.keys.len() {
225+
if res.matched_probe.len() == max_rows {
226+
break;
227+
}
228+
229+
if !MATCHED && res.unmatched.len() >= max_rows {
230+
break;
231+
}
232+
225233
assume(res.unmatched.len() < res.unmatched.capacity());
226234
assume(res.matched_probe.len() == res.matched_build.len());
227235
assume(res.matched_build.len() < res.matched_build.capacity());
228236
assume(res.matched_probe.len() < res.matched_probe.capacity());
229237
assume(self.key_idx < self.pointers.len());
230238

231-
if res.matched_probe.len() == max_rows {
232-
break;
233-
}
234-
235239
if self.probe_entry_ptr == 0 {
236240
self.probe_entry_ptr = self.pointers[self.key_idx];
237241

@@ -350,16 +354,20 @@ impl<'a, const MATCHED: bool, const MATCH_FIRST: bool> ProbeStream
350354
while self.idx < self.selections.len() {
351355
let key_idx = self.selections[self.idx] as usize;
352356

357+
if res.matched_probe.len() == max_rows {
358+
break;
359+
}
360+
361+
if !MATCHED && res.unmatched.len() >= max_rows {
362+
break;
363+
}
364+
353365
assume(res.unmatched.len() < res.unmatched.capacity());
354366
assume(res.matched_probe.len() == res.matched_build.len());
355367
assume(res.matched_build.len() < res.matched_build.capacity());
356368
assume(res.matched_probe.len() < res.matched_probe.capacity());
357369
assume(key_idx < self.pointers.len());
358370

359-
if res.matched_probe.len() == max_rows {
360-
break;
361-
}
362-
363371
if self.probe_entry_ptr == 0 {
364372
self.probe_entry_ptr = self.pointers[key_idx];
365373

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Regression test for: anti/left/semi/right join with probe block fully unmatched
2+
# When probe block has more rows than max_block_size and all are unmatched,
3+
# the unmatched Vec fills to capacity and the assume() before the break fires -> UB/panic.
4+
#
5+
# Key: insert data with a large block_size so the parquet block exceeds max_block_size,
6+
# then query with a smaller max_block_size. The probe block will have more rows than
7+
# ProbedRows.unmatched.capacity(), triggering the bug.
8+
9+
statement ok
10+
set disable_join_reorder = 1;
11+
12+
# ── String key ──────────────────────────────────────────────────────────────
13+
14+
statement ok
15+
drop table if exists t_build_str;
16+
17+
statement ok
18+
drop table if exists t_probe_str;
19+
20+
statement ok
21+
create table t_build_str (rid string);
22+
23+
statement ok
24+
create table t_probe_str (pks string);
25+
26+
# Insert probe data with large block_size so one parquet block has 200 rows
27+
statement ok
28+
set max_block_size = 10000;
29+
30+
statement ok
31+
insert into t_probe_str select number::string from numbers(200);
32+
33+
# Insert build data with hash collisions but no key overlap
34+
statement ok
35+
insert into t_build_str select (number + 1000000)::string from numbers(100000);
36+
37+
# Now query with small max_block_size=100: probe block has 200 rows > capacity=100
38+
statement ok
39+
set max_block_size = 100;
40+
41+
# LEFT ANTI JOIN (NOT EXISTS)
42+
query I
43+
select count(*) from t_probe_str as sou
44+
where not exists (select 1 from t_build_str as tar where tar.rid = sou.pks);
45+
----
46+
200
47+
48+
# LEFT JOIN (unmatched rows get NULL on build side)
49+
query I
50+
select count(*) from t_probe_str as sou
51+
left join t_build_str as tar on tar.rid = sou.pks
52+
where tar.rid is null;
53+
----
54+
200
55+
56+
# LEFT SEMI JOIN (EXISTS) - returns 0 since no overlap
57+
query I
58+
select count(*) from t_probe_str as sou
59+
where exists (select 1 from t_build_str as tar where tar.rid = sou.pks);
60+
----
61+
0
62+
63+
statement ok
64+
drop table t_build_str;
65+
66+
statement ok
67+
drop table t_probe_str;
68+
69+
# ── Integer key ─────────────────────────────────────────────────────────────
70+
71+
statement ok
72+
drop table if exists t_build_int;
73+
74+
statement ok
75+
drop table if exists t_probe_int;
76+
77+
statement ok
78+
create table t_build_int (rid int);
79+
80+
statement ok
81+
create table t_probe_int (pks int);
82+
83+
statement ok
84+
set max_block_size = 10000;
85+
86+
statement ok
87+
insert into t_probe_int select number from numbers(200);
88+
89+
statement ok
90+
insert into t_build_int select number + 1000000 from numbers(100000);
91+
92+
statement ok
93+
set max_block_size = 100;
94+
95+
query I
96+
select count(*) from t_probe_int as sou
97+
where not exists (select 1 from t_build_int as tar where tar.rid = sou.pks);
98+
----
99+
200
100+
101+
query I
102+
select count(*) from t_probe_int as sou
103+
left join t_build_int as tar on tar.rid = sou.pks
104+
where tar.rid is null;
105+
----
106+
200
107+
108+
statement ok
109+
drop table t_build_int;
110+
111+
statement ok
112+
drop table t_probe_int;
113+
114+
# ── Bigint key ──────────────────────────────────────────────────────────────
115+
116+
statement ok
117+
drop table if exists t_build_bigint;
118+
119+
statement ok
120+
drop table if exists t_probe_bigint;
121+
122+
statement ok
123+
create table t_build_bigint (rid bigint);
124+
125+
statement ok
126+
create table t_probe_bigint (pks bigint);
127+
128+
statement ok
129+
set max_block_size = 10000;
130+
131+
statement ok
132+
insert into t_probe_bigint select number from numbers(200);
133+
134+
statement ok
135+
insert into t_build_bigint select number + 1000000 from numbers(100000);
136+
137+
statement ok
138+
set max_block_size = 100;
139+
140+
query I
141+
select count(*) from t_probe_bigint as sou
142+
where not exists (select 1 from t_build_bigint as tar where tar.rid = sou.pks);
143+
----
144+
200
145+
146+
statement ok
147+
drop table t_build_bigint;
148+
149+
statement ok
150+
drop table t_probe_bigint;
151+
152+
statement ok
153+
unset disable_join_reorder;
154+
155+
statement ok
156+
unset max_block_size;

0 commit comments

Comments
 (0)