Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
563 changes: 507 additions & 56 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mockito = "1"
motore-macros = "0.4.3"
murmur3 = "0.5.2"
once_cell = "1.20"
opendal = "0.55.0"
opendal = "0.56"
ordered-float = "4"
parquet = "58"
pilota = "0.11.10"
Expand Down
7 changes: 3 additions & 4 deletions crates/storage/opendal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ opendal-fs = ["opendal/services-fs"]
opendal-gcs = ["opendal/services-gcs"]
opendal-memory = ["opendal/services-memory"]
opendal-oss = ["opendal/services-oss"]
opendal-s3 = ["opendal/services-s3", "reqsign"]
opendal-s3 = ["opendal/services-s3", "reqsign-aws-v4", "reqsign-core"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -44,8 +44,8 @@ iceberg = { workspace = true }
opendal = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
reqsign = { version = "0.16.3", optional = true, default-features = false }
reqwest = { workspace = true }
reqsign-aws-v4 = { version = "3.0.0", optional = true }
reqsign-core = { version = "3.0.0", optional = true }
serde = { workspace = true }
typetag = { workspace = true }
url = { workspace = true }
Expand All @@ -54,6 +54,5 @@ futures = { workspace = true }
[dev-dependencies]
async-trait = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
reqsign = { version = "0.16.3", default-features = false }
reqwest = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
6 changes: 5 additions & 1 deletion crates/storage/opendal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,11 @@ impl Storage for OpenDalStorage {
} else {
format!("{relative_path}/")
};
Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
Ok(op
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove_all was deprecated

.delete_with(&path)
.recursive(true)
.await
.map_err(from_opendal_error)?)
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
Expand Down
49 changes: 23 additions & 26 deletions crates/storage/opendal/src/resolving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use iceberg::io::{
StorageFactory,
};
use iceberg::{Error, ErrorKind, Result};
use opendal::Scheme;
use serde::{Deserialize, Serialize};
use url::Url;

Expand All @@ -52,26 +51,24 @@ pub const SCHEME_ABFS: &str = "abfs";
pub const SCHEME_WASBS: &str = "wasbs";
pub const SCHEME_WASB: &str = "wasb";

/// Parse a URL scheme string into an [`opendal::Scheme`].
fn parse_scheme(scheme: &str) -> Result<Scheme> {
/// Parse a URL scheme string.
fn parse_scheme(scheme: &str) -> Result<&'static str> {
match scheme {
SCHEME_MEMORY => Ok(Scheme::Memory),
SCHEME_FILE | "" => Ok(Scheme::Fs),
SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
SCHEME_OSS => Ok(Scheme::Oss),
SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
s => s.parse::<Scheme>().map_err(|e| {
Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported storage scheme: {s}: {e}"),
)
}),
SCHEME_MEMORY => Ok("memory"),
SCHEME_FILE | "" => Ok("file"),
SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok("s3"),
SCHEME_GS | SCHEME_GCS => Ok("gcs"),
SCHEME_OSS => Ok("oss"),
SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok("azdls"),
s => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported storage scheme: {s}"),
)),
}
}

