From 52941dee0773c08bf7ee9a3051e276e5668f2d5e Mon Sep 17 00:00:00 2001 From: "Ivan St. Ivanov" Date: Wed, 1 Apr 2026 16:55:35 +0300 Subject: [PATCH 1/2] 25 Add a method that opens an input stream to an S3 object Signed-off-by: Ivan St. Ivanov --- .../main/java/com/hedera/bucky/S3Client.java | 32 ++++++++++++ .../java/com/hedera/bucky/S3ClientTest.java | 52 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/client/src/main/java/com/hedera/bucky/S3Client.java b/client/src/main/java/com/hedera/bucky/S3Client.java index 96e7c40..3dded20 100644 --- a/client/src/main/java/com/hedera/bucky/S3Client.java +++ b/client/src/main/java/com/hedera/bucky/S3Client.java @@ -1347,6 +1347,38 @@ public Result execute() { // HTTP layer // ------------------------------------------------------------------------- + /** + * Opens a streaming download of an object from S3. + * The caller is responsible for closing the returned InputStream. + * + * @param key the key for the object in S3 (e.g., "my_folder/my_file.txt"), cannot be blank + * @return an InputStream to read the object content, or null if the object doesn't exist + * @throws S3ResponseException if a non-200/404 response is received from S3 + * @throws IOException if an error occurs while reading the response body on failure + */ + public InputStream openObjectStream(@NonNull final String key) throws S3ResponseException, IOException { + Preconditions.requireNotBlank(key); + // build the URL for the request + final String url = endpoint + bucketName + "/" + urlEncode(key, true); + // make the request + final HttpResponse response = + request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + // check status code and return stream or throw + final int responseStatusCode = response.statusCode(); + if (responseStatusCode == 404) { + response.body().close(); + return null; + } else if (responseStatusCode != 200) { + try (final InputStream in = response.body()) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final String message = "Failed to open stream for object: key=%s".formatted(key); + throw new S3ResponseException(responseStatusCode, responseBody, response.headers(), message); + } + } + // caller is responsible for closing this stream + return response.body(); + } + /** * Performs an HTTP request to S3 to the specified URL with the given parameters. * diff --git a/client/src/test/java/com/hedera/bucky/S3ClientTest.java b/client/src/test/java/com/hedera/bucky/S3ClientTest.java index 45e6d72..fc3f5ff 100644 --- a/client/src/test/java/com/hedera/bucky/S3ClientTest.java +++ b/client/src/test/java/com/hedera/bucky/S3ClientTest.java @@ -13,6 +13,7 @@ import io.minio.MinioClient; import io.minio.PutObjectArgs; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; @@ -463,6 +464,44 @@ void testFetchNonExistentObject() throws Exception { } } + /** + * This test aims to verify that the {@link S3Client#openObjectStream(String)} + * method returns an InputStream whose content matches the uploaded object. + */ + @Test + @DisplayName("openObjectStream() returns an InputStream with the correct content") + void testOpenObjectStream() throws Exception { + // Setup + final String key = "openObjectStreamTest.bin"; + final Random random = new Random(98765432101234L); + final byte[] expected = new byte[2 * 1024 * 1024 + 137]; + random.nextBytes(expected); + minioClient.putObject(PutObjectArgs.builder().bucket(BUCKET_NAME).object(key).stream( + new ByteArrayInputStream(expected), expected.length, -1) + .build()); + try (final S3Client s3Client = client()) { + // Call + try (final InputStream stream = s3Client.openObjectStream(key)) { + // Assert + assertThat(stream).isNotNull(); + final byte[] actual = stream.readAllBytes(); + assertThat(actual).isEqualTo(expected); + } + } + } + + /** + * This test aims to verify that the {@link S3Client#openObjectStream(String)} + * method returns null when trying to open a stream for a non-existent object. + */ + @Test + @DisplayName("openObjectStream() returns null for a non-existent key") + void testOpenObjectStreamNonExistentObject() throws Exception { + try (final S3Client s3Client = client()) { + assertNull(s3Client.openObjectStream("non-existent-stream-object.txt")); + } + } + // ----------------------------------------------------------------------- // Constructor — precondition validation (no network calls required) // ----------------------------------------------------------------------- @@ -551,6 +590,19 @@ void testDownloadTextFileRejectsBlankKey() throws S3ClientInitializationExceptio } } + /** + * Verifies that {@link S3Client#openObjectStream(String)} throws + * {@link IllegalArgumentException} for a blank key. + */ + @Test + @DisplayName("openObjectStream() throws IllegalArgumentException for a blank key") + void testOpenObjectStreamRejectsBlankKey() throws S3ClientInitializationException { + try (final S3Client s3Client = client()) { + assertThatThrownBy(() -> s3Client.openObjectStream("")).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> s3Client.openObjectStream(" ")).isInstanceOf(IllegalArgumentException.class); + } + } + /** * Verifies that {@link S3Client#abortMultipartUpload(String, String)} * throws {@link IllegalArgumentException} for a blank key or upload ID. From 4b6bae4fa67b671d81d7bca63faf521945af04da Mon Sep 17 00:00:00 2001 From: "Ivan St. Ivanov" Date: Tue, 19 May 2026 14:42:32 +0300 Subject: [PATCH 2/2] Make openObjectStream retriable Signed-off-by: Ivan St. Ivanov --- .../main/java/com/hedera/bucky/S3Client.java | 84 ++++++++++++------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/client/src/main/java/com/hedera/bucky/S3Client.java b/client/src/main/java/com/hedera/bucky/S3Client.java index 3dded20..a37a45e 100644 --- a/client/src/main/java/com/hedera/bucky/S3Client.java +++ b/client/src/main/java/com/hedera/bucky/S3Client.java @@ -278,6 +278,20 @@ public String downloadTextFile(@NonNull final String key) throws S3ResponseExcep return withRetry(new DownloadTextFileOperation(this, key)); } + /** + * Opens a streaming download of an object from S3. + * The caller is responsible for closing the returned InputStream. + * + * @param key the key for the object in S3 (e.g., "my_folder/my_file.txt"), cannot be blank + * @return an InputStream to read the object content, or null if the object doesn't exist + * @throws S3ResponseException if a non-200/404 response is received from S3 + * @throws IOException if an error occurs while reading the response body on failure + */ + public InputStream openObjectStream(@NonNull final String key) throws S3ResponseException, IOException { + Preconditions.requireNotBlank(key); + return withRetry(new OpenObjectStreamOperation(this, key)); + } + /** * Uploads a file to S3 using multipart upload. * @@ -1343,42 +1357,50 @@ public Result execute() { } } - // ------------------------------------------------------------------------- - // HTTP layer - // ------------------------------------------------------------------------- + private static final class OpenObjectStreamOperation implements S3Operation { + private final S3Client client; + private final String key; - /** - * Opens a streaming download of an object from S3. - * The caller is responsible for closing the returned InputStream. - * - * @param key the key for the object in S3 (e.g., "my_folder/my_file.txt"), cannot be blank - * @return an InputStream to read the object content, or null if the object doesn't exist - * @throws S3ResponseException if a non-200/404 response is received from S3 - * @throws IOException if an error occurs while reading the response body on failure - */ - public InputStream openObjectStream(@NonNull final String key) throws S3ResponseException, IOException { - Preconditions.requireNotBlank(key); - // build the URL for the request - final String url = endpoint + bucketName + "/" + urlEncode(key, true); - // make the request - final HttpResponse response = - request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); - // check status code and return stream or throw - final int responseStatusCode = response.statusCode(); - if (responseStatusCode == 404) { - response.body().close(); - return null; - } else if (responseStatusCode != 200) { - try (final InputStream in = response.body()) { - final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); - final String message = "Failed to open stream for object: key=%s".formatted(key); - throw new S3ResponseException(responseStatusCode, responseBody, response.headers(), message); + OpenObjectStreamOperation(final S3Client client, final String key) { + this.client = client; + this.key = key; + } + + @Override + public Result execute() { + Result result; + try { + final String url = client.endpoint + client.bucketName + "/" + urlEncode(key, true); + final HttpResponse response = + client.request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream()); + final int responseStatusCode = response.statusCode(); + if (responseStatusCode == 404) { + response.body().close(); + result = new Result.Success<>(null); + } else if (responseStatusCode != 200) { + try (final InputStream in = response.body()) { + final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH); + final HttpHeaders responseHeaders = response.headers(); + final String message = "Failed to open stream for object: key=%s".formatted(key); + result = new Result.ResponseFailure<>( + responseStatusCode, responseBody, responseHeaders, message); + } + } else { + result = new Result.Success<>(response.body()); + } + } catch (final UncheckedIOException e) { + result = new Result.NetworkFailure<>(e.getCause()); + } catch (final IOException e) { + result = new Result.NetworkFailure<>(e); } + return result; } - // caller is responsible for closing this stream - return response.body(); } + // ------------------------------------------------------------------------- + // HTTP layer + // ------------------------------------------------------------------------- + /** * Performs an HTTP request to S3 to the specified URL with the given parameters. *