From 1f6beace1ad8003d9adff4686c2e58e2c532b56f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Mon, 4 May 2026 10:51:11 +0200 Subject: [PATCH 1/2] Fix LocalInputFile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary Root cause (parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java) Both read(ByteBuffer) and readFully(ByteBuffer) called: buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); Two independent bugs: 1. Wrong put overload semantics. ByteBuffer.put(byte[] src, int offset, int length) reads src[offset..offset+length]. The code passed buf.position() + buf.arrayOffset() as the source offset, but the source is the freshly-allocated buffer array whose indices have no relationship to buf.position() or buf.arrayOffset(). On heap buffers with position=0 and arrayOffset=0 this happens to work; any other state reads from the wrong offset (or throws ArrayIndexOutOfBoundsException). 2. arrayOffset() is not universally defined. Direct buffers, memory-mapped buffers, and read-only views all throw UnsupportedOperationException from arrayOffset(). Any Parquet consumer passing such a buffer (which is exactly what the steampipe integration test triggers via ParquetFileReader.readFooter) blows up at the first call. 3. Partial-read accounting. read(ByteBuffer) unconditionally copied buf.remaining() bytes regardless of how many read(byte[]) actually returned, corrupting the destination buffer on short reads / EOF. Fix - readFully(ByteBuffer) → buf.put(buffer) (straight array-to-buffer copy, no arrayOffset()). - read(ByteBuffer) → only put read bytes when read > 0, return read (including -1 at EOF). Tests added TestLocalInputOutput gained 8 regression cases in parquet-common/src/test/java/org/apache/parquet/io/: - readFullyIntoHeapByteBuffer — baseline happy path. - readFullyIntoHeapByteBufferWithNonZeroPosition — exercises the old wrong-source-offset bug. - readFullyIntoDirectByteBuffer — direct buffers; arrayOffset() would have thrown. - readFullyIntoReadOnlyByteBuffer — read-only views; asserts ReadOnlyBufferException (correct JDK behaviour after the fix). - readIntoHeapByteBuffer — baseline. - readIntoByteBufferAdvancesPositionByBytesRead — exercises the partial-read accounting bug. - readIntoByteBufferReturnsMinusOneAtEof — EOF signalling. - readIntoDirectByteBuffer — direct buffer read path. --- .../org/apache/parquet/io/LocalInputFile.java | 10 +- .../parquet/io/TestLocalInputOutput.java | 152 ++++++++++++++++++ 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 7174b42d5d..59e2c142de 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -81,16 +81,18 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { @Override public int read(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; - int code = read(buffer); - buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); - return code; + int bytesRead = randomAccessFile.read(buffer); + if (bytesRead > 0) { + buf.put(buffer, 0, bytesRead); + } + return bytesRead; } @Override public void readFully(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; readFully(buffer); - buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + buf.put(buffer); } @Override diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index 84d728f14e..2b55cadb82 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -18,11 +18,14 @@ */ package org.apache.parquet.io; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Path; import java.nio.file.Paths; @@ -89,4 +92,153 @@ private File createTempFile() throws IOException { tmp.delete(); return tmp; } + + @Test + public void readFullyIntoHeapByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {1, 2, 3, 4, 5}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(5); + stream.readFully(buf); + assertEquals(5, buf.position()); + buf.flip(); + byte[] out = new byte[5]; + buf.get(out); + assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, out); + } + } + + @Test + public void readFullyIntoHeapByteBufferWithNonZeroPosition() throws IOException { + // Regression: the buggy implementation passed buf.position() as the src offset to + // ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array. + Path path = writeBytes(new byte[] {10, 20, 30, 40}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(6); + buf.put(new byte[] {99, 99}); // advance position to 2 + stream.readFully(buf); + assertEquals(6, buf.position()); + buf.flip(); + byte[] out = new byte[6]; + buf.get(out); + assertArrayEquals(new byte[] {99, 99, 10, 20, 30, 40}, out); + } + } + + @Test + public void readFullyIntoDirectByteBuffer() throws IOException { + // Regression: the buggy implementation called arrayOffset() which throws + // UnsupportedOperationException on direct buffers. + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocateDirect(3); + stream.readFully(buf); + assertEquals(3, buf.position()); + buf.flip(); + byte[] out = new byte[3]; + buf.get(out); + assertArrayEquals(new byte[] {7, 8, 9}, out); + } + } + + @Test + public void readFullyIntoReadOnlyByteBuffer() throws IOException { + // Read-only views also throw from arrayOffset(). + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer backing = ByteBuffer.allocate(3); + ByteBuffer buf = backing.asReadOnlyBuffer(); + assertThrows(java.nio.ReadOnlyBufferException.class, () -> stream.readFully(buf)); + } + } + + @Test + public void readIntoHeapByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {1, 2, 3, 4}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(4); + int read = stream.read(buf); + assertEquals(4, read); + assertEquals(4, buf.position()); + buf.flip(); + byte[] out = new byte[4]; + buf.get(out); + assertArrayEquals(new byte[] {1, 2, 3, 4}, out); + } + } + + @Test + public void readIntoByteBufferAdvancesPositionByBytesRead() throws IOException { + // Regression: the buggy implementation always advanced by buf.remaining() regardless of how + // many bytes were actually read, leaving the destination buffer inconsistent on partial reads. + Path path = writeBytes(new byte[] {1, 2, 3}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(10); + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(3, buf.position()); + } + } + + @Test + public void readIntoByteBufferReturnsMinusOneAtEof() throws IOException { + Path path = writeBytes(new byte[] {1}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + assertEquals(1, stream.read()); + ByteBuffer buf = ByteBuffer.allocate(4); + int read = stream.read(buf); + assertEquals(-1, read); + assertEquals(0, buf.position()); + } + } + + @Test + public void readIntoDirectByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocateDirect(3); + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(3, buf.position()); + buf.flip(); + byte[] out = new byte[3]; + buf.get(out); + assertArrayEquals(new byte[] {7, 8, 9}, out); + } + } + + @Test + public void readIntoByteBufferWithNonZeroPosition() throws IOException { + // Regression: the buggy implementation passed buf.position() as the src offset to + // ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array. + Path path = writeBytes(new byte[] {10, 20, 30}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(5); + buf.put(new byte[] {99, 99}); // advance position to 2 + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(5, buf.position()); + buf.flip(); + byte[] out = new byte[5]; + buf.get(out); + assertArrayEquals(new byte[] {99, 99, 10, 20, 30}, out); + } + } + + @Test + public void readFullyThrowsEofWhenStreamTooShort() throws IOException { + Path path = writeBytes(new byte[] {1, 2}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(10); + assertThrows(EOFException.class, () -> stream.readFully(buf)); + } + } + + private Path writeBytes(byte[] data) throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(data); + } + return path; + } } From 7420a2b87ebf94eaa086c827e67c1f02c9a9777f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Mon, 4 May 2026 11:14:07 +0200 Subject: [PATCH 2/2] Update TestLocalInputOutput.java --- .../java/org/apache/parquet/io/TestLocalInputOutput.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index 2b55cadb82..6e9217717a 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -109,8 +109,6 @@ public void readFullyIntoHeapByteBuffer() throws IOException { @Test public void readFullyIntoHeapByteBufferWithNonZeroPosition() throws IOException { - // Regression: the buggy implementation passed buf.position() as the src offset to - // ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array. Path path = writeBytes(new byte[] {10, 20, 30, 40}); try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { ByteBuffer buf = ByteBuffer.allocate(6); @@ -126,8 +124,6 @@ public void readFullyIntoHeapByteBufferWithNonZeroPosition() throws IOException @Test public void readFullyIntoDirectByteBuffer() throws IOException { - // Regression: the buggy implementation called arrayOffset() which throws - // UnsupportedOperationException on direct buffers. Path path = writeBytes(new byte[] {7, 8, 9}); try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { ByteBuffer buf = ByteBuffer.allocateDirect(3); @@ -142,7 +138,6 @@ public void readFullyIntoDirectByteBuffer() throws IOException { @Test public void readFullyIntoReadOnlyByteBuffer() throws IOException { - // Read-only views also throw from arrayOffset(). Path path = writeBytes(new byte[] {7, 8, 9}); try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { ByteBuffer backing = ByteBuffer.allocate(3); @@ -168,8 +163,6 @@ public void readIntoHeapByteBuffer() throws IOException { @Test public void readIntoByteBufferAdvancesPositionByBytesRead() throws IOException { - // Regression: the buggy implementation always advanced by buf.remaining() regardless of how - // many bytes were actually read, leaving the destination buffer inconsistent on partial reads. Path path = writeBytes(new byte[] {1, 2, 3}); try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { ByteBuffer buf = ByteBuffer.allocate(10); @@ -208,8 +201,6 @@ public void readIntoDirectByteBuffer() throws IOException { @Test public void readIntoByteBufferWithNonZeroPosition() throws IOException { - // Regression: the buggy implementation passed buf.position() as the src offset to - // ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array. Path path = writeBytes(new byte[] {10, 20, 30}); try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { ByteBuffer buf = ByteBuffer.allocate(5);