From 445c724523fd963a4027e68ef3d79eb4737739da Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:38:29 +0200 Subject: [PATCH 01/20] ref(service): Classify service errors by kind Add service-level error kinds so API handlers can map failures without matching every backend-specific variant. Keep client metadata errors distinct from internal metadata failures and classify transient backend failures as retryable/service-unavailable paths. Co-Authored-By: OpenAI Codex --- Cargo.lock | 99 ++-- Cargo.toml | 1 + objectstore-server/docs/architecture.md | 14 + objectstore-server/src/endpoints/common.rs | 51 +- objectstore-server/src/endpoints/multipart.rs | 4 +- objectstore-server/src/endpoints/objects.rs | 14 +- objectstore-server/tests/range_requests.rs | 23 + objectstore-service/Cargo.toml | 1 + objectstore-service/docs/architecture.md | 9 + objectstore-service/src/backend/gcs.rs | 33 +- objectstore-service/src/backend/local_fs.rs | 68 +-- .../src/backend/s3_compatible.rs | 49 +- objectstore-service/src/error.rs | 513 ++++++++++++++++-- 13 files changed, 679 insertions(+), 200 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 874be913..4af47f78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,7 +220,7 @@ dependencies = [ "argh_shared", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -263,7 +263,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -274,7 +274,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -774,7 +774,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -798,7 +798,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.117", ] [[package]] @@ -809,7 +809,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -849,6 +849,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive-error-kind" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a422c2552b72ab3fc01435089a5d9f5337e42383543e885275699ef0f539ba" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "2.1.1" @@ -868,7 +879,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.117", "unicode-xid", ] @@ -902,7 +913,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1208,7 +1219,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1976,7 +1987,7 @@ dependencies = [ "quote", "rustc_version", "simd_cesu8", - "syn", + "syn 2.0.117", ] [[package]] @@ -2001,7 +2012,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" dependencies = [ "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2662,6 +2673,7 @@ dependencies = [ "bigtable_rs", "bytes", "chrono", + "derive-error-kind", "futures", "futures-util", "gcp_auth", @@ -2743,7 +2755,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2866,7 +2878,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2922,7 +2934,7 @@ checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3012,7 +3024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.117", ] [[package]] @@ -3041,7 +3053,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "version_check", "yansi", ] @@ -3073,7 +3085,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn", + "syn 2.0.117", "tempfile", ] @@ -3087,7 +3099,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3407,7 +3419,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3992,7 +4004,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4059,7 +4071,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4250,6 +4262,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.117" @@ -4278,7 +4301,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4347,7 +4370,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4358,7 +4381,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4471,7 +4494,7 @@ checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4558,7 +4581,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4583,7 +4606,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn", + "syn 2.0.117", "tempfile", "tonic-build", ] @@ -4660,7 +4683,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4764,7 +4787,7 @@ checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5006,7 +5029,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasm-bindgen-shared", ] @@ -5153,7 +5176,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5164,7 +5187,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5539,7 +5562,7 @@ dependencies = [ "heck", "indexmap 2.13.0", "prettyplease", - "syn", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -5555,7 +5578,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -5628,7 +5651,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5649,7 +5672,7 @@ checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5669,7 +5692,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5709,7 +5732,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7908086a..d871ee3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ stresstest = { path = "stresstest" } anyhow = "1.0.69" async-trait = "0.1.88" bytes = "1.11.1" +derive-error-kind = "0.1.0" futures = "0.3.31" futures-util = "0.3.31" http = "1.3.1" diff --git a/objectstore-server/docs/architecture.md b/objectstore-server/docs/architecture.md index 08bb7496..24f82434 100644 --- a/objectstore-server/docs/architecture.md +++ b/objectstore-server/docs/architecture.md @@ -76,6 +76,20 @@ A request flows through several layers before reaching the storage service: [`objectstore-types` docs](objectstore_types) for the header mapping) and the payload is streamed back. +### Service Error Mapping + +Endpoint handlers map [`objectstore_service::error::Error`] to HTTP status +codes via its coarse error kind: + +- Client stream and bad request errors return HTTP 400. +- Service or upstream capacity errors return HTTP 429. +- Transient service errors return HTTP 503. +- Unsupported operations return HTTP 501. +- Internal errors return HTTP 500. + +Unsatisfiable byte ranges are a special case: `GET` returns HTTP 416 and +preserves the required `Content-Range: bytes */{total}` header. + ## Authentication & Authorization Objectstore uses **JWT tokens with EdDSA signatures** (Ed25519) for diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 06b6014e..2d9e49f1 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -6,7 +6,7 @@ use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use http::HeaderValue; -use objectstore_service::error::Error as ServiceError; +use objectstore_service::error::{Error as ServiceError, ErrorKind as ServiceErrorKind}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -92,18 +92,21 @@ impl ApiError { StatusCode::INTERNAL_SERVER_ERROR } - ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::RangeNotSatisfiable { .. }) => { StatusCode::RANGE_NOT_SATISFIABLE } - ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, - ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, - ApiError::Service(_) => { - objectstore_log::error!(!!self, "error handling request"); - StatusCode::INTERNAL_SERVER_ERROR - } + ApiError::Service(error) => match error.kind() { + ServiceErrorKind::ClientStream | ServiceErrorKind::BadRequest => { + StatusCode::BAD_REQUEST + } + ServiceErrorKind::TooManyRequests => StatusCode::TOO_MANY_REQUESTS, + ServiceErrorKind::Transient => StatusCode::SERVICE_UNAVAILABLE, + ServiceErrorKind::NotImplemented => StatusCode::NOT_IMPLEMENTED, + ServiceErrorKind::Internal => { + objectstore_log::error!(!!self, "error handling request"); + StatusCode::INTERNAL_SERVER_ERROR + } + }, ApiError::Internal(_) => { objectstore_log::error!(!!self, "internal error"); @@ -127,3 +130,31 @@ pub fn insert_accept_ranges(response: &mut Response) { HeaderValue::from_static("bytes"), ); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn service_capacity_errors_return_429() { + assert_eq!( + ApiError::Service(ServiceError::AtCapacity).status(), + StatusCode::TOO_MANY_REQUESTS + ); + } + + #[tokio::test] + async fn transient_service_errors_return_503() { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + drop(listener); + + let cause = reqwest::get(format!("http://{addr}")).await.unwrap_err(); + let error = ServiceError::reqwest_transparent("backend request", cause); + + assert_eq!( + ApiError::Service(error).status(), + StatusCode::SERVICE_UNAVAILABLE + ); + } +} diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index c9d40d26..5f7d93bf 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -95,7 +95,9 @@ async fn initiate_inner( id: ObjectId, headers: HeaderMap, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "").map_err(|cause| { + ServiceError::metadata_client("invalid multipart object metadata headers", cause) + })?; // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. metadata.time_created = Some(SystemTime::now()); diff --git a/objectstore-server/src/endpoints/objects.rs b/objectstore-server/src/endpoints/objects.rs index 8f172e31..ebc0d625 100644 --- a/objectstore-server/src/endpoints/objects.rs +++ b/objectstore-server/src/endpoints/objects.rs @@ -45,7 +45,8 @@ async fn objects_post( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; metadata.time_created = Some(SystemTime::now()); state @@ -91,7 +92,9 @@ async fn object_get( }; let stream = state.meter_stream(stream, &context); - let metadata_headers = metadata.to_headers("").map_err(ServiceError::from)?; + let metadata_headers = metadata + .to_headers("") + .map_err(|cause| ServiceError::metadata("failed to serialize object metadata", cause))?; let mut response = match content_range { Some(ref content_range) => { @@ -122,7 +125,9 @@ async fn object_head(service: AuthAwareService, Xt(id): Xt) -> ApiResu return Ok(StatusCode::NOT_FOUND.into_response()); }; - let headers = metadata.to_headers("").map_err(ServiceError::from)?; + let headers = metadata + .to_headers("") + .map_err(|cause| ServiceError::metadata("failed to serialize object metadata", cause))?; let mut response = (StatusCode::NO_CONTENT, headers).into_response(); insert_accept_ranges(&mut response); @@ -136,7 +141,8 @@ async fn object_put( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; metadata.time_created = Some(SystemTime::now()); let ObjectId { context, key } = id; diff --git a/objectstore-server/tests/range_requests.rs b/objectstore-server/tests/range_requests.rs index ca5afd4d..2351a1c5 100644 --- a/objectstore-server/tests/range_requests.rs +++ b/objectstore-server/tests/range_requests.rs @@ -233,3 +233,26 @@ async fn range_on_nonexistent_object_returns_404() -> Result<()> { assert_eq!(resp.status(), reqwest::StatusCode::NOT_FOUND); Ok(()) } + +#[tokio::test] +async fn invalid_metadata_request_headers_return_400() -> Result<()> { + let server = TestServer::with_config(Config { + auth: AuthZ { + enforce: false, + ..Default::default() + }, + ..Default::default() + }) + .await; + let client = reqwest::Client::new(); + + let resp = client + .put(server.url("/v1/objects/test/org=1/invalid-metadata")) + .header("x-sn-expiration", "garbage") + .body("payload") + .send() + .await?; + + assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST); + Ok(()) +} diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index e4434d42..42076189 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -16,6 +16,7 @@ base64 = "0.22" bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" } chrono = "0.4" bytes = { workspace = true } +derive-error-kind = { workspace = true } futures-util = { workspace = true } gcp_auth = "0.12.3" humantime = { workspace = true } diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index f9e1e5f4..ed25de95 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -155,6 +155,15 @@ The service applies backpressure to protect backends from overload. Rather than queueing work when capacity is exhausted, the service rejects operations immediately so the caller can shed load or retry. +# Error Classification + +The service error type exposes a coarse [`ErrorKind`](error::ErrorKind) so API +layers can map failures without matching every backend-specific variant. Client +input and client stream errors are separated from internal failures, capacity +errors, and unsupported operations. Backend HTTP failures default to internal +classification; transparent reqwest status classification is available for call +sites that explicitly opt in. + ## Concurrency Limit A semaphore caps the total number of in-flight backend operations across all diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 70f7b73a..5e50f24b 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -27,7 +27,7 @@ use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, ListPartsResponse, PartNumber, UploadId, UploadPartResponse, }; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`GcsBackend`]. /// @@ -363,25 +363,7 @@ fn insert_gcs_meta_header( /// Returns `true` if the error is a transient reqwest failure worth retrying. fn is_retryable(error: &Error) -> bool { - let Error::Reqwest { cause, .. } = error else { - return false; - }; - if cause.is_timeout() || cause.is_connect() || cause.is_request() { - return true; - } - let Some(status) = cause.status() else { - return false; - }; - // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes - matches!( - status, - StatusCode::REQUEST_TIMEOUT - | StatusCode::TOO_MANY_REQUESTS - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT - ) + matches!(error, Error::Reqwest(error) if error.is_retryable()) } /// GCS JSON API backend for long-term storage of large objects. @@ -617,10 +599,8 @@ impl Backend for GcsBackend { // NB: Ensure the order of these fields and that a content-type is attached to them. Both // are required by the GCS API. - let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata for GCS upload".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&gcs_metadata) + .map_err(|cause| Error::serde("failed to serialize metadata for GCS upload", cause))?; let multipart = multipart::Form::new() .part( @@ -651,10 +631,7 @@ impl Backend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - _ => Error::reqwest("GCS: upload object", e), - })?; + .map_err(|e| Error::reqwest("GCS: upload object", e))?; Ok(()) } diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 2a221a79..a1dbb441 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -22,7 +22,7 @@ use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, }; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`LocalFsBackend`]. /// @@ -97,19 +97,12 @@ impl Backend for LocalFsBackend { let mut reader = pin!(StreamReader::new(stream)); let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|cause| Error::serde("failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; - tokio::io::copy(&mut reader, &mut writer) - .await - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), - })?; + tokio::io::copy(&mut reader, &mut writer).await?; writer.flush().await?; let file = writer.into_inner(); @@ -137,11 +130,8 @@ impl Backend for LocalFsBackend { let mut metadata_line = String::new(); reader.read_line(&mut metadata_line).await?; let file_len = reader.get_ref().metadata().await?.len(); - let mut metadata: Metadata = - serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize metadata".to_string(), - cause, - })?; + let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end()) + .map_err(|cause| Error::serde("failed to deserialize metadata", cause))?; let payload_size = file_len .checked_sub(metadata_line.len() as u64) .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?; @@ -201,10 +191,8 @@ impl MultipartUploadBackend for LocalFsBackend { tokio::fs::create_dir_all(&dir).await?; let meta_path = dir.join("metadata.json"); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize multipart metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|cause| Error::serde("failed to serialize multipart metadata", cause))?; tokio::fs::write(meta_path, metadata_json).await?; Ok(upload_id) @@ -231,10 +219,8 @@ impl MultipartUploadBackend for LocalFsBackend { "uploaded_at": SystemTime::now(), "size": content_length, }); - let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { - context: "failed to serialize part header".to_string(), - cause, - })?; + let header_line = serde_json::to_string(&header) + .map_err(|cause| Error::serde("failed to serialize part header", cause))?; let part_path = dir.join(format!("{part_number}.part")); let file = OpenOptions::new() @@ -249,12 +235,7 @@ impl MultipartUploadBackend for LocalFsBackend { writer.write_all(header_line.as_bytes()).await?; writer.write_all(b"\n").await?; - let _bytes_copied = tokio::io::copy(&mut reader, &mut writer) - .await - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), - })?; + let _bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?; // TODO: validate bytes_copied against content_length and return a BadRequest-style // error. Needs a service-layer error variant that maps to HTTP 400 without abusing @@ -301,11 +282,8 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, - })?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) + .map_err(|cause| Error::serde("failed to deserialize part header", cause))?; parts.push(Part { part_number: pn, @@ -361,11 +339,8 @@ impl MultipartUploadBackend for LocalFsBackend { // Read metadata let meta_path = dir.join("metadata.json"); let meta_bytes = tokio::fs::read(&meta_path).await?; - let metadata: Metadata = - serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde { - context: "failed to deserialize multipart metadata".to_string(), - cause, - })?; + let metadata: Metadata = serde_json::from_slice(&meta_bytes) + .map_err(|cause| Error::serde("failed to deserialize multipart metadata", cause))?; // TODO: validate that parts are in ascending part_number order and reject with // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. @@ -384,11 +359,8 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, - })?; + let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) + .map_err(|cause| Error::serde("failed to deserialize part header", cause))?; let stored_etag = header["etag"].as_str().unwrap_or(""); if stored_etag != completed.etag { @@ -413,10 +385,8 @@ impl MultipartUploadBackend for LocalFsBackend { .await?; let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&metadata) + .map_err(|cause| Error::serde("failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 9818597c..6c8d761b 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -14,7 +14,7 @@ use crate::backend::common::{ }; use crate::error::{Error, Result}; use crate::id::ObjectId; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`S3CompatibleBackend`]. /// @@ -172,10 +172,10 @@ where if let Some(r) = range { builder = builder.header(reqwest::header::RANGE, r.to_header_value()); } - let response = builder.send().await.map_err(|cause| Error::Reqwest { - context: "S3: failed to send request".to_string(), - cause, - })?; + let response = builder + .send() + .await + .map_err(|cause| Error::reqwest("S3: failed to send request", cause))?; if response.status() == StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); @@ -201,13 +201,11 @@ where let response = response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to get object".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to get object", cause))?; let headers = response.headers(); - let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; + let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX) + .map_err(|cause| Error::metadata("S3: failed to parse object metadata", cause))?; let content_range = if response.status() == StatusCode::PARTIAL_CONTENT { let range = headers @@ -262,14 +260,13 @@ where .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send TTI update request".to_string(), - cause, - })? + .map_err(|cause| Error::reqwest("S3: failed to send TTI update request", cause))? .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to update expiration time for object with TTI".to_string(), - cause, + .map_err(|cause| { + Error::reqwest( + "S3: failed to update expiration time for object with TTI", + cause, + ) })?; Ok(()) @@ -319,13 +316,7 @@ impl Backend for S3CompatibleBackend { .send() .await .and_then(|response| response.error_for_status()) - .map_err(|cause| match stream::unpack_client_error(&cause) { - Some(ce) => Error::Client(ce), - _ => Error::Reqwest { - context: "S3: failed to put object".to_string(), - cause, - }, - })?; + .map_err(|cause| Error::reqwest("S3: failed to put object", cause))?; Ok(()) } @@ -359,20 +350,14 @@ impl Backend for S3CompatibleBackend { .await? .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send delete request".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to send delete request", cause))?; // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to delete object".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to delete object", cause))?; } Ok(()) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 93c18818..32ff2a05 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -3,60 +3,193 @@ //! [`Error`] covers I/O, serialization, HTTP, metadata, authentication, //! and backend-specific failures. [`Result`] is the corresponding alias. +#![allow(missing_docs)] + use std::any::Any; use objectstore_log::Level; +use reqwest::StatusCode; use thiserror::Error as ThisError; -use crate::stream::ClientError; +use crate::stream::{self, ClientError}; -/// Error type for service operations. +/// Coarse categories for service errors. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ErrorKind { + /// Error originating from a client-supplied request body stream. + ClientStream, + /// Backend or network failure that may succeed if retried. + Transient, + /// Malformed or unsupported client input. + BadRequest, + /// Operation unsupported by this service instance. + NotImplemented, + /// Service or upstream capacity limit. + TooManyRequests, + /// Internal service or backend failure. + Internal, +} + +/// Reqwest failure with service-level classification. #[derive(Debug, ThisError)] +#[error("reqwest error: {context}")] +pub struct ReqwestError { + kind: ErrorKind, + context: String, + #[source] + cause: reqwest::Error, +} + +impl ReqwestError { + fn transparent(context: impl Into, cause: reqwest::Error) -> Self { + let kind = classify_reqwest(&cause); + Self { + kind, + context: context.into(), + cause, + } + } + + fn internal(context: impl Into, cause: reqwest::Error) -> Self { + Self { + kind: ErrorKind::Internal, + context: context.into(), + cause, + } + } + + /// Returns the service-level classification for this reqwest error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying reqwest error. + pub fn cause(&self) -> &reqwest::Error { + &self.cause + } + + /// Returns whether the underlying reqwest failure is worth retrying. + pub fn is_retryable(&self) -> bool { + is_retryable_reqwest(&self.cause) + } +} + +/// Serde failure with service-level classification. +#[derive(Debug, ThisError)] +#[error("serde error: {context}")] +pub struct SerdeError { + kind: ErrorKind, + context: String, + #[source] + cause: serde_json::Error, +} + +impl SerdeError { + fn internal(context: impl Into, cause: serde_json::Error) -> Self { + Self { + kind: ErrorKind::Internal, + context: context.into(), + cause, + } + } + + fn client(context: impl Into, cause: serde_json::Error) -> Self { + Self { + kind: ErrorKind::BadRequest, + context: context.into(), + cause, + } + } + + /// Returns the service-level classification for this serde error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying serde error. + pub fn cause(&self) -> &serde_json::Error { + &self.cause + } +} + +/// Metadata failure with service-level classification. +#[derive(Debug, ThisError)] +#[error("metadata error: {context}")] +pub struct MetadataError { + kind: ErrorKind, + context: String, + #[source] + cause: objectstore_types::metadata::Error, +} + +impl MetadataError { + fn internal(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { + Self { + kind: ErrorKind::Internal, + context: context.into(), + cause, + } + } + + fn client(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { + Self { + kind: ErrorKind::BadRequest, + context: context.into(), + cause, + } + } + + /// Returns the service-level classification for this metadata error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying metadata error. + pub fn cause(&self) -> &objectstore_types::metadata::Error { + &self.cause + } +} + +/// Error type for service operations. +#[derive(Debug, ThisError, derive_error_kind::ErrorKind)] +#[error_kind(ErrorKind)] pub enum Error { /// IO errors related to payload streaming or file operations. #[error("i/o error: {0}")] - Io(#[from] std::io::Error), + #[error_kind(ErrorKind, Internal)] + Io(std::io::Error), /// Error originating from a client-supplied input stream. /// /// Indicates the client is at fault (e.g. dropped connection mid-upload) and should /// map to a 4xx response rather than a 5xx. #[error("error reading client stream: {0}")] + #[error_kind(ErrorKind, ClientStream)] Client(#[from] ClientError), - /// Errors related to de/serialization. - #[error("serde error: {context}")] - Serde { - /// Context describing what was being serialized/deserialized. - context: String, - /// The underlying serde error. - #[source] - cause: serde_json::Error, - }, + /// Serde errors. + #[error(transparent)] + #[error_kind(transparent)] + Serde(#[from] SerdeError), - /// All errors stemming from the reqwest client, used in multiple backends to send requests to - /// e.g. GCP APIs. - /// These can be network errors encountered when sending the requests, but can also indicate - /// errors returned by the API itself. - #[error("reqwest error: {context}")] - Reqwest { - /// Context describing the request that failed. - context: String, - /// The underlying reqwest error. - #[source] - cause: reqwest::Error, - }, + /// Reqwest errors from backend HTTP calls. + #[error(transparent)] + #[error_kind(transparent)] + Reqwest(#[from] ReqwestError), - /// Errors related to de/serialization and parsing of object metadata. - #[error("metadata error: {0}")] - Metadata(#[from] objectstore_types::metadata::Error), + /// Metadata errors. + #[error(transparent)] + #[error_kind(transparent)] + Metadata(#[from] MetadataError), /// Errors encountered when attempting to authenticate with GCP. #[error("GCP authentication error: {0}")] + #[error_kind(ErrorKind, Internal)] GcpAuth(#[from] gcp_auth::Error), /// A spawned service task panicked. #[error("service task failed: {0}")] + #[error_kind(ErrorKind, Internal)] Panic(String), /// A spawned service task was dropped before it could deliver its result. @@ -64,6 +197,7 @@ pub enum Error { /// This is an unexpected condition that can occur when the runtime drops the task for unknown /// reasons. #[error("task dropped")] + #[error_kind(ErrorKind, Internal)] Dropped, /// A redirect tombstone was encountered at a place where it is not supported. @@ -71,10 +205,12 @@ pub enum Error { /// This indicates a caller bug — tombstone-aware reads must go through the /// [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) methods. #[error("unexpected tombstone")] + #[error_kind(ErrorKind, Internal)] UnexpectedTombstone, /// The requested byte range is not satisfiable for the object's size. #[error("range not satisfiable (object size: {total} bytes)")] + #[error_kind(ErrorKind, BadRequest)] RangeNotSatisfiable { /// Total size of the object in bytes. total: u64, @@ -82,11 +218,13 @@ pub enum Error { /// The service has reached its concurrency limit and cannot accept more operations. #[error("concurrency limit reached")] + #[error_kind(ErrorKind, TooManyRequests)] AtCapacity, /// Any other error stemming from one of the storage backends, which might be specific to that /// backend or to a certain operation. #[error("storage backend error: {context}")] + #[error_kind(ErrorKind, Internal)] Generic { /// Context describing the operation that failed. context: String, @@ -97,10 +235,12 @@ pub enum Error { /// The functionality is not implemented by this instance of the service. #[error("not implemented")] + #[error_kind(ErrorKind, NotImplemented)] NotImplemented, /// Invalid upload ID (e.g. path traversal attempt). #[error(transparent)] + #[error_kind(ErrorKind, BadRequest)] InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), } @@ -117,20 +257,45 @@ impl Error { Self::Panic(msg) } - /// Creates an [`Error::Reqwest`] from a reqwest error with context. + /// Creates an internal [`Error`] from a reqwest error with context. pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { - Self::Reqwest { - context: context.into(), - cause, + if let Some(client_error) = stream::unpack_client_error(&cause) { + return Self::Client(client_error); } + + Self::Reqwest(ReqwestError::internal(context, cause)) + } + + /// Creates an [`Error`] from a reqwest error, preserving its backend status classification. + pub fn reqwest_transparent(context: impl Into, cause: reqwest::Error) -> Self { + if let Some(client_error) = stream::unpack_client_error(&cause) { + return Self::Client(client_error); + } + + Self::Reqwest(ReqwestError::transparent(context, cause)) } /// Creates an [`Error::Serde`] from a serde error with context. pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { - Self::Serde { - context: context.into(), - cause, - } + Self::Serde(SerdeError::internal(context, cause)) + } + + /// Creates a client-classified [`Error::Serde`] from a serde error with context. + pub fn serde_client(context: impl Into, cause: serde_json::Error) -> Self { + Self::Serde(SerdeError::client(context, cause)) + } + + /// Creates an [`Error::Metadata`] from a metadata error with context. + pub fn metadata(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { + Self::Metadata(MetadataError::internal(context, cause)) + } + + /// Creates a client-classified [`Error::Metadata`] from a metadata error with context. + pub fn metadata_client( + context: impl Into, + cause: objectstore_types::metadata::Error, + ) -> Self { + Self::Metadata(MetadataError::client(context, cause)) } /// Creates an [`Error::Generic`] with a context string and no cause. @@ -146,14 +311,35 @@ impl Error { match self { // Malformed client input at DEBUG level Self::Client(_) => Level::DEBUG, - Self::Metadata(_) => Level::DEBUG, Self::RangeNotSatisfiable { .. } => Level::DEBUG, // Like rate limits, we treat capacity errors as warnings Self::AtCapacity => Level::WARN, // All other errors are service or backend failures Self::Io(_) => Level::ERROR, - Self::Serde { .. } => Level::ERROR, - Self::Reqwest { .. } => Level::ERROR, + Self::Serde(error) => match error.kind() { + ErrorKind::BadRequest => Level::DEBUG, + ErrorKind::ClientStream + | ErrorKind::Transient + | ErrorKind::NotImplemented + | ErrorKind::TooManyRequests + | ErrorKind::Internal => Level::ERROR, + }, + Self::Reqwest(error) => match error.kind() { + ErrorKind::BadRequest => Level::DEBUG, + ErrorKind::TooManyRequests => Level::WARN, + ErrorKind::ClientStream + | ErrorKind::Transient + | ErrorKind::NotImplemented + | ErrorKind::Internal => Level::ERROR, + }, + Self::Metadata(error) => match error.kind() { + ErrorKind::BadRequest => Level::DEBUG, + ErrorKind::ClientStream + | ErrorKind::Transient + | ErrorKind::NotImplemented + | ErrorKind::TooManyRequests + | ErrorKind::Internal => Level::ERROR, + }, Self::GcpAuth(_) => Level::ERROR, Self::Panic(_) => Level::ERROR, Self::Dropped => Level::ERROR, @@ -165,5 +351,256 @@ impl Error { } } +fn classify_reqwest(cause: &reqwest::Error) -> ErrorKind { + if let Some(status) = cause.status() { + return match status { + StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, + StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, + status if status.is_client_error() => ErrorKind::BadRequest, + _ => ErrorKind::Internal, + }; + } + + if cause.is_timeout() || cause.is_connect() || cause.is_request() { + ErrorKind::Transient + } else { + ErrorKind::Internal + } +} + +fn is_retryable_reqwest(cause: &reqwest::Error) -> bool { + if let Some(status) = cause.status() { + return matches!( + status, + StatusCode::TOO_MANY_REQUESTS + | StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + ); + } + + cause.is_timeout() || cause.is_connect() || cause.is_request() +} + +impl From for Error { + fn from(cause: std::io::Error) -> Self { + match stream::unpack_client_error(&cause) { + Some(client_error) => Self::Client(client_error), + None => Self::Io(cause), + } + } +} + +impl From for Error { + fn from(cause: objectstore_types::metadata::Error) -> Self { + Self::metadata("metadata operation failed", cause) + } +} + /// Result type for service operations. pub type Result = std::result::Result; + +#[cfg(test)] +mod tests { + use std::io; + + use tokio::io::AsyncWriteExt; + + use super::*; + + fn serde_error() -> serde_json::Error { + serde_json::from_str::("{").unwrap_err() + } + + fn metadata_error() -> objectstore_types::metadata::Error { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + objectstore_types::metadata::HEADER_EXPIRATION, + "garbage".parse().unwrap(), + ); + objectstore_types::metadata::Metadata::from_headers(&headers, "").unwrap_err() + } + + async fn reqwest_status_error(status: StatusCode) -> reqwest::Error { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let response = format!( + "HTTP/1.1 {} test\r\nContent-Length: 0\r\nConnection: close\r\n\r\n", + status.as_u16() + ); + tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + socket.write_all(response.as_bytes()).await.unwrap(); + }); + + reqwest::get(format!("http://{addr}")) + .await + .unwrap() + .error_for_status() + .unwrap_err() + } + + async fn reqwest_decode_error() -> reqwest::Error { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + socket + .write_all( + b"HTTP/1.1 200 OK\r\nContent-Length: 8\r\nConnection: close\r\n\r\nnot-json", + ) + .await + .unwrap(); + }); + + reqwest::get(format!("http://{addr}")) + .await + .unwrap() + .json::() + .await + .unwrap_err() + } + + #[test] + fn direct_variant_kinds() { + assert_eq!( + Error::Client(ClientError::new(io::Error::other("client"))).kind(), + ErrorKind::ClientStream + ); + assert_eq!(Error::AtCapacity.kind(), ErrorKind::TooManyRequests); + assert_eq!(Error::NotImplemented.kind(), ErrorKind::NotImplemented); + assert_eq!( + Error::RangeNotSatisfiable { total: 10 }.kind(), + ErrorKind::BadRequest + ); + assert_eq!(Error::generic("backend").kind(), ErrorKind::Internal); + } + + #[test] + fn serde_helpers_have_distinct_kinds() { + assert_eq!( + Error::serde("stored json", serde_error()).kind(), + ErrorKind::Internal + ); + assert_eq!( + Error::serde_client("request json", serde_error()).kind(), + ErrorKind::BadRequest + ); + } + + #[test] + fn metadata_helpers_have_distinct_kinds() { + assert_eq!( + Error::metadata("stored metadata", metadata_error()).kind(), + ErrorKind::Internal + ); + assert_eq!( + Error::metadata_client("request metadata", metadata_error()).kind(), + ErrorKind::BadRequest + ); + } + + #[tokio::test] + async fn reqwest_helper_classifies_reqwest_failures_as_internal() { + assert_eq!( + Error::reqwest( + "bad request", + reqwest_status_error(StatusCode::BAD_REQUEST).await + ) + .kind(), + ErrorKind::Internal + ); + assert_eq!( + Error::reqwest( + "too many requests", + reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await + ) + .kind(), + ErrorKind::Internal + ); + assert_eq!( + Error::reqwest( + "internal server error", + reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await + ) + .kind(), + ErrorKind::Internal + ); + } + + #[tokio::test] + async fn reqwest_helper_preserves_retryability() { + let Error::Reqwest(error) = Error::reqwest( + "too many requests", + reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await, + ) else { + panic!("expected reqwest error"); + }; + assert!(error.is_retryable()); + + let Error::Reqwest(error) = Error::reqwest( + "internal server error", + reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await, + ) else { + panic!("expected reqwest error"); + }; + assert!(error.is_retryable()); + + let Error::Reqwest(error) = Error::reqwest( + "bad request", + reqwest_status_error(StatusCode::BAD_REQUEST).await, + ) else { + panic!("expected reqwest error"); + }; + assert!(!error.is_retryable()); + } + + #[tokio::test] + async fn reqwest_transparent_helper_classifies_statuses() { + assert_eq!( + Error::reqwest_transparent( + "bad request", + reqwest_status_error(StatusCode::BAD_REQUEST).await + ) + .kind(), + ErrorKind::BadRequest + ); + assert_eq!( + Error::reqwest_transparent( + "too many requests", + reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await + ) + .kind(), + ErrorKind::TooManyRequests + ); + assert_eq!( + Error::reqwest_transparent( + "internal server error", + reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await + ) + .kind(), + ErrorKind::Transient + ); + } + + #[tokio::test] + async fn reqwest_transparent_helper_classifies_no_status_decode_error_as_internal() { + assert_eq!( + Error::reqwest_transparent("decode", reqwest_decode_error().await).kind(), + ErrorKind::Internal + ); + } + + #[test] + fn io_helper_preserves_wrapped_client_error() { + let client_error = ClientError::new(io::Error::other("client disconnected")); + let error = Error::from(io::Error::other(client_error)); + assert_eq!(error.kind(), ErrorKind::ClientStream); + } +} From 6b1fcd8d8dbf0c047bde9c0a0091848fa03765f2 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:30:42 +0200 Subject: [PATCH 02/20] improve --- objectstore-server/src/extractors/body.rs | 7 +- objectstore-service/src/error.rs | 219 +++++++++++----------- objectstore-service/src/stream.rs | 18 +- 3 files changed, 124 insertions(+), 120 deletions(-) diff --git a/objectstore-server/src/extractors/body.rs b/objectstore-server/src/extractors/body.rs index b4d3d772..6411a1dc 100644 --- a/objectstore-server/src/extractors/body.rs +++ b/objectstore-server/src/extractors/body.rs @@ -5,7 +5,7 @@ use std::convert::Infallible; use axum::extract::{FromRequest, FromRequestParts, Path, Request}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_service::id::ObjectContext; -use objectstore_service::stream::{ClientError, ClientStream}; +use objectstore_service::stream::{ClientStream, ClientStreamError}; use super::id::ContextParams; use crate::state::ServiceState; @@ -37,7 +37,10 @@ impl FromRequest for MeteredBody { usecase: params.usecase, scopes: params.scopes, }; - let stream = body.into_data_stream().map_err(ClientError::new).boxed(); + let stream = body + .into_data_stream() + .map_err(ClientStreamError::new) + .boxed(); let stream = state.meter_stream(stream, &context).boxed(); Ok(Self(stream)) } diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 32ff2a05..35e2946a 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -3,6 +3,7 @@ //! [`Error`] covers I/O, serialization, HTTP, metadata, authentication, //! and backend-specific failures. [`Result`] is the corresponding alias. +// Needed as the `.kind()` methods generated by `derive(ErrorKind)` currently lack documentation. #![allow(missing_docs)] use std::any::Any; @@ -11,26 +12,120 @@ use objectstore_log::Level; use reqwest::StatusCode; use thiserror::Error as ThisError; -use crate::stream::{self, ClientError}; +use crate::stream::{self, ClientStreamError}; -/// Coarse categories for service errors. +/// The category of a service error. +/// +/// Users should rely on the kind for classification, rather than matching on specific error +/// variants. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ErrorKind { /// Error originating from a client-supplied request body stream. ClientStream, - /// Backend or network failure that may succeed if retried. + /// Transient failure that may succeed if retried. Transient, - /// Malformed or unsupported client input. + /// Malformed client input. BadRequest, /// Operation unsupported by this service instance. NotImplemented, /// Service or upstream capacity limit. TooManyRequests, - /// Internal service or backend failure. + /// Internal failure. Internal, } -/// Reqwest failure with service-level classification. +/// Error type for service operations. +#[derive(Debug, ThisError, derive_error_kind::ErrorKind)] +#[error_kind(ErrorKind)] +pub enum Error { + /// IO errors related to payload streaming or file operations. + #[error("i/o error: {0}")] + #[error_kind(ErrorKind, Internal)] + Io(std::io::Error), + + /// Error originating from a client-supplied input stream. + #[error("error reading client stream: {0}")] + #[error_kind(ErrorKind, ClientStream)] + ClientStream(#[from] ClientStreamError), + + /// Errors related to de/serialization. + #[error(transparent)] + #[error_kind(transparent)] + Serde(#[from] SerdeError), + + /// Reqwest errors from storage backend HTTP calls. + #[error(transparent)] + #[error_kind(transparent)] + Reqwest(#[from] ReqwestError), + + /// Errors related to de/serialization and parsing of object metadata. + #[error(transparent)] + #[error_kind(transparent)] + Metadata(#[from] MetadataError), + + /// Errors encountered when attempting to authenticate with GCP. + #[error("GCP authentication error: {0}")] + #[error_kind(ErrorKind, Internal)] + GcpAuth(#[from] gcp_auth::Error), + + /// A spawned service task panicked. + #[error("service task failed: {0}")] + #[error_kind(ErrorKind, Internal)] + Panic(String), + + /// A spawned service task was dropped before it could deliver its result. + /// + /// This is an unexpected condition that can occur when the runtime drops the task for unknown + /// reasons. + #[error("task dropped")] + #[error_kind(ErrorKind, Internal)] + Dropped, + + /// A redirect tombstone was encountered at a place where it is not supported. + /// + /// This indicates a caller bug — tombstone-aware reads must go through the + /// [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) methods. + #[error("unexpected tombstone")] + #[error_kind(ErrorKind, Internal)] + UnexpectedTombstone, + + /// The requested byte range is not satisfiable for the object's size. + #[error("range not satisfiable (object size: {total} bytes)")] + #[error_kind(ErrorKind, BadRequest)] + RangeNotSatisfiable { + /// Total size of the object in bytes. + total: u64, + }, + + /// The service has reached its concurrency limit and cannot accept more operations. + #[error("concurrency limit reached")] + #[error_kind(ErrorKind, TooManyRequests)] + AtCapacity, + + /// Any other error stemming from one of the storage backends, which might be specific to that + /// backend or to a certain operation. + #[error("storage backend error: {context}")] + #[error_kind(ErrorKind, Internal)] + Generic { + /// Context describing the operation that failed. + context: String, + /// The underlying error, if available. + #[source] + cause: Option>, + }, + + /// The functionality is not implemented by this instance of the service. + #[error("not implemented")] + #[error_kind(ErrorKind, NotImplemented)] + NotImplemented, + + /// Invalid upload ID for a multipart upload. + #[error(transparent)] + #[error_kind(ErrorKind, BadRequest)] + InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), +} + +/// Reqwest error with context and kind. #[derive(Debug, ThisError)] #[error("reqwest error: {context}")] pub struct ReqwestError { @@ -74,7 +169,7 @@ impl ReqwestError { } } -/// Serde failure with service-level classification. +/// Serde error with context and kind. #[derive(Debug, ThisError)] #[error("serde error: {context}")] pub struct SerdeError { @@ -101,7 +196,7 @@ impl SerdeError { } } - /// Returns the service-level classification for this serde error. + /// Returns the service-level category for this serde error. pub fn kind(&self) -> ErrorKind { self.kind } @@ -112,7 +207,7 @@ impl SerdeError { } } -/// Metadata failure with service-level classification. +/// Metadata error with context and kind. #[derive(Debug, ThisError)] #[error("metadata error: {context}")] pub struct MetadataError { @@ -150,100 +245,6 @@ impl MetadataError { } } -/// Error type for service operations. -#[derive(Debug, ThisError, derive_error_kind::ErrorKind)] -#[error_kind(ErrorKind)] -pub enum Error { - /// IO errors related to payload streaming or file operations. - #[error("i/o error: {0}")] - #[error_kind(ErrorKind, Internal)] - Io(std::io::Error), - - /// Error originating from a client-supplied input stream. - /// - /// Indicates the client is at fault (e.g. dropped connection mid-upload) and should - /// map to a 4xx response rather than a 5xx. - #[error("error reading client stream: {0}")] - #[error_kind(ErrorKind, ClientStream)] - Client(#[from] ClientError), - - /// Serde errors. - #[error(transparent)] - #[error_kind(transparent)] - Serde(#[from] SerdeError), - - /// Reqwest errors from backend HTTP calls. - #[error(transparent)] - #[error_kind(transparent)] - Reqwest(#[from] ReqwestError), - - /// Metadata errors. - #[error(transparent)] - #[error_kind(transparent)] - Metadata(#[from] MetadataError), - - /// Errors encountered when attempting to authenticate with GCP. - #[error("GCP authentication error: {0}")] - #[error_kind(ErrorKind, Internal)] - GcpAuth(#[from] gcp_auth::Error), - - /// A spawned service task panicked. - #[error("service task failed: {0}")] - #[error_kind(ErrorKind, Internal)] - Panic(String), - - /// A spawned service task was dropped before it could deliver its result. - /// - /// This is an unexpected condition that can occur when the runtime drops the task for unknown - /// reasons. - #[error("task dropped")] - #[error_kind(ErrorKind, Internal)] - Dropped, - - /// A redirect tombstone was encountered at a place where it is not supported. - /// - /// This indicates a caller bug — tombstone-aware reads must go through the - /// [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) methods. - #[error("unexpected tombstone")] - #[error_kind(ErrorKind, Internal)] - UnexpectedTombstone, - - /// The requested byte range is not satisfiable for the object's size. - #[error("range not satisfiable (object size: {total} bytes)")] - #[error_kind(ErrorKind, BadRequest)] - RangeNotSatisfiable { - /// Total size of the object in bytes. - total: u64, - }, - - /// The service has reached its concurrency limit and cannot accept more operations. - #[error("concurrency limit reached")] - #[error_kind(ErrorKind, TooManyRequests)] - AtCapacity, - - /// Any other error stemming from one of the storage backends, which might be specific to that - /// backend or to a certain operation. - #[error("storage backend error: {context}")] - #[error_kind(ErrorKind, Internal)] - Generic { - /// Context describing the operation that failed. - context: String, - /// The underlying error, if available. - #[source] - cause: Option>, - }, - - /// The functionality is not implemented by this instance of the service. - #[error("not implemented")] - #[error_kind(ErrorKind, NotImplemented)] - NotImplemented, - - /// Invalid upload ID (e.g. path traversal attempt). - #[error(transparent)] - #[error_kind(ErrorKind, BadRequest)] - InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), -} - impl Error { /// Creates an [`Error::Panic`] from a panic payload, extracting the message. pub fn panic(payload: Box) -> Self { @@ -260,7 +261,7 @@ impl Error { /// Creates an internal [`Error`] from a reqwest error with context. pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { - return Self::Client(client_error); + return Self::ClientStream(client_error); } Self::Reqwest(ReqwestError::internal(context, cause)) @@ -269,7 +270,7 @@ impl Error { /// Creates an [`Error`] from a reqwest error, preserving its backend status classification. pub fn reqwest_transparent(context: impl Into, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { - return Self::Client(client_error); + return Self::ClientStream(client_error); } Self::Reqwest(ReqwestError::transparent(context, cause)) @@ -310,7 +311,7 @@ impl Error { pub fn level(&self) -> Level { match self { // Malformed client input at DEBUG level - Self::Client(_) => Level::DEBUG, + Self::ClientStream(_) => Level::DEBUG, Self::RangeNotSatisfiable { .. } => Level::DEBUG, // Like rate limits, we treat capacity errors as warnings Self::AtCapacity => Level::WARN, @@ -391,7 +392,7 @@ fn is_retryable_reqwest(cause: &reqwest::Error) -> bool { impl From for Error { fn from(cause: std::io::Error) -> Self { match stream::unpack_client_error(&cause) { - Some(client_error) => Self::Client(client_error), + Some(client_error) => Self::ClientStream(client_error), None => Self::Io(cause), } } @@ -470,7 +471,7 @@ mod tests { #[test] fn direct_variant_kinds() { assert_eq!( - Error::Client(ClientError::new(io::Error::other("client"))).kind(), + Error::ClientStream(ClientStreamError::new(io::Error::other("client"))).kind(), ErrorKind::ClientStream ); assert_eq!(Error::AtCapacity.kind(), ErrorKind::TooManyRequests); @@ -599,7 +600,7 @@ mod tests { #[test] fn io_helper_preserves_wrapped_client_error() { - let client_error = ClientError::new(io::Error::other("client disconnected")); + let client_error = ClientStreamError::new(io::Error::other("client disconnected")); let error = Error::from(io::Error::other(client_error)); assert_eq!(error.kind(), ErrorKind::ClientStream); } diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index efaf9c43..77f87e67 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -30,9 +30,9 @@ pub type PayloadStream = BoxStream<'static, io::Result>; /// [`unpack_client_error`] to return a 4xx response rather than treating /// it as a 5xx backend failure. #[derive(Clone, Debug)] -pub struct ClientError(Arc); +pub struct ClientStreamError(Arc); -impl ClientError { +impl ClientStreamError { /// Creates a new [`ClientError`] wrapping `err`. pub fn new(err: E) -> Self where @@ -42,21 +42,21 @@ impl ClientError { } } -impl fmt::Display for ClientError { +impl fmt::Display for ClientStreamError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } -impl Error for ClientError { +impl Error for ClientStreamError { fn source(&self) -> Option<&(dyn Error + 'static)> { self.0.source() } } /// Required by [`tokio_util::io::StreamReader`] in the local filesystem backend. -impl From for io::Error { - fn from(err: ClientError) -> Self { +impl From for io::Error { + fn from(err: ClientStreamError) -> Self { io::Error::other(err) } } @@ -70,7 +70,7 @@ impl From for io::Error { /// than 500. /// /// Use [`single`] to construct a single-chunk `ClientStream` from an owned value. -pub type ClientStream = BoxStream<'static, Result>; +pub type ClientStream = BoxStream<'static, Result>; /// Walks the source chain of `err` looking for a [`ClientError`]. /// @@ -82,7 +82,7 @@ pub type ClientStream = BoxStream<'static, Result>; /// /// Use this in `put_object` implementations to reclassify body-stream errors /// as [`crate::error::Error::Client`] instead of an opaque server error. -pub fn unpack_client_error(err: &E) -> Option +pub fn unpack_client_error(err: &E) -> Option where E: Error + 'static, { @@ -96,7 +96,7 @@ where None => s, }; - if let Some(client_error) = target.downcast_ref::() { + if let Some(client_error) = target.downcast_ref::() { return Some(client_error.clone()); } From a5f7a74473671e910ac80a62ab6fe5b7e535020d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:33:03 +0200 Subject: [PATCH 03/20] ref(service): Inline error wrapper construction Remove private wrapper constructor helpers that only duplicated the public Error helpers. Keep classification policy in the public constructors so call sites have fewer names to reason about. Co-Authored-By: OpenAI Codex --- objectstore-service/src/error.rs | 86 ++++++++++++-------------------- 1 file changed, 31 insertions(+), 55 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 35e2946a..d9cfa760 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -136,23 +136,6 @@ pub struct ReqwestError { } impl ReqwestError { - fn transparent(context: impl Into, cause: reqwest::Error) -> Self { - let kind = classify_reqwest(&cause); - Self { - kind, - context: context.into(), - cause, - } - } - - fn internal(context: impl Into, cause: reqwest::Error) -> Self { - Self { - kind: ErrorKind::Internal, - context: context.into(), - cause, - } - } - /// Returns the service-level classification for this reqwest error. pub fn kind(&self) -> ErrorKind { self.kind @@ -180,22 +163,6 @@ pub struct SerdeError { } impl SerdeError { - fn internal(context: impl Into, cause: serde_json::Error) -> Self { - Self { - kind: ErrorKind::Internal, - context: context.into(), - cause, - } - } - - fn client(context: impl Into, cause: serde_json::Error) -> Self { - Self { - kind: ErrorKind::BadRequest, - context: context.into(), - cause, - } - } - /// Returns the service-level category for this serde error. pub fn kind(&self) -> ErrorKind { self.kind @@ -218,22 +185,6 @@ pub struct MetadataError { } impl MetadataError { - fn internal(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { - Self { - kind: ErrorKind::Internal, - context: context.into(), - cause, - } - } - - fn client(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { - Self { - kind: ErrorKind::BadRequest, - context: context.into(), - cause, - } - } - /// Returns the service-level classification for this metadata error. pub fn kind(&self) -> ErrorKind { self.kind @@ -264,7 +215,11 @@ impl Error { return Self::ClientStream(client_error); } - Self::Reqwest(ReqwestError::internal(context, cause)) + Self::Reqwest(ReqwestError { + kind: ErrorKind::Internal, + context: context.into(), + cause, + }) } /// Creates an [`Error`] from a reqwest error, preserving its backend status classification. @@ -273,22 +228,39 @@ impl Error { return Self::ClientStream(client_error); } - Self::Reqwest(ReqwestError::transparent(context, cause)) + let kind = classify_reqwest(&cause); + Self::Reqwest(ReqwestError { + kind, + context: context.into(), + cause, + }) } /// Creates an [`Error::Serde`] from a serde error with context. pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { - Self::Serde(SerdeError::internal(context, cause)) + Self::Serde(SerdeError { + kind: ErrorKind::Internal, + context: context.into(), + cause, + }) } /// Creates a client-classified [`Error::Serde`] from a serde error with context. pub fn serde_client(context: impl Into, cause: serde_json::Error) -> Self { - Self::Serde(SerdeError::client(context, cause)) + Self::Serde(SerdeError { + kind: ErrorKind::BadRequest, + context: context.into(), + cause, + }) } /// Creates an [`Error::Metadata`] from a metadata error with context. pub fn metadata(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { - Self::Metadata(MetadataError::internal(context, cause)) + Self::Metadata(MetadataError { + kind: ErrorKind::Internal, + context: context.into(), + cause, + }) } /// Creates a client-classified [`Error::Metadata`] from a metadata error with context. @@ -296,7 +268,11 @@ impl Error { context: impl Into, cause: objectstore_types::metadata::Error, ) -> Self { - Self::Metadata(MetadataError::client(context, cause)) + Self::Metadata(MetadataError { + kind: ErrorKind::BadRequest, + context: context.into(), + cause, + }) } /// Creates an [`Error::Generic`] with a context string and no cause. From 31388db15425dc107302de010045556bf72e2e23 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:42:11 +0200 Subject: [PATCH 04/20] improve --- objectstore-service/src/error.rs | 388 ++++++++----------------------- 1 file changed, 97 insertions(+), 291 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index d9cfa760..313d9cbf 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -125,77 +125,6 @@ pub enum Error { InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), } -/// Reqwest error with context and kind. -#[derive(Debug, ThisError)] -#[error("reqwest error: {context}")] -pub struct ReqwestError { - kind: ErrorKind, - context: String, - #[source] - cause: reqwest::Error, -} - -impl ReqwestError { - /// Returns the service-level classification for this reqwest error. - pub fn kind(&self) -> ErrorKind { - self.kind - } - - /// Returns the underlying reqwest error. - pub fn cause(&self) -> &reqwest::Error { - &self.cause - } - - /// Returns whether the underlying reqwest failure is worth retrying. - pub fn is_retryable(&self) -> bool { - is_retryable_reqwest(&self.cause) - } -} - -/// Serde error with context and kind. -#[derive(Debug, ThisError)] -#[error("serde error: {context}")] -pub struct SerdeError { - kind: ErrorKind, - context: String, - #[source] - cause: serde_json::Error, -} - -impl SerdeError { - /// Returns the service-level category for this serde error. - pub fn kind(&self) -> ErrorKind { - self.kind - } - - /// Returns the underlying serde error. - pub fn cause(&self) -> &serde_json::Error { - &self.cause - } -} - -/// Metadata error with context and kind. -#[derive(Debug, ThisError)] -#[error("metadata error: {context}")] -pub struct MetadataError { - kind: ErrorKind, - context: String, - #[source] - cause: objectstore_types::metadata::Error, -} - -impl MetadataError { - /// Returns the service-level classification for this metadata error. - pub fn kind(&self) -> ErrorKind { - self.kind - } - - /// Returns the underlying metadata error. - pub fn cause(&self) -> &objectstore_types::metadata::Error { - &self.cause - } -} - impl Error { /// Creates an [`Error::Panic`] from a panic payload, extracting the message. pub fn panic(payload: Box) -> Self { @@ -209,7 +138,8 @@ impl Error { Self::Panic(msg) } - /// Creates an internal [`Error`] from a reqwest error with context. + /// Creates an [`Error`] from a reqwest error with context, classifying it as an internal + /// error. pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { return Self::ClientStream(client_error); @@ -222,7 +152,7 @@ impl Error { }) } - /// Creates an [`Error`] from a reqwest error, preserving its backend status classification. + /// Creates an [`Error`] from a reqwest error, classifying it according to its status code. pub fn reqwest_transparent(context: impl Into, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { return Self::ClientStream(client_error); @@ -236,7 +166,8 @@ impl Error { }) } - /// Creates an [`Error::Serde`] from a serde error with context. + /// Creates an [`Error::Serde`] from a serde error with context, classifying it as an internal + /// error. pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { kind: ErrorKind::Internal, @@ -245,7 +176,8 @@ impl Error { }) } - /// Creates a client-classified [`Error::Serde`] from a serde error with context. + /// Creates an [`Error::Serde`] from a serde error with context, classifying it as a client + /// error. pub fn serde_client(context: impl Into, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { kind: ErrorKind::BadRequest, @@ -254,7 +186,8 @@ impl Error { }) } - /// Creates an [`Error::Metadata`] from a metadata error with context. + /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as an + /// internal error. pub fn metadata(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { Self::Metadata(MetadataError { kind: ErrorKind::Internal, @@ -263,7 +196,8 @@ impl Error { }) } - /// Creates a client-classified [`Error::Metadata`] from a metadata error with context. + /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as a + /// client error. pub fn metadata_client( context: impl Into, cause: objectstore_types::metadata::Error, @@ -328,43 +262,6 @@ impl Error { } } -fn classify_reqwest(cause: &reqwest::Error) -> ErrorKind { - if let Some(status) = cause.status() { - return match status { - StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, - StatusCode::REQUEST_TIMEOUT - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, - status if status.is_client_error() => ErrorKind::BadRequest, - _ => ErrorKind::Internal, - }; - } - - if cause.is_timeout() || cause.is_connect() || cause.is_request() { - ErrorKind::Transient - } else { - ErrorKind::Internal - } -} - -fn is_retryable_reqwest(cause: &reqwest::Error) -> bool { - if let Some(status) = cause.status() { - return matches!( - status, - StatusCode::TOO_MANY_REQUESTS - | StatusCode::REQUEST_TIMEOUT - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT - ); - } - - cause.is_timeout() || cause.is_connect() || cause.is_request() -} - impl From for Error { fn from(cause: std::io::Error) -> Self { match stream::unpack_client_error(&cause) { @@ -376,208 +273,117 @@ impl From for Error { impl From for Error { fn from(cause: objectstore_types::metadata::Error) -> Self { - Self::metadata("metadata operation failed", cause) + Self::metadata("metadata de/serialization failed", cause) } } -/// Result type for service operations. -pub type Result = std::result::Result; - -#[cfg(test)] -mod tests { - use std::io; - - use tokio::io::AsyncWriteExt; - - use super::*; +/// Reqwest error with context and kind. +#[derive(Debug, ThisError)] +#[error("reqwest error: {context}")] +pub struct ReqwestError { + kind: ErrorKind, + context: String, + #[source] + cause: reqwest::Error, +} - fn serde_error() -> serde_json::Error { - serde_json::from_str::("{").unwrap_err() +impl ReqwestError { + /// Returns the service-level classification for this reqwest error. + pub fn kind(&self) -> ErrorKind { + self.kind } - fn metadata_error() -> objectstore_types::metadata::Error { - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - objectstore_types::metadata::HEADER_EXPIRATION, - "garbage".parse().unwrap(), - ); - objectstore_types::metadata::Metadata::from_headers(&headers, "").unwrap_err() + /// Returns the underlying reqwest error. + pub fn cause(&self) -> &reqwest::Error { + &self.cause } - async fn reqwest_status_error(status: StatusCode) -> reqwest::Error { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let response = format!( - "HTTP/1.1 {} test\r\nContent-Length: 0\r\nConnection: close\r\n\r\n", - status.as_u16() - ); - tokio::spawn(async move { - let (mut socket, _) = listener.accept().await.unwrap(); - socket.write_all(response.as_bytes()).await.unwrap(); - }); - - reqwest::get(format!("http://{addr}")) - .await - .unwrap() - .error_for_status() - .unwrap_err() + /// Returns whether the underlying reqwest failure is worth retrying. + pub fn is_retryable(&self) -> bool { + is_retryable_reqwest(&self.cause) } +} - async fn reqwest_decode_error() -> reqwest::Error { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - tokio::spawn(async move { - let (mut socket, _) = listener.accept().await.unwrap(); - socket - .write_all( - b"HTTP/1.1 200 OK\r\nContent-Length: 8\r\nConnection: close\r\n\r\nnot-json", - ) - .await - .unwrap(); - }); - - reqwest::get(format!("http://{addr}")) - .await - .unwrap() - .json::() - .await - .unwrap_err() - } +/// Serde error with context and kind. +#[derive(Debug, ThisError)] +#[error("serde error: {context}")] +pub struct SerdeError { + kind: ErrorKind, + context: String, + #[source] + cause: serde_json::Error, +} - #[test] - fn direct_variant_kinds() { - assert_eq!( - Error::ClientStream(ClientStreamError::new(io::Error::other("client"))).kind(), - ErrorKind::ClientStream - ); - assert_eq!(Error::AtCapacity.kind(), ErrorKind::TooManyRequests); - assert_eq!(Error::NotImplemented.kind(), ErrorKind::NotImplemented); - assert_eq!( - Error::RangeNotSatisfiable { total: 10 }.kind(), - ErrorKind::BadRequest - ); - assert_eq!(Error::generic("backend").kind(), ErrorKind::Internal); +impl SerdeError { + /// Returns the service-level category for this serde error. + pub fn kind(&self) -> ErrorKind { + self.kind } - #[test] - fn serde_helpers_have_distinct_kinds() { - assert_eq!( - Error::serde("stored json", serde_error()).kind(), - ErrorKind::Internal - ); - assert_eq!( - Error::serde_client("request json", serde_error()).kind(), - ErrorKind::BadRequest - ); + /// Returns the underlying serde error. + pub fn cause(&self) -> &serde_json::Error { + &self.cause } +} - #[test] - fn metadata_helpers_have_distinct_kinds() { - assert_eq!( - Error::metadata("stored metadata", metadata_error()).kind(), - ErrorKind::Internal - ); - assert_eq!( - Error::metadata_client("request metadata", metadata_error()).kind(), - ErrorKind::BadRequest - ); - } +/// Metadata error with context and kind. +#[derive(Debug, ThisError)] +#[error("metadata error: {context}")] +pub struct MetadataError { + kind: ErrorKind, + context: String, + #[source] + cause: objectstore_types::metadata::Error, +} - #[tokio::test] - async fn reqwest_helper_classifies_reqwest_failures_as_internal() { - assert_eq!( - Error::reqwest( - "bad request", - reqwest_status_error(StatusCode::BAD_REQUEST).await - ) - .kind(), - ErrorKind::Internal - ); - assert_eq!( - Error::reqwest( - "too many requests", - reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await - ) - .kind(), - ErrorKind::Internal - ); - assert_eq!( - Error::reqwest( - "internal server error", - reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await - ) - .kind(), - ErrorKind::Internal - ); +impl MetadataError { + /// Returns the service-level classification for this metadata error. + pub fn kind(&self) -> ErrorKind { + self.kind } - #[tokio::test] - async fn reqwest_helper_preserves_retryability() { - let Error::Reqwest(error) = Error::reqwest( - "too many requests", - reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await, - ) else { - panic!("expected reqwest error"); - }; - assert!(error.is_retryable()); - - let Error::Reqwest(error) = Error::reqwest( - "internal server error", - reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await, - ) else { - panic!("expected reqwest error"); - }; - assert!(error.is_retryable()); + /// Returns the underlying metadata error. + pub fn cause(&self) -> &objectstore_types::metadata::Error { + &self.cause + } +} - let Error::Reqwest(error) = Error::reqwest( - "bad request", - reqwest_status_error(StatusCode::BAD_REQUEST).await, - ) else { - panic!("expected reqwest error"); +fn classify_reqwest(cause: &reqwest::Error) -> ErrorKind { + if let Some(status) = cause.status() { + return match status { + StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, + StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, + status if status.is_client_error() => ErrorKind::BadRequest, + _ => ErrorKind::Internal, }; - assert!(!error.is_retryable()); } - #[tokio::test] - async fn reqwest_transparent_helper_classifies_statuses() { - assert_eq!( - Error::reqwest_transparent( - "bad request", - reqwest_status_error(StatusCode::BAD_REQUEST).await - ) - .kind(), - ErrorKind::BadRequest - ); - assert_eq!( - Error::reqwest_transparent( - "too many requests", - reqwest_status_error(StatusCode::TOO_MANY_REQUESTS).await - ) - .kind(), - ErrorKind::TooManyRequests - ); - assert_eq!( - Error::reqwest_transparent( - "internal server error", - reqwest_status_error(StatusCode::INTERNAL_SERVER_ERROR).await - ) - .kind(), - ErrorKind::Transient - ); + if cause.is_timeout() || cause.is_connect() || cause.is_request() { + ErrorKind::Transient + } else { + ErrorKind::Internal } +} - #[tokio::test] - async fn reqwest_transparent_helper_classifies_no_status_decode_error_as_internal() { - assert_eq!( - Error::reqwest_transparent("decode", reqwest_decode_error().await).kind(), - ErrorKind::Internal +fn is_retryable_reqwest(cause: &reqwest::Error) -> bool { + if let Some(status) = cause.status() { + return matches!( + status, + StatusCode::TOO_MANY_REQUESTS + | StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT ); } - #[test] - fn io_helper_preserves_wrapped_client_error() { - let client_error = ClientStreamError::new(io::Error::other("client disconnected")); - let error = Error::from(io::Error::other(client_error)); - assert_eq!(error.kind(), ErrorKind::ClientStream); - } + cause.is_timeout() || cause.is_connect() || cause.is_request() } + +/// Result type for service operations. +pub type Result = std::result::Result; From e70b06fc02a8958469d53cbe844ea0cb85792be7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:47:57 +0200 Subject: [PATCH 05/20] improve --- objectstore-service/src/error.rs | 43 +++----------------------------- 1 file changed, 4 insertions(+), 39 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 313d9cbf..9eb2875a 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -219,45 +219,10 @@ impl Error { /// Returns the appropriate log level for this error. pub fn level(&self) -> Level { - match self { - // Malformed client input at DEBUG level - Self::ClientStream(_) => Level::DEBUG, - Self::RangeNotSatisfiable { .. } => Level::DEBUG, - // Like rate limits, we treat capacity errors as warnings - Self::AtCapacity => Level::WARN, - // All other errors are service or backend failures - Self::Io(_) => Level::ERROR, - Self::Serde(error) => match error.kind() { - ErrorKind::BadRequest => Level::DEBUG, - ErrorKind::ClientStream - | ErrorKind::Transient - | ErrorKind::NotImplemented - | ErrorKind::TooManyRequests - | ErrorKind::Internal => Level::ERROR, - }, - Self::Reqwest(error) => match error.kind() { - ErrorKind::BadRequest => Level::DEBUG, - ErrorKind::TooManyRequests => Level::WARN, - ErrorKind::ClientStream - | ErrorKind::Transient - | ErrorKind::NotImplemented - | ErrorKind::Internal => Level::ERROR, - }, - Self::Metadata(error) => match error.kind() { - ErrorKind::BadRequest => Level::DEBUG, - ErrorKind::ClientStream - | ErrorKind::Transient - | ErrorKind::NotImplemented - | ErrorKind::TooManyRequests - | ErrorKind::Internal => Level::ERROR, - }, - Self::GcpAuth(_) => Level::ERROR, - Self::Panic(_) => Level::ERROR, - Self::Dropped => Level::ERROR, - Self::UnexpectedTombstone => Level::ERROR, - Self::NotImplemented => Level::ERROR, - Self::InvalidUploadId(_) => Level::DEBUG, - Self::Generic { .. } => Level::ERROR, + match self.kind() { + ErrorKind::ClientStream | ErrorKind::BadRequest => Level::DEBUG, + ErrorKind::Transient | ErrorKind::TooManyRequests => Level::WARN, + ErrorKind::NotImplemented | ErrorKind::Internal => Level::ERROR, } } } From d75662dbaeec213091840d0c25bf0f2145b4ef64 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:56:36 +0200 Subject: [PATCH 06/20] ref(service): Require explicit metadata error mapping Remove the service Error conversion from objectstore metadata errors. Map metadata parsing and serialization failures at each backend call site so the context and classification stay visible. Co-Authored-By: OpenAI Codex --- objectstore-service/src/backend/gcs.rs | 19 +++++++++++++++++-- .../src/backend/s3_compatible.rs | 15 +++++++++++++-- objectstore-service/src/error.rs | 6 ------ 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 5e50f24b..57c54ca9 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -185,13 +185,28 @@ impl GcsObject { .metadata .remove(&GcsMetaKey::Expiration) .map(|s| s.parse()) - .transpose()? + .transpose() + .map_err(|cause| { + Error::metadata( + "GCS: failed to parse expiration policy from object metadata", + cause, + ) + })? .unwrap_or_default(); let origin = self.metadata.remove(&GcsMetaKey::Origin); let content_type = self.content_type; - let compression = self.content_encoding.map(|s| s.parse()).transpose()?; + let compression = self + .content_encoding + .map(|s| s.parse()) + .transpose() + .map_err(|cause| { + Error::metadata( + "GCS: failed to parse compression from object metadata", + cause, + ) + })?; let size = self .size .map(|size| size.parse()) diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 6c8d761b..62ed7b82 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -257,7 +257,14 @@ where format!("/{}/{}", self.bucket, id.as_storage_path()), ) .header("x-goog-metadata-directive", "REPLACE") - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) + .headers( + metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX).map_err(|cause| { + Error::metadata( + "S3: failed to serialize object metadata for TTI update", + cause, + ) + })?, + ) .send() .await .map_err(|cause| Error::reqwest("S3: failed to send TTI update request", cause))? @@ -311,7 +318,11 @@ impl Backend for S3CompatibleBackend { objectstore_log::debug!("Writing to s3_compatible backend"); self.request(Method::PUT, self.object_url(id)) .await? - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) + .headers( + metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX).map_err(|cause| { + Error::metadata("S3: failed to serialize object metadata for upload", cause) + })?, + ) .body(Body::wrap_stream(stream)) .send() .await diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 9eb2875a..4da7a3c7 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -236,12 +236,6 @@ impl From for Error { } } -impl From for Error { - fn from(cause: objectstore_types::metadata::Error) -> Self { - Self::metadata("metadata de/serialization failed", cause) - } -} - /// Reqwest error with context and kind. #[derive(Debug, ThisError)] #[error("reqwest error: {context}")] From 2528810d61c26e65dee6367d7495df8b266f0d59 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:00:23 +0200 Subject: [PATCH 07/20] docs(service): Fix client stream error links Update stream documentation links after the client stream error rename. This keeps rustdoc clean under CI's denied warnings. Co-Authored-By: OpenAI Codex --- objectstore-service/src/stream.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index 77f87e67..16be5860 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -4,7 +4,7 @@ //! cover the two directions of data flow: //! //! - [`ClientStream`] — incoming data from a client PUT request body. Uses -//! [`ClientError`] as the error type so backends can distinguish a broken +//! [`ClientStreamError`] as the error type so backends can distinguish a broken //! client connection from a backend I/O failure (400 vs 500). //! - [`PayloadStream`] — outgoing data returned from //! [`Backend::get_object`](crate::backend::common::Backend::get_object). @@ -33,7 +33,7 @@ pub type PayloadStream = BoxStream<'static, io::Result>; pub struct ClientStreamError(Arc); impl ClientStreamError { - /// Creates a new [`ClientError`] wrapping `err`. + /// Creates a new [`ClientStreamError`] wrapping `err`. pub fn new(err: E) -> Self where E: Error + Send + Sync + 'static, @@ -63,25 +63,25 @@ impl From for io::Error { /// Incoming byte stream from a client PUT request body. /// -/// Uses [`ClientError`] as the error type so that a dropped or interrupted +/// Uses [`ClientStreamError`] as the error type so that a dropped or interrupted /// client connection is distinguishable from a backend I/O failure. Backends -/// that detect a [`ClientError`] (via [`unpack_client_error`]) can surface it -/// as [`crate::error::Error::Client`], which the server maps to HTTP 400 rather -/// than 500. +/// that detect a [`ClientStreamError`] (via [`unpack_client_error`]) can surface +/// it as [`crate::error::Error::ClientStream`], which the server maps to HTTP +/// 400 rather than 500. /// /// Use [`single`] to construct a single-chunk `ClientStream` from an owned value. pub type ClientStream = BoxStream<'static, Result>; -/// Walks the source chain of `err` looking for a [`ClientError`]. +/// Walks the source chain of `err` looking for a [`ClientStreamError`]. /// /// At each step, two locations are checked: /// -/// - **Direct**: the error itself is a `ClientError`. +/// - **Direct**: the error itself is a `ClientStreamError`. /// - **Packed in `io::Error`**: the error is an `io::Error` whose custom inner -/// value is a `ClientError`. +/// value is a `ClientStreamError`. /// /// Use this in `put_object` implementations to reclassify body-stream errors -/// as [`crate::error::Error::Client`] instead of an opaque server error. +/// as [`crate::error::Error::ClientStream`] instead of an opaque server error. pub fn unpack_client_error(err: &E) -> Option where E: Error + 'static, From 1a2d51d7aabdc04e14ae17bad4005490eea1cdb5 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:07:34 +0200 Subject: [PATCH 08/20] improve --- objectstore-server/docs/architecture.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/objectstore-server/docs/architecture.md b/objectstore-server/docs/architecture.md index 24f82434..08bb7496 100644 --- a/objectstore-server/docs/architecture.md +++ b/objectstore-server/docs/architecture.md @@ -76,20 +76,6 @@ A request flows through several layers before reaching the storage service: [`objectstore-types` docs](objectstore_types) for the header mapping) and the payload is streamed back. -### Service Error Mapping - -Endpoint handlers map [`objectstore_service::error::Error`] to HTTP status -codes via its coarse error kind: - -- Client stream and bad request errors return HTTP 400. -- Service or upstream capacity errors return HTTP 429. -- Transient service errors return HTTP 503. -- Unsupported operations return HTTP 501. -- Internal errors return HTTP 500. - -Unsatisfiable byte ranges are a special case: `GET` returns HTTP 416 and -preserves the required `Content-Range: bytes */{total}` header. - ## Authentication & Authorization Objectstore uses **JWT tokens with EdDSA signatures** (Ed25519) for From 8b94e62ef6470b7290d4aed0f5937ec6e26e8f60 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:21:47 +0200 Subject: [PATCH 09/20] ref(service): Keep GCS retry policy local Move reqwest retryability out of the shared error wrapper and back next to the GCS retry loop. Let ReqwestError handle only construction and service-level classification. Co-Authored-By: OpenAI Codex --- objectstore-service/src/backend/gcs.rs | 23 ++++++- objectstore-service/src/error.rs | 88 ++++++++++---------------- 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 57c54ca9..249f9f20 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -378,7 +378,28 @@ fn insert_gcs_meta_header( /// Returns `true` if the error is a transient reqwest failure worth retrying. fn is_retryable(error: &Error) -> bool { - matches!(error, Error::Reqwest(error) if error.is_retryable()) + let Error::Reqwest(error) = error else { + return false; + }; + let cause = error.cause(); + + if cause.is_timeout() || cause.is_connect() || cause.is_request() { + return true; + } + + let Some(status) = cause.status() else { + return false; + }; + + matches!( + status, + StatusCode::REQUEST_TIMEOUT + | StatusCode::TOO_MANY_REQUESTS + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + ) } /// GCS JSON API backend for long-term storage of large objects. diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 4da7a3c7..5fae5c7b 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -145,11 +145,7 @@ impl Error { return Self::ClientStream(client_error); } - Self::Reqwest(ReqwestError { - kind: ErrorKind::Internal, - context: context.into(), - cause, - }) + Self::Reqwest(ReqwestError::internal(context, cause)) } /// Creates an [`Error`] from a reqwest error, classifying it according to its status code. @@ -158,12 +154,7 @@ impl Error { return Self::ClientStream(client_error); } - let kind = classify_reqwest(&cause); - Self::Reqwest(ReqwestError { - kind, - context: context.into(), - cause, - }) + Self::Reqwest(ReqwestError::transparent(context, cause)) } /// Creates an [`Error::Serde`] from a serde error with context, classifying it as an internal @@ -247,6 +238,39 @@ pub struct ReqwestError { } impl ReqwestError { + fn internal(context: impl Into, cause: reqwest::Error) -> Self { + Self { + kind: ErrorKind::Internal, + context: context.into(), + cause, + } + } + + fn transparent(context: impl Into, cause: reqwest::Error) -> Self { + let kind = if let Some(status) = cause.status() { + match status { + StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, + StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, + status if status.is_client_error() => ErrorKind::BadRequest, + _ => ErrorKind::Internal, + } + } else if cause.is_timeout() || cause.is_connect() || cause.is_request() { + ErrorKind::Transient + } else { + ErrorKind::Internal + }; + + Self { + kind, + context: context.into(), + cause, + } + } + /// Returns the service-level classification for this reqwest error. pub fn kind(&self) -> ErrorKind { self.kind @@ -256,11 +280,6 @@ impl ReqwestError { pub fn cause(&self) -> &reqwest::Error { &self.cause } - - /// Returns whether the underlying reqwest failure is worth retrying. - pub fn is_retryable(&self) -> bool { - is_retryable_reqwest(&self.cause) - } } /// Serde error with context and kind. @@ -307,42 +326,5 @@ impl MetadataError { } } -fn classify_reqwest(cause: &reqwest::Error) -> ErrorKind { - if let Some(status) = cause.status() { - return match status { - StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, - StatusCode::REQUEST_TIMEOUT - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, - status if status.is_client_error() => ErrorKind::BadRequest, - _ => ErrorKind::Internal, - }; - } - - if cause.is_timeout() || cause.is_connect() || cause.is_request() { - ErrorKind::Transient - } else { - ErrorKind::Internal - } -} - -fn is_retryable_reqwest(cause: &reqwest::Error) -> bool { - if let Some(status) = cause.status() { - return matches!( - status, - StatusCode::TOO_MANY_REQUESTS - | StatusCode::REQUEST_TIMEOUT - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT - ); - } - - cause.is_timeout() || cause.is_connect() || cause.is_request() -} - /// Result type for service operations. pub type Result = std::result::Result; From 1b6d58f7c08f58bac43390738892ac0c7597f9be Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:22:29 +0200 Subject: [PATCH 10/20] improve --- objectstore-server/src/endpoints/common.rs | 28 ---------------------- 1 file changed, 28 deletions(-) diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 2d9e49f1..c6f3c928 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -130,31 +130,3 @@ pub fn insert_accept_ranges(response: &mut Response) { HeaderValue::from_static("bytes"), ); } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn service_capacity_errors_return_429() { - assert_eq!( - ApiError::Service(ServiceError::AtCapacity).status(), - StatusCode::TOO_MANY_REQUESTS - ); - } - - #[tokio::test] - async fn transient_service_errors_return_503() { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - drop(listener); - - let cause = reqwest::get(format!("http://{addr}")).await.unwrap_err(); - let error = ServiceError::reqwest_transparent("backend request", cause); - - assert_eq!( - ApiError::Service(error).status(), - StatusCode::SERVICE_UNAVAILABLE - ); - } -} From 646edff508c6baa680e16853e45a89bd8e367da4 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:31:47 +0200 Subject: [PATCH 11/20] improve --- objectstore-service/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 5fae5c7b..6d8e9812 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -16,8 +16,8 @@ use crate::stream::{self, ClientStreamError}; /// The category of a service error. /// -/// Users should rely on the kind for classification, rather than matching on specific error -/// variants. +/// Users should rely on the kind for classification and handling, rather than matching on +/// specific variants. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ErrorKind { /// Error originating from a client-supplied request body stream. From d0b8a0cc040be8b3d3d3fa98ca46a0e09739daaa Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:35:27 +0200 Subject: [PATCH 12/20] cow --- objectstore-service/src/error.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 6d8e9812..283c4cd6 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -7,6 +7,7 @@ #![allow(missing_docs)] use std::any::Any; +use std::borrow::Cow; use objectstore_log::Level; use reqwest::StatusCode; @@ -140,7 +141,7 @@ impl Error { /// Creates an [`Error`] from a reqwest error with context, classifying it as an internal /// error. - pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { + pub fn reqwest(context: impl Into>, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { return Self::ClientStream(client_error); } @@ -149,7 +150,10 @@ impl Error { } /// Creates an [`Error`] from a reqwest error, classifying it according to its status code. - pub fn reqwest_transparent(context: impl Into, cause: reqwest::Error) -> Self { + pub fn reqwest_transparent( + context: impl Into>, + cause: reqwest::Error, + ) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { return Self::ClientStream(client_error); } @@ -159,7 +163,7 @@ impl Error { /// Creates an [`Error::Serde`] from a serde error with context, classifying it as an internal /// error. - pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { + pub fn serde(context: impl Into>, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { kind: ErrorKind::Internal, context: context.into(), @@ -169,7 +173,7 @@ impl Error { /// Creates an [`Error::Serde`] from a serde error with context, classifying it as a client /// error. - pub fn serde_client(context: impl Into, cause: serde_json::Error) -> Self { + pub fn serde_client(context: impl Into>, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { kind: ErrorKind::BadRequest, context: context.into(), @@ -179,7 +183,10 @@ impl Error { /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as an /// internal error. - pub fn metadata(context: impl Into, cause: objectstore_types::metadata::Error) -> Self { + pub fn metadata( + context: impl Into>, + cause: objectstore_types::metadata::Error, + ) -> Self { Self::Metadata(MetadataError { kind: ErrorKind::Internal, context: context.into(), @@ -190,7 +197,7 @@ impl Error { /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as a /// client error. pub fn metadata_client( - context: impl Into, + context: impl Into>, cause: objectstore_types::metadata::Error, ) -> Self { Self::Metadata(MetadataError { @@ -232,13 +239,13 @@ impl From for Error { #[error("reqwest error: {context}")] pub struct ReqwestError { kind: ErrorKind, - context: String, + context: Cow<'static, str>, #[source] cause: reqwest::Error, } impl ReqwestError { - fn internal(context: impl Into, cause: reqwest::Error) -> Self { + fn internal(context: impl Into>, cause: reqwest::Error) -> Self { Self { kind: ErrorKind::Internal, context: context.into(), @@ -246,7 +253,7 @@ impl ReqwestError { } } - fn transparent(context: impl Into, cause: reqwest::Error) -> Self { + fn transparent(context: impl Into>, cause: reqwest::Error) -> Self { let kind = if let Some(status) = cause.status() { match status { StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, @@ -287,7 +294,7 @@ impl ReqwestError { #[error("serde error: {context}")] pub struct SerdeError { kind: ErrorKind, - context: String, + context: Cow<'static, str>, #[source] cause: serde_json::Error, } @@ -309,7 +316,7 @@ impl SerdeError { #[error("metadata error: {context}")] pub struct MetadataError { kind: ErrorKind, - context: String, + context: Cow<'static, str>, #[source] cause: objectstore_types::metadata::Error, } From 31fa0ca7c911c21f319da76dfced6cfaa97af210 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:37:36 +0200 Subject: [PATCH 13/20] improve --- objectstore-service/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 283c4cd6..09c2f92d 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -278,7 +278,7 @@ impl ReqwestError { } } - /// Returns the service-level classification for this reqwest error. + /// Returns the service-level category for this reqwest error. pub fn kind(&self) -> ErrorKind { self.kind } @@ -322,7 +322,7 @@ pub struct MetadataError { } impl MetadataError { - /// Returns the service-level classification for this metadata error. + /// Returns the service-level category for this metadata error. pub fn kind(&self) -> ErrorKind { self.kind } From f585e7e0a0d0db849272f5ce352d44b798e70a51 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:38:13 +0200 Subject: [PATCH 14/20] improve --- objectstore-service/src/error.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 09c2f92d..f7a83d30 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -139,7 +139,7 @@ impl Error { Self::Panic(msg) } - /// Creates an [`Error`] from a reqwest error with context, classifying it as an internal + /// Creates an [`Error`] from a reqwest error with context, categorizing it as an internal /// error. pub fn reqwest(context: impl Into>, cause: reqwest::Error) -> Self { if let Some(client_error) = stream::unpack_client_error(&cause) { @@ -149,7 +149,7 @@ impl Error { Self::Reqwest(ReqwestError::internal(context, cause)) } - /// Creates an [`Error`] from a reqwest error, classifying it according to its status code. + /// Creates an [`Error`] from a reqwest error, categorizing it according to its status code. pub fn reqwest_transparent( context: impl Into>, cause: reqwest::Error, @@ -161,7 +161,7 @@ impl Error { Self::Reqwest(ReqwestError::transparent(context, cause)) } - /// Creates an [`Error::Serde`] from a serde error with context, classifying it as an internal + /// Creates an [`Error::Serde`] from a serde error with context, categorizing it as an internal /// error. pub fn serde(context: impl Into>, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { @@ -171,7 +171,7 @@ impl Error { }) } - /// Creates an [`Error::Serde`] from a serde error with context, classifying it as a client + /// Creates an [`Error::Serde`] from a serde error with context, categorizing it as a client /// error. pub fn serde_client(context: impl Into>, cause: serde_json::Error) -> Self { Self::Serde(SerdeError { @@ -181,7 +181,7 @@ impl Error { }) } - /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as an + /// Creates an [`Error::Metadata`] from a metadata error with context, categorizing it as an /// internal error. pub fn metadata( context: impl Into>, @@ -194,7 +194,7 @@ impl Error { }) } - /// Creates an [`Error::Metadata`] from a metadata error with context, classifying it as a + /// Creates an [`Error::Metadata`] from a metadata error with context, categorizing it as a /// client error. pub fn metadata_client( context: impl Into>, From 717e1eea005b8e367d7604bd87c36a04882c55ef Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:40:11 +0200 Subject: [PATCH 15/20] improve --- objectstore-server/tests/range_requests.rs | 23 ---------------------- 1 file changed, 23 deletions(-) diff --git a/objectstore-server/tests/range_requests.rs b/objectstore-server/tests/range_requests.rs index 2351a1c5..ca5afd4d 100644 --- a/objectstore-server/tests/range_requests.rs +++ b/objectstore-server/tests/range_requests.rs @@ -233,26 +233,3 @@ async fn range_on_nonexistent_object_returns_404() -> Result<()> { assert_eq!(resp.status(), reqwest::StatusCode::NOT_FOUND); Ok(()) } - -#[tokio::test] -async fn invalid_metadata_request_headers_return_400() -> Result<()> { - let server = TestServer::with_config(Config { - auth: AuthZ { - enforce: false, - ..Default::default() - }, - ..Default::default() - }) - .await; - let client = reqwest::Client::new(); - - let resp = client - .put(server.url("/v1/objects/test/org=1/invalid-metadata")) - .header("x-sn-expiration", "garbage") - .body("payload") - .send() - .await?; - - assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST); - Ok(()) -} From c8917323de281751da5cdd26eea66f5ac756fd68 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:41:29 +0200 Subject: [PATCH 16/20] improve --- objectstore-server/src/endpoints/multipart.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index 5f7d93bf..8e50c3ed 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -95,9 +95,8 @@ async fn initiate_inner( id: ObjectId, headers: HeaderMap, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(|cause| { - ServiceError::metadata_client("invalid multipart object metadata headers", cause) - })?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. metadata.time_created = Some(SystemTime::now()); From 911a39a458f6b08fd360d8a187ad9c590f4475b1 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:44:45 +0200 Subject: [PATCH 17/20] improve --- objectstore-service/docs/architecture.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index ed25de95..30e7517b 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -158,11 +158,7 @@ immediately so the caller can shed load or retry. # Error Classification The service error type exposes a coarse [`ErrorKind`](error::ErrorKind) so API -layers can map failures without matching every backend-specific variant. Client -input and client stream errors are separated from internal failures, capacity -errors, and unsupported operations. Backend HTTP failures default to internal -classification; transparent reqwest status classification is available for call -sites that explicitly opt in. +layers can map failures without matching every specific variant. ## Concurrency Limit From 233b642f7ffd4a5346a0f2a82edf5b0d8eb10e6b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:46:32 +0200 Subject: [PATCH 18/20] improve --- objectstore-service/src/backend/gcs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 249f9f20..b3914c8b 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -390,7 +390,7 @@ fn is_retryable(error: &Error) -> bool { let Some(status) = cause.status() else { return false; }; - + // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes matches!( status, StatusCode::REQUEST_TIMEOUT From 27cdfeabc75781570442ac3c277a3068dd187fd7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:47:46 +0200 Subject: [PATCH 19/20] improve --- objectstore-service/src/backend/gcs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index b3914c8b..56fd90d4 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -636,7 +636,7 @@ impl Backend for GcsBackend { // NB: Ensure the order of these fields and that a content-type is attached to them. Both // are required by the GCS API. let metadata_json = serde_json::to_string(&gcs_metadata) - .map_err(|cause| Error::serde("failed to serialize metadata for GCS upload", cause))?; + .map_err(|cause| Error::serde("GCS: failed to serialize metadata", cause))?; let multipart = multipart::Form::new() .part( From fc3ebb5deeffc7a1f476d66ad7d04d01b6303363 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:50:25 +0200 Subject: [PATCH 20/20] improve --- objectstore-service/src/backend/local_fs.rs | 24 ++++++++++++--------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index a1dbb441..8529c31b 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -98,7 +98,7 @@ impl Backend for LocalFsBackend { let mut writer = BufWriter::new(file); let metadata_json = serde_json::to_string(metadata) - .map_err(|cause| Error::serde("failed to serialize metadata", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; @@ -131,7 +131,7 @@ impl Backend for LocalFsBackend { reader.read_line(&mut metadata_line).await?; let file_len = reader.get_ref().metadata().await?.len(); let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end()) - .map_err(|cause| Error::serde("failed to deserialize metadata", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to deserialize metadata", cause))?; let payload_size = file_len .checked_sub(metadata_line.len() as u64) .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?; @@ -192,7 +192,7 @@ impl MultipartUploadBackend for LocalFsBackend { let meta_path = dir.join("metadata.json"); let metadata_json = serde_json::to_string(metadata) - .map_err(|cause| Error::serde("failed to serialize multipart metadata", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; tokio::fs::write(meta_path, metadata_json).await?; Ok(upload_id) @@ -220,7 +220,7 @@ impl MultipartUploadBackend for LocalFsBackend { "size": content_length, }); let header_line = serde_json::to_string(&header) - .map_err(|cause| Error::serde("failed to serialize part header", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to serialize part header", cause))?; let part_path = dir.join(format!("{part_number}.part")); let file = OpenOptions::new() @@ -282,8 +282,10 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) - .map_err(|cause| Error::serde("failed to deserialize part header", cause))?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| { + Error::serde("local_fs: failed to deserialize part header", cause) + })?; parts.push(Part { part_number: pn, @@ -340,7 +342,7 @@ impl MultipartUploadBackend for LocalFsBackend { let meta_path = dir.join("metadata.json"); let meta_bytes = tokio::fs::read(&meta_path).await?; let metadata: Metadata = serde_json::from_slice(&meta_bytes) - .map_err(|cause| Error::serde("failed to deserialize multipart metadata", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to deserialize metadata", cause))?; // TODO: validate that parts are in ascending part_number order and reject with // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. @@ -359,8 +361,10 @@ impl MultipartUploadBackend for LocalFsBackend { let mut reader = BufReader::new(file); let mut header_line = String::new(); reader.read_line(&mut header_line).await?; - let header: serde_json::Value = serde_json::from_str(header_line.trim_end()) - .map_err(|cause| Error::serde("failed to deserialize part header", cause))?; + let header: serde_json::Value = + serde_json::from_str(header_line.trim_end()).map_err(|cause| { + Error::serde("local_fs: failed to deserialize part header", cause) + })?; let stored_etag = header["etag"].as_str().unwrap_or(""); if stored_etag != completed.etag { @@ -386,7 +390,7 @@ impl MultipartUploadBackend for LocalFsBackend { let mut writer = BufWriter::new(file); let metadata_json = serde_json::to_string(&metadata) - .map_err(|cause| Error::serde("failed to serialize metadata", cause))?; + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?;