Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 127 additions & 3 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use bigtable_rs::google::bigtable::v2::{self, mutation};
use bytes::Bytes;
use futures_util::TryStreamExt;
use objectstore_types::metadata::{ExpirationPolicy, Metadata};
use objectstore_types::range::ByteRange;
use objectstore_types::range::{ByteRange, ContentRange};
use serde::{Deserialize, Serialize};
use tonic::Code;

Expand Down Expand Up @@ -997,7 +997,7 @@ impl HighVolumeBackend for BigTableBackend {
async fn get_tiered_object(
&self,
id: &ObjectId,
_range: Option<ByteRange>,
range: Option<ByteRange>,
) -> Result<TieredGet> {
objectstore_log::debug!("Reading from Bigtable backend");
let path = id.as_storage_path().to_string().into_bytes();
Expand All @@ -1017,11 +1017,14 @@ impl HighVolumeBackend for BigTableBackend {
}),
RowData::Object { metadata, payload } => {
let mut metadata = metadata;
let payload = Bytes::from(payload);
if metadata.size.is_none() {
// If object size wasn't written into the metadata, re-compute it now
metadata.size = Some(payload.len());
}
TieredGet::Object(metadata, None, crate::stream::single(payload))

let (content_range, payload) = apply_range(payload, range)?;
TieredGet::Object(metadata, content_range, crate::stream::single(payload))
}
})
}
Expand Down Expand Up @@ -1221,6 +1224,25 @@ fn is_retryable(error: &BigTableError) -> bool {
}
}

/// Resolves an optional byte range against a payload buffer, returning the
/// applicable content range and the (potentially narrowed) payload.
///
/// When `range` is `None`, returns the full payload unchanged. Uses
/// `Bytes::slice` to avoid copying data.
fn apply_range(payload: Bytes, range: Option<ByteRange>) -> Result<(Option<ContentRange>, Bytes)> {
let Some(byte_range) = range else {
return Ok((None, payload));
};

let total = payload.len() as u64;
let content_range = byte_range
.resolve(total)
.ok_or(Error::RangeNotSatisfiable { total })?;

let sliced = payload.slice(content_range.start as usize..content_range.end as usize + 1);
Ok((Some(content_range), sliced))
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
Expand Down Expand Up @@ -2158,4 +2180,106 @@ mod tests {

Ok(())
}

// --- Range Request Tests ---

async fn put_range_test_object(backend: &BigTableBackend) -> Result<ObjectId> {
let id = make_id();
let metadata = Metadata {
content_type: "text/plain".into(),
..Default::default()
};
let payload = b"Hello, range requests!";
backend
.put_object(&id, &metadata, stream::single(payload.as_slice()))
.await?;
Ok(id)
}

#[tokio::test]
async fn get_object_range_bounded() -> Result<()> {
let backend = create_test_backend().await?;
let id = put_range_test_object(&backend).await?;

let (_, content_range, stream) = backend
.get_object(&id, Some(ByteRange::Bounded(7, 11)))
.await?
.unwrap();
let data = stream::read_to_vec(stream).await?;
assert_eq!(&data, b"range");

let content_range = content_range.unwrap();
assert_eq!(content_range.start, 7);
assert_eq!(content_range.end, 11);
assert_eq!(content_range.total, 22);

Ok(())
}

#[tokio::test]
async fn get_object_range_from() -> Result<()> {
let backend = create_test_backend().await?;
let id = put_range_test_object(&backend).await?;

let (_, content_range, stream) = backend
.get_object(&id, Some(ByteRange::From(7)))
.await?
.unwrap();
let data = stream::read_to_vec(stream).await?;
assert_eq!(&data, b"range requests!");

let content_range = content_range.unwrap();
assert_eq!(content_range.start, 7);
assert_eq!(content_range.end, 21);
assert_eq!(content_range.total, 22);

Ok(())
}

#[tokio::test]
async fn get_object_range_last() -> Result<()> {
let backend = create_test_backend().await?;
let id = put_range_test_object(&backend).await?;

let (_, content_range, stream) = backend
.get_object(&id, Some(ByteRange::Last(9)))
.await?
.unwrap();
let data = stream::read_to_vec(stream).await?;
assert_eq!(&data, b"requests!");

let content_range = content_range.unwrap();
assert_eq!(content_range.start, 13);
assert_eq!(content_range.end, 21);
assert_eq!(content_range.total, 22);

Ok(())
}

#[tokio::test]
async fn get_object_range_unsatisfiable() -> Result<()> {
let backend = create_test_backend().await?;
let id = put_range_test_object(&backend).await?;

match backend.get_object(&id, Some(ByteRange::From(100))).await {
Err(Error::RangeNotSatisfiable { total }) => assert_eq!(total, 22),
Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"),
Err(e) => panic!("expected RangeNotSatisfiable, got {e:?}"),
}

Ok(())
}

#[tokio::test]
async fn get_object_no_range_returns_full_payload() -> Result<()> {
let backend = create_test_backend().await?;
let id = put_range_test_object(&backend).await?;

let (_, content_range, stream) = backend.get_object(&id, None).await?.unwrap();
let data = stream::read_to_vec(stream).await?;
assert_eq!(&data, b"Hello, range requests!");
assert!(content_range.is_none());

Ok(())
}
}
Loading