Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion arrow-avro/src/reader/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ impl BlockDecoder {
AvroError::ParseError(format!("Block size cannot be negative, got {c}"))
})?;

self.in_progress.data.reserve(self.bytes_remaining);
// Only reserve what the current input backs: the block size is
// input specified so could be an extreme value (e.g. i64::MAX)
// in case of corrupted/malicious input. The rest is reserved
// lazily by `extend_from_slice` below as data arrives.
Comment thread
miniex marked this conversation as resolved.
self.in_progress
.data
.reserve(self.bytes_remaining.min(buf.len()));
self.state = BlockDecoderState::Data;
}
}
Expand Down Expand Up @@ -148,3 +154,72 @@ impl BlockDecoder {
self.bytes_remaining
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Zig-zag encode `value` as an Avro `long` (variable-length integer).
fn encode_long(value: i64, out: &mut Vec<u8>) {
let mut n = ((value << 1) ^ (value >> 63)) as u64;
while n >= 0x80 {
out.push((n as u8) | 0x80);
n >>= 7;
}
out.push(n as u8);
}

#[test]
fn test_oversized_block_size_bounds_reserve() {
// A block advertising `i64::MAX` bytes must not reserve that up front when only
// a few payload bytes are present, or a crafted OCF aborts on a huge alloc (#10234).
let mut buf = Vec::new();
encode_long(1, &mut buf); // object count
encode_long(i64::MAX, &mut buf); // attacker-controlled block size
buf.extend_from_slice(&[0u8; 8]); // a handful of real bytes

let mut decoder = BlockDecoder::default();
let read = decoder.decode(&buf).unwrap();

assert_eq!(read, buf.len(), "all available input should be consumed");
assert!(
decoder.in_progress.data.capacity() <= buf.len(),
"capacity {} must stay bounded by available input {}, not the advertised i64::MAX",
decoder.in_progress.data.capacity(),
buf.len(),
);
}

#[test]
fn test_negative_block_size_errors() {
let mut buf = Vec::new();
encode_long(1, &mut buf); // object count
encode_long(-1, &mut buf); // invalid (negative) block size

let mut decoder = BlockDecoder::default();
let err = decoder.decode(&buf).unwrap_err();
assert!(
err.to_string().contains("Block size cannot be negative"),
"unexpected error: {err}",
);
}

#[test]
fn test_well_formed_block_round_trips() {
// The capped reserve must not change decoding of a normal block.
let payload = [1u8, 2, 3, 4];
let sync = [7u8; 16];
let mut buf = Vec::new();
encode_long(2, &mut buf); // object count
encode_long(payload.len() as i64, &mut buf); // block size
buf.extend_from_slice(&payload);
buf.extend_from_slice(&sync);

let mut decoder = BlockDecoder::default();
assert_eq!(decoder.decode(&buf).unwrap(), buf.len());
let block = decoder.flush().expect("a complete block");
assert_eq!(block.count, 2);
assert_eq!(block.data, payload);
assert_eq!(block.sync, sync);
}
}
99 changes: 89 additions & 10 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2286,35 +2286,58 @@ fn process_blockwise(
match block_count.cmp(&0) {
Ordering::Equal => break,
Ordering::Less => {
let count = (-block_count) as usize;
// `unsigned_abs` avoids overflowing `-block_count` for `i64::MIN` (#10235)
let count = block_count.unsigned_abs() as usize;
// A negative count is followed by a long of the size in bytes
let size_in_bytes = buf.get_long()? as usize;
let raw_size = buf.get_long()?;
let size_in_bytes = usize::try_from(raw_size).map_err(|_| {
AvroError::ParseError(format!("Block size cannot be negative, got {raw_size}"))
})?;
match negative_behavior {
NegativeBlockBehavior::ProcessItems => {
// Process items one-by-one after reading size
for _ in 0..count {
on_item(buf)?;
}
total = process_block_items(buf, count, total, &mut on_item)?;
}
NegativeBlockBehavior::SkipBySize => {
// Skip the entire block payload at once
let _ = buf.get_fixed(size_in_bytes)?;
total = total.saturating_add(count);
}
}
total += count;
}
Ordering::Greater => {
let count = block_count as usize;
for _ in 0..count {
on_item(buf)?;
}
total += count;
total = process_block_items(buf, count, total, &mut on_item)?;
}
}
}
Ok(total)
}

