diff --git a/Cargo.lock b/Cargo.lock index 874be913..4af47f78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,7 +220,7 @@ dependencies = [ "argh_shared", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -263,7 +263,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -274,7 +274,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -774,7 +774,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -798,7 +798,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.117", ] [[package]] @@ -809,7 +809,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -849,6 +849,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive-error-kind" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a422c2552b72ab3fc01435089a5d9f5337e42383543e885275699ef0f539ba" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "2.1.1" @@ -868,7 +879,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.117", "unicode-xid", ] @@ -902,7 +913,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1208,7 +1219,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1976,7 +1987,7 @@ dependencies = [ "quote", "rustc_version", "simd_cesu8", - "syn", + "syn 2.0.117", ] [[package]] @@ -2001,7 +2012,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" dependencies = [ "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2662,6 +2673,7 @@ dependencies = [ "bigtable_rs", "bytes", "chrono", + "derive-error-kind", "futures", "futures-util", "gcp_auth", @@ -2743,7 +2755,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2866,7 +2878,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2922,7 +2934,7 @@ checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3012,7 +3024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.117", ] [[package]] @@ -3041,7 +3053,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "version_check", "yansi", ] @@ -3073,7 +3085,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn", + "syn 2.0.117", "tempfile", ] @@ -3087,7 +3099,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3407,7 +3419,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3992,7 +4004,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4059,7 +4071,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4250,6 +4262,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.117" @@ -4278,7 +4301,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4347,7 +4370,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4358,7 +4381,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4471,7 +4494,7 @@ checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4558,7 +4581,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4583,7 +4606,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn", + "syn 2.0.117", "tempfile", "tonic-build", ] @@ -4660,7 +4683,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4764,7 +4787,7 @@ checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5006,7 +5029,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasm-bindgen-shared", ] @@ -5153,7 +5176,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5164,7 +5187,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5539,7 +5562,7 @@ dependencies = [ "heck", "indexmap 2.13.0", "prettyplease", - "syn", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -5555,7 +5578,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -5628,7 +5651,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5649,7 +5672,7 @@ checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5669,7 +5692,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5709,7 +5732,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7908086a..d871ee3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ stresstest = { path = "stresstest" } anyhow = "1.0.69" async-trait = "0.1.88" bytes = "1.11.1" +derive-error-kind = "0.1.0" futures = "0.3.31" futures-util = "0.3.31" http = "1.3.1" diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 06b6014e..c6f3c928 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -6,7 +6,7 @@ use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use http::HeaderValue; -use objectstore_service::error::Error as ServiceError; +use objectstore_service::error::{Error as ServiceError, ErrorKind as ServiceErrorKind}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -92,18 +92,21 @@ impl ApiError { StatusCode::INTERNAL_SERVER_ERROR } - ApiError::Service(ServiceError::Client(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::Metadata(_)) => StatusCode::BAD_REQUEST, ApiError::Service(ServiceError::RangeNotSatisfiable { .. }) => { StatusCode::RANGE_NOT_SATISFIABLE } - ApiError::Service(ServiceError::InvalidUploadId(_)) => StatusCode::BAD_REQUEST, - ApiError::Service(ServiceError::AtCapacity) => StatusCode::TOO_MANY_REQUESTS, - ApiError::Service(ServiceError::NotImplemented) => StatusCode::NOT_IMPLEMENTED, - ApiError::Service(_) => { - objectstore_log::error!(!!self, "error handling request"); - StatusCode::INTERNAL_SERVER_ERROR - } + ApiError::Service(error) => match error.kind() { + ServiceErrorKind::ClientStream | ServiceErrorKind::BadRequest => { + StatusCode::BAD_REQUEST + } + ServiceErrorKind::TooManyRequests => StatusCode::TOO_MANY_REQUESTS, + ServiceErrorKind::Transient => StatusCode::SERVICE_UNAVAILABLE, + ServiceErrorKind::NotImplemented => StatusCode::NOT_IMPLEMENTED, + ServiceErrorKind::Internal => { + objectstore_log::error!(!!self, "error handling request"); + StatusCode::INTERNAL_SERVER_ERROR + } + }, ApiError::Internal(_) => { objectstore_log::error!(!!self, "internal error"); diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index c9d40d26..8e50c3ed 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -95,7 +95,8 @@ async fn initiate_inner( id: ObjectId, headers: HeaderMap, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; // TODO: Do this in `complete` instead, when we have a Service API to mutate metadata. metadata.time_created = Some(SystemTime::now()); diff --git a/objectstore-server/src/endpoints/objects.rs b/objectstore-server/src/endpoints/objects.rs index 8f172e31..ebc0d625 100644 --- a/objectstore-server/src/endpoints/objects.rs +++ b/objectstore-server/src/endpoints/objects.rs @@ -45,7 +45,8 @@ async fn objects_post( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; metadata.time_created = Some(SystemTime::now()); state @@ -91,7 +92,9 @@ async fn object_get( }; let stream = state.meter_stream(stream, &context); - let metadata_headers = metadata.to_headers("").map_err(ServiceError::from)?; + let metadata_headers = metadata + .to_headers("") + .map_err(|cause| ServiceError::metadata("failed to serialize object metadata", cause))?; let mut response = match content_range { Some(ref content_range) => { @@ -122,7 +125,9 @@ async fn object_head(service: AuthAwareService, Xt(id): Xt) -> ApiResu return Ok(StatusCode::NOT_FOUND.into_response()); }; - let headers = metadata.to_headers("").map_err(ServiceError::from)?; + let headers = metadata + .to_headers("") + .map_err(|cause| ServiceError::metadata("failed to serialize object metadata", cause))?; let mut response = (StatusCode::NO_CONTENT, headers).into_response(); insert_accept_ranges(&mut response); @@ -136,7 +141,8 @@ async fn object_put( headers: HeaderMap, MeteredBody(body): MeteredBody, ) -> ApiResult { - let mut metadata = Metadata::from_headers(&headers, "").map_err(ServiceError::from)?; + let mut metadata = Metadata::from_headers(&headers, "") + .map_err(|cause| ServiceError::metadata_client("invalid object metadata headers", cause))?; metadata.time_created = Some(SystemTime::now()); let ObjectId { context, key } = id; diff --git a/objectstore-server/src/extractors/body.rs b/objectstore-server/src/extractors/body.rs index b4d3d772..6411a1dc 100644 --- a/objectstore-server/src/extractors/body.rs +++ b/objectstore-server/src/extractors/body.rs @@ -5,7 +5,7 @@ use std::convert::Infallible; use axum::extract::{FromRequest, FromRequestParts, Path, Request}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_service::id::ObjectContext; -use objectstore_service::stream::{ClientError, ClientStream}; +use objectstore_service::stream::{ClientStream, ClientStreamError}; use super::id::ContextParams; use crate::state::ServiceState; @@ -37,7 +37,10 @@ impl FromRequest for MeteredBody { usecase: params.usecase, scopes: params.scopes, }; - let stream = body.into_data_stream().map_err(ClientError::new).boxed(); + let stream = body + .into_data_stream() + .map_err(ClientStreamError::new) + .boxed(); let stream = state.meter_stream(stream, &context).boxed(); Ok(Self(stream)) } diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index e4434d42..42076189 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -16,6 +16,7 @@ base64 = "0.22" bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" } chrono = "0.4" bytes = { workspace = true } +derive-error-kind = { workspace = true } futures-util = { workspace = true } gcp_auth = "0.12.3" humantime = { workspace = true } diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index f9e1e5f4..30e7517b 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -155,6 +155,11 @@ The service applies backpressure to protect backends from overload. Rather than queueing work when capacity is exhausted, the service rejects operations immediately so the caller can shed load or retry. +# Error Classification + +The service error type exposes a coarse [`ErrorKind`](error::ErrorKind) so API +layers can map failures without matching every specific variant. + ## Concurrency Limit A semaphore caps the total number of in-flight backend operations across all diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index 70f7b73a..56fd90d4 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -27,7 +27,7 @@ use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, ListPartsResponse, PartNumber, UploadId, UploadPartResponse, }; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`GcsBackend`]. /// @@ -185,13 +185,28 @@ impl GcsObject { .metadata .remove(&GcsMetaKey::Expiration) .map(|s| s.parse()) - .transpose()? + .transpose() + .map_err(|cause| { + Error::metadata( + "GCS: failed to parse expiration policy from object metadata", + cause, + ) + })? .unwrap_or_default(); let origin = self.metadata.remove(&GcsMetaKey::Origin); let content_type = self.content_type; - let compression = self.content_encoding.map(|s| s.parse()).transpose()?; + let compression = self + .content_encoding + .map(|s| s.parse()) + .transpose() + .map_err(|cause| { + Error::metadata( + "GCS: failed to parse compression from object metadata", + cause, + ) + })?; let size = self .size .map(|size| size.parse()) @@ -363,12 +378,15 @@ fn insert_gcs_meta_header( /// Returns `true` if the error is a transient reqwest failure worth retrying. fn is_retryable(error: &Error) -> bool { - let Error::Reqwest { cause, .. } = error else { + let Error::Reqwest(error) = error else { return false; }; + let cause = error.cause(); + if cause.is_timeout() || cause.is_connect() || cause.is_request() { return true; } + let Some(status) = cause.status() else { return false; }; @@ -617,10 +635,8 @@ impl Backend for GcsBackend { // NB: Ensure the order of these fields and that a content-type is attached to them. Both // are required by the GCS API. - let metadata_json = serde_json::to_string(&gcs_metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata for GCS upload".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&gcs_metadata) + .map_err(|cause| Error::serde("GCS: failed to serialize metadata", cause))?; let multipart = multipart::Form::new() .part( @@ -651,10 +667,7 @@ impl Backend for GcsBackend { .send() .await .and_then(|r| r.error_for_status()) - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - _ => Error::reqwest("GCS: upload object", e), - })?; + .map_err(|e| Error::reqwest("GCS: upload object", e))?; Ok(()) } diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 2a221a79..8529c31b 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -22,7 +22,7 @@ use crate::multipart::{ AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse, ListPartsResponse, Part, PartNumber, UploadId, UploadPartResponse, }; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`LocalFsBackend`]. /// @@ -97,19 +97,12 @@ impl Backend for LocalFsBackend { let mut reader = pin!(StreamReader::new(stream)); let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; - tokio::io::copy(&mut reader, &mut writer) - .await - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), - })?; + tokio::io::copy(&mut reader, &mut writer).await?; writer.flush().await?; let file = writer.into_inner(); @@ -137,11 +130,8 @@ impl Backend for LocalFsBackend { let mut metadata_line = String::new(); reader.read_line(&mut metadata_line).await?; let file_len = reader.get_ref().metadata().await?.len(); - let mut metadata: Metadata = - serde_json::from_str(metadata_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize metadata".to_string(), - cause, - })?; + let mut metadata: Metadata = serde_json::from_str(metadata_line.trim_end()) + .map_err(|cause| Error::serde("local_fs: failed to deserialize metadata", cause))?; let payload_size = file_len .checked_sub(metadata_line.len() as u64) .ok_or_else(|| Error::generic("local-fs file corrupted: shorter than header"))?; @@ -201,10 +191,8 @@ impl MultipartUploadBackend for LocalFsBackend { tokio::fs::create_dir_all(&dir).await?; let meta_path = dir.join("metadata.json"); - let metadata_json = serde_json::to_string(metadata).map_err(|cause| Error::Serde { - context: "failed to serialize multipart metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(metadata) + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; tokio::fs::write(meta_path, metadata_json).await?; Ok(upload_id) @@ -231,10 +219,8 @@ impl MultipartUploadBackend for LocalFsBackend { "uploaded_at": SystemTime::now(), "size": content_length, }); - let header_line = serde_json::to_string(&header).map_err(|cause| Error::Serde { - context: "failed to serialize part header".to_string(), - cause, - })?; + let header_line = serde_json::to_string(&header) + .map_err(|cause| Error::serde("local_fs: failed to serialize part header", cause))?; let part_path = dir.join(format!("{part_number}.part")); let file = OpenOptions::new() @@ -249,12 +235,7 @@ impl MultipartUploadBackend for LocalFsBackend { writer.write_all(header_line.as_bytes()).await?; writer.write_all(b"\n").await?; - let _bytes_copied = tokio::io::copy(&mut reader, &mut writer) - .await - .map_err(|e| match stream::unpack_client_error(&e) { - Some(ce) => Error::Client(ce), - None => e.into(), - })?; + let _bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?; // TODO: validate bytes_copied against content_length and return a BadRequest-style // error. Needs a service-layer error variant that maps to HTTP 400 without abusing @@ -302,9 +283,8 @@ impl MultipartUploadBackend for LocalFsBackend { let mut header_line = String::new(); reader.read_line(&mut header_line).await?; let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, + serde_json::from_str(header_line.trim_end()).map_err(|cause| { + Error::serde("local_fs: failed to deserialize part header", cause) })?; parts.push(Part { @@ -361,11 +341,8 @@ impl MultipartUploadBackend for LocalFsBackend { // Read metadata let meta_path = dir.join("metadata.json"); let meta_bytes = tokio::fs::read(&meta_path).await?; - let metadata: Metadata = - serde_json::from_slice(&meta_bytes).map_err(|cause| Error::Serde { - context: "failed to deserialize multipart metadata".to_string(), - cause, - })?; + let metadata: Metadata = serde_json::from_slice(&meta_bytes) + .map_err(|cause| Error::serde("local_fs: failed to deserialize metadata", cause))?; // TODO: validate that parts are in ascending part_number order and reject with // InvalidPartOrder if not (matches S3/GCS behavior). Needs a proper client error variant. @@ -385,9 +362,8 @@ impl MultipartUploadBackend for LocalFsBackend { let mut header_line = String::new(); reader.read_line(&mut header_line).await?; let header: serde_json::Value = - serde_json::from_str(header_line.trim_end()).map_err(|cause| Error::Serde { - context: "failed to deserialize part header".to_string(), - cause, + serde_json::from_str(header_line.trim_end()).map_err(|cause| { + Error::serde("local_fs: failed to deserialize part header", cause) })?; let stored_etag = header["etag"].as_str().unwrap_or(""); @@ -413,10 +389,8 @@ impl MultipartUploadBackend for LocalFsBackend { .await?; let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(&metadata).map_err(|cause| Error::Serde { - context: "failed to serialize metadata".to_string(), - cause, - })?; + let metadata_json = serde_json::to_string(&metadata) + .map_err(|cause| Error::serde("local_fs: failed to serialize metadata", cause))?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 9818597c..62ed7b82 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -14,7 +14,7 @@ use crate::backend::common::{ }; use crate::error::{Error, Result}; use crate::id::ObjectId; -use crate::stream::{self, ClientStream}; +use crate::stream::ClientStream; /// Configuration for [`S3CompatibleBackend`]. /// @@ -172,10 +172,10 @@ where if let Some(r) = range { builder = builder.header(reqwest::header::RANGE, r.to_header_value()); } - let response = builder.send().await.map_err(|cause| Error::Reqwest { - context: "S3: failed to send request".to_string(), - cause, - })?; + let response = builder + .send() + .await + .map_err(|cause| Error::reqwest("S3: failed to send request", cause))?; if response.status() == StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); @@ -201,13 +201,11 @@ where let response = response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to get object".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to get object", cause))?; let headers = response.headers(); - let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; + let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX) + .map_err(|cause| Error::metadata("S3: failed to parse object metadata", cause))?; let content_range = if response.status() == StatusCode::PARTIAL_CONTENT { let range = headers @@ -259,17 +257,23 @@ where format!("/{}/{}", self.bucket, id.as_storage_path()), ) .header("x-goog-metadata-directive", "REPLACE") - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) + .headers( + metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX).map_err(|cause| { + Error::metadata( + "S3: failed to serialize object metadata for TTI update", + cause, + ) + })?, + ) .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send TTI update request".to_string(), - cause, - })? + .map_err(|cause| Error::reqwest("S3: failed to send TTI update request", cause))? .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to update expiration time for object with TTI".to_string(), - cause, + .map_err(|cause| { + Error::reqwest( + "S3: failed to update expiration time for object with TTI", + cause, + ) })?; Ok(()) @@ -314,18 +318,16 @@ impl Backend for S3CompatibleBackend { objectstore_log::debug!("Writing to s3_compatible backend"); self.request(Method::PUT, self.object_url(id)) .await? - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) + .headers( + metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX).map_err(|cause| { + Error::metadata("S3: failed to serialize object metadata for upload", cause) + })?, + ) .body(Body::wrap_stream(stream)) .send() .await .and_then(|response| response.error_for_status()) - .map_err(|cause| match stream::unpack_client_error(&cause) { - Some(ce) => Error::Client(ce), - _ => Error::Reqwest { - context: "S3: failed to put object".to_string(), - cause, - }, - })?; + .map_err(|cause| Error::reqwest("S3: failed to put object", cause))?; Ok(()) } @@ -359,20 +361,14 @@ impl Backend for S3CompatibleBackend { .await? .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send delete request".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to send delete request", cause))?; // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { objectstore_log::debug!("Object not found"); response .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to delete object".to_string(), - cause, - })?; + .map_err(|cause| Error::reqwest("S3: failed to delete object", cause))?; } Ok(()) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 93c18818..f7a83d30 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -3,60 +3,75 @@ //! [`Error`] covers I/O, serialization, HTTP, metadata, authentication, //! and backend-specific failures. [`Result`] is the corresponding alias. +// Needed as the `.kind()` methods generated by `derive(ErrorKind)` currently lack documentation. +#![allow(missing_docs)] + use std::any::Any; +use std::borrow::Cow; use objectstore_log::Level; +use reqwest::StatusCode; use thiserror::Error as ThisError; -use crate::stream::ClientError; +use crate::stream::{self, ClientStreamError}; + +/// The category of a service error. +/// +/// Users should rely on the kind for classification and handling, rather than matching on +/// specific variants. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ErrorKind { + /// Error originating from a client-supplied request body stream. + ClientStream, + /// Transient failure that may succeed if retried. + Transient, + /// Malformed client input. + BadRequest, + /// Operation unsupported by this service instance. + NotImplemented, + /// Service or upstream capacity limit. + TooManyRequests, + /// Internal failure. + Internal, +} /// Error type for service operations. -#[derive(Debug, ThisError)] +#[derive(Debug, ThisError, derive_error_kind::ErrorKind)] +#[error_kind(ErrorKind)] pub enum Error { /// IO errors related to payload streaming or file operations. #[error("i/o error: {0}")] - Io(#[from] std::io::Error), + #[error_kind(ErrorKind, Internal)] + Io(std::io::Error), /// Error originating from a client-supplied input stream. - /// - /// Indicates the client is at fault (e.g. dropped connection mid-upload) and should - /// map to a 4xx response rather than a 5xx. #[error("error reading client stream: {0}")] - Client(#[from] ClientError), + #[error_kind(ErrorKind, ClientStream)] + ClientStream(#[from] ClientStreamError), /// Errors related to de/serialization. - #[error("serde error: {context}")] - Serde { - /// Context describing what was being serialized/deserialized. - context: String, - /// The underlying serde error. - #[source] - cause: serde_json::Error, - }, + #[error(transparent)] + #[error_kind(transparent)] + Serde(#[from] SerdeError), - /// All errors stemming from the reqwest client, used in multiple backends to send requests to - /// e.g. GCP APIs. - /// These can be network errors encountered when sending the requests, but can also indicate - /// errors returned by the API itself. - #[error("reqwest error: {context}")] - Reqwest { - /// Context describing the request that failed. - context: String, - /// The underlying reqwest error. - #[source] - cause: reqwest::Error, - }, + /// Reqwest errors from storage backend HTTP calls. + #[error(transparent)] + #[error_kind(transparent)] + Reqwest(#[from] ReqwestError), /// Errors related to de/serialization and parsing of object metadata. - #[error("metadata error: {0}")] - Metadata(#[from] objectstore_types::metadata::Error), + #[error(transparent)] + #[error_kind(transparent)] + Metadata(#[from] MetadataError), /// Errors encountered when attempting to authenticate with GCP. #[error("GCP authentication error: {0}")] + #[error_kind(ErrorKind, Internal)] GcpAuth(#[from] gcp_auth::Error), /// A spawned service task panicked. #[error("service task failed: {0}")] + #[error_kind(ErrorKind, Internal)] Panic(String), /// A spawned service task was dropped before it could deliver its result. @@ -64,6 +79,7 @@ pub enum Error { /// This is an unexpected condition that can occur when the runtime drops the task for unknown /// reasons. #[error("task dropped")] + #[error_kind(ErrorKind, Internal)] Dropped, /// A redirect tombstone was encountered at a place where it is not supported. @@ -71,10 +87,12 @@ pub enum Error { /// This indicates a caller bug — tombstone-aware reads must go through the /// [`HighVolumeBackend`](crate::backend::common::HighVolumeBackend) methods. #[error("unexpected tombstone")] + #[error_kind(ErrorKind, Internal)] UnexpectedTombstone, /// The requested byte range is not satisfiable for the object's size. #[error("range not satisfiable (object size: {total} bytes)")] + #[error_kind(ErrorKind, BadRequest)] RangeNotSatisfiable { /// Total size of the object in bytes. total: u64, @@ -82,11 +100,13 @@ pub enum Error { /// The service has reached its concurrency limit and cannot accept more operations. #[error("concurrency limit reached")] + #[error_kind(ErrorKind, TooManyRequests)] AtCapacity, /// Any other error stemming from one of the storage backends, which might be specific to that /// backend or to a certain operation. #[error("storage backend error: {context}")] + #[error_kind(ErrorKind, Internal)] Generic { /// Context describing the operation that failed. context: String, @@ -97,10 +117,12 @@ pub enum Error { /// The functionality is not implemented by this instance of the service. #[error("not implemented")] + #[error_kind(ErrorKind, NotImplemented)] NotImplemented, - /// Invalid upload ID (e.g. path traversal attempt). + /// Invalid upload ID for a multipart upload. #[error(transparent)] + #[error_kind(ErrorKind, BadRequest)] InvalidUploadId(#[from] objectstore_types::multipart::InvalidUploadId), } @@ -117,20 +139,72 @@ impl Error { Self::Panic(msg) } - /// Creates an [`Error::Reqwest`] from a reqwest error with context. - pub fn reqwest(context: impl Into, cause: reqwest::Error) -> Self { - Self::Reqwest { + /// Creates an [`Error`] from a reqwest error with context, categorizing it as an internal + /// error. + pub fn reqwest(context: impl Into>, cause: reqwest::Error) -> Self { + if let Some(client_error) = stream::unpack_client_error(&cause) { + return Self::ClientStream(client_error); + } + + Self::Reqwest(ReqwestError::internal(context, cause)) + } + + /// Creates an [`Error`] from a reqwest error, categorizing it according to its status code. + pub fn reqwest_transparent( + context: impl Into>, + cause: reqwest::Error, + ) -> Self { + if let Some(client_error) = stream::unpack_client_error(&cause) { + return Self::ClientStream(client_error); + } + + Self::Reqwest(ReqwestError::transparent(context, cause)) + } + + /// Creates an [`Error::Serde`] from a serde error with context, categorizing it as an internal + /// error. + pub fn serde(context: impl Into>, cause: serde_json::Error) -> Self { + Self::Serde(SerdeError { + kind: ErrorKind::Internal, context: context.into(), cause, - } + }) } - /// Creates an [`Error::Serde`] from a serde error with context. - pub fn serde(context: impl Into, cause: serde_json::Error) -> Self { - Self::Serde { + /// Creates an [`Error::Serde`] from a serde error with context, categorizing it as a client + /// error. + pub fn serde_client(context: impl Into>, cause: serde_json::Error) -> Self { + Self::Serde(SerdeError { + kind: ErrorKind::BadRequest, context: context.into(), cause, - } + }) + } + + /// Creates an [`Error::Metadata`] from a metadata error with context, categorizing it as an + /// internal error. + pub fn metadata( + context: impl Into>, + cause: objectstore_types::metadata::Error, + ) -> Self { + Self::Metadata(MetadataError { + kind: ErrorKind::Internal, + context: context.into(), + cause, + }) + } + + /// Creates an [`Error::Metadata`] from a metadata error with context, categorizing it as a + /// client error. + pub fn metadata_client( + context: impl Into>, + cause: objectstore_types::metadata::Error, + ) -> Self { + Self::Metadata(MetadataError { + kind: ErrorKind::BadRequest, + context: context.into(), + cause, + }) } /// Creates an [`Error::Generic`] with a context string and no cause. @@ -143,26 +217,120 @@ impl Error { /// Returns the appropriate log level for this error. pub fn level(&self) -> Level { - match self { - // Malformed client input at DEBUG level - Self::Client(_) => Level::DEBUG, - Self::Metadata(_) => Level::DEBUG, - Self::RangeNotSatisfiable { .. } => Level::DEBUG, - // Like rate limits, we treat capacity errors as warnings - Self::AtCapacity => Level::WARN, - // All other errors are service or backend failures - Self::Io(_) => Level::ERROR, - Self::Serde { .. } => Level::ERROR, - Self::Reqwest { .. } => Level::ERROR, - Self::GcpAuth(_) => Level::ERROR, - Self::Panic(_) => Level::ERROR, - Self::Dropped => Level::ERROR, - Self::UnexpectedTombstone => Level::ERROR, - Self::NotImplemented => Level::ERROR, - Self::InvalidUploadId(_) => Level::DEBUG, - Self::Generic { .. } => Level::ERROR, + match self.kind() { + ErrorKind::ClientStream | ErrorKind::BadRequest => Level::DEBUG, + ErrorKind::Transient | ErrorKind::TooManyRequests => Level::WARN, + ErrorKind::NotImplemented | ErrorKind::Internal => Level::ERROR, + } + } +} + +impl From for Error { + fn from(cause: std::io::Error) -> Self { + match stream::unpack_client_error(&cause) { + Some(client_error) => Self::ClientStream(client_error), + None => Self::Io(cause), + } + } +} + +/// Reqwest error with context and kind. +#[derive(Debug, ThisError)] +#[error("reqwest error: {context}")] +pub struct ReqwestError { + kind: ErrorKind, + context: Cow<'static, str>, + #[source] + cause: reqwest::Error, +} + +impl ReqwestError { + fn internal(context: impl Into>, cause: reqwest::Error) -> Self { + Self { + kind: ErrorKind::Internal, + context: context.into(), + cause, } } + + fn transparent(context: impl Into>, cause: reqwest::Error) -> Self { + let kind = if let Some(status) = cause.status() { + match status { + StatusCode::TOO_MANY_REQUESTS => ErrorKind::TooManyRequests, + StatusCode::REQUEST_TIMEOUT + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => ErrorKind::Transient, + status if status.is_client_error() => ErrorKind::BadRequest, + _ => ErrorKind::Internal, + } + } else if cause.is_timeout() || cause.is_connect() || cause.is_request() { + ErrorKind::Transient + } else { + ErrorKind::Internal + }; + + Self { + kind, + context: context.into(), + cause, + } + } + + /// Returns the service-level category for this reqwest error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying reqwest error. + pub fn cause(&self) -> &reqwest::Error { + &self.cause + } +} + +/// Serde error with context and kind. +#[derive(Debug, ThisError)] +#[error("serde error: {context}")] +pub struct SerdeError { + kind: ErrorKind, + context: Cow<'static, str>, + #[source] + cause: serde_json::Error, +} + +impl SerdeError { + /// Returns the service-level category for this serde error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying serde error. + pub fn cause(&self) -> &serde_json::Error { + &self.cause + } +} + +/// Metadata error with context and kind. +#[derive(Debug, ThisError)] +#[error("metadata error: {context}")] +pub struct MetadataError { + kind: ErrorKind, + context: Cow<'static, str>, + #[source] + cause: objectstore_types::metadata::Error, +} + +impl MetadataError { + /// Returns the service-level category for this metadata error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + + /// Returns the underlying metadata error. + pub fn cause(&self) -> &objectstore_types::metadata::Error { + &self.cause + } } /// Result type for service operations. diff --git a/objectstore-service/src/stream.rs b/objectstore-service/src/stream.rs index efaf9c43..16be5860 100644 --- a/objectstore-service/src/stream.rs +++ b/objectstore-service/src/stream.rs @@ -4,7 +4,7 @@ //! cover the two directions of data flow: //! //! - [`ClientStream`] — incoming data from a client PUT request body. Uses -//! [`ClientError`] as the error type so backends can distinguish a broken +//! [`ClientStreamError`] as the error type so backends can distinguish a broken //! client connection from a backend I/O failure (400 vs 500). //! - [`PayloadStream`] — outgoing data returned from //! [`Backend::get_object`](crate::backend::common::Backend::get_object). @@ -30,10 +30,10 @@ pub type PayloadStream = BoxStream<'static, io::Result>; /// [`unpack_client_error`] to return a 4xx response rather than treating /// it as a 5xx backend failure. #[derive(Clone, Debug)] -pub struct ClientError(Arc); +pub struct ClientStreamError(Arc); -impl ClientError { - /// Creates a new [`ClientError`] wrapping `err`. +impl ClientStreamError { + /// Creates a new [`ClientStreamError`] wrapping `err`. pub fn new(err: E) -> Self where E: Error + Send + Sync + 'static, @@ -42,47 +42,47 @@ impl ClientError { } } -impl fmt::Display for ClientError { +impl fmt::Display for ClientStreamError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } -impl Error for ClientError { +impl Error for ClientStreamError { fn source(&self) -> Option<&(dyn Error + 'static)> { self.0.source() } } /// Required by [`tokio_util::io::StreamReader`] in the local filesystem backend. -impl From for io::Error { - fn from(err: ClientError) -> Self { +impl From for io::Error { + fn from(err: ClientStreamError) -> Self { io::Error::other(err) } } /// Incoming byte stream from a client PUT request body. /// -/// Uses [`ClientError`] as the error type so that a dropped or interrupted +/// Uses [`ClientStreamError`] as the error type so that a dropped or interrupted /// client connection is distinguishable from a backend I/O failure. Backends -/// that detect a [`ClientError`] (via [`unpack_client_error`]) can surface it -/// as [`crate::error::Error::Client`], which the server maps to HTTP 400 rather -/// than 500. +/// that detect a [`ClientStreamError`] (via [`unpack_client_error`]) can surface +/// it as [`crate::error::Error::ClientStream`], which the server maps to HTTP +/// 400 rather than 500. /// /// Use [`single`] to construct a single-chunk `ClientStream` from an owned value. -pub type ClientStream = BoxStream<'static, Result>; +pub type ClientStream = BoxStream<'static, Result>; -/// Walks the source chain of `err` looking for a [`ClientError`]. +/// Walks the source chain of `err` looking for a [`ClientStreamError`]. /// /// At each step, two locations are checked: /// -/// - **Direct**: the error itself is a `ClientError`. +/// - **Direct**: the error itself is a `ClientStreamError`. /// - **Packed in `io::Error`**: the error is an `io::Error` whose custom inner -/// value is a `ClientError`. +/// value is a `ClientStreamError`. /// /// Use this in `put_object` implementations to reclassify body-stream errors -/// as [`crate::error::Error::Client`] instead of an opaque server error. -pub fn unpack_client_error(err: &E) -> Option +/// as [`crate::error::Error::ClientStream`] instead of an opaque server error. +pub fn unpack_client_error(err: &E) -> Option where E: Error + 'static, { @@ -96,7 +96,7 @@ where None => s, }; - if let Some(client_error) = target.downcast_ref::() { + if let Some(client_error) = target.downcast_ref::() { return Some(client_error.clone()); }