From 39433ef91b4520319e9d6acdc5549a9a788829c6 Mon Sep 17 00:00:00 2001 From: Xander Date: Thu, 26 Feb 2026 16:47:02 +0000 Subject: [PATCH 1/9] RFC for table encryption --- docs/rfcs/0003_table_encryption.md | 729 +++++++++++++++++++++++++++++ 1 file changed, 729 insertions(+) create mode 100644 docs/rfcs/0003_table_encryption.md diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md new file mode 100644 index 0000000000..f830377568 --- /dev/null +++ b/docs/rfcs/0003_table_encryption.md @@ -0,0 +1,729 @@ + + +# Table Encryption + +## Background + +### Iceberg Spec: Encryption + +The [Iceberg table spec](https://iceberg.apache.org/spec/#table-metadata) defines encryption +as a first-class concept. Tables may store an `encryption-keys` map in their metadata, +snapshots may reference an `encryption-key-id`, and manifest files carry optional +`key_metadata` bytes. Data files themselves can be encrypted either at the stream level +(AES-GCM envelope encryption, the "AGS1" format) or natively by the file format (e.g. +Parquet Modular Encryption). + +The Java implementation (`org.apache.iceberg.encryption`) is the reference and has been +production-tested. It defines: + +- **`EncryptionManager`** -- orchestrates encrypt/decrypt of `InputFile`/`OutputFile` +- **`KeyManagementClient`** -- pluggable KMS integration (wrap/unwrap keys) +- **`EncryptedInputFile` / `EncryptedOutputFile`** -- thin wrappers pairing a raw file handle + with its `EncryptionKeyMetadata` +- **`StandardEncryptionManager`** -- envelope encryption with key caching, AGS1 streams, + and Parquet native encryption support +- **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) +- **`AesGcmInputStream` / `AesGcmOutputStream`** -- block-based stream encryption (AGS1 format) + +### Problem Statement + +The Rust implementation currently has no encryption support. Users reading encrypted Iceberg +tables created by Java/Spark cannot do so from `iceberg-rust`. Writing encrypted tables is +likewise impossible. + +Additionally, the current `InputFile` type is a concrete struct tightly coupled to `opendal::Operator`. +This prevents cleanly representing encrypted input files -- in the Java implementation, `InputFile` +is an interface with encrypted variants (`EncryptedInputFile`, `NativeEncryptionInputFile`). + +### Relationship to Storage Trait RFC + +[RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes +converting `Storage` from an enum to a trait and removing the `Extensions` mechanism from +`FileIOBuilder`. This encryption RFC is designed to work both with the current `Extensions`-based +`FileIO` and with the future trait-based storage. Specific adaptation points are called out below. + +--- + +## High-Level Architecture + +The encryption system follows the same envelope encryption pattern as the Java implementation, +adapted to Rust's ownership and async model. + +### Component Overview + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ User / Table Scan │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ EncryptionManager │ +│ │ +│ - Orchestrates key unwrapping, caching, and encryptor creation │ +│ - Holds Arc for KMS integration │ +│ - Maintains KeyCache (LRU + TTL) to avoid redundant KMS calls │ +│ - Provides prepare_decryption() and bulk_prepare_decryption() │ +│ - Provides extract_aad_prefix() for Parquet native encryption │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌──────────────────────────┐ ┌──────────────────────────────────────────────┐ +│ KeyManagementClient │ │ KeyCache │ +│ (trait) │ │ │ +│ │ │ - LRU cache with configurable TTL │ +│ wrap_key(dek, kek_id) │ │ - Thread-safe │ +│ unwrap_key(wrapped_dek) │ │ - Caches Arc per metadata │ +└──────────────────────────┘ └──────────────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────┐ +│ KMS Implementations │ +│ │ +│ - InMemoryKms (testing) │ +│ - AWS KMS (future) │ +│ - Azure KV (future) │ +│ - GCP KMS (future) │ +└──────────────────────────┘ +``` + +### Decryption Data Flow + +``` +TableMetadata + └── encryption_keys: {key_id → EncryptedKey(key_metadata bytes)} + │ +Snapshot │ + └── encryption_key_id ──────┘ + │ + ▼ + load_manifest_list(file_io, table_metadata) + 1. Look up encryption_key_id in table_metadata.encryption_keys + 2. If found: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile + 3. If not: file_io.new_input(path) + │ + ▼ +ManifestFile + └── key_metadata: Option> + │ + load_manifest(file_io) + 1. If key_metadata present: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile + 2. If not: file_io.new_input(path) + │ + ▼ +FileScanTask + └── key_metadata: Option> + │ + ArrowReader::create_parquet_record_batch_stream_builder() + 1. If key_metadata present: + * Parquet-native encrypted → FileDecryptionProperties with IcebergKeyRetriever + 2. If not: standard Parquet read +``` + +### Crate Structure + +All encryption code lives in a new `encryption` module within the `iceberg` crate, gated +behind an `encryption` feature flag: + +``` +crates/iceberg/src/ +├── encryption/ +│ ├── mod.rs # Module re-exports +│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) +│ ├── cache.rs # KeyCache (LRU + TTL) +│ ├── key_management.rs # KeyManagementClient trait + InMemoryKms +│ ├── key_metadata.rs # EncryptionKeyMetadata trait + StandardKeyMetadata +│ ├── manager.rs # EncryptionManager (orchestrator) +│ ├── parquet_key_retriever.rs # Bridge to parquet-rs KeyRetriever +│ └── stream.rs # AesGcmFileRead (AGS1 stream decryption) +├── io/ +│ └── file_io.rs # InputFile enum + EncryptedInputFile variant +└── arrow/ + └── reader.rs # Parquet decryption integration +``` + +--- + +## Design + +### Core Cryptographic Primitives + +#### EncryptionAlgorithm + +```rust +pub enum EncryptionAlgorithm { + Aes128Gcm, + // Future: Aes256Gcm +} + +impl EncryptionAlgorithm { + pub fn key_length(&self) -> usize; // 16 for AES-128 + pub fn nonce_length(&self) -> usize; // 12 (96-bit) +} +``` + +#### SecureKey + +Wraps key material with automatic zeroization on drop via `zeroize::Zeroizing>`: + +```rust +pub struct SecureKey { + data: Zeroizing>, + algorithm: EncryptionAlgorithm, +} + +impl SecureKey { + pub fn new(data: Vec, algorithm: EncryptionAlgorithm) -> Result; + pub fn generate(algorithm: EncryptionAlgorithm) -> Self; +} +``` + +#### AesGcmEncryptor + +Performs AES-GCM encrypt/decrypt operations. Ciphertext format matches the Java implementation: +`[12-byte nonce][ciphertext][16-byte GCM tag]`. + +```rust +pub struct AesGcmEncryptor { /* ... */ } + +impl AesGcmEncryptor { + pub fn new(key: SecureKey) -> Self; + pub fn encrypt(&self, plaintext: &[u8], aad: Option<&[u8]>) -> Result>; + pub fn decrypt(&self, ciphertext: &[u8], aad: Option<&[u8]>) -> Result>; +} +``` + +### Key Management + +#### KeyManagementClient Trait + +Pluggable interface for KMS integration. Mirrors the Java `KeyManagementClient`: + +```rust +#[async_trait] +pub trait KeyManagementClient: Send + Sync { + /// Wraps a DEK using the master key identified by `master_key_id`. + async fn wrap_key(&self, dek: &[u8], master_key_id: &str) -> Result>; + + /// Unwraps a previously wrapped DEK. + async fn unwrap_key(&self, wrapped_dek: &[u8]) -> Result>; +} +``` + +Users implement this trait to integrate with their KMS of choice (AWS KMS, Azure Key Vault, +GCP KMS, HashiCorp Vault, etc.). An `InMemoryKms` is provided for testing. + +#### StandardKeyMetadata + +Avro-serialized metadata stored alongside encrypted files. Compatible with the Java +`StandardKeyMetadata` format for cross-language interoperability: + +```rust +pub struct StandardKeyMetadata { + encryption_key: Vec, // Wrapped DEK + aad_prefix: Vec, // Additional authenticated data prefix + file_length: Option, // Optional encrypted file length +} + +impl StandardKeyMetadata { + pub fn serialize(&self) -> Result>; + pub fn deserialize(bytes: &[u8]) -> Result; +} +``` + +#### KeyCache + +Thread-safe LRU cache with TTL to avoid redundant KMS round-trips: + +```rust +pub struct KeyCache { /* ... */ } + +impl KeyCache { + pub fn new(capacity: usize, ttl: Duration) -> Self; + pub async fn get(&self, key_metadata: &[u8]) -> Option>; + pub async fn insert(&self, key_metadata: &[u8], encryptor: Arc); + pub async fn evict_expired(&self); +} +``` + +### EncryptionManager + +Central orchestrator that ties together KMS, caching, and encryptor creation: + +```rust +pub struct EncryptionManager { + kms_client: Arc, + algorithm: EncryptionAlgorithm, + key_cache: Arc, +} + +impl EncryptionManager { + pub fn new( + kms_client: Arc, + algorithm: EncryptionAlgorithm, + cache_ttl: Duration, + ) -> Self; + + pub fn with_defaults(kms_client: Arc) -> Self; + + /// Unwraps a DEK from key metadata and returns a cached encryptor. + pub async fn prepare_decryption( + &self, + key_metadata: &[u8], + ) -> Result>; + + /// Batch preparation for multiple files (parallel KMS calls). + pub async fn bulk_prepare_decryption( + &self, + key_metadata_list: Vec>, + ) -> Result>>; + + /// Extracts the AAD prefix from key metadata for Parquet native encryption. + pub fn extract_aad_prefix(&self, key_metadata: &[u8]) -> Result>; +} +``` + +### AGS1 Stream Encryption + +Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. + +#### Format + +``` +┌──────────────────────────────────────────┐ +│ Header (8 bytes) │ +│ Magic: "AGS1" (4 bytes) │ +│ Plain block size: u32 LE (4 bytes) │ +│ Default: 1,048,576 (1 MiB) │ +├──────────────────────────────────────────┤ +│ Block 0 │ +│ Nonce (12 bytes) │ +│ Ciphertext (up to plain_block_size) │ +│ GCM Tag (16 bytes) │ +├──────────────────────────────────────────┤ +│ Block 1..N (same structure) │ +├──────────────────────────────────────────┤ +│ Final block (may be shorter) │ +│ Nonce (12 bytes) │ +│ Ciphertext (remaining bytes) │ +│ GCM Tag (16 bytes) │ +└──────────────────────────────────────────┘ + +Cipher block size = plain_block_size + 12 (nonce) + 16 (tag) = 1,048,604 +``` + +Each block's AAD is constructed as: `aad_prefix || block_index (4 bytes, little-endian)`. +This binds each block to its position in the stream, preventing block reordering attacks. + +#### AesGcmFileRead + +Implements the `FileRead` trait to provide transparent decryption of AGS1-encrypted files. +Supports random-access reads with an internal block cache (LRU, default 16 blocks): + +```rust +pub struct AesGcmFileRead { /* ... */ } + +impl AesGcmFileRead { + pub async fn new( + inner: Box, + encryptor: Arc, + key_metadata: &StandardKeyMetadata, + file_length: u64, + ) -> Result; + + pub async fn calculate_plaintext_length_from_file( + reader: &impl FileRead, + file_length: u64, + ) -> Result; +} + +#[async_trait] +impl FileRead for AesGcmFileRead { + async fn read(&self, range: Range) -> Result; +} +``` + +### InputFile: From Struct to Enum + +**This is a key design change.** The current `InputFile` is a concrete struct. In the Java +implementation, `InputFile` is an interface with multiple implementations including encrypted +variants. We propose converting `InputFile` to an enum to support encrypted files without +requiring a separate type at every call site: + +```rust +pub enum InputFile { + /// Standard unencrypted input file. + Plain { + op: Operator, + path: String, + relative_path_pos: usize, + }, + + /// AGS1 stream-encrypted input file. + /// The file is decrypted transparently on read. + Encrypted { + op: Operator, + path: String, + relative_path_pos: usize, + encryptor: Arc, + key_metadata: StandardKeyMetadata, + }, + + /// Parquet-native encrypted input file. + /// Decryption is handled by the Parquet reader using FileDecryptionProperties. + /// The InputFile itself reads raw (encrypted) bytes. + NativeEncrypted { + op: Operator, + path: String, + relative_path_pos: usize, + key_metadata: Vec, + }, +} +``` + +This mirrors the Java hierarchy: + +| Java | Rust | +|-------------------------------|-------------------------------| +| `InputFile` (interface) | `InputFile` (enum) | +| Regular `InputFile` impl | `InputFile::Plain` | +| `EncryptedInputFile` wrapper | `InputFile::Encrypted` | +| `NativeEncryptionInputFile` | `InputFile::NativeEncrypted` | + +Common operations delegate to the appropriate variant: + +```rust +impl InputFile { + pub fn location(&self) -> &str; + pub async fn exists(&self) -> Result; + pub async fn metadata(&self) -> Result; + pub async fn read(&self) -> Result; + pub async fn reader(&self) -> Result>; +} +``` + +For the `Encrypted` variant, `read()` and `reader()` transparently decrypt via `AesGcmFileRead`. +For the `NativeEncrypted` variant, `read()` and `reader()` return raw bytes -- the Parquet +reader handles decryption using `FileDecryptionProperties`. + +#### Adaptation for Storage Trait RFC + +Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. The enum +structure remains the same -- only the inner storage handle type changes: + +```rust +// After Storage Trait RFC merges: +pub enum InputFile { + Plain { + storage: Arc, + path: String, + }, + Encrypted { + storage: Arc, + path: String, + encryptor: Arc, + key_metadata: StandardKeyMetadata, + }, + NativeEncrypted { + storage: Arc, + path: String, + key_metadata: Vec, + }, +} +``` + +### FileIO Integration + +#### Current Approach (with Extensions) + +The `EncryptionManager` is injected into `FileIO` via the existing `Extensions` mechanism: + +```rust +let encryption_manager = EncryptionManager::with_defaults(Arc::new(kms_client)); + +let file_io = FileIOBuilder::new("s3") + .with_prop("s3.region", "us-east-1") + .with_extension(encryption_manager) + .build()?; + +// Creates an encrypted InputFile +let input = file_io.new_input(path, key_metadata).await?; +let data = input.read().await?; +``` + +#### After Storage Trait RFC + +RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` will instead be +provided through the `StorageFactory` or configured at the catalog level: + +```rust +// Option A: EncryptionManager on the catalog +let catalog = GlueCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3)) + .with_encryption_manager(encryption_manager) + .load("my_catalog", props) + .await?; + +// Option B: Wrapping StorageFactory - I'm pretty sure this is more idomatic in the new trait world. +pub struct EncryptingStorageFactory { + inner: Arc, + encryption_manager: Arc, +} + +impl StorageFactory for EncryptingStorageFactory { + fn build(&self, config: &StorageConfig) -> Result> { + let storage = self.inner.build(config)?; + Ok(Arc::new(EncryptingStorage::new(storage, self.encryption_manager.clone()))) + } +} +``` + +The exact integration point will be finalized when RFC 0002 merges. The encryption +module's internal design (crypto, key management, stream format) is unaffected. + +### Parquet Native Encryption Bridge + +For files using Parquet Modular Encryption (where the Parquet file itself contains encrypted +column chunks), we bridge Iceberg's async key management with parquet-rs's synchronous +`KeyRetriever` trait: + +```rust +pub struct IcebergKeyRetriever { + encryption_manager: Arc, + runtime: tokio::runtime::Handle, +} + +impl KeyRetriever for IcebergKeyRetriever { + fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result> { + // Bridge async → sync using the tokio runtime handle + std::thread::scope(|s| { + s.spawn(|| { + self.runtime.block_on(async { + self.encryption_manager + .prepare_decryption(key_metadata) + .await + }) + }) + .join() + }) + } +} +``` + +The Arrow reader integrates this when `key_metadata` is present on a `FileScanTask`: + +```rust +// In ArrowReader: +if let Some(key_metadata) = &task.key_metadata { + let key_retriever = Arc::new(IcebergKeyRetriever::new( + encryption_manager, + runtime_handle, + )); + let decryption_properties = FileDecryptionProperties::with_key_retriever( + key_retriever as Arc, + ) + .build()?; + builder = builder.with_file_decryption_properties(decryption_properties); +} +``` + +### Manifest & Snapshot Integration + +#### ManifestFile + +The `ManifestFile` struct gains an optional `key_metadata` field. When present, +`load_manifest()` uses encrypted I/O: + +```rust +pub struct ManifestFile { + // ... existing fields ... + pub key_metadata: Option>, +} + +impl ManifestFile { + pub async fn load_manifest(&self, file_io: &FileIO) -> Result { + let avro = match &self.key_metadata { + Some(km) => { + file_io + .new_encrypted_input(&self.manifest_path, km) + .await? + .read() + .await? + } + None => { + file_io.new_input(&self.manifest_path)?.read().await? + } + }; + // Deserialize Avro manifest... + } +} +``` + +#### Snapshot + +Snapshots reference an `encryption_key_id` that maps to a key in `TableMetadata.encryption_keys`: + +```rust +pub struct Snapshot { + // ... existing fields ... + pub encryption_key_id: Option, +} + +impl Snapshot { + pub async fn load_manifest_list( + &self, + file_io: &FileIO, + table_metadata: &TableMetadata, + ) -> Result { + let bytes = match &self.encryption_key_id { + Some(key_id) => { + let encrypted_key = table_metadata + .encryption_keys + .get(key_id) + .ok_or_else(|| /* error */)?; + file_io + .new_encrypted_input(&self.manifest_list, &encrypted_key.key_metadata) + .await? + .read() + .await? + } + None => file_io.new_input(&self.manifest_list)?.read().await?, + }; + ManifestList::parse(bytes, /* ... */) + } +} +``` + +#### FileScanTask + +Propagates per-file encryption metadata through the scan pipeline: + +```rust +pub struct FileScanTask { + // ... existing fields ... + pub key_metadata: Option>, +} +``` + +--- + +## Implementation Plan + +### Phase 1: Core Encryption (Read Path) + +- Cryptographic primitives: `EncryptionAlgorithm`, `SecureKey`, `AesGcmEncryptor` +- `KeyManagementClient` trait and `InMemoryKms` +- `StandardKeyMetadata` with Avro serialization (Java-compatible) +- `KeyCache` with LRU + TTL +- `EncryptionManager` with `prepare_decryption()` and `bulk_prepare_decryption()` +- `AesGcmFileRead` (AGS1 stream decryption implementing `FileRead`) +- `InputFile` enum conversion (`Plain`, `Encrypted`, `NativeEncrypted`) +- `FileIO::new_encrypted_input()` integration +- Manifest and snapshot decryption +- `FileScanTask.key_metadata` propagation +- `IcebergKeyRetriever` for Parquet native encryption +- Arrow reader integration with `FileDecryptionProperties` +- Feature-gated behind `encryption` feature flag +- Integration tests with `InMemoryKms` + +### Phase 2: Write Path + +- `OutputFile` enum conversion (mirroring `InputFile`) +- `AesGcmFileWrite` (AGS1 stream encryption implementing `FileWrite`) +- `EncryptionManager::prepare_encryption()` (generate DEK, wrap with KMS, create metadata) +- `FileIO::new_encrypted_output()` integration +- Parquet writer encryption support (`FileEncryptionProperties`) +- Encrypted manifest and manifest list writing +- Encrypted snapshot commit flow + +### Phase 3: Production KMS Implementations + +- AWS KMS `KeyManagementClient` implementation +- Azure Key Vault `KeyManagementClient` implementation +- GCP KMS `KeyManagementClient` implementation + + +### Phase 4: Storage Trait Adaptation + +- Adapt to RFC 0002 when it merges: + - Replace `Operator` with `Arc` in `InputFile`/`OutputFile` enum variants + - Replace `Extensions`-based `EncryptionManager` injection with the new pattern + (catalog-level or `EncryptingStorageFactory` wrapper) + - Remove any `Extensions`-specific code + +### Future Work + +- Column-level encryption policies (encrypt specific columns with different keys) +- Key rotation support (re-encrypt DEKs with new KEKs without re-encrypting data) +- Encryption metadata in `TableMetadata` write path +- AES-256-GCM support (depends on apache/arrow-rs#9203) + +--- + +## Compatibility + +### Java Interoperability + +Cross-language compatibility is a hard requirement: + +- **AGS1 format**: Byte-level compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream` + (same header, block size, nonce/tag layout, AAD construction) +- **StandardKeyMetadata**: Avro-serialized with the same schema as Java, enabling Rust to + read tables encrypted by Java/Spark and vice versa +- **Parquet native encryption**: Uses the same `KeyRetriever` interface from `parquet-rs`, + which follows the Parquet spec + +### Feature Flag + +All encryption code is gated behind `--features encryption` to avoid adding cryptographic +dependencies for users who don't need encryption. The `aes-gcm` and `zeroize` crates are only compiled when enabled. + +When the `encryption` feature is not enabled and an encrypted file is encountered, clear +error messages are returned indicating that the feature must be enabled. + +--- + +## Risks and Mitigations + +| Risk | Description | Mitigation | +| ---- | ----------- | ---------- | +| Storage trait churn | RFC 0002 may change `InputFile`/`FileIO` significantly | Design encryption module with clean boundaries; crypto/key management/stream code is independent of storage abstraction | +| Parquet async/sync bridge | `KeyRetriever` is sync but KMS calls are async | Use `std::thread::scope` + `runtime.block_on()` to bridge; document the requirement for a tokio runtime handle | +| Key metadata format drift | Java may evolve `StandardKeyMetadata` | Pin to Avro schema version; add schema version detection for forward compatibility | +| Performance: KMS latency | KMS round-trips add latency to file opens | `KeyCache` with TTL; `bulk_prepare_decryption()` for parallel unwrapping | +| `InputFile` enum breaking change | Converting from struct to enum breaks existing code | Not sure, I think we have to break this | + +## Open Questions + +1. **KMS crate structure**: Should KMS implementations live in `iceberg-encryption-{provider}` + crates, or in the existing catalog crates (since AWS KMS is often used with Glue catalog)? +2. **Write path priority**: Should Phase 2 (write path) block on Phase 3 (storage trait + adaptation), or proceed independently? + +## Conclusion + +This RFC introduces encryption support for `iceberg-rust`, following the Iceberg spec and +maintaining byte-level compatibility with the Java reference implementation. The design +separates concerns into pluggable components (KMS client, key cache, stream cipher, encryption +manager) and integrates with the existing read path through `FileIO`, manifest loading, and +the Arrow/Parquet reader. The `InputFile` type is evolved from a concrete struct to an enum +to cleanly represent encrypted file variants, mirroring Java's interface hierarchy. The +implementation is feature-gated and designed to adapt cleanly to the upcoming storage trait +refactoring from RFC 0002. From fe5f1b113af741e719eb1234d9dec0ac9f4e149f Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 27 Feb 2026 19:10:49 +0000 Subject: [PATCH 2/9] update --- docs/rfcs/0003_table_encryption.md | 645 ++++++++++------------------- 1 file changed, 227 insertions(+), 418 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index f830377568..544f06e484 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -42,16 +42,6 @@ production-tested. It defines: - **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) - **`AesGcmInputStream` / `AesGcmOutputStream`** -- block-based stream encryption (AGS1 format) -### Problem Statement - -The Rust implementation currently has no encryption support. Users reading encrypted Iceberg -tables created by Java/Spark cannot do so from `iceberg-rust`. Writing encrypted tables is -likewise impossible. - -Additionally, the current `InputFile` type is a concrete struct tightly coupled to `opendal::Operator`. -This prevents cleanly representing encrypted input files -- in the Java implementation, `InputFile` -is an interface with encrypted variants (`EncryptedInputFile`, `NativeEncryptionInputFile`). - ### Relationship to Storage Trait RFC [RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes @@ -63,8 +53,23 @@ converting `Storage` from an enum to a trait and removing the `Extensions` mecha ## High-Level Architecture -The encryption system follows the same envelope encryption pattern as the Java implementation, -adapted to Rust's ownership and async model. +The encryption system uses two-layer envelope encryption, adapted from the Java implementation +to Rust's ownership and async model. + +### Key Hierarchy + +``` +Master Key (in KMS) + └── wraps → KEK (Key Encryption Key) — stored in table metadata as EncryptedKey + └── wraps → DEK (Data Encryption Key) — stored in StandardKeyMetadata per file + └── encrypts → file content (AGS1 stream or Parquet native) +``` + +- **Master keys** live in the KMS and never leave it +- **KEKs** are wrapped by the master key and stored in `TableMetadata.encryption_keys` +- **DEKs** are wrapped by a KEK and stored per-file in `StandardKeyMetadata` +- KEKs are cached in memory (moka async cache with configurable TTL) to avoid redundant KMS calls +- KEK rotation occurs automatically when a KEK exceeds its configurable lifespan (default 730 days per NIST SP 800-57) ### Component Overview @@ -75,22 +80,26 @@ adapted to Rust's ownership and async model. │ ▼ ┌─────────────────────────────────────────────────────────────────────────────┐ -│ EncryptionManager │ -│ │ -│ - Orchestrates key unwrapping, caching, and encryptor creation │ -│ - Holds Arc for KMS integration │ -│ - Maintains KeyCache (LRU + TTL) to avoid redundant KMS calls │ -│ - Provides prepare_decryption() and bulk_prepare_decryption() │ -│ - Provides extract_aad_prefix() for Parquet native encryption │ +│ EncryptionManager (trait) │ +│ │ +│ StandardEncryptionManager: │ +│ - Two-layer envelope: Master → KEK → DEK │ +│ - KEK cache (moka async, configurable TTL) │ +│ - Automatic KEK rotation │ +│ - encrypt() / decrypt() for AGS1 stream files │ +│ - encrypt_native() for Parquet Modular Encryption │ +│ - wrap/unwrap_key_metadata() for manifest list keys │ +│ - generate_dek() with KEK management │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ ┌──────────────────────────┐ ┌──────────────────────────────────────────────┐ -│ KeyManagementClient │ │ KeyCache │ -│ (trait) │ │ │ -│ │ │ - LRU cache with configurable TTL │ -│ wrap_key(dek, kek_id) │ │ - Thread-safe │ -│ unwrap_key(wrapped_dek) │ │ - Caches Arc per metadata │ +│ KeyManagementClient │ │ KEK Cache │ +│ (trait) │ │ │ +│ │ │ - moka::future::Cache with configurable TTL │ +│ wrap_key(key, key_id) │ │ - Thread-safe async │ +│ unwrap_key(wrapped, id) │ │ - Caches plaintext KEK bytes per key ID │ +│ initialize(props) │ │ │ └──────────────────────────┘ └──────────────────────────────────────────────┘ │ ▼ @@ -104,59 +113,99 @@ adapted to Rust's ownership and async model. └──────────────────────────┘ ``` -### Decryption Data Flow +### Data Flow + +#### Read Path (Decryption) ``` TableMetadata - └── encryption_keys: {key_id → EncryptedKey(key_metadata bytes)} + └── encryption_keys: {key_id → EncryptedKey} │ Snapshot │ - └── encryption_key_id ──────┘ + └── encryption_key_id ──────┘ (V3 format only) │ ▼ load_manifest_list(file_io, table_metadata) 1. Look up encryption_key_id in table_metadata.encryption_keys - 2. If found: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile - 3. If not: file_io.new_input(path) + 2. em.unwrap_key_metadata() → plaintext key metadata + 3. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile │ ▼ ManifestFile └── key_metadata: Option> │ load_manifest(file_io) - 1. If key_metadata present: file_io.new_encrypted_input(path, key_metadata) giving a new encrypted InputFile - 2. If not: file_io.new_input(path) + 1. If key_metadata present: file_io.new_encrypted_input() → AGS1-decrypting InputFile + 2. If not: file_io.new_input() │ ▼ FileScanTask └── key_metadata: Option> │ - ArrowReader::create_parquet_record_batch_stream_builder() + ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: - * Parquet-native encrypted → FileDecryptionProperties with IcebergKeyRetriever + a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncrypted InputFile + b. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) + c. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` -### Crate Structure +#### Write Path (Encryption) + +``` +RollingFileWriter::new_output_file() + 1. If file_io.encryption_manager() is Some: + a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile + b. EncryptionManager generates DEK, wraps with KEK + c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet writer + d. Store key_metadata bytes on DataFile + 2. ParquetWriter detects NativeEncrypted, configures FileEncryptionProperties + +SnapshotProducer::commit() + 1. Manifest writing: + a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile + b. Store key_metadata on ManifestFile entry + 2. Manifest list writing: + a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile + b. em.wrap_key_metadata() → EncryptedKey for table metadata + c. Store key_id on Snapshot.encryption_key_id + 3. Table updates include AddEncryptionKey for new KEKs +``` -All encryption code lives in a new `encryption` module within the `iceberg` crate, gated -behind an `encryption` feature flag: +### Module Structure ``` crates/iceberg/src/ ├── encryption/ -│ ├── mod.rs # Module re-exports -│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) -│ ├── cache.rs # KeyCache (LRU + TTL) -│ ├── key_management.rs # KeyManagementClient trait + InMemoryKms -│ ├── key_metadata.rs # EncryptionKeyMetadata trait + StandardKeyMetadata -│ ├── manager.rs # EncryptionManager (orchestrator) -│ ├── parquet_key_retriever.rs # Bridge to parquet-rs KeyRetriever -│ └── stream.rs # AesGcmFileRead (AGS1 stream decryption) +│ ├── mod.rs # Module re-exports +│ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) +│ ├── key_management.rs # KeyManagementClient trait +│ ├── key_metadata.rs # StandardKeyMetadata (Avro V1, Java-compatible) +│ ├── encryption_manager.rs # EncryptionManager trait + StandardEncryptionManager +│ ├── plaintext_encryption_manager.rs # No-op pass-through for unencrypted tables +│ ├── file_encryptor.rs # FileEncryptor (write-side AGS1 wrapper) +│ ├── file_decryptor.rs # FileDecryptor (read-side AGS1 wrapper) +│ ├── encrypted_io.rs # EncryptedInputFile / EncryptedOutputFile wrappers +│ ├── stream.rs # AesGcmFileRead / AesGcmFileWrite (AGS1 format) +│ ├── kms/ +│ │ ├── mod.rs +│ │ └── in_memory.rs # InMemoryKms (testing only) +│ └── integration_tests.rs # End-to-end encryption round-trip tests ├── io/ -│ └── file_io.rs # InputFile enum + EncryptedInputFile variant -└── arrow/ - └── reader.rs # Parquet decryption integration +│ └── file_io.rs # InputFile/OutputFile enums, FileIO encryption methods +├── arrow/ +│ └── reader.rs # Parquet decryption via FileDecryptionProperties +├── writer/file_writer/ +│ ├── parquet_writer.rs # Parquet FileEncryptionProperties integration +│ └── rolling_writer.rs # Encrypted output file creation + key_metadata propagation +├── transaction/ +│ └── snapshot.rs # Encrypted manifest/manifest list writing, KEK management +├── scan/ +│ ├── context.rs # key_metadata propagation from DataFile → FileScanTask +│ └── task.rs # FileScanTask.key_metadata field +└── spec/ + ├── snapshot.rs # Snapshot.encryption_key_id, load_manifest_list decryption + └── manifest_list.rs # ManifestFile.key_metadata, load_manifest decryption ``` --- @@ -170,12 +219,7 @@ crates/iceberg/src/ ```rust pub enum EncryptionAlgorithm { Aes128Gcm, - // Future: Aes256Gcm -} - -impl EncryptionAlgorithm { - pub fn key_length(&self) -> usize; // 16 for AES-128 - pub fn nonce_length(&self) -> usize; // 12 (96-bit) + // Future: Aes256Gcm (depends on parquet-rs support) } ``` @@ -197,7 +241,7 @@ impl SecureKey { #### AesGcmEncryptor -Performs AES-GCM encrypt/decrypt operations. Ciphertext format matches the Java implementation: +AES-GCM encrypt/decrypt. Ciphertext format matches the Java implementation: `[12-byte nonce][ciphertext][16-byte GCM tag]`. ```rust @@ -218,12 +262,10 @@ Pluggable interface for KMS integration. Mirrors the Java `KeyManagementClient`: ```rust #[async_trait] -pub trait KeyManagementClient: Send + Sync { - /// Wraps a DEK using the master key identified by `master_key_id`. - async fn wrap_key(&self, dek: &[u8], master_key_id: &str) -> Result>; - - /// Unwraps a previously wrapped DEK. - async fn unwrap_key(&self, wrapped_dek: &[u8]) -> Result>; +pub trait KeyManagementClient: Debug + Send + Sync { + async fn initialize(&mut self, properties: HashMap) -> Result<()>; + async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result>; + async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) -> Result>; } ``` @@ -237,69 +279,54 @@ Avro-serialized metadata stored alongside encrypted files. Compatible with the J ```rust pub struct StandardKeyMetadata { - encryption_key: Vec, // Wrapped DEK + encryption_key: Vec, // Plaintext DEK (for PME) or wrapped DEK (for AGS1) aad_prefix: Vec, // Additional authenticated data prefix file_length: Option, // Optional encrypted file length } - -impl StandardKeyMetadata { - pub fn serialize(&self) -> Result>; - pub fn deserialize(bytes: &[u8]) -> Result; -} ``` -#### KeyCache +Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with Java. -Thread-safe LRU cache with TTL to avoid redundant KMS round-trips: +### EncryptionManager -```rust -pub struct KeyCache { /* ... */ } +The `EncryptionManager` trait abstracts encryption orchestration. `StandardEncryptionManager` +implements two-layer envelope encryption with KEK caching and rotation: -impl KeyCache { - pub fn new(capacity: usize, ttl: Duration) -> Self; - pub async fn get(&self, key_metadata: &[u8]) -> Option>; - pub async fn insert(&self, key_metadata: &[u8], encryptor: Arc); - pub async fn evict_expired(&self); +```rust +#[async_trait] +pub trait EncryptionManager: Debug + Send + Sync { + /// Decrypt an AGS1 stream-encrypted file. + async fn decrypt(&self, encrypted: EncryptedInputFile) -> Result; + + /// Encrypt a file with AGS1 stream encryption. + async fn encrypt(&self, raw_output: OutputFile) -> Result; + + /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). + async fn encrypt_native(&self, raw_output: OutputFile) -> Result; + + /// Unwrap key metadata using the table's KEK hierarchy. + async fn unwrap_key_metadata( + &self, encrypted_key: &EncryptedKey, + encryption_keys: &HashMap, + ) -> Result>; + + /// Wrap key metadata with a KEK for storage in table metadata. + async fn wrap_key_metadata( + &self, key_metadata: &[u8], + ) -> Result<(EncryptedKey, Option)>; } ``` -### EncryptionManager - -Central orchestrator that ties together KMS, caching, and encryptor creation: +`StandardEncryptionManager` configuration: ```rust -pub struct EncryptionManager { - kms_client: Arc, - algorithm: EncryptionAlgorithm, - key_cache: Arc, -} - -impl EncryptionManager { - pub fn new( - kms_client: Arc, - algorithm: EncryptionAlgorithm, - cache_ttl: Duration, - ) -> Self; - - pub fn with_defaults(kms_client: Arc) -> Self; - - /// Unwraps a DEK from key metadata and returns a cached encryptor. - pub async fn prepare_decryption( - &self, - key_metadata: &[u8], - ) -> Result>; - - /// Batch preparation for multiple files (parallel KMS calls). - pub async fn bulk_prepare_decryption( - &self, - key_metadata_list: Vec>, - ) -> Result>>; - - /// Extracts the AAD prefix from key metadata for Parquet native encryption. - pub fn extract_aad_prefix(&self, key_metadata: &[u8]) -> Result>; -} +let em = StandardEncryptionManager::new(Arc::new(kms_client)) + .with_table_key_id("master-key-1") // Master key ID in KMS + .with_aad_prefix(b"my-table".to_vec()); // AAD prefix for GCM blocks ``` +A `PlaintextEncryptionManager` is also provided as a no-op pass-through for unencrypted tables. + ### AGS1 Stream Encryption Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. @@ -321,153 +348,78 @@ Block-based stream encryption format compatible with Java's `AesGcmInputStream`/ │ Block 1..N (same structure) │ ├──────────────────────────────────────────┤ │ Final block (may be shorter) │ -│ Nonce (12 bytes) │ -│ Ciphertext (remaining bytes) │ -│ GCM Tag (16 bytes) │ └──────────────────────────────────────────┘ - -Cipher block size = plain_block_size + 12 (nonce) + 16 (tag) = 1,048,604 ``` -Each block's AAD is constructed as: `aad_prefix || block_index (4 bytes, little-endian)`. -This binds each block to its position in the stream, preventing block reordering attacks. +Each block's AAD is constructed as `aad_prefix || block_index (4 bytes LE)`, binding each +block to its position in the stream to prevent reordering attacks. -#### AesGcmFileRead +**`AesGcmFileRead`** implements the `FileRead` trait for transparent AGS1 decryption with +random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for transparent AGS1 +encryption with block buffering. -Implements the `FileRead` trait to provide transparent decryption of AGS1-encrypted files. -Supports random-access reads with an internal block cache (LRU, default 16 blocks): +### InputFile and OutputFile Enums -```rust -pub struct AesGcmFileRead { /* ... */ } - -impl AesGcmFileRead { - pub async fn new( - inner: Box, - encryptor: Arc, - key_metadata: &StandardKeyMetadata, - file_length: u64, - ) -> Result; - - pub async fn calculate_plaintext_length_from_file( - reader: &impl FileRead, - file_length: u64, - ) -> Result; -} - -#[async_trait] -impl FileRead for AesGcmFileRead { - async fn read(&self, range: Range) -> Result; -} -``` - -### InputFile: From Struct to Enum - -**This is a key design change.** The current `InputFile` is a concrete struct. In the Java -implementation, `InputFile` is an interface with multiple implementations including encrypted -variants. We propose converting `InputFile` to an enum to support encrypted files without -requiring a separate type at every call site: +`InputFile` and `OutputFile` are enums with three variants each: ```rust pub enum InputFile { - /// Standard unencrypted input file. - Plain { - op: Operator, - path: String, - relative_path_pos: usize, - }, - - /// AGS1 stream-encrypted input file. - /// The file is decrypted transparently on read. - Encrypted { - op: Operator, - path: String, - relative_path_pos: usize, - encryptor: Arc, - key_metadata: StandardKeyMetadata, - }, - - /// Parquet-native encrypted input file. - /// Decryption is handled by the Parquet reader using FileDecryptionProperties. - /// The InputFile itself reads raw (encrypted) bytes. - NativeEncrypted { - op: Operator, - path: String, - relative_path_pos: usize, - key_metadata: Vec, - }, -} -``` + /// Standard unencrypted file. + Plain { storage: Arc, path: String }, -This mirrors the Java hierarchy: + /// AGS1 stream-encrypted file. Transparent decryption on read. + Encrypted { storage: Arc, path: String, decryptor: Arc }, -| Java | Rust | -|-------------------------------|-------------------------------| -| `InputFile` (interface) | `InputFile` (enum) | -| Regular `InputFile` impl | `InputFile::Plain` | -| `EncryptedInputFile` wrapper | `InputFile::Encrypted` | -| `NativeEncryptionInputFile` | `InputFile::NativeEncrypted` | - -Common operations delegate to the appropriate variant: + /// Parquet Modular Encryption. Raw reads; Parquet reader handles decryption. + NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, +} -```rust -impl InputFile { - pub fn location(&self) -> &str; - pub async fn exists(&self) -> Result; - pub async fn metadata(&self) -> Result; - pub async fn read(&self) -> Result; - pub async fn reader(&self) -> Result>; +pub enum OutputFile { + Plain { storage: Arc, path: String }, + Encrypted { storage: Arc, path: String, encryptor: Arc }, + NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, } ``` -For the `Encrypted` variant, `read()` and `reader()` transparently decrypt via `AesGcmFileRead`. -For the `NativeEncrypted` variant, `read()` and `reader()` return raw bytes -- the Parquet -reader handles decryption using `FileDecryptionProperties`. +`NativeKeyMaterial` carries the plaintext DEK and AAD prefix for Parquet's +`FileEncryptionProperties` / `FileDecryptionProperties`. + +Common operations (`location()`, `exists()`, `read()`, `reader()`, `write()`, `writer()`) +delegate to the appropriate variant, with `Encrypted` variants transparently encrypting/decrypting +via `AesGcmFileRead`/`AesGcmFileWrite`. #### Adaptation for Storage Trait RFC -Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. The enum -structure remains the same -- only the inner storage handle type changes: - -```rust -// After Storage Trait RFC merges: -pub enum InputFile { - Plain { - storage: Arc, - path: String, - }, - Encrypted { - storage: Arc, - path: String, - encryptor: Arc, - key_metadata: StandardKeyMetadata, - }, - NativeEncrypted { - storage: Arc, - path: String, - key_metadata: Vec, - }, -} -``` +Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. This is +already the case in this implementation — the enum structure is stable. Only the underlying +`Storage` trait implementation may change. ### FileIO Integration -#### Current Approach (with Extensions) - -The `EncryptionManager` is injected into `FileIO` via the existing `Extensions` mechanism: +The `EncryptionManager` is stored as a type-safe `FileIOBuilder` extension. This integrates +naturally with catalogs that support extensions (e.g. `RestCatalog.with_file_io_extension()`): ```rust -let encryption_manager = EncryptionManager::with_defaults(Arc::new(kms_client)); - +// Via FileIOBuilder extension (works with RestCatalog and any extension-aware catalog) let file_io = FileIOBuilder::new("s3") .with_prop("s3.region", "us-east-1") .with_extension(encryption_manager) .build()?; -// Creates an encrypted InputFile -let input = file_io.new_input(path, key_metadata).await?; -let data = input.read().await?; +// Or via convenience method on FileIO +let file_io = file_io.with_encryption_manager(encryption_manager); ``` +FileIO provides encryption-aware factory methods: + +| Method | Purpose | +|--------|---------| +| `new_encrypted_input(path, key_metadata)` | AGS1 stream decryption (manifests, manifest lists) | +| `new_encrypted_output(path)` | AGS1 stream encryption | +| `new_native_encrypted_input(path, key_metadata)` | PME input (Parquet handles decryption) | +| `new_native_encrypted_output(path)` | PME output (Parquet handles encryption) | +| `encryption_manager()` | Returns the configured EncryptionManager, if any | + #### After Storage Trait RFC RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` will instead be @@ -481,7 +433,7 @@ let catalog = GlueCatalogBuilder::default() .load("my_catalog", props) .await?; -// Option B: Wrapping StorageFactory - I'm pretty sure this is more idomatic in the new trait world. +// Option B: Wrapping StorageFactory pub struct EncryptingStorageFactory { inner: Arc, encryption_manager: Arc, @@ -498,182 +450,66 @@ impl StorageFactory for EncryptingStorageFactory { The exact integration point will be finalized when RFC 0002 merges. The encryption module's internal design (crypto, key management, stream format) is unaffected. -### Parquet Native Encryption Bridge +### Parquet Modular Encryption -For files using Parquet Modular Encryption (where the Parquet file itself contains encrypted -column chunks), we bridge Iceberg's async key management with parquet-rs's synchronous -`KeyRetriever` trait: +For Parquet data files, encryption is handled natively by the Parquet reader/writer using +`FileEncryptionProperties` and `FileDecryptionProperties` from `parquet-rs`. -```rust -pub struct IcebergKeyRetriever { - encryption_manager: Arc, - runtime: tokio::runtime::Handle, -} +**Write path** (`ParquetWriter`): When the output file is `NativeEncrypted`, the writer extracts +`NativeKeyMaterial` (plaintext DEK + AAD prefix) and configures `FileEncryptionProperties` on the +`AsyncArrowWriter`. The Parquet crate handles column/page-level encryption. -impl KeyRetriever for IcebergKeyRetriever { - fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result> { - // Bridge async → sync using the tokio runtime handle - std::thread::scope(|s| { - s.spawn(|| { - self.runtime.block_on(async { - self.encryption_manager - .prepare_decryption(key_metadata) - .await - }) - }) - .join() - }) - } -} -``` +**Read path** (`ArrowReader`): When `FileScanTask.key_metadata` is present, the reader calls +`file_io.new_native_encrypted_input()` which deserializes `StandardKeyMetadata` to extract the +plaintext DEK and AAD prefix. These are used to build `FileDecryptionProperties` which are +passed to `ParquetRecordBatchStreamBuilder::new_with_options()`. -The Arrow reader integrates this when `key_metadata` is present on a `FileScanTask`: +The `ArrowFileReader::get_metadata()` implementation forwards both `file_decryption_properties` +and `metadata_options` from `ArrowReaderOptions` to `ParquetMetaDataReader`, enabling encrypted +footer parsing. -```rust -// In ArrowReader: -if let Some(key_metadata) = &task.key_metadata { - let key_retriever = Arc::new(IcebergKeyRetriever::new( - encryption_manager, - runtime_handle, - )); - let decryption_properties = FileDecryptionProperties::with_key_retriever( - key_retriever as Arc, - ) - .build()?; - builder = builder.with_file_decryption_properties(decryption_properties); -} -``` +### Catalog Integration -### Manifest & Snapshot Integration +Encryption is configured at the catalog level. The `EncryptionManager` is attached to the +catalog's `FileIO`, and all tables loaded from that catalog inherit it automatically. -#### ManifestFile - -The `ManifestFile` struct gains an optional `key_metadata` field. When present, -`load_manifest()` uses encrypted I/O: +For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`), the encryption manager +can be added directly: ```rust -pub struct ManifestFile { - // ... existing fields ... - pub key_metadata: Option>, -} - -impl ManifestFile { - pub async fn load_manifest(&self, file_io: &FileIO) -> Result { - let avro = match &self.key_metadata { - Some(km) => { - file_io - .new_encrypted_input(&self.manifest_path, km) - .await? - .read() - .await? - } - None => { - file_io.new_input(&self.manifest_path)?.read().await? - } - }; - // Deserialize Avro manifest... - } -} +rest_catalog.with_file_io_extension(encryption_manager); ``` -#### Snapshot - -Snapshots reference an `encryption_key_id` that maps to a key in `TableMetadata.encryption_keys`: +For catalogs that don't support extensions, an `EncryptedCatalog` wrapper implements the +`Catalog` trait by delegating to an inner catalog and attaching the `EncryptionManager` to +every `Table` returned: ```rust -pub struct Snapshot { - // ... existing fields ... - pub encryption_key_id: Option, -} - -impl Snapshot { - pub async fn load_manifest_list( - &self, - file_io: &FileIO, - table_metadata: &TableMetadata, - ) -> Result { - let bytes = match &self.encryption_key_id { - Some(key_id) => { - let encrypted_key = table_metadata - .encryption_keys - .get(key_id) - .ok_or_else(|| /* error */)?; - file_io - .new_encrypted_input(&self.manifest_list, &encrypted_key.key_metadata) - .await? - .read() - .await? - } - None => file_io.new_input(&self.manifest_list)?.read().await?, - }; - ManifestList::parse(bytes, /* ... */) - } -} -``` - -#### FileScanTask - -Propagates per-file encryption metadata through the scan pipeline: - -```rust -pub struct FileScanTask { - // ... existing fields ... - pub key_metadata: Option>, -} +let inner_catalog = MemoryCatalogBuilder::default().load("c", props).await?; +let encrypted = EncryptedCatalog::new(Arc::new(inner_catalog), encryption_manager); ``` ---- - -## Implementation Plan - -### Phase 1: Core Encryption (Read Path) - -- Cryptographic primitives: `EncryptionAlgorithm`, `SecureKey`, `AesGcmEncryptor` -- `KeyManagementClient` trait and `InMemoryKms` -- `StandardKeyMetadata` with Avro serialization (Java-compatible) -- `KeyCache` with LRU + TTL -- `EncryptionManager` with `prepare_decryption()` and `bulk_prepare_decryption()` -- `AesGcmFileRead` (AGS1 stream decryption implementing `FileRead`) -- `InputFile` enum conversion (`Plain`, `Encrypted`, `NativeEncrypted`) -- `FileIO::new_encrypted_input()` integration -- Manifest and snapshot decryption -- `FileScanTask.key_metadata` propagation -- `IcebergKeyRetriever` for Parquet native encryption -- Arrow reader integration with `FileDecryptionProperties` -- Feature-gated behind `encryption` feature flag -- Integration tests with `InMemoryKms` - -### Phase 2: Write Path - -- `OutputFile` enum conversion (mirroring `InputFile`) -- `AesGcmFileWrite` (AGS1 stream encryption implementing `FileWrite`) -- `EncryptionManager::prepare_encryption()` (generate DEK, wrap with KMS, create metadata) -- `FileIO::new_encrypted_output()` integration -- Parquet writer encryption support (`FileEncryptionProperties`) -- Encrypted manifest and manifest list writing -- Encrypted snapshot commit flow +The wrapper intercepts all `Table`-returning methods (`load_table`, `create_table`, +`register_table`, `update_table`) and calls `table.with_file_io(...)` to attach encryption. +All other catalog methods delegate directly. -### Phase 3: Production KMS Implementations +`Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which +stores its own `FileIO` for manifest/manifest list loading). -- AWS KMS `KeyManagementClient` implementation -- Azure Key Vault `KeyManagementClient` implementation -- GCP KMS `KeyManagementClient` implementation +### DataFusion Integration +The DataFusion integration requires **no encryption-specific code**. Encryption flows +transparently through the existing pipeline: -### Phase 4: Storage Trait Adaptation +- **`IcebergTableProvider::scan()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergTableScan` → `table.scan().to_arrow()` → `ArrowReader` decrypts via `key_metadata` +- **`IcebergTableProvider::insert_into()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergWriteExec` uses `table.file_io()` → `RollingFileWriterBuilder` detects encryption → PME-encrypted data files + AGS1-encrypted manifests +- **`IcebergCommitExec`** uses `Transaction::fast_append()` → `SnapshotProducer` writes encrypted manifests/manifest list → `AddEncryptionKey` updates persisted in table metadata -- Adapt to RFC 0002 when it merges: - - Replace `Operator` with `Arc` in `InputFile`/`OutputFile` enum variants - - Replace `Extensions`-based `EncryptionManager` injection with the new pattern - (catalog-level or `EncryptingStorageFactory` wrapper) - - Remove any `Extensions`-specific code +### Format Version Requirement -### Future Work - -- Column-level encryption policies (encrypt specific columns with different keys) -- Key rotation support (re-encrypt DEKs with new KEKs without re-encrypting data) -- Encryption metadata in `TableMetadata` write path -- AES-256-GCM support (depends on apache/arrow-rs#9203) +Snapshot-level `encryption_key_id` is serialized only in the **V3** snapshot format. V2 snapshots +do not include this field, so encrypted manifest lists cannot be read back after a V2 round-trip. +Tables using encryption must use format version V3. --- @@ -687,43 +523,16 @@ Cross-language compatibility is a hard requirement: (same header, block size, nonce/tag layout, AAD construction) - **StandardKeyMetadata**: Avro-serialized with the same schema as Java, enabling Rust to read tables encrypted by Java/Spark and vice versa -- **Parquet native encryption**: Uses the same `KeyRetriever` interface from `parquet-rs`, - which follows the Parquet spec - -### Feature Flag - -All encryption code is gated behind `--features encryption` to avoid adding cryptographic -dependencies for users who don't need encryption. The `aes-gcm` and `zeroize` crates are only compiled when enabled. - -When the `encryption` feature is not enabled and an encrypted file is encountered, clear -error messages are returned indicating that the feature must be enabled. +- **Parquet native encryption**: Uses `FileDecryptionProperties`/`FileEncryptionProperties` + from `parquet-rs`, which follows the Parquet spec --- -## Risks and Mitigations - -| Risk | Description | Mitigation | -| ---- | ----------- | ---------- | -| Storage trait churn | RFC 0002 may change `InputFile`/`FileIO` significantly | Design encryption module with clean boundaries; crypto/key management/stream code is independent of storage abstraction | -| Parquet async/sync bridge | `KeyRetriever` is sync but KMS calls are async | Use `std::thread::scope` + `runtime.block_on()` to bridge; document the requirement for a tokio runtime handle | -| Key metadata format drift | Java may evolve `StandardKeyMetadata` | Pin to Avro schema version; add schema version detection for forward compatibility | -| Performance: KMS latency | KMS round-trips add latency to file opens | `KeyCache` with TTL; `bulk_prepare_decryption()` for parallel unwrapping | -| `InputFile` enum breaking change | Converting from struct to enum breaks existing code | Not sure, I think we have to break this | - -## Open Questions - -1. **KMS crate structure**: Should KMS implementations live in `iceberg-encryption-{provider}` - crates, or in the existing catalog crates (since AWS KMS is often used with Glue catalog)? -2. **Write path priority**: Should Phase 2 (write path) block on Phase 3 (storage trait - adaptation), or proceed independently? - -## Conclusion - -This RFC introduces encryption support for `iceberg-rust`, following the Iceberg spec and -maintaining byte-level compatibility with the Java reference implementation. The design -separates concerns into pluggable components (KMS client, key cache, stream cipher, encryption -manager) and integrates with the existing read path through `FileIO`, manifest loading, and -the Arrow/Parquet reader. The `InputFile` type is evolved from a concrete struct to an enum -to cleanly represent encrypted file variants, mirroring Java's interface hierarchy. The -implementation is feature-gated and designed to adapt cleanly to the upcoming storage trait -refactoring from RFC 0002. +## Future Work + +- **Production KMS implementations**: AWS KMS, Azure Key Vault, GCP KMS +- **Column-level encryption policies**: Encrypt specific columns with different keys +- **Key rotation support**: Re-encrypt DEKs with new KEKs without re-encrypting data +- **AES-256-GCM support**: Depends on `parquet-rs` support +- **Storage Trait adaptation**: Replace `Extensions`-based `EncryptionManager` injection + with the pattern from RFC 0002 (catalog-level or `EncryptingStorageFactory` wrapper) From 975117d4e9fde5b3ad55a336da0e76a51db78305 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 27 Feb 2026 20:58:13 +0000 Subject: [PATCH 3/9] Update with kms client registration --- docs/rfcs/0003_table_encryption.md | 95 ++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 18 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index 544f06e484..c7506f21dc 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -317,16 +317,17 @@ pub trait EncryptionManager: Debug + Send + Sync { } ``` -`StandardEncryptionManager` configuration: +`StandardEncryptionManager` is typically not constructed directly by users. Instead, +`TableBuilder::build()` constructs it automatically from a `KeyManagementClient` +extension and the table's properties (see [Catalog Integration](#catalog-integration) below). +For manual construction in tests: ```rust let em = StandardEncryptionManager::new(Arc::new(kms_client)) .with_table_key_id("master-key-1") // Master key ID in KMS - .with_aad_prefix(b"my-table".to_vec()); // AAD prefix for GCM blocks + .with_encryption_keys(table_metadata.encryption_keys.clone()); ``` -A `PlaintextEncryptionManager` is also provided as a no-op pass-through for unencrypted tables. - ### AGS1 Stream Encryption Block-based stream encryption format compatible with Java's `AesGcmInputStream`/`AesGcmOutputStream`. @@ -470,32 +471,90 @@ footer parsing. ### Catalog Integration -Encryption is configured at the catalog level. The `EncryptionManager` is attached to the -catalog's `FileIO`, and all tables loaded from that catalog inherit it automatically. +#### How Java Does It + +In Java, encryption is configured through two catalog properties: + +- `encryption.kms-impl` — fully qualified class name of the `KeyManagementClient` implementation + (e.g. `org.apache.iceberg.aws.AwsKeyManagementClient`) +- `encryption.kms-type` — reserved for a future built-in registry of KMS types, but + **not yet implemented** in Java (any value throws `"Unsupported KMS type"`) + +These are mutually exclusive. `kms-impl` uses Java reflection to instantiate the KMS client +from a class name. The `table_key_id` is then read from the table's `encryption.key-id` +property, and a per-table `StandardEncryptionManager` is constructed in +`RESTTableOperations.encryption()`. The base `FileIO` is wrapped with `EncryptingFileIO.combine(io, encryption())` +to produce a per-table encrypting FileIO. + +#### How Rust Does It -For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`), the encryption manager -can be added directly: +Rust does not have Java's reflection-based class loading, so `encryption.kms-impl` (a class name +string) is not useful. Instead, the user provides a concrete `Arc` +as a `FileIOBuilder` extension on the catalog. The `table_key_id` and `encryption_keys` are +then inferred automatically from table metadata. + +**Step 1: User provides the KMS client to the catalog.** + +For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`): ```rust -rest_catalog.with_file_io_extension(encryption_manager); +let kms_client: Arc = Arc::new(my_aws_kms); + +let catalog = RestCatalogBuilder::default() + .load("rest", props) + .await? + .with_file_io_extension(kms_client); ``` -For catalogs that don't support extensions, an `EncryptedCatalog` wrapper implements the -`Catalog` trait by delegating to an inner catalog and attaching the `EncryptionManager` to -every `Table` returned: +**Step 2: `TableBuilder::build()` auto-configures encryption per table.** + +When building a `Table`, `TableBuilder::maybe_configure_encryption()` runs automatically. +This is the Rust equivalent of Java's `RESTTableOperations.io()` which calls +`EncryptingFileIO.combine(io, encryption())`. It checks: + +1. Does the `FileIO` have a `KeyManagementClient` extension? If not, return as-is. +2. Does the table metadata have an `encryption.key-id` property? If not, return as-is (unencrypted table). +3. If both are present, construct a `StandardEncryptionManager` with: + - `table_key_id` from the `encryption.key-id` table property + - `encryption_keys` from `TableMetadata.encryption_keys` (the KEK map) + - The `KeyManagementClient` from the extension +4. Attach the `EncryptionManager` to the table's `FileIO`. + +This runs on every `Table::builder().build()` call, so each table gets a correctly configured +per-table `EncryptionManager` even when a single catalog manages tables with different key IDs. ```rust -let inner_catalog = MemoryCatalogBuilder::default().load("c", props).await?; -let encrypted = EncryptedCatalog::new(Arc::new(inner_catalog), encryption_manager); +// User code — just provide the KMS client, everything else is automatic: +let table = catalog.load_table(&ident).await?; +// table.file_io() now has a StandardEncryptionManager configured with +// the correct table_key_id and encryption_keys from this table's metadata. ``` -The wrapper intercepts all `Table`-returning methods (`load_table`, `create_table`, -`register_table`, `update_table`) and calls `table.with_file_io(...)` to attach encryption. -All other catalog methods delegate directly. - `Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which stores its own `FileIO` for manifest/manifest list loading). +#### Open Decision: KMS Client Injection Mechanism + +The current approach requires the user to manually construct and provide the KMS client. +In production, the REST catalog server may return `encryption.kms-impl` or `encryption.kms-type` +in its config response. A few options for resolving this automatically in Rust: + +1. **Current approach (explicit)**: User constructs `Arc` and adds + it as a `FileIOBuilder` extension. Simple, no magic, works today. + +2. **Type registry**: A `HashMap Arc>>` + mapping `kms-type` strings (e.g. `"aws"`, `"gcp"`) to factory functions. The catalog reads + `encryption.kms-type` from properties and looks up the factory. Requires a registration step + but is closer to Java's `kms-type` intent. + +3. **Catalog-specific logic**: Each catalog implementation (REST, Glue, etc.) knows how to + create its KMS client based on the properties it receives. For example, `RestCatalog` could + detect `encryption.kms-impl = AwsKeyManagementClient` in the config response and + automatically create an `AwsKms` instance with the right endpoint and credentials. + +The right choice depends on how the upstream Iceberg spec evolves `encryption.kms-type`. For +now, option 1 (explicit) is implemented and sufficient for production use. + ### DataFusion Integration The DataFusion integration requires **no encryption-specific code**. Encryption flows From 7bb62bcfe8e044f8a8d7bbed9d0caee33df58c68 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 09:22:44 +0000 Subject: [PATCH 4/9] address comments --- docs/rfcs/0003_table_encryption.md | 111 +++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 30 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index c7506f21dc..5d028b0e17 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -53,21 +53,37 @@ converting `Storage` from an enum to a trait and removing the `Extensions` mecha ## High-Level Architecture -The encryption system uses two-layer envelope encryption, adapted from the Java implementation -to Rust's ownership and async model. +The encryption system uses envelope encryption with a chained key hierarchy, adapted from the +Java implementation to Rust's ownership and async model. KMS-managed master keys wrap KEKs, +which encrypt only manifest list key metadata. All other DEKs are protected by being stored +inside their encrypted parent files. ### Key Hierarchy ``` Master Key (in KMS) - └── wraps → KEK (Key Encryption Key) — stored in table metadata as EncryptedKey - └── wraps → DEK (Data Encryption Key) — stored in StandardKeyMetadata per file - └── encrypts → file content (AGS1 stream or Parquet native) + └── wraps → KEK (Key Encryption Key) — stored KMS-wrapped in table metadata + └── encrypts → manifest list StandardKeyMetadata (AES-GCM, KEY_TIMESTAMP as AAD) + │ + ├── manifest list DEK → encrypts manifest list file (AGS1) + │ └── manifest key_metadata (plaintext StandardKeyMetadata) stored in manifest list entries + │ └── manifest DEK → encrypts manifest file (AGS1) + │ └── data file key_metadata (plaintext StandardKeyMetadata) stored in manifest entries + │ └── data file DEK → encrypts data file (AGS1 or Parquet native) ``` - **Master keys** live in the KMS and never leave it -- **KEKs** are wrapped by the master key and stored in `TableMetadata.encryption_keys` -- **DEKs** are wrapped by a KEK and stored per-file in `StandardKeyMetadata` +- **KEKs** are wrapped by the master key via KMS (`kmsClient.wrapKey()`) and stored in + `TableMetadata.encryption_keys` with a `KEY_TIMESTAMP` property for rotation tracking +- **DEKs** are generated as plaintext random bytes and stored in `StandardKeyMetadata` per file. + DEKs are **not** individually wrapped by a KEK. Instead, they are protected by being stored + inside their encrypted parent file: + - **Manifest list DEKs**: Their `StandardKeyMetadata` is AES-GCM encrypted by a KEK + (using `KEY_TIMESTAMP` as AAD) and stored as an `EncryptedKey` in table metadata + - **Manifest DEKs**: Their `StandardKeyMetadata` is stored as plaintext `key_metadata` bytes + in manifest list entries — protected because the manifest list file itself is encrypted + - **Data file DEKs**: Their `StandardKeyMetadata` is stored as plaintext `key_metadata` bytes + in manifest entries — protected because the manifest file itself is encrypted - KEKs are cached in memory (moka async cache with configurable TTL) to avoid redundant KMS calls - KEK rotation occurs automatically when a KEK exceeds its configurable lifespan (default 730 days per NIST SP 800-57) @@ -83,13 +99,14 @@ Master Key (in KMS) │ EncryptionManager (trait) │ │ │ │ StandardEncryptionManager: │ -│ - Two-layer envelope: Master → KEK → DEK │ +│ - Envelope encryption: Master → KEK → manifest list StandardKeyMetadata │ +│ - DEKs are plaintext, protected by encrypted parent files │ │ - KEK cache (moka async, configurable TTL) │ -│ - Automatic KEK rotation │ +│ - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking) │ │ - encrypt() / decrypt() for AGS1 stream files │ │ - encrypt_native() for Parquet Modular Encryption │ -│ - wrap/unwrap_key_metadata() for manifest list keys │ -│ - generate_dek() with KEK management │ +│ - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS) │ +│ - generate_dek() for per-file plaintext DEK generation │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ @@ -127,26 +144,35 @@ Snapshot │ ▼ load_manifest_list(file_io, table_metadata) 1. Look up encryption_key_id in table_metadata.encryption_keys - 2. em.unwrap_key_metadata() → plaintext key metadata - 3. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile + → get manifest list EncryptedKey + 2. Find the KEK via EncryptedKey.encrypted_by_id + → unwrap KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id) + (KEK is cached to avoid redundant KMS calls) + 3. AES-GCM decrypt the manifest list's StandardKeyMetadata using the + unwrapped KEK, with KEY_TIMESTAMP as AAD + 4. Extract plaintext manifest list DEK from decrypted StandardKeyMetadata + 5. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile │ ▼ ManifestFile - └── key_metadata: Option> + └── key_metadata: Option> (plaintext StandardKeyMetadata, read from encrypted manifest list) │ load_manifest(file_io) - 1. If key_metadata present: file_io.new_encrypted_input() → AGS1-decrypting InputFile + 1. If key_metadata present: + a. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix + b. file_io.new_encrypted_input() → AGS1-decrypting InputFile 2. If not: file_io.new_input() │ ▼ FileScanTask - └── key_metadata: Option> + └── key_metadata: Option> (plaintext StandardKeyMetadata, read from encrypted manifest) │ ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncrypted InputFile - b. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) - c. Pass to ParquetRecordBatchStreamBuilder + b. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix + c. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) + d. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` @@ -156,20 +182,31 @@ FileScanTask RollingFileWriter::new_output_file() 1. If file_io.encryption_manager() is Some: a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile - b. EncryptionManager generates DEK, wraps with KEK + b. EncryptionManager generates random plaintext DEK + AAD prefix c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet writer - d. Store key_metadata bytes on DataFile + d. Store plaintext StandardKeyMetadata as key_metadata bytes on DataFile + (protected by being stored inside the encrypted parent manifest) 2. ParquetWriter detects NativeEncrypted, configures FileEncryptionProperties SnapshotProducer::commit() 1. Manifest writing: - a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile - b. Store key_metadata on ManifestFile entry + a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix + b. Write manifest to AGS1-encrypting OutputFile + c. Store plaintext StandardKeyMetadata as key_metadata on ManifestFile entry + (protected by being stored inside the encrypted parent manifest list) 2. Manifest list writing: - a. file_io.new_encrypted_output(path) → AGS1-encrypting OutputFile - b. em.wrap_key_metadata() → EncryptedKey for table metadata - c. Store key_id on Snapshot.encryption_key_id - 3. Table updates include AddEncryptionKey for new KEKs + a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix + b. Write manifest list to AGS1-encrypting OutputFile + c. Get or create KEK: + - Find unexpired KEK (check KEY_TIMESTAMP, 730-day lifespan) + - If none: generate new KEK, wrap via KMS: kms_client.wrap_key(kek, table_key_id) + d. AES-GCM encrypt the manifest list's StandardKeyMetadata using the KEK, + with KEY_TIMESTAMP as AAD + e. Store as EncryptedKey (encrypted_by_id = kek_id) in encryption manager + f. Store manifest list key_id on Snapshot.encryption_key_id + 3. Table commit includes AddEncryptionKey for all new entries: + - New KEKs (encrypted_by_id = table_key_id, properties include KEY_TIMESTAMP) + - New manifest list key metadata (encrypted_by_id = kek_id) ``` ### Module Structure @@ -279,10 +316,14 @@ Avro-serialized metadata stored alongside encrypted files. Compatible with the J ```rust pub struct StandardKeyMetadata { - encryption_key: Vec, // Plaintext DEK (for PME) or wrapped DEK (for AGS1) + encryption_key: Vec, // Plaintext DEK (always plaintext — never individually wrapped) aad_prefix: Vec, // Additional authenticated data prefix file_length: Option, // Optional encrypted file length } +// Note: For manifest lists, the entire serialized StandardKeyMetadata is AES-GCM +// encrypted by a KEK before storage. For manifests and data files, the +// StandardKeyMetadata is stored as plaintext key_metadata in the parent +// encrypted file. ``` Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with Java. @@ -290,7 +331,7 @@ Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with J ### EncryptionManager The `EncryptionManager` trait abstracts encryption orchestration. `StandardEncryptionManager` -implements two-layer envelope encryption with KEK caching and rotation: +implements envelope encryption with KMS-backed KEK management, KEK caching, and rotation: ```rust #[async_trait] @@ -304,13 +345,23 @@ pub trait EncryptionManager: Debug + Send + Sync { /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). async fn encrypt_native(&self, raw_output: OutputFile) -> Result; - /// Unwrap key metadata using the table's KEK hierarchy. + /// Unwrap key metadata for a manifest list. + /// 1. Look up the manifest list's EncryptedKey by key ID + /// 2. Find the KEK via encrypted_by_id + /// 3. Unwrap the KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id) + /// 4. AES-GCM decrypt the manifest list's StandardKeyMetadata using the KEK, + /// with KEY_TIMESTAMP as AAD + /// 5. Return the decrypted StandardKeyMetadata bytes (containing plaintext DEK) async fn unwrap_key_metadata( &self, encrypted_key: &EncryptedKey, encryption_keys: &HashMap, ) -> Result>; - /// Wrap key metadata with a KEK for storage in table metadata. + /// Wrap key metadata for a manifest list with a KEK for storage in table metadata. + /// 1. Get or create a KEK (wrapping new KEK via KMS if needed) + /// 2. AES-GCM encrypt the StandardKeyMetadata using the KEK, with KEY_TIMESTAMP as AAD + /// 3. Return the manifest list EncryptedKey (encrypted_by_id = kek_id) + /// and optionally a new KEK EncryptedKey if one was created async fn wrap_key_metadata( &self, key_metadata: &[u8], ) -> Result<(EncryptedKey, Option)>; From 7a2aa5aed62768ff0690ef670c9d32a67eae47ac Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 14 Apr 2026 13:57:20 +0100 Subject: [PATCH 5/9] update rfc --- docs/rfcs/0003_table_encryption.md | 332 +++++++++++++++++------------ 1 file changed, 197 insertions(+), 135 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index 5d028b0e17..8b87969038 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -23,7 +23,7 @@ ### Iceberg Spec: Encryption -The [Iceberg table spec](https://iceberg.apache.org/spec/#table-metadata) defines encryption +The [Iceberg table spec](https://iceberg.apache.org/docs/nightly/encryption/) defines encryption as a first-class concept. Tables may store an `encryption-keys` map in their metadata, snapshots may reference an `encryption-key-id`, and manifest files carry optional `key_metadata` bytes. Data files themselves can be encrypted either at the stream level @@ -96,17 +96,18 @@ Master Key (in KMS) │ ▼ ┌─────────────────────────────────────────────────────────────────────────────┐ -│ EncryptionManager (trait) │ +│ EncryptionManager (concrete struct) │ │ │ -│ StandardEncryptionManager: │ -│ - Envelope encryption: Master → KEK → manifest list StandardKeyMetadata │ -│ - DEKs are plaintext, protected by encrypted parent files │ -│ - KEK cache (moka async, configurable TTL) │ -│ - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking) │ -│ - encrypt() / decrypt() for AGS1 stream files │ -│ - encrypt_native() for Parquet Modular Encryption │ -│ - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS) │ -│ - generate_dek() for per-file plaintext DEK generation │ +│ EncryptionManager: │ +│ - Envelope encryption: Master → KEK → manifest list StandardKeyMetadata │ +│ - DEKs are plaintext, protected by encrypted parent files │ +│ - KEK cache (moka async, configurable TTL) │ +│ - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking) │ +│ - encrypt() / decrypt() for AGS1 stream files │ +│ - encrypt_native() for Parquet Modular Encryption │ +│ - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS) │ +│ - generate_dek() for per-file plaintext DEK generation │ +│ - Constructed per-table by KmsClientFactory │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ @@ -114,9 +115,9 @@ Master Key (in KMS) │ KeyManagementClient │ │ KEK Cache │ │ (trait) │ │ │ │ │ │ - moka::future::Cache with configurable TTL │ -│ wrap_key(key, key_id) │ │ - Thread-safe async │ -│ unwrap_key(wrapped, id) │ │ - Caches plaintext KEK bytes per key ID │ -│ initialize(props) │ │ │ +│ wrap_key(key, key_id) │ │ - Thread-safe async │ +│ unwrap_key(wrapped, id) │ │ - Caches plaintext KEK bytes per key ID │ +│ generate_key(key_id) │ │ │ └──────────────────────────┘ └──────────────────────────────────────────────┘ │ ▼ @@ -124,9 +125,20 @@ Master Key (in KMS) │ KMS Implementations │ │ │ │ - InMemoryKms (testing) │ -│ - AWS KMS (future) │ +│ - AWS KMS │ │ - Azure KV (future) │ │ - GCP KMS (future) │ +└──────────────────────────┘ + │ + ▲ created by +┌──────────────────────────┐ +│ KmsClientFactory │ +│ (trait) │ +│ │ +│ create_kms_client(props)│ +│ │ +│ - AwsKmsClientFactory │ +│ - Custom factories │ └──────────────────────────┘ ``` @@ -218,16 +230,17 @@ crates/iceberg/src/ │ ├── crypto.rs # AES-GCM primitives (SecureKey, AesGcmEncryptor) │ ├── key_management.rs # KeyManagementClient trait │ ├── key_metadata.rs # StandardKeyMetadata (Avro V1, Java-compatible) -│ ├── encryption_manager.rs # EncryptionManager trait + StandardEncryptionManager -│ ├── plaintext_encryption_manager.rs # No-op pass-through for unencrypted tables +│ ├── encryption_manager.rs # EncryptionManager (concrete struct) │ ├── file_encryptor.rs # FileEncryptor (write-side AGS1 wrapper) │ ├── file_decryptor.rs # FileDecryptor (read-side AGS1 wrapper) │ ├── encrypted_io.rs # EncryptedInputFile / EncryptedOutputFile wrappers │ ├── stream.rs # AesGcmFileRead / AesGcmFileWrite (AGS1 format) │ ├── kms/ -│ │ ├── mod.rs -│ │ └── in_memory.rs # InMemoryKms (testing only) -│ └── integration_tests.rs # End-to-end encryption round-trip tests +│ │ ├── mod.rs # KmsClientFactory trait +│ │ ├── in_memory.rs # InMemoryKms (testing only) +│ │ └── aws.rs # AwsKeyManagementClient (feature-gated: kms-aws) +├── tests/ +│ └── encryption_integration.rs # End-to-end encryption round-trip tests ├── io/ │ └── file_io.rs # InputFile/OutputFile enums, FileIO encryption methods ├── arrow/ @@ -300,12 +313,36 @@ Pluggable interface for KMS integration. Mirrors the Java `KeyManagementClient`: ```rust #[async_trait] pub trait KeyManagementClient: Debug + Send + Sync { - async fn initialize(&mut self, properties: HashMap) -> Result<()>; + /// Wrap (encrypt) a key using a wrapping key managed by the KMS. async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result>; - async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) -> Result>; + + /// Unwrap (decrypt) a previously wrapped key. Returns SensitiveBytes + /// which zeroizes on drop and redacts in Debug output. + async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) -> Result; + + /// Whether this KMS supports server-side key generation. + /// If true, callers can use generate_key() for atomic key generation and wrapping. + fn supports_key_generation(&self) -> bool { false } + + /// Generate a new key and wrap it atomically on the server side. + /// Only supported when supports_key_generation() returns true. + async fn generate_key(&self, wrapping_key_id: &str) -> Result { + Err(Error::new(ErrorKind::FeatureUnsupported, "...")) + } } ``` +The `initialize()` method from the Java interface is intentionally omitted — in Rust, +KMS clients are constructed with their configuration already applied (via builder pattern +or constructor arguments), making a separate initialization step unnecessary. + +The `SensitiveBytes` return type for `unwrap_key` ensures that plaintext key material +is automatically zeroized on drop and redacted in debug output. + +`supports_key_generation()` and `generate_key()` support KMS backends like AWS KMS +that can atomically generate and wrap keys server-side via `GenerateDataKey`, which is +more secure than generating locally and wrapping separately. + Users implement this trait to integrate with their KMS of choice (AWS KMS, Azure Key Vault, GCP KMS, HashiCorp Vault, etc.). An `InMemoryKms` is provided for testing. @@ -330,51 +367,59 @@ Wire format: `[version byte 0x01][Avro binary datum]` — byte-compatible with J ### EncryptionManager -The `EncryptionManager` trait abstracts encryption orchestration. `StandardEncryptionManager` -implements envelope encryption with KMS-backed KEK management, KEK caching, and rotation: +`EncryptionManager` is a concrete struct (not a trait) that implements envelope encryption +with KMS-backed KEK management, KEK caching, and rotation. A trait is unnecessary here — +for unencrypted tables, the encryption manager is simply `None` rather than using a no-op +implementation: ```rust -#[async_trait] -pub trait EncryptionManager: Debug + Send + Sync { +pub struct EncryptionManager { + kms_client: Arc, + kek_cache: moka::future::Cache, + kek_lifespan_days: i64, + algorithm: EncryptionAlgorithm, + table_key_id: Option, + encryption_keys: HashMap, +} + +impl EncryptionManager { + pub fn new(kms_client: Arc) -> Self; + /// Decrypt an AGS1 stream-encrypted file. - async fn decrypt(&self, encrypted: EncryptedInputFile) -> Result; + pub async fn decrypt(&self, encrypted: EncryptedInputFile) -> Result; /// Encrypt a file with AGS1 stream encryption. - async fn encrypt(&self, raw_output: OutputFile) -> Result; + pub async fn encrypt(&self, raw_output: OutputFile) -> Result; /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). - async fn encrypt_native(&self, raw_output: OutputFile) -> Result; + pub async fn encrypt_native(&self, raw_output: OutputFile) -> Result; /// Unwrap key metadata for a manifest list. /// 1. Look up the manifest list's EncryptedKey by key ID /// 2. Find the KEK via encrypted_by_id - /// 3. Unwrap the KEK via KMS: kms_client.unwrap_key(kek.encrypted_key_metadata, table_key_id) + /// 3. Unwrap the KEK via KMS /// 4. AES-GCM decrypt the manifest list's StandardKeyMetadata using the KEK, /// with KEY_TIMESTAMP as AAD /// 5. Return the decrypted StandardKeyMetadata bytes (containing plaintext DEK) - async fn unwrap_key_metadata( + pub async fn unwrap_key_metadata( &self, encrypted_key: &EncryptedKey, encryption_keys: &HashMap, ) -> Result>; /// Wrap key metadata for a manifest list with a KEK for storage in table metadata. - /// 1. Get or create a KEK (wrapping new KEK via KMS if needed) - /// 2. AES-GCM encrypt the StandardKeyMetadata using the KEK, with KEY_TIMESTAMP as AAD - /// 3. Return the manifest list EncryptedKey (encrypted_by_id = kek_id) - /// and optionally a new KEK EncryptedKey if one was created - async fn wrap_key_metadata( + pub async fn wrap_key_metadata( &self, key_metadata: &[u8], ) -> Result<(EncryptedKey, Option)>; } ``` -`StandardEncryptionManager` is typically not constructed directly by users. Instead, -`TableBuilder::build()` constructs it automatically from a `KeyManagementClient` +`EncryptionManager` is typically not constructed directly by users. Instead, +`TableBuilder::build()` constructs it automatically from a `KmsClientFactory` extension and the table's properties (see [Catalog Integration](#catalog-integration) below). For manual construction in tests: ```rust -let em = StandardEncryptionManager::new(Arc::new(kms_client)) +let em = EncryptionManager::new(Arc::new(kms_client)) .with_table_key_id("master-key-1") // Master key ID in KMS .with_encryption_keys(table_metadata.encryption_keys.clone()); ``` @@ -388,7 +433,7 @@ Block-based stream encryption format compatible with Java's `AesGcmInputStream`/ ``` ┌──────────────────────────────────────────┐ │ Header (8 bytes) │ -│ Magic: "AGS1" (4 bytes) │ +│ Magic: "AGS1" (4 bytes) │ │ Plain block size: u32 LE (4 bytes) │ │ Default: 1,048,576 (1 MiB) │ ├──────────────────────────────────────────┤ @@ -410,56 +455,67 @@ block to its position in the stream to prevent reordering attacks. random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for transparent AGS1 encryption with block buffering. -### InputFile and OutputFile Enums +### EncryptedInputFile and EncryptedOutputFile Wrappers -`InputFile` and `OutputFile` are enums with three variants each: +Rather than adding encryption variants to `InputFile`/`OutputFile` (which would change the +public API of those core types), encryption uses dedicated wrapper types. `EncryptedInputFile` +and `EncryptedOutputFile` wrap a plain `InputFile`/`OutputFile` and add transparent +encryption/decryption. The plain `InputFile`/`OutputFile` types remain unchanged. ```rust -pub enum InputFile { - /// Standard unencrypted file. - Plain { storage: Arc, path: String }, - - /// AGS1 stream-encrypted file. Transparent decryption on read. - Encrypted { storage: Arc, path: String, decryptor: Arc }, - - /// Parquet Modular Encryption. Raw reads; Parquet reader handles decryption. - NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, +/// Wraps a plain InputFile with decryption capabilities. +pub enum EncryptedInputFile { + /// AGS1 stream-encrypted file. Decrypted transparently on read. + Encrypted { + inner: InputFile, + decryptor: Arc, + }, + /// Parquet Modular Encryption. The Parquet reader handles decryption. + NativeEncrypted { + inner: InputFile, + key_material: NativeKeyMaterial, + }, } -pub enum OutputFile { - Plain { storage: Arc, path: String }, - Encrypted { storage: Arc, path: String, encryptor: Arc }, - NativeEncrypted { storage: Arc, path: String, key_material: NativeKeyMaterial }, +/// Wraps a plain OutputFile with encryption capabilities. +pub enum EncryptedOutputFile { + /// AGS1 stream-encrypted output. Encrypted transparently on write. + Encrypted { + inner: OutputFile, + key_metadata: Box<[u8]>, + encryptor: Arc, + }, + /// Parquet Modular Encryption. The Parquet writer handles encryption. + NativeEncrypted { + inner: OutputFile, + key_metadata: Box<[u8]>, + key_material: NativeKeyMaterial, + }, } ``` +Both wrappers delegate standard operations (`location()`, `exists()`, `read()`, `reader()`, +`write()`, `writer()`) to the inner file, with `Encrypted` variants transparently +encrypting/decrypting via `AesGcmFileRead`/`AesGcmFileWrite`. `into_inner()` recovers +the underlying plain file. + `NativeKeyMaterial` carries the plaintext DEK and AAD prefix for Parquet's `FileEncryptionProperties` / `FileDecryptionProperties`. -Common operations (`location()`, `exists()`, `read()`, `reader()`, `write()`, `writer()`) -delegate to the appropriate variant, with `Encrypted` variants transparently encrypting/decrypting -via `AesGcmFileRead`/`AesGcmFileWrite`. - -#### Adaptation for Storage Trait RFC - -Once RFC 0002 merges, `InputFile` will hold `Arc` instead of `Operator`. This is -already the case in this implementation — the enum structure is stable. Only the underlying -`Storage` trait implementation may change. +This wrapper approach means `ManifestReader` and `ManifestListWriter` accept the encrypted +wrapper types (or `Box`) where encryption is needed, rather than requiring +changes to the `InputFile`/`OutputFile` enums themselves. ### FileIO Integration -The `EncryptionManager` is stored as a type-safe `FileIOBuilder` extension. This integrates -naturally with catalogs that support extensions (e.g. `RestCatalog.with_file_io_extension()`): +The `EncryptionManager` is attached to `FileIO` via a convenience method. The encryption +manager is typically created automatically by `TableBuilder::build()` using the catalog's +`KmsClientFactory`, but can also be attached directly for testing: ```rust -// Via FileIOBuilder extension (works with RestCatalog and any extension-aware catalog) -let file_io = FileIOBuilder::new("s3") - .with_prop("s3.region", "us-east-1") - .with_extension(encryption_manager) - .build()?; - -// Or via convenience method on FileIO -let file_io = file_io.with_encryption_manager(encryption_manager); +// Typically automatic via catalog + KmsClientFactory (see Catalog Integration). +// For manual/test setup: +let file_io = file_io.with_encryption_manager(Arc::new(encryption_manager)); ``` FileIO provides encryption-aware factory methods: @@ -474,33 +530,21 @@ FileIO provides encryption-aware factory methods: #### After Storage Trait RFC -RFC 0002 removes `Extensions` from `FileIOBuilder`. The `EncryptionManager` will instead be -provided through the `StorageFactory` or configured at the catalog level: +RFC 0002 removes `Extensions` from `FileIOBuilder`. The `KmsClientFactory` will be +provided at the catalog level (Option A), which is consistent with the wrapper approach +for `EncryptedInputFile`/`EncryptedOutputFile` — encryption operates at the `FileIO` level +rather than wrapping storage: ```rust -// Option A: EncryptionManager on the catalog let catalog = GlueCatalogBuilder::default() .with_storage_factory(Arc::new(OpenDalStorageFactory::S3)) - .with_encryption_manager(encryption_manager) + .with_kms_client_factory(Arc::new(AwsKmsClientFactory)) .load("my_catalog", props) .await?; - -// Option B: Wrapping StorageFactory -pub struct EncryptingStorageFactory { - inner: Arc, - encryption_manager: Arc, -} - -impl StorageFactory for EncryptingStorageFactory { - fn build(&self, config: &StorageConfig) -> Result> { - let storage = self.inner.build(config)?; - Ok(Arc::new(EncryptingStorage::new(storage, self.encryption_manager.clone()))) - } -} ``` -The exact integration point will be finalized when RFC 0002 merges. The encryption -module's internal design (crypto, key management, stream format) is unaffected. +The encryption module's internal design (crypto, key management, stream format) is unaffected +by the storage trait changes. ### Parquet Modular Encryption @@ -540,21 +584,63 @@ to produce a per-table encrypting FileIO. #### How Rust Does It Rust does not have Java's reflection-based class loading, so `encryption.kms-impl` (a class name -string) is not useful. Instead, the user provides a concrete `Arc` -as a `FileIOBuilder` extension on the catalog. The `table_key_id` and `encryption_keys` are -then inferred automatically from table metadata. +string) is not useful. Instead, the user provides a `KmsClientFactory` to the catalog builder. +The factory creates per-table `KeyManagementClient` instances from table properties, and the +`table_key_id` and `encryption_keys` are inferred automatically from table metadata. -**Step 1: User provides the KMS client to the catalog.** +##### KmsClientFactory Trait + +A `KmsClientFactory` abstracts the construction of KMS clients. This allows the catalog to +create a correctly configured KMS client for each table based on that table's properties, +without the user needing to know the details of per-table KMS configuration: + +```rust +#[async_trait] +pub trait KmsClientFactory: Send + Sync + Debug { + /// Create a KeyManagementClient from table properties. + /// + /// Called by TableBuilder::build() for each encrypted table. The factory + /// receives the table's properties (which may include KMS endpoint, region, + /// credentials, etc.) and returns a configured client. + async fn create_kms_client( + &self, + properties: &HashMap, + ) -> Result>; +} +``` -For catalogs that support `FileIOBuilder` extensions (e.g. `RestCatalog`): +Example implementations: ```rust -let kms_client: Arc = Arc::new(my_aws_kms); +/// Factory for AWS KMS clients. Reads endpoint and region from table properties. +#[derive(Debug)] +pub struct AwsKmsClientFactory; +#[async_trait] +impl KmsClientFactory for AwsKmsClientFactory { + async fn create_kms_client( + &self, + properties: &HashMap, + ) -> Result> { + let region = properties.get("kms.region").cloned(); + let endpoint = properties.get("kms.endpoint").cloned(); + let client = AwsKeyManagementClient::builder() + .with_region(region) + .with_endpoint(endpoint) + .build() + .await?; + Ok(Arc::new(client)) + } +} +``` + +**Step 1: User provides the KMS client factory to the catalog builder.** + +```rust let catalog = RestCatalogBuilder::default() + .with_kms_client_factory(Arc::new(AwsKmsClientFactory)) .load("rest", props) - .await? - .with_file_io_extension(kms_client); + .await?; ``` **Step 2: `TableBuilder::build()` auto-configures encryption per table.** @@ -563,49 +649,29 @@ When building a `Table`, `TableBuilder::maybe_configure_encryption()` runs autom This is the Rust equivalent of Java's `RESTTableOperations.io()` which calls `EncryptingFileIO.combine(io, encryption())`. It checks: -1. Does the `FileIO` have a `KeyManagementClient` extension? If not, return as-is. +1. Does the catalog have a `KmsClientFactory`? If not, return as-is. 2. Does the table metadata have an `encryption.key-id` property? If not, return as-is (unencrypted table). -3. If both are present, construct a `StandardEncryptionManager` with: - - `table_key_id` from the `encryption.key-id` table property - - `encryption_keys` from `TableMetadata.encryption_keys` (the KEK map) - - The `KeyManagementClient` from the extension +3. If both are present: + a. Call `kms_client_factory.create_kms_client(table_properties)` to get a per-table KMS client + b. Construct an `EncryptionManager` with: + - `table_key_id` from the `encryption.key-id` table property + - `encryption_keys` from `TableMetadata.encryption_keys` (the KEK map) + - The `KeyManagementClient` from the factory 4. Attach the `EncryptionManager` to the table's `FileIO`. This runs on every `Table::builder().build()` call, so each table gets a correctly configured per-table `EncryptionManager` even when a single catalog manages tables with different key IDs. ```rust -// User code — just provide the KMS client, everything else is automatic: +// User code — just provide the factory, everything else is automatic: let table = catalog.load_table(&ident).await?; -// table.file_io() now has a StandardEncryptionManager configured with +// table.file_io() now has an EncryptionManager configured with // the correct table_key_id and encryption_keys from this table's metadata. ``` `Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which stores its own `FileIO` for manifest/manifest list loading). -#### Open Decision: KMS Client Injection Mechanism - -The current approach requires the user to manually construct and provide the KMS client. -In production, the REST catalog server may return `encryption.kms-impl` or `encryption.kms-type` -in its config response. A few options for resolving this automatically in Rust: - -1. **Current approach (explicit)**: User constructs `Arc` and adds - it as a `FileIOBuilder` extension. Simple, no magic, works today. - -2. **Type registry**: A `HashMap Arc>>` - mapping `kms-type` strings (e.g. `"aws"`, `"gcp"`) to factory functions. The catalog reads - `encryption.kms-type` from properties and looks up the factory. Requires a registration step - but is closer to Java's `kms-type` intent. - -3. **Catalog-specific logic**: Each catalog implementation (REST, Glue, etc.) knows how to - create its KMS client based on the properties it receives. For example, `RestCatalog` could - detect `encryption.kms-impl = AwsKeyManagementClient` in the config response and - automatically create an `AwsKms` instance with the right endpoint and credentials. - -The right choice depends on how the upstream Iceberg spec evolves `encryption.kms-type`. For -now, option 1 (explicit) is implemented and sufficient for production use. - ### DataFusion Integration The DataFusion integration requires **no encryption-specific code**. Encryption flows @@ -640,9 +706,5 @@ Cross-language compatibility is a hard requirement: ## Future Work -- **Production KMS implementations**: AWS KMS, Azure Key Vault, GCP KMS -- **Column-level encryption policies**: Encrypt specific columns with different keys -- **Key rotation support**: Re-encrypt DEKs with new KEKs without re-encrypting data +- **Additional KMS implementations**: Azure Key Vault, GCP KMS (AWS KMS is implemented) - **AES-256-GCM support**: Depends on `parquet-rs` support -- **Storage Trait adaptation**: Replace `Extensions`-based `EncryptionManager` injection - with the pattern from RFC 0002 (catalog-level or `EncryptingStorageFactory` wrapper) From 1bfe352c39a0e3c49512c3191110fa7a077ef96f Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 24 Apr 2026 08:50:37 +0100 Subject: [PATCH 6/9] enum --- docs/rfcs/0003_table_encryption.md | 136 +++++++++++++++-------------- 1 file changed, 72 insertions(+), 64 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index 8b87969038..76cff96710 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -35,8 +35,8 @@ production-tested. It defines: - **`EncryptionManager`** -- orchestrates encrypt/decrypt of `InputFile`/`OutputFile` - **`KeyManagementClient`** -- pluggable KMS integration (wrap/unwrap keys) -- **`EncryptedInputFile` / `EncryptedOutputFile`** -- thin wrappers pairing a raw file handle - with its `EncryptionKeyMetadata` +- **`EncryptedInputFile` / `NativeEncryptedInputFile` / `EncryptedOutputFile` / `NativeEncryptedOutputFile`** -- + thin wrappers pairing a raw file handle with its encryption key material - **`StandardEncryptionManager`** -- envelope encryption with key caching, AGS1 streams, and Parquet native encryption support - **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) @@ -181,9 +181,9 @@ FileScanTask │ ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: - a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncrypted InputFile - b. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix - c. Build FileDecryptionProperties from NativeKeyMaterial (DEK + AAD prefix) + a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncryptedInputFile + b. Extract NativeKeyMaterial (plaintext DEK + AAD prefix) + c. Build FileDecryptionProperties from NativeKeyMaterial d. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` @@ -193,12 +193,12 @@ FileScanTask ``` RollingFileWriter::new_output_file() 1. If file_io.encryption_manager() is Some: - a. file_io.new_native_encrypted_output(path) → EncryptedOutputFile + a. file_io.new_native_encrypted_output(path) → NativeEncryptedOutputFile b. EncryptionManager generates random plaintext DEK + AAD prefix - c. OutputFile::NativeEncrypted carries NativeKeyMaterial for Parquet writer + c. NativeEncryptedOutputFile carries NativeKeyMaterial for Parquet writer d. Store plaintext StandardKeyMetadata as key_metadata bytes on DataFile (protected by being stored inside the encrypted parent manifest) - 2. ParquetWriter detects NativeEncrypted, configures FileEncryptionProperties + 2. ParquetWriter extracts NativeKeyMaterial, configures FileEncryptionProperties SnapshotProducer::commit() 1. Manifest writing: @@ -233,7 +233,7 @@ crates/iceberg/src/ │ ├── encryption_manager.rs # EncryptionManager (concrete struct) │ ├── file_encryptor.rs # FileEncryptor (write-side AGS1 wrapper) │ ├── file_decryptor.rs # FileDecryptor (read-side AGS1 wrapper) -│ ├── encrypted_io.rs # EncryptedInputFile / EncryptedOutputFile wrappers +│ ├── encrypted_io.rs # Encrypted / NativeEncrypted InputFile / OutputFile structs │ ├── stream.rs # AesGcmFileRead / AesGcmFileWrite (AGS1 format) │ ├── kms/ │ │ ├── mod.rs # KmsClientFactory trait @@ -386,13 +386,16 @@ impl EncryptionManager { pub fn new(kms_client: Arc) -> Self; /// Decrypt an AGS1 stream-encrypted file. - pub async fn decrypt(&self, encrypted: EncryptedInputFile) -> Result; + pub async fn decrypt(&self, input: InputFile, key_metadata: &[u8]) -> Result; /// Encrypt a file with AGS1 stream encryption. pub async fn encrypt(&self, raw_output: OutputFile) -> Result; /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). - pub async fn encrypt_native(&self, raw_output: OutputFile) -> Result; + pub async fn encrypt_native(&self, raw_output: OutputFile) -> Result; + + /// Decrypt key metadata for a Parquet Modular Encryption (PME) file. + pub fn decrypt_native(&self, raw_input: InputFile, key_metadata: &[u8]) -> Result; /// Unwrap key metadata for a manifest list. /// 1. Look up the manifest list's EncryptedKey by key ID @@ -455,56 +458,61 @@ block to its position in the stream to prevent reordering attacks. random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for transparent AGS1 encryption with block buffering. -### EncryptedInputFile and EncryptedOutputFile Wrappers +### Encrypted File Wrappers Rather than adding encryption variants to `InputFile`/`OutputFile` (which would change the -public API of those core types), encryption uses dedicated wrapper types. `EncryptedInputFile` -and `EncryptedOutputFile` wrap a plain `InputFile`/`OutputFile` and add transparent -encryption/decryption. The plain `InputFile`/`OutputFile` types remain unchanged. +public API of those core types), encryption uses dedicated wrapper structs. These wrap a +plain `InputFile`/`OutputFile` and add encryption/decryption capabilities. The plain +`InputFile`/`OutputFile` types remain unchanged. + +AGS1 stream encryption and Parquet Modular Encryption (PME) are separate structs rather +than enum variants because they are used in entirely different code paths — AGS1 for +manifests and manifest lists, PME for data files. Separate types give compile-time safety: +a function handling PME takes `NativeEncryptedInputFile` and cannot accidentally receive +an AGS1 file. ```rust -/// Wraps a plain InputFile with decryption capabilities. -pub enum EncryptedInputFile { - /// AGS1 stream-encrypted file. Decrypted transparently on read. - Encrypted { - inner: InputFile, - decryptor: Arc, - }, - /// Parquet Modular Encryption. The Parquet reader handles decryption. - NativeEncrypted { - inner: InputFile, - key_material: NativeKeyMaterial, - }, +/// AGS1 stream-encrypted input file. Decrypted transparently on read. +pub struct EncryptedInputFile { + inner: InputFile, + decryptor: Arc, } -/// Wraps a plain OutputFile with encryption capabilities. -pub enum EncryptedOutputFile { - /// AGS1 stream-encrypted output. Encrypted transparently on write. - Encrypted { - inner: OutputFile, - key_metadata: Box<[u8]>, - encryptor: Arc, - }, - /// Parquet Modular Encryption. The Parquet writer handles encryption. - NativeEncrypted { - inner: OutputFile, - key_metadata: Box<[u8]>, - key_material: NativeKeyMaterial, - }, +/// Parquet Modular Encryption input file. +/// The Parquet reader handles decryption at the column/page level. +pub struct NativeEncryptedInputFile { + inner: InputFile, + key_material: NativeKeyMaterial, } -``` -Both wrappers delegate standard operations (`location()`, `exists()`, `read()`, `reader()`, -`write()`, `writer()`) to the inner file, with `Encrypted` variants transparently -encrypting/decrypting via `AesGcmFileRead`/`AesGcmFileWrite`. `into_inner()` recovers -the underlying plain file. +/// AGS1 stream-encrypted output file. Encrypted transparently on write. +pub struct EncryptedOutputFile { + inner: OutputFile, + key_metadata: Box<[u8]>, + encryptor: Arc, +} + +/// Parquet Modular Encryption output file. +/// The Parquet writer handles encryption at the column/page level. +pub struct NativeEncryptedOutputFile { + inner: OutputFile, + key_metadata: Box<[u8]>, + key_material: NativeKeyMaterial, +} +``` -`NativeKeyMaterial` carries the plaintext DEK and AAD prefix for Parquet's -`FileEncryptionProperties` / `FileDecryptionProperties`. +`EncryptedInputFile` and `EncryptedOutputFile` delegate standard operations (`location()`, +`read()`, `reader()`, `writer()`) to the inner file, transparently encrypting/decrypting +via `AesGcmFileRead`/`AesGcmFileWrite`. `NativeEncryptedInputFile` and +`NativeEncryptedOutputFile` carry `NativeKeyMaterial` (plaintext DEK and AAD prefix) +for the Parquet reader/writer to configure `FileDecryptionProperties` / +`FileEncryptionProperties`. All four types provide `into_inner()` to recover the +underlying plain file. -This wrapper approach means `ManifestReader` and `ManifestListWriter` accept the encrypted -wrapper types (or `Box`) where encryption is needed, rather than requiring -changes to the `InputFile`/`OutputFile` enums themselves. +This wrapper approach means `ManifestReader` and `ManifestListWriter` accept the AGS1 +wrapper types (or `Box`) where encryption is needed, while the Parquet +writer accepts `NativeEncryptedOutputFile`, rather than requiring changes to the +`InputFile`/`OutputFile` types themselves. ### FileIO Integration @@ -520,20 +528,20 @@ let file_io = file_io.with_encryption_manager(Arc::new(encryption_manager)); FileIO provides encryption-aware factory methods: -| Method | Purpose | -|--------|---------| -| `new_encrypted_input(path, key_metadata)` | AGS1 stream decryption (manifests, manifest lists) | -| `new_encrypted_output(path)` | AGS1 stream encryption | -| `new_native_encrypted_input(path, key_metadata)` | PME input (Parquet handles decryption) | -| `new_native_encrypted_output(path)` | PME output (Parquet handles encryption) | -| `encryption_manager()` | Returns the configured EncryptionManager, if any | +| Method | Returns | Purpose | +|--------|---------|---------| +| `new_encrypted_input(path, key_metadata)` | `EncryptedInputFile` | AGS1 stream decryption (manifests, manifest lists) | +| `new_encrypted_output(path)` | `EncryptedOutputFile` | AGS1 stream encryption | +| `new_native_encrypted_input(path, key_metadata)` | `NativeEncryptedInputFile` | PME input (Parquet handles decryption) | +| `new_native_encrypted_output(path)` | `NativeEncryptedOutputFile` | PME output (Parquet handles encryption) | +| `encryption_manager()` | `Option>` | Returns the configured EncryptionManager, if any | #### After Storage Trait RFC RFC 0002 removes `Extensions` from `FileIOBuilder`. The `KmsClientFactory` will be provided at the catalog level (Option A), which is consistent with the wrapper approach -for `EncryptedInputFile`/`EncryptedOutputFile` — encryption operates at the `FileIO` level -rather than wrapping storage: +for the encrypted file wrappers — encryption operates at the `FileIO` level rather than +wrapping storage: ```rust let catalog = GlueCatalogBuilder::default() @@ -551,14 +559,14 @@ by the storage trait changes. For Parquet data files, encryption is handled natively by the Parquet reader/writer using `FileEncryptionProperties` and `FileDecryptionProperties` from `parquet-rs`. -**Write path** (`ParquetWriter`): When the output file is `NativeEncrypted`, the writer extracts +**Write path** (`ParquetWriter`): When given a `NativeEncryptedOutputFile`, the writer extracts `NativeKeyMaterial` (plaintext DEK + AAD prefix) and configures `FileEncryptionProperties` on the `AsyncArrowWriter`. The Parquet crate handles column/page-level encryption. **Read path** (`ArrowReader`): When `FileScanTask.key_metadata` is present, the reader calls -`file_io.new_native_encrypted_input()` which deserializes `StandardKeyMetadata` to extract the -plaintext DEK and AAD prefix. These are used to build `FileDecryptionProperties` which are -passed to `ParquetRecordBatchStreamBuilder::new_with_options()`. +`file_io.new_native_encrypted_input()` which returns a `NativeEncryptedInputFile`. The reader +extracts `NativeKeyMaterial` to build `FileDecryptionProperties` which are passed to +`ParquetRecordBatchStreamBuilder::new_with_options()`. The `ArrowFileReader::get_metadata()` implementation forwards both `file_decryption_properties` and `metadata_options` from `ArrowReaderOptions` to `ParquetMetaDataReader`, enabling encrypted From 0de289de8c0dc31466a72aaf086ffbdd609b1cf3 Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 5 May 2026 10:49:39 +0100 Subject: [PATCH 7/9] bring up-to-date, remove em from fileIo and remove need for native io objects --- docs/rfcs/0003_table_encryption.md | 159 ++++++++++++----------------- 1 file changed, 68 insertions(+), 91 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index 76cff96710..fb070c1973 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -35,8 +35,8 @@ production-tested. It defines: - **`EncryptionManager`** -- orchestrates encrypt/decrypt of `InputFile`/`OutputFile` - **`KeyManagementClient`** -- pluggable KMS integration (wrap/unwrap keys) -- **`EncryptedInputFile` / `NativeEncryptedInputFile` / `EncryptedOutputFile` / `NativeEncryptedOutputFile`** -- - thin wrappers pairing a raw file handle with its encryption key material +- **`EncryptedInputFile` / `EncryptedOutputFile`** -- + thin AGS1 wrappers pairing a raw file handle with its encryption key material - **`StandardEncryptionManager`** -- envelope encryption with key caching, AGS1 streams, and Parquet native encryption support - **`StandardKeyMetadata`** -- Avro-serialized key metadata (wrapped DEK, AAD prefix, file length) @@ -44,10 +44,10 @@ production-tested. It defines: ### Relationship to Storage Trait RFC -[RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) proposes -converting `Storage` from an enum to a trait and removing the `Extensions` mechanism from -`FileIOBuilder`. This encryption RFC is designed to work both with the current `Extensions`-based -`FileIO` and with the future trait-based storage. Specific adaptation points are called out below. +[RFC 0002 (Making Storage a Trait)](https://github.com/apache/iceberg-rust/pull/2116) has landed, +converting `Storage` from an enum to a trait. This encryption RFC keeps `FileIO` entirely +encryption-unaware — the `EncryptionManager` lives on the `Table` and is passed explicitly +to components that need it. --- @@ -163,16 +163,16 @@ Snapshot │ 3. AES-GCM decrypt the manifest list's StandardKeyMetadata using the unwrapped KEK, with KEY_TIMESTAMP as AAD 4. Extract plaintext manifest list DEK from decrypted StandardKeyMetadata - 5. file_io.new_encrypted_input(path, key_metadata) → AGS1-decrypting InputFile + 5. em.decrypt(input_file, key_metadata) → AGS1-decrypting EncryptedInputFile │ ▼ ManifestFile └── key_metadata: Option> (plaintext StandardKeyMetadata, read from encrypted manifest list) │ - load_manifest(file_io) + load_manifest(file_io, encryption_manager) 1. If key_metadata present: a. Parse StandardKeyMetadata → extract plaintext DEK + AAD prefix - b. file_io.new_encrypted_input() → AGS1-decrypting InputFile + b. em.decrypt(input_file, key_metadata) → AGS1-decrypting EncryptedInputFile 2. If not: file_io.new_input() │ ▼ @@ -181,9 +181,9 @@ FileScanTask │ ArrowReader::create_parquet_record_batch_stream_builder_with_key_metadata() 1. If key_metadata present: - a. file_io.new_native_encrypted_input(path, key_metadata) → NativeEncryptedInputFile - b. Extract NativeKeyMaterial (plaintext DEK + AAD prefix) - c. Build FileDecryptionProperties from NativeKeyMaterial + a. Deserialize key_metadata bytes → StandardKeyMetadata + b. Extract plaintext DEK + AAD prefix from StandardKeyMetadata + c. Build FileDecryptionProperties d. Pass to ParquetRecordBatchStreamBuilder 2. If not: standard Parquet read ``` @@ -192,13 +192,12 @@ FileScanTask ``` RollingFileWriter::new_output_file() - 1. If file_io.encryption_manager() is Some: - a. file_io.new_native_encrypted_output(path) → NativeEncryptedOutputFile - b. EncryptionManager generates random plaintext DEK + AAD prefix - c. NativeEncryptedOutputFile carries NativeKeyMaterial for Parquet writer - d. Store plaintext StandardKeyMetadata as key_metadata bytes on DataFile + 1. If encryption_manager is Some: + a. em.generate_native_key_metadata() → StandardKeyMetadata (plaintext DEK + AAD prefix) + b. Pass StandardKeyMetadata to ParquetWriter + c. Store serialized StandardKeyMetadata as key_metadata bytes on DataFile (protected by being stored inside the encrypted parent manifest) - 2. ParquetWriter extracts NativeKeyMaterial, configures FileEncryptionProperties + 2. ParquetWriter extracts DEK + AAD from StandardKeyMetadata, configures FileEncryptionProperties SnapshotProducer::commit() 1. Manifest writing: @@ -233,7 +232,7 @@ crates/iceberg/src/ │ ├── encryption_manager.rs # EncryptionManager (concrete struct) │ ├── file_encryptor.rs # FileEncryptor (write-side AGS1 wrapper) │ ├── file_decryptor.rs # FileDecryptor (read-side AGS1 wrapper) -│ ├── encrypted_io.rs # Encrypted / NativeEncrypted InputFile / OutputFile structs +│ ├── encrypted_io.rs # EncryptedInputFile / EncryptedOutputFile (AGS1 wrappers) │ ├── stream.rs # AesGcmFileRead / AesGcmFileWrite (AGS1 format) │ ├── kms/ │ │ ├── mod.rs # KmsClientFactory trait @@ -391,11 +390,9 @@ impl EncryptionManager { /// Encrypt a file with AGS1 stream encryption. pub async fn encrypt(&self, raw_output: OutputFile) -> Result; - /// Encrypt for Parquet Modular Encryption (generates NativeKeyMaterial). - pub async fn encrypt_native(&self, raw_output: OutputFile) -> Result; - - /// Decrypt key metadata for a Parquet Modular Encryption (PME) file. - pub fn decrypt_native(&self, raw_input: InputFile, key_metadata: &[u8]) -> Result; + /// Generate key material for Parquet Modular Encryption (PME). + /// Returns a StandardKeyMetadata containing a fresh DEK and AAD prefix. + pub fn generate_native_key_metadata(&self) -> Result; /// Unwrap key metadata for a manifest list. /// 1. Look up the manifest list's EncryptedKey by key ID @@ -458,18 +455,12 @@ block to its position in the stream to prevent reordering attacks. random-access reads. **`AesGcmFileWrite`** implements `FileWrite` for transparent AGS1 encryption with block buffering. -### Encrypted File Wrappers +### Encrypted File Wrappers (AGS1 Only) Rather than adding encryption variants to `InputFile`/`OutputFile` (which would change the -public API of those core types), encryption uses dedicated wrapper structs. These wrap a -plain `InputFile`/`OutputFile` and add encryption/decryption capabilities. The plain -`InputFile`/`OutputFile` types remain unchanged. - -AGS1 stream encryption and Parquet Modular Encryption (PME) are separate structs rather -than enum variants because they are used in entirely different code paths — AGS1 for -manifests and manifest lists, PME for data files. Separate types give compile-time safety: -a function handling PME takes `NativeEncryptedInputFile` and cannot accidentally receive -an AGS1 file. +public API of those core types), AGS1 stream encryption uses dedicated wrapper structs. +These wrap a plain `InputFile`/`OutputFile` and add encryption/decryption capabilities. +The plain `InputFile`/`OutputFile` types remain unchanged. ```rust /// AGS1 stream-encrypted input file. Decrypted transparently on read. @@ -478,70 +469,53 @@ pub struct EncryptedInputFile { decryptor: Arc, } -/// Parquet Modular Encryption input file. -/// The Parquet reader handles decryption at the column/page level. -pub struct NativeEncryptedInputFile { - inner: InputFile, - key_material: NativeKeyMaterial, -} - /// AGS1 stream-encrypted output file. Encrypted transparently on write. pub struct EncryptedOutputFile { inner: OutputFile, key_metadata: Box<[u8]>, encryptor: Arc, } - -/// Parquet Modular Encryption output file. -/// The Parquet writer handles encryption at the column/page level. -pub struct NativeEncryptedOutputFile { - inner: OutputFile, - key_metadata: Box<[u8]>, - key_material: NativeKeyMaterial, -} ``` `EncryptedInputFile` and `EncryptedOutputFile` delegate standard operations (`location()`, `read()`, `reader()`, `writer()`) to the inner file, transparently encrypting/decrypting -via `AesGcmFileRead`/`AesGcmFileWrite`. `NativeEncryptedInputFile` and -`NativeEncryptedOutputFile` carry `NativeKeyMaterial` (plaintext DEK and AAD prefix) -for the Parquet reader/writer to configure `FileDecryptionProperties` / -`FileEncryptionProperties`. All four types provide `into_inner()` to recover the +via `AesGcmFileRead`/`AesGcmFileWrite`. Both provide `into_inner()` to recover the underlying plain file. +Parquet Modular Encryption (PME) does not use wrapper types. Instead, the Parquet +reader/writer receives `StandardKeyMetadata` directly from the `EncryptionManager` and +uses it to configure `FileDecryptionProperties`/`FileEncryptionProperties` (see +[Parquet Modular Encryption](#parquet-modular-encryption) below). + This wrapper approach means `ManifestReader` and `ManifestListWriter` accept the AGS1 wrapper types (or `Box`) where encryption is needed, while the Parquet -writer accepts `NativeEncryptedOutputFile`, rather than requiring changes to the -`InputFile`/`OutputFile` types themselves. +writer works with plain `OutputFile` plus `StandardKeyMetadata`, rather than requiring +changes to the `InputFile`/`OutputFile` types themselves. ### FileIO Integration -The `EncryptionManager` is attached to `FileIO` via a convenience method. The encryption -manager is typically created automatically by `TableBuilder::build()` using the catalog's -`KmsClientFactory`, but can also be attached directly for testing: +`FileIO` is **encryption-unaware**. It has no knowledge of encryption, no encrypted factory +methods, and no reference to the `EncryptionManager`. This keeps `FileIO` focused on +storage concerns only. -```rust -// Typically automatic via catalog + KmsClientFactory (see Catalog Integration). -// For manual/test setup: -let file_io = file_io.with_encryption_manager(Arc::new(encryption_manager)); -``` +The `EncryptionManager` lives on the `Table` and is passed explicitly to components that +need it (e.g. `SnapshotProducer`, `ArrowReader`, `ParquetWriter`). This is more explicit +than assuming FileIO has been configured correctly. -FileIO provides encryption-aware factory methods: +```rust +// EncryptionManager is accessed from the Table: +let em = table.encryption_manager(); // Option<&Arc> -| Method | Returns | Purpose | -|--------|---------|---------| -| `new_encrypted_input(path, key_metadata)` | `EncryptedInputFile` | AGS1 stream decryption (manifests, manifest lists) | -| `new_encrypted_output(path)` | `EncryptedOutputFile` | AGS1 stream encryption | -| `new_native_encrypted_input(path, key_metadata)` | `NativeEncryptedInputFile` | PME input (Parquet handles decryption) | -| `new_native_encrypted_output(path)` | `NativeEncryptedOutputFile` | PME output (Parquet handles encryption) | -| `encryption_manager()` | `Option>` | Returns the configured EncryptionManager, if any | +// AGS1 encryption for manifests/manifest lists: +let encrypted_output = em.encrypt(plain_output_file).await?; +let encrypted_input = em.decrypt(plain_input_file, key_metadata).await?; -#### After Storage Trait RFC +// PME key material for Parquet data files: +let key_metadata = em.generate_native_key_metadata()?; +``` -RFC 0002 removes `Extensions` from `FileIOBuilder`. The `KmsClientFactory` will be -provided at the catalog level (Option A), which is consistent with the wrapper approach -for the encrypted file wrappers — encryption operates at the `FileIO` level rather than -wrapping storage: +The `KmsClientFactory` is provided at the catalog level, which constructs the +`EncryptionManager` per-table during `TableBuilder::build()`: ```rust let catalog = GlueCatalogBuilder::default() @@ -557,15 +531,18 @@ by the storage trait changes. ### Parquet Modular Encryption For Parquet data files, encryption is handled natively by the Parquet reader/writer using -`FileEncryptionProperties` and `FileDecryptionProperties` from `parquet-rs`. - -**Write path** (`ParquetWriter`): When given a `NativeEncryptedOutputFile`, the writer extracts -`NativeKeyMaterial` (plaintext DEK + AAD prefix) and configures `FileEncryptionProperties` on the -`AsyncArrowWriter`. The Parquet crate handles column/page-level encryption. - -**Read path** (`ArrowReader`): When `FileScanTask.key_metadata` is present, the reader calls -`file_io.new_native_encrypted_input()` which returns a `NativeEncryptedInputFile`. The reader -extracts `NativeKeyMaterial` to build `FileDecryptionProperties` which are passed to +`FileEncryptionProperties` and `FileDecryptionProperties` from `parquet-rs`. No dedicated +wrapper types are needed — the Parquet layer works with plain `InputFile`/`OutputFile` +plus `StandardKeyMetadata` for key material. + +**Write path** (`ParquetWriter`): The writer receives a `&StandardKeyMetadata` (from +`EncryptionManager::generate_native_key_metadata()`), extracts the plaintext DEK and AAD +prefix, and configures `FileEncryptionProperties` on the `AsyncArrowWriter`. The Parquet +crate handles column/page-level encryption. The output file itself is a plain `OutputFile`. + +**Read path** (`ArrowReader`): When `FileScanTask.key_metadata` is present, the reader +deserializes the raw bytes into a `StandardKeyMetadata`, extracts the DEK and AAD prefix, +and builds `FileDecryptionProperties` which are passed to `ParquetRecordBatchStreamBuilder::new_with_options()`. The `ArrowFileReader::get_metadata()` implementation forwards both `file_decryption_properties` @@ -665,7 +642,7 @@ This is the Rust equivalent of Java's `RESTTableOperations.io()` which calls - `table_key_id` from the `encryption.key-id` table property - `encryption_keys` from `TableMetadata.encryption_keys` (the KEK map) - The `KeyManagementClient` from the factory -4. Attach the `EncryptionManager` to the table's `FileIO`. +4. Store the `EncryptionManager` on the `Table`. This runs on every `Table::builder().build()` call, so each table gets a correctly configured per-table `EncryptionManager` even when a single catalog manages tables with different key IDs. @@ -673,20 +650,20 @@ per-table `EncryptionManager` even when a single catalog manages tables with dif ```rust // User code — just provide the factory, everything else is automatic: let table = catalog.load_table(&ident).await?; -// table.file_io() now has an EncryptionManager configured with +// table.encryption_manager() returns the EncryptionManager configured with // the correct table_key_id and encryption_keys from this table's metadata. ``` -`Table::with_file_io()` replaces the table's `FileIO` and rebuilds its `ObjectCache` (which -stores its own `FileIO` for manifest/manifest list loading). +The `EncryptionManager` is passed explicitly from the `Table` to components that need it +(e.g. `ObjectCache`, `SnapshotProducer`, `ArrowReader`, `RollingFileWriter`). ### DataFusion Integration The DataFusion integration requires **no encryption-specific code**. Encryption flows transparently through the existing pipeline: -- **`IcebergTableProvider::scan()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergTableScan` → `table.scan().to_arrow()` → `ArrowReader` decrypts via `key_metadata` -- **`IcebergTableProvider::insert_into()`** calls `catalog.load_table()` → table has encrypted FileIO → `IcebergWriteExec` uses `table.file_io()` → `RollingFileWriterBuilder` detects encryption → PME-encrypted data files + AGS1-encrypted manifests +- **`IcebergTableProvider::scan()`** calls `catalog.load_table()` → table has `EncryptionManager` → `IcebergTableScan` → `table.scan().to_arrow()` → `ArrowReader` decrypts via `key_metadata` +- **`IcebergTableProvider::insert_into()`** calls `catalog.load_table()` → table has `EncryptionManager` → `IcebergWriteExec` uses `table.encryption_manager()` → `RollingFileWriterBuilder` encrypts → PME-encrypted data files + AGS1-encrypted manifests - **`IcebergCommitExec`** uses `Transaction::fast_append()` → `SnapshotProducer` writes encrypted manifests/manifest list → `AddEncryptionKey` updates persisted in table metadata ### Format Version Requirement From c2891296e974756a0ac5300d2ae3d86732de7d70 Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 5 May 2026 21:42:01 +0100 Subject: [PATCH 8/9] update --- docs/rfcs/0003_table_encryption.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/rfcs/0003_table_encryption.md b/docs/rfcs/0003_table_encryption.md index fb070c1973..3dae675aa1 100644 --- a/docs/rfcs/0003_table_encryption.md +++ b/docs/rfcs/0003_table_encryption.md @@ -104,9 +104,8 @@ Master Key (in KMS) │ - KEK cache (moka async, configurable TTL) │ │ - Automatic KEK rotation (730 days, KEY_TIMESTAMP tracking) │ │ - encrypt() / decrypt() for AGS1 stream files │ -│ - encrypt_native() for Parquet Modular Encryption │ +│ - generate_native_key_metadata() for Parquet Modular Encryption │ │ - wrap/unwrap_key_metadata() for manifest list keys (KEK + KMS) │ -│ - generate_dek() for per-file plaintext DEK generation │ │ - Constructed per-table by KmsClientFactory │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ @@ -209,11 +208,14 @@ SnapshotProducer::commit() a. em.encrypt(output_file) → generates random plaintext DEK + AAD prefix b. Write manifest list to AGS1-encrypting OutputFile c. Get or create KEK: - - Find unexpired KEK (check KEY_TIMESTAMP, 730-day lifespan) + - Find newest unexpired KEK by KEY_TIMESTAMP - If none: generate new KEK, wrap via KMS: kms_client.wrap_key(kek, table_key_id) d. AES-GCM encrypt the manifest list's StandardKeyMetadata using the KEK, with KEY_TIMESTAMP as AAD - e. Store as EncryptedKey (encrypted_by_id = kek_id) in encryption manager + e. wrap_key_metadata() returns (EncryptedKey, Option) + - The EncryptedKey entry (encrypted_by_id = kek_id) + - Optionally a new KEK if one was created/rotated + - Caller persists both via AddEncryptionKey in the commit f. Store manifest list key_id on Snapshot.encryption_key_id 3. Table commit includes AddEncryptionKey for all new entries: - New KEKs (encrypted_by_id = table_key_id, properties include KEY_TIMESTAMP) @@ -407,6 +409,8 @@ impl EncryptionManager { ) -> Result>; /// Wrap key metadata for a manifest list with a KEK for storage in table metadata. + /// Returns (wrapped_entry, optional_new_kek). The caller must persist both via + /// AddEncryptionKey — the manager does not store new KEKs internally. pub async fn wrap_key_metadata( &self, key_metadata: &[u8], ) -> Result<(EncryptedKey, Option)>; From 49d108944602b29e496093507c4200c53037dc46 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 14:06:56 +0100 Subject: [PATCH 9/9] add hashicorp --- .typos.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.typos.toml b/.typos.toml index 36996a553a..df8a1da77e 100644 --- a/.typos.toml +++ b/.typos.toml @@ -21,6 +21,8 @@ extend-ignore-identifiers-re = ["^bimap$"] [default.extend-words] ags = "ags" AGS = "AGS" +# For HashiCorp (otherwise flagged as a typo of "Hash") +hashi = "hashi" [files] extend-exclude = ["**/testdata", "CHANGELOG.md"]