Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions client/src/main/java/com/hedera/bucky/S3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,20 @@ public String downloadTextFile(@NonNull final String key) throws S3ResponseExcep
return withRetry(new DownloadTextFileOperation(this, key));
}

/**
* Opens a streaming download of an object from S3.
* The caller is responsible for closing the returned InputStream.
*
* @param key the key for the object in S3 (e.g., "my_folder/my_file.txt"), cannot be blank
* @return an InputStream to read the object content, or null if the object doesn't exist
* @throws S3ResponseException if a non-200/404 response is received from S3
* @throws IOException if an error occurs while reading the response body on failure
*/
public InputStream openObjectStream(@NonNull final String key) throws S3ResponseException, IOException {
Preconditions.requireNotBlank(key);
return withRetry(new OpenObjectStreamOperation(this, key));
}

/**
* Uploads a file to S3 using multipart upload.
*
Expand Down Expand Up @@ -1343,6 +1357,46 @@ public Result<byte[]> execute() {
}
}

private static final class OpenObjectStreamOperation implements S3Operation<InputStream> {
private final S3Client client;
private final String key;

OpenObjectStreamOperation(final S3Client client, final String key) {
this.client = client;
this.key = key;
}

@Override
public Result<InputStream> execute() {
Result<InputStream> result;
try {
final String url = client.endpoint + client.bucketName + "/" + urlEncode(key, true);
final HttpResponse<InputStream> response =
client.request(url, GET, Collections.emptyMap(), null, BodyHandlers.ofInputStream());
final int responseStatusCode = response.statusCode();
if (responseStatusCode == 404) {
response.body().close();
result = new Result.Success<>(null);
} else if (responseStatusCode != 200) {
try (final InputStream in = response.body()) {
final byte[] responseBody = in.readNBytes(ERROR_BODY_MAX_LENGTH);
final HttpHeaders responseHeaders = response.headers();
final String message = "Failed to open stream for object: key=%s".formatted(key);
result = new Result.ResponseFailure<>(
responseStatusCode, responseBody, responseHeaders, message);
}
} else {
result = new Result.Success<>(response.body());
}
} catch (final UncheckedIOException e) {
result = new Result.NetworkFailure<>(e.getCause());
} catch (final IOException e) {
result = new Result.NetworkFailure<>(e);
}
return result;
}
}

// -------------------------------------------------------------------------
// HTTP layer
// -------------------------------------------------------------------------
Expand Down
52 changes: 52 additions & 0 deletions client/src/test/java/com/hedera/bucky/S3ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -463,6 +464,44 @@ void testFetchNonExistentObject() throws Exception {
}
}

/**
* This test aims to verify that the {@link S3Client#openObjectStream(String)}
* method returns an InputStream whose content matches the uploaded object.
*/
@Test
@DisplayName("openObjectStream() returns an InputStream with the correct content")
void testOpenObjectStream() throws Exception {
// Setup
final String key = "openObjectStreamTest.bin";
final Random random = new Random(98765432101234L);
final byte[] expected = new byte[2 * 1024 * 1024 + 137];
random.nextBytes(expected);
minioClient.putObject(PutObjectArgs.builder().bucket(BUCKET_NAME).object(key).stream(
new ByteArrayInputStream(expected), expected.length, -1)
.build());
try (final S3Client s3Client = client()) {
// Call
try (final InputStream stream = s3Client.openObjectStream(key)) {
// Assert
assertThat(stream).isNotNull();
final byte[] actual = stream.readAllBytes();
assertThat(actual).isEqualTo(expected);
}
}
}

/**
* This test aims to verify that the {@link S3Client#openObjectStream(String)}
* method returns null when trying to open a stream for a non-existent object.
*/
@Test
@DisplayName("openObjectStream() returns null for a non-existent key")
void testOpenObjectStreamNonExistentObject() throws Exception {
try (final S3Client s3Client = client()) {
assertNull(s3Client.openObjectStream("non-existent-stream-object.txt"));
}
}

// -----------------------------------------------------------------------
// Constructor — precondition validation (no network calls required)
// -----------------------------------------------------------------------
Expand Down Expand Up @@ -551,6 +590,19 @@ void testDownloadTextFileRejectsBlankKey() throws S3ClientInitializationExceptio
}
}

/**
* Verifies that {@link S3Client#openObjectStream(String)} throws
* {@link IllegalArgumentException} for a blank key.
*/
@Test
@DisplayName("openObjectStream() throws IllegalArgumentException for a blank key")
void testOpenObjectStreamRejectsBlankKey() throws S3ClientInitializationException {
try (final S3Client s3Client = client()) {
assertThatThrownBy(() -> s3Client.openObjectStream("")).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> s3Client.openObjectStream(" ")).isInstanceOf(IllegalArgumentException.class);
}
}

/**
* Verifies that {@link S3Client#abortMultipartUpload(String, String)}
* throws {@link IllegalArgumentException} for a blank key or upload ID.
Expand Down
Loading