/// Decode `count` items, capping the running total at `i32::MAX` (the largest index
/// an Arrow list/map offset holds). Otherwise a crafted `i64::MAX` count of a zero-byte
/// item like `null` spins the loop forever (#10235); byte-consuming items self-terminate
/// on cursor exhaustion, so valid blocks (including `array<null>`) are unaffected.
#[inline]
fn process_block_items(
buf: &mut AvroCursor,
count: usize,
total: usize,
on_item: &mut impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
) -> Result<usize, AvroError> {
let new_total = total.checked_add(count).filter(|&t| t <= i32::MAX as usize);
Comment on lines +2317 to +2328

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure about hard limiting to i32::MAX either considering we have LargeLists which have i64 offsets; i'll refer to my previous question, in wondering how other avro implementations deal with this issue?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the decode paths of the other implementations. None of them bound the count by remaining bytes - the negative-block byte size is read and discarded everywhere - so that original check was wrong (a valid array<null> legitimately has more items than bytes). The defenses that do exist are absolute caps on the item count: Java caps at Integer.MAX_VALUE - 8 (SystemLimitException, 1.11.3) and goavro at MaxBlockCount = math.MaxInt32. So i32::MAX here matches goavro exactly.

On LargeList: fair point it is coupled to the offset type, but the reader only materializes into i32-offset List/Map (no LargeList), so i32::MAX is the exact point past which the offsets cannot represent the result anyway. And even with i64 offsets you could not allow an i64::MAX zero-byte count (still an unbounded spin), so an absolute ceiling is needed regardless of width - Java caps at ~Integer.MAX_VALUE independent of the container.

If you would rather decouple it, I can pull it into a named MAX_BLOCK_ITEM_COUNT constant and raise it if Large* variants are ever supported. Happy to go either way.

let Some(new_total) = new_total else {
return Err(AvroError::ParseError(format!(
"Array/map item count {count} exceeds the maximum {} addressable by 32-bit offsets",
i32::MAX
)));
};
for _ in 0..count {
on_item(buf)?;
}
Ok(new_total)
}

#[inline]
fn flush_values<T>(values: &mut Vec<T>) -> Vec<T> {
std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY))
Expand Down Expand Up @@ -3434,6 +3457,62 @@ mod tests {
assert_eq!(values.value(2), 3);
}

/// Zig-zag + unsigned-LEB128 encode, correct for all `i64` including `MIN`/`MAX`
/// (`encode_avro_long` loops forever on those two values).
fn encode_avro_long_extreme(value: i64) -> Vec<u8> {
let mut n = ((value << 1) ^ (value >> 63)) as u64;
let mut out = Vec::new();
while n >= 0x80 {
out.push((n as u8) | 0x80);
n >>= 7;
}
out.push(n as u8);
out
}

// `array<null>` is the worst case: items consume no bytes, so an unbounded
// `block_count` spins the item loop without ever advancing the cursor (#10235).
fn array_of_null_decoder() -> Decoder {
let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Null))));
Decoder::try_new(&list_dt).unwrap()
}

#[test]
fn test_array_of_null_decodes() {
// A valid `array<null>` still decodes: zero-byte items make the count exceed the
// bytes left, so the bound must not reject it (#10235 review).
let mut decoder = array_of_null_decoder();
let mut data = encode_avro_long(3); // three null items
data.extend_from_slice(&encode_avro_long(0)); // empty-block terminator
decoder.decode(&mut AvroCursor::new(&data)).unwrap();
}

#[test]
fn test_array_block_count_i64_max_errors() {
// A positive `i64::MAX` block count must error rather than spin the item loop.
let mut decoder = array_of_null_decoder();
let mut data = encode_avro_long_extreme(i64::MAX); // item count
data.extend_from_slice(&encode_avro_long(0)); // empty-block terminator
let err = decoder.decode(&mut AvroCursor::new(&data)).unwrap_err();
assert!(
err.to_string().contains("exceeds the maximum"),
"unexpected error: {err}",
);
}

#[test]
fn test_array_block_count_i64_min_errors() {
// `i64::MIN` previously overflowed `-block_count` before spinning the loop.
let mut decoder = array_of_null_decoder();
let mut data = encode_avro_long_extreme(i64::MIN); // negative item count
data.extend_from_slice(&encode_avro_long(0)); // block size in bytes
let err = decoder.decode(&mut AvroCursor::new(&data)).unwrap_err();
assert!(
err.to_string().contains("exceeds the maximum"),
"unexpected error: {err}",
);
}

#[test]
fn test_nested_array_decoding() {
let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
Expand Down