diff --git a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java index 7174b42d5d..59e2c142de 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java @@ -81,16 +81,18 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { @Override public int read(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; - int code = read(buffer); - buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); - return code; + int bytesRead = randomAccessFile.read(buffer); + if (bytesRead > 0) { + buf.put(buffer, 0, bytesRead); + } + return bytesRead; } @Override public void readFully(ByteBuffer buf) throws IOException { byte[] buffer = new byte[buf.remaining()]; readFully(buffer); - buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); + buf.put(buffer); } @Override diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java index 84d728f14e..6e9217717a 100644 --- a/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java @@ -18,11 +18,14 @@ */ package org.apache.parquet.io; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Path; import java.nio.file.Paths; @@ -89,4 +92,144 @@ private File createTempFile() throws IOException { tmp.delete(); return tmp; } + + @Test + public void readFullyIntoHeapByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {1, 2, 3, 4, 5}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(5); + stream.readFully(buf); + assertEquals(5, buf.position()); + buf.flip(); + byte[] out = new byte[5]; + buf.get(out); + assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, out); + } + } + + @Test + public void readFullyIntoHeapByteBufferWithNonZeroPosition() throws IOException { + Path path = writeBytes(new byte[] {10, 20, 30, 40}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(6); + buf.put(new byte[] {99, 99}); // advance position to 2 + stream.readFully(buf); + assertEquals(6, buf.position()); + buf.flip(); + byte[] out = new byte[6]; + buf.get(out); + assertArrayEquals(new byte[] {99, 99, 10, 20, 30, 40}, out); + } + } + + @Test + public void readFullyIntoDirectByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocateDirect(3); + stream.readFully(buf); + assertEquals(3, buf.position()); + buf.flip(); + byte[] out = new byte[3]; + buf.get(out); + assertArrayEquals(new byte[] {7, 8, 9}, out); + } + } + + @Test + public void readFullyIntoReadOnlyByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer backing = ByteBuffer.allocate(3); + ByteBuffer buf = backing.asReadOnlyBuffer(); + assertThrows(java.nio.ReadOnlyBufferException.class, () -> stream.readFully(buf)); + } + } + + @Test + public void readIntoHeapByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {1, 2, 3, 4}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(4); + int read = stream.read(buf); + assertEquals(4, read); + assertEquals(4, buf.position()); + buf.flip(); + byte[] out = new byte[4]; + buf.get(out); + assertArrayEquals(new byte[] {1, 2, 3, 4}, out); + } + } + + @Test + public void readIntoByteBufferAdvancesPositionByBytesRead() throws IOException { + Path path = writeBytes(new byte[] {1, 2, 3}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(10); + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(3, buf.position()); + } + } + + @Test + public void readIntoByteBufferReturnsMinusOneAtEof() throws IOException { + Path path = writeBytes(new byte[] {1}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + assertEquals(1, stream.read()); + ByteBuffer buf = ByteBuffer.allocate(4); + int read = stream.read(buf); + assertEquals(-1, read); + assertEquals(0, buf.position()); + } + } + + @Test + public void readIntoDirectByteBuffer() throws IOException { + Path path = writeBytes(new byte[] {7, 8, 9}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocateDirect(3); + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(3, buf.position()); + buf.flip(); + byte[] out = new byte[3]; + buf.get(out); + assertArrayEquals(new byte[] {7, 8, 9}, out); + } + } + + @Test + public void readIntoByteBufferWithNonZeroPosition() throws IOException { + Path path = writeBytes(new byte[] {10, 20, 30}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(5); + buf.put(new byte[] {99, 99}); // advance position to 2 + int read = stream.read(buf); + assertEquals(3, read); + assertEquals(5, buf.position()); + buf.flip(); + byte[] out = new byte[5]; + buf.get(out); + assertArrayEquals(new byte[] {99, 99, 10, 20, 30}, out); + } + } + + @Test + public void readFullyThrowsEofWhenStreamTooShort() throws IOException { + Path path = writeBytes(new byte[] {1, 2}); + try (SeekableInputStream stream = new LocalInputFile(path).newStream()) { + ByteBuffer buf = ByteBuffer.allocate(10); + assertThrows(EOFException.class, () -> stream.readFully(buf)); + } + } + + private Path writeBytes(byte[] data) throws IOException { + Path path = Paths.get(createTempFile().getPath()); + OutputFile write = new LocalOutputFile(path); + try (PositionOutputStream stream = write.createOrOverwrite(512)) { + stream.write(data); + } + return path; + } }