diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs index e20818bf968a..8df76c4b15f0 100644 --- a/arrow-avro/src/reader/block.rs +++ b/arrow-avro/src/reader/block.rs @@ -96,7 +96,13 @@ impl BlockDecoder { AvroError::ParseError(format!("Block size cannot be negative, got {c}")) })?; - self.in_progress.data.reserve(self.bytes_remaining); + // Only reserve what the current input backs: the block size is + // input specified so could be an extreme value (e.g. i64::MAX) + // in case of corrupted/malicious input. The rest is reserved + // lazily by `extend_from_slice` below as data arrives. + self.in_progress + .data + .reserve(self.bytes_remaining.min(buf.len())); self.state = BlockDecoderState::Data; } } @@ -148,3 +154,72 @@ impl BlockDecoder { self.bytes_remaining } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Zig-zag encode `value` as an Avro `long` (variable-length integer). + fn encode_long(value: i64, out: &mut Vec) { + let mut n = ((value << 1) ^ (value >> 63)) as u64; + while n >= 0x80 { + out.push((n as u8) | 0x80); + n >>= 7; + } + out.push(n as u8); + } + + #[test] + fn test_oversized_block_size_bounds_reserve() { + // A block advertising `i64::MAX` bytes must not reserve that up front when only + // a few payload bytes are present, or a crafted OCF aborts on a huge alloc (#10234). + let mut buf = Vec::new(); + encode_long(1, &mut buf); // object count + encode_long(i64::MAX, &mut buf); // attacker-controlled block size + buf.extend_from_slice(&[0u8; 8]); // a handful of real bytes + + let mut decoder = BlockDecoder::default(); + let read = decoder.decode(&buf).unwrap(); + + assert_eq!(read, buf.len(), "all available input should be consumed"); + assert!( + decoder.in_progress.data.capacity() <= buf.len(), + "capacity {} must stay bounded by available input {}, not the advertised i64::MAX", + decoder.in_progress.data.capacity(), + buf.len(), + ); + } + + #[test] + fn test_negative_block_size_errors() { + let mut buf = Vec::new(); + encode_long(1, &mut buf); // object count + encode_long(-1, &mut buf); // invalid (negative) block size + + let mut decoder = BlockDecoder::default(); + let err = decoder.decode(&buf).unwrap_err(); + assert!( + err.to_string().contains("Block size cannot be negative"), + "unexpected error: {err}", + ); + } + + #[test] + fn test_well_formed_block_round_trips() { + // The capped reserve must not change decoding of a normal block. + let payload = [1u8, 2, 3, 4]; + let sync = [7u8; 16]; + let mut buf = Vec::new(); + encode_long(2, &mut buf); // object count + encode_long(payload.len() as i64, &mut buf); // block size + buf.extend_from_slice(&payload); + buf.extend_from_slice(&sync); + + let mut decoder = BlockDecoder::default(); + assert_eq!(decoder.decode(&buf).unwrap(), buf.len()); + let block = decoder.flush().expect("a complete block"); + assert_eq!(block.count, 2); + assert_eq!(block.data, payload); + assert_eq!(block.sync, sync); + } +} diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 306c77718234..2a29ba127e4d 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -2286,35 +2286,58 @@ fn process_blockwise( match block_count.cmp(&0) { Ordering::Equal => break, Ordering::Less => { - let count = (-block_count) as usize; + // `unsigned_abs` avoids overflowing `-block_count` for `i64::MIN` (#10235) + let count = block_count.unsigned_abs() as usize; // A negative count is followed by a long of the size in bytes - let size_in_bytes = buf.get_long()? as usize; + let raw_size = buf.get_long()?; + let size_in_bytes = usize::try_from(raw_size).map_err(|_| { + AvroError::ParseError(format!("Block size cannot be negative, got {raw_size}")) + })?; match negative_behavior { NegativeBlockBehavior::ProcessItems => { // Process items one-by-one after reading size - for _ in 0..count { - on_item(buf)?; - } + total = process_block_items(buf, count, total, &mut on_item)?; } NegativeBlockBehavior::SkipBySize => { // Skip the entire block payload at once let _ = buf.get_fixed(size_in_bytes)?; + total = total.saturating_add(count); } } - total += count; } Ordering::Greater => { let count = block_count as usize; - for _ in 0..count { - on_item(buf)?; - } - total += count; + total = process_block_items(buf, count, total, &mut on_item)?; } } } Ok(total) } +/// Decode `count` items, capping the running total at `i32::MAX` (the largest index +/// an Arrow list/map offset holds). Otherwise a crafted `i64::MAX` count of a zero-byte +/// item like `null` spins the loop forever (#10235); byte-consuming items self-terminate +/// on cursor exhaustion, so valid blocks (including `array`) are unaffected. +#[inline] +fn process_block_items( + buf: &mut AvroCursor, + count: usize, + total: usize, + on_item: &mut impl FnMut(&mut AvroCursor) -> Result<(), AvroError>, +) -> Result { + let new_total = total.checked_add(count).filter(|&t| t <= i32::MAX as usize); + let Some(new_total) = new_total else { + return Err(AvroError::ParseError(format!( + "Array/map item count {count} exceeds the maximum {} addressable by 32-bit offsets", + i32::MAX + ))); + }; + for _ in 0..count { + on_item(buf)?; + } + Ok(new_total) +} + #[inline] fn flush_values(values: &mut Vec) -> Vec { std::mem::replace(values, Vec::with_capacity(DEFAULT_CAPACITY)) @@ -3434,6 +3457,62 @@ mod tests { assert_eq!(values.value(2), 3); } + /// Zig-zag + unsigned-LEB128 encode, correct for all `i64` including `MIN`/`MAX` + /// (`encode_avro_long` loops forever on those two values). + fn encode_avro_long_extreme(value: i64) -> Vec { + let mut n = ((value << 1) ^ (value >> 63)) as u64; + let mut out = Vec::new(); + while n >= 0x80 { + out.push((n as u8) | 0x80); + n >>= 7; + } + out.push(n as u8); + out + } + + // `array` is the worst case: items consume no bytes, so an unbounded + // `block_count` spins the item loop without ever advancing the cursor (#10235). + fn array_of_null_decoder() -> Decoder { + let list_dt = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Null)))); + Decoder::try_new(&list_dt).unwrap() + } + + #[test] + fn test_array_of_null_decodes() { + // A valid `array` still decodes: zero-byte items make the count exceed the + // bytes left, so the bound must not reject it (#10235 review). + let mut decoder = array_of_null_decoder(); + let mut data = encode_avro_long(3); // three null items + data.extend_from_slice(&encode_avro_long(0)); // empty-block terminator + decoder.decode(&mut AvroCursor::new(&data)).unwrap(); + } + + #[test] + fn test_array_block_count_i64_max_errors() { + // A positive `i64::MAX` block count must error rather than spin the item loop. + let mut decoder = array_of_null_decoder(); + let mut data = encode_avro_long_extreme(i64::MAX); // item count + data.extend_from_slice(&encode_avro_long(0)); // empty-block terminator + let err = decoder.decode(&mut AvroCursor::new(&data)).unwrap_err(); + assert!( + err.to_string().contains("exceeds the maximum"), + "unexpected error: {err}", + ); + } + + #[test] + fn test_array_block_count_i64_min_errors() { + // `i64::MIN` previously overflowed `-block_count` before spinning the loop. + let mut decoder = array_of_null_decoder(); + let mut data = encode_avro_long_extreme(i64::MIN); // negative item count + data.extend_from_slice(&encode_avro_long(0)); // block size in bytes + let err = decoder.decode(&mut AvroCursor::new(&data)).unwrap_err(); + assert!( + err.to_string().contains("exceeds the maximum"), + "unexpected error: {err}", + ); + } + #[test] fn test_nested_array_decoding() { let inner_ty = avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));