diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index c3bf24cd..12d6237b 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -38,7 +38,7 @@ use bigtable_rs::google::bigtable::v2::{self, mutation}; use bytes::Bytes; use futures_util::TryStreamExt; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; -use objectstore_types::range::ByteRange; +use objectstore_types::range::{ByteRange, ContentRange}; use serde::{Deserialize, Serialize}; use tonic::Code; @@ -997,7 +997,7 @@ impl HighVolumeBackend for BigTableBackend { async fn get_tiered_object( &self, id: &ObjectId, - _range: Option, + range: Option, ) -> Result { objectstore_log::debug!("Reading from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); @@ -1017,11 +1017,14 @@ impl HighVolumeBackend for BigTableBackend { }), RowData::Object { metadata, payload } => { let mut metadata = metadata; + let payload = Bytes::from(payload); if metadata.size.is_none() { // If object size wasn't written into the metadata, re-compute it now metadata.size = Some(payload.len()); } - TieredGet::Object(metadata, None, crate::stream::single(payload)) + + let (content_range, payload) = apply_range(payload, range)?; + TieredGet::Object(metadata, content_range, crate::stream::single(payload)) } }) } @@ -1221,6 +1224,25 @@ fn is_retryable(error: &BigTableError) -> bool { } } +/// Resolves an optional byte range against a payload buffer, returning the +/// applicable content range and the (potentially narrowed) payload. +/// +/// When `range` is `None`, returns the full payload unchanged. Uses +/// `Bytes::slice` to avoid copying data. +fn apply_range(payload: Bytes, range: Option) -> Result<(Option, Bytes)> { + let Some(byte_range) = range else { + return Ok((None, payload)); + }; + + let total = payload.len() as u64; + let content_range = byte_range + .resolve(total) + .ok_or(Error::RangeNotSatisfiable { total })?; + + let sliced = payload.slice(content_range.start as usize..content_range.end as usize + 1); + Ok((Some(content_range), sliced)) +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -2158,4 +2180,106 @@ mod tests { Ok(()) } + + // --- Range Request Tests --- + + async fn put_range_test_object(backend: &BigTableBackend) -> Result { + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + ..Default::default() + }; + let payload = b"Hello, range requests!"; + backend + .put_object(&id, &metadata, stream::single(payload.as_slice())) + .await?; + Ok(id) + } + + #[tokio::test] + async fn get_object_range_bounded() -> Result<()> { + let backend = create_test_backend().await?; + let id = put_range_test_object(&backend).await?; + + let (_, content_range, stream) = backend + .get_object(&id, Some(ByteRange::Bounded(7, 11))) + .await? + .unwrap(); + let data = stream::read_to_vec(stream).await?; + assert_eq!(&data, b"range"); + + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 7); + assert_eq!(content_range.end, 11); + assert_eq!(content_range.total, 22); + + Ok(()) + } + + #[tokio::test] + async fn get_object_range_from() -> Result<()> { + let backend = create_test_backend().await?; + let id = put_range_test_object(&backend).await?; + + let (_, content_range, stream) = backend + .get_object(&id, Some(ByteRange::From(7))) + .await? + .unwrap(); + let data = stream::read_to_vec(stream).await?; + assert_eq!(&data, b"range requests!"); + + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 7); + assert_eq!(content_range.end, 21); + assert_eq!(content_range.total, 22); + + Ok(()) + } + + #[tokio::test] + async fn get_object_range_last() -> Result<()> { + let backend = create_test_backend().await?; + let id = put_range_test_object(&backend).await?; + + let (_, content_range, stream) = backend + .get_object(&id, Some(ByteRange::Last(9))) + .await? + .unwrap(); + let data = stream::read_to_vec(stream).await?; + assert_eq!(&data, b"requests!"); + + let content_range = content_range.unwrap(); + assert_eq!(content_range.start, 13); + assert_eq!(content_range.end, 21); + assert_eq!(content_range.total, 22); + + Ok(()) + } + + #[tokio::test] + async fn get_object_range_unsatisfiable() -> Result<()> { + let backend = create_test_backend().await?; + let id = put_range_test_object(&backend).await?; + + match backend.get_object(&id, Some(ByteRange::From(100))).await { + Err(Error::RangeNotSatisfiable { total }) => assert_eq!(total, 22), + Ok(_) => panic!("expected RangeNotSatisfiable, got Ok"), + Err(e) => panic!("expected RangeNotSatisfiable, got {e:?}"), + } + + Ok(()) + } + + #[tokio::test] + async fn get_object_no_range_returns_full_payload() -> Result<()> { + let backend = create_test_backend().await?; + let id = put_range_test_object(&backend).await?; + + let (_, content_range, stream) = backend.get_object(&id, None).await?.unwrap(); + let data = stream::read_to_vec(stream).await?; + assert_eq!(&data, b"Hello, range requests!"); + assert!(content_range.is_none()); + + Ok(()) + } }