/// Extract the [`Scheme`] family from a path URL.
fn extract_scheme(path: &str) -> Result<Scheme> {
/// Extract the scheme from a path URL.
fn extract_scheme(path: &str) -> Result<&'static str> {
let url = Url::parse(path).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
Expand All @@ -83,44 +80,44 @@ fn extract_scheme(path: &str) -> Result<Scheme> {

/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
fn build_storage_for_scheme(
scheme: Scheme,
scheme: &'static str,
props: &HashMap<String, String>,
#[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
) -> Result<OpenDalStorage> {
match scheme {
#[cfg(feature = "opendal-s3")]
Scheme::S3 => {
"s3" => {
let config = crate::s3::s3_config_parse(props.clone())?;
Ok(OpenDalStorage::S3 {
config: Arc::new(config),
customized_credential_load: customized_credential_load.clone(),
})
}
#[cfg(feature = "opendal-gcs")]
Scheme::Gcs => {
"gcs" => {
let config = crate::gcs::gcs_config_parse(props.clone())?;
Ok(OpenDalStorage::Gcs {
config: Arc::new(config),
})
}
#[cfg(feature = "opendal-oss")]
Scheme::Oss => {
"oss" => {
let config = crate::oss::oss_config_parse(props.clone())?;
Ok(OpenDalStorage::Oss {
config: Arc::new(config),
})
}
#[cfg(feature = "opendal-azdls")]
Scheme::Azdls => {
"azdls" => {
let config = crate::azdls::azdls_config_parse(props.clone())?;
Ok(OpenDalStorage::Azdls {
config: Arc::new(config),
})
}
#[cfg(feature = "opendal-fs")]
Scheme::Fs => Ok(OpenDalStorage::LocalFs),
"file" => Ok(OpenDalStorage::LocalFs),
#[cfg(feature = "opendal-memory")]
Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
"memory" => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
unsupported => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported storage scheme: {unsupported}"),
Expand Down Expand Up @@ -201,7 +198,7 @@ pub struct OpenDalResolvingStorage {
props: HashMap<String, String>,
/// Cache of scheme to storage mappings.
#[serde(skip, default)]
storages: RwLock<HashMap<Scheme, Arc<OpenDalStorage>>>,
storages: RwLock<HashMap<&'static str, Arc<OpenDalStorage>>>,
/// Custom AWS credential loader for S3 storage.
#[cfg(feature = "opendal-s3")]
#[serde(skip)]
Expand Down Expand Up @@ -286,7 +283,7 @@ impl Storage for OpenDalResolvingStorage {
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
// Group paths by scheme so each resolved storage receives a batch,
// avoiding repeated operator creation per path.
let mut grouped: HashMap<Scheme, Vec<String>> = HashMap::new();
let mut grouped: HashMap<&'static str, Vec<String>> = HashMap::new();
while let Some(path) = paths.next().await {
let scheme = extract_scheme(&path)?;
grouped.entry(scheme).or_default().push(path);
Expand Down
46 changes: 21 additions & 25 deletions crates/storage/opendal/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use iceberg::io::{
CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
Expand All @@ -28,8 +27,11 @@ use iceberg::io::{
use iceberg::{Error, ErrorKind, Result};
use opendal::services::S3Config;
use opendal::{Configurator, Operator};
pub use reqsign::{AwsCredential, AwsCredentialLoad};
use reqwest::Client;
/// AWS credentials: access key ID, secret access key, and optional session token.
pub use reqsign_aws_v4::Credential as AwsCredential;
/// Trait for types that can asynchronously supply [`AwsCredential`] to a [`CustomAwsCredentialLoader`].
pub use reqsign_core::ProvideCredential;
use reqsign_core::{ProvideCredentialChain, ProvideCredentialDyn};
use url::Url;

use crate::utils::{from_opendal_error, is_truthy};
Expand Down Expand Up @@ -143,20 +145,26 @@ pub(crate) fn s3_config_build(
// Set bucket name.
.bucket(bucket);

if let Some(customized_credential_load) = customized_credential_load {
builder = builder
.customized_credential_load(customized_credential_load.clone().into_opendal_loader());
if let Some(loader) = customized_credential_load {
let chain = ProvideCredentialChain::new().push(Arc::clone(&loader.0));
builder = builder.credential_provider_chain(chain);
}

Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
}

/// Custom AWS credential loader.
/// This can be used to load credentials from a custom source, such as the AWS SDK.
///
/// This should be set as an extension on `FileIOBuilder`.
#[derive(Clone)]
pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
/// Wraps any [`ProvideCredential`] implementation for use with the S3 storage backend.
/// Use [`CustomAwsCredentialLoader::new`] to create one, then pass it to
/// [`OpenDalStorageFactory::S3`](crate::OpenDalStorageFactory).
pub struct CustomAwsCredentialLoader(Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>);

impl Clone for CustomAwsCredentialLoader {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}

impl std::fmt::Debug for CustomAwsCredentialLoader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -166,21 +174,9 @@ impl std::fmt::Debug for CustomAwsCredentialLoader {
}

impl CustomAwsCredentialLoader {
/// Create a new custom AWS credential loader.
pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
Self(loader)
}

/// Convert this loader into an opendal compatible loader for customized AWS credentials.
pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
Box::new(self)
}
}

#[async_trait]
impl AwsCredentialLoad for CustomAwsCredentialLoader {
async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
self.0.load_credential(client).await
/// Create a new custom AWS credential loader from any [`ProvideCredential`] implementation.
pub fn new(provider: impl ProvideCredential<Credential = AwsCredential> + 'static) -> Self {
Self(Arc::new(provider) as Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>)
}
}

Expand Down
29 changes: 17 additions & 12 deletions crates/storage/opendal/tests/file_io_s3_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use futures::StreamExt;
use iceberg::io::{
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION,
S3_SECRET_ACCESS_KEY,
};
use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory};
use iceberg_storage_opendal::{
AwsCredential, CustomAwsCredentialLoader, OpenDalStorageFactory, ProvideCredential,
};
use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up};
use reqsign::{AwsCredential, AwsCredentialLoad};
use reqwest::Client;
use reqsign_core::Context;

async fn get_file_io() -> FileIO {
set_up();
Expand Down Expand Up @@ -99,6 +99,7 @@ mod tests {
}

// Mock credential loader for testing
#[derive(Debug)]
struct MockCredentialLoader {
credential: Option<AwsCredential>,
}
Expand All @@ -118,9 +119,13 @@ mod tests {
}
}

#[async_trait]
impl AwsCredentialLoad for MockCredentialLoader {
async fn load_credential(&self, _client: Client) -> anyhow::Result<Option<AwsCredential>> {
impl ProvideCredential for MockCredentialLoader {
type Credential = AwsCredential;

async fn provide_credential(
&self,
_ctx: &Context,
) -> reqsign_core::Result<Option<AwsCredential>> {
Ok(self.credential.clone())
}
}
Expand All @@ -129,7 +134,7 @@ mod tests {
fn test_custom_aws_credential_loader_instantiation() {
// Test creating CustomAwsCredentialLoader with mock loader
let mock_loader = MockCredentialLoader::new_minio();
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));
let custom_loader = CustomAwsCredentialLoader::new(mock_loader);

// Test that the loader can be used in FileIOBuilder with OpenDalStorageFactory
let _builder = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
Expand All @@ -149,7 +154,7 @@ mod tests {

// Create a mock credential loader
let mock_loader = MockCredentialLoader::new_minio();
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));
let custom_loader = CustomAwsCredentialLoader::new(mock_loader);

let minio_endpoint = get_minio_endpoint();

Expand Down Expand Up @@ -177,7 +182,7 @@ mod tests {

// Create a mock credential loader with no credentials
let mock_loader = MockCredentialLoader::new(None);
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));
let custom_loader = CustomAwsCredentialLoader::new(mock_loader);

let minio_endpoint = get_minio_endpoint();

Expand All @@ -199,8 +204,8 @@ mod tests {
),
Err(e) => {
assert!(
e.to_string()
.contains("no valid credential found and anonymous access is not allowed")
e.to_string().contains("failed to load signing credential"),
"unexpected error: {e}"
);
}
}
Expand Down
25 changes: 13 additions & 12 deletions crates/storage/opendal/tests/resolving_storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,21 @@ mod tests {
#[cfg(feature = "opendal-s3")]
#[tokio::test]
async fn test_with_custom_credential_loader() {
use async_trait::async_trait;
use iceberg_storage_opendal::CustomAwsCredentialLoader;
use reqsign::{AwsCredential, AwsCredentialLoad};
use reqwest::Client;
use iceberg_storage_opendal::{
AwsCredential, CustomAwsCredentialLoader, ProvideCredential,
};
use reqsign_core::Context;

#[derive(Debug)]
struct MinioCredentialLoader;

#[async_trait]
impl AwsCredentialLoad for MinioCredentialLoader {
async fn load_credential(
impl ProvideCredential for MinioCredentialLoader {
type Credential = AwsCredential;

async fn provide_credential(
&self,
_client: Client,
) -> anyhow::Result<Option<AwsCredential>> {
_ctx: &Context,
) -> reqsign_core::Result<Option<AwsCredential>> {
Ok(Some(AwsCredential {
access_key_id: "admin".to_string(),
secret_access_key: "password".to_string(),
Expand All @@ -282,9 +284,8 @@ mod tests {
set_up();
let minio_endpoint = get_minio_endpoint();

let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader(
CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)),
);
let factory = OpenDalResolvingStorageFactory::new()
.with_s3_credential_loader(CustomAwsCredentialLoader::new(MinioCredentialLoader));

let file_io = FileIOBuilder::new(Arc::new(factory))
.with_props(vec![
Expand Down
Loading