diff --git a/README.md b/README.md index f270f01..b59b882 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,11 @@ SDK 提供以下能力: 3. 上传本地文件并返回 `UploadResult`。 4. 订阅上传阶段和分片进度事件。 5. 列出 App 本地仍可恢复的上传任务。 -6. 在 App 重启、网络恢复或上传中断后继续上传。 -7. 取消远端上传并清理本地状态。 -8. 仅删除本地快照,便于用户放弃恢复或处理异常状态。 -9. 接入日志与指标回调,方便接入宿主 App 的可观测性系统。 +6. 使用用户 access JWT 列出当前用户可见的远端 verified 逻辑对象。 +7. 在 App 重启、网络恢复或上传中断后继续上传。 +8. 取消远端上传并清理本地状态。 +9. 仅删除本地快照,便于用户放弃恢复或处理异常状态。 +10. 接入日志与指标回调,方便接入宿主 App 的可观测性系统。 ## 2. 环境要求 @@ -559,11 +560,34 @@ SDK 会把源文件名写入保留 raw tag,value 是 `UploadRequest.fileURL.la App 侧持久化上传记录时,优先保存 `logicalUploadID`、`objectKey`、`fileSize` 和业务自己的文件记录 ID。不要把 `api_key`、临时访问凭证或完整错误详情写入可导出的用户日志。 -## 12. 恢复上传 +## 12. 列出远端对象 + +`listObjects(_:authorizationHeader:)` 使用用户 access JWT 调用 Data Gateway object service,返回当前用户可见的 verified 逻辑对象。它不使用上传 `api_key`。`authorizationHeader` 不能为空;临时网络错误会复用控制面重试策略,但 `unauthenticated` 会直接返回给 App,由 App 负责刷新用户会话后重试。 + +```swift +let page = try await client.listObjects( + ListObjectsOptions( + pageSize: 50, + pageToken: nil, + filter: "status:verified" + ), + authorizationHeader: "Bearer \(userAccessToken)" +) + +for object in page.objects { + print(object.objectID) + print(object.fileID) + print(object.status) +} +``` + +`page.nextPageToken` 非空时,可带同一 filter 继续请求下一页。 + +## 13. 恢复上传 SDK 会在 `persistRootURL` 下保存上传快照。App 重启、网络恢复、进程被系统终止后,可以使用这些快照恢复上传。 -### 12.1 启动时列出待恢复任务 +### 13.1 启动时列出待恢复任务 ```swift let pendingUploads = try await client.listPendingUploads() @@ -578,7 +602,7 @@ for item in pendingUploads { `listPendingUploads()` 只返回仍在 active 状态的本地快照。使用推荐配置时,已完成上传不会出现在结果中。 -### 12.2 恢复指定任务 +### 13.2 恢复指定任务 ```swift let result = try await client.resumeUpload(logicalUploadID: pending.logicalUploadID) @@ -594,7 +618,7 @@ print(result.objectKey) 3. 如果远端状态无法继续安全恢复,SDK 会在 `maxRestartCount` 限制内自动重建上传会话。 4. 超过重建次数上限后,SDK 抛出 `DataGatewayClientError.uploadRestartExceeded`。 -### 12.3 PendingUploadInfo 字段 +### 13.3 PendingUploadInfo 字段 | 字段 | 说明 | |---|---| @@ -617,9 +641,9 @@ print(result.objectKey) | `businessCompleting` | 正在完成最终确认。 | | `terminalFailed` | 已进入失败终态。 | -## 13. 取消和本地清理 +## 14. 取消和本地清理 -### 13.1 取消远端上传 +### 14.1 取消远端上传 ```swift try await client.abortUpload(logicalUploadID: logicalUploadID) @@ -627,7 +651,7 @@ try await client.abortUpload(logicalUploadID: logicalUploadID) `abortUpload(logicalUploadID:)` 会请求远端取消该上传,并在取消成功或远端已经找不到该上传时删除本地快照。用户在 UI 中选择“取消上传”或“放弃任务”时,优先使用该方法。 -### 13.2 仅删除本地快照 +### 14.2 仅删除本地快照 ```swift try await client.deleteLocalSnapshot(logicalUploadID: logicalUploadID) @@ -641,7 +665,7 @@ try await client.deleteLocalSnapshot(logicalUploadID: logicalUploadID) 如果你的目标是取消上传并释放远端资源,请使用 `abortUpload(logicalUploadID:)`。 -## 14. 错误处理 +## 15. 错误处理 SDK 的公开错误模型是 `DataGatewayClientError`。 @@ -728,7 +752,7 @@ do { | `uploadRestartExceeded` | 提示重新上传,必要时保留日志联系支持。 | | `persistenceFailed` | 检查磁盘空间、文件保护状态和 App 容器权限。 | -## 15. 可观测性 +## 16. 可观测性 通过 `DataGatewayClientObservability` 将 SDK 日志和指标接入宿主 App: @@ -772,9 +796,9 @@ SDK 会对包含 `credential`、`token`、`accessKey`、`secret` 等关键词的 | `upload_part` | `upload_id`、`part_number` | 分片上传事件。 | | `credentials_refresh` | `upload_id` | 上传凭证刷新事件。 | -## 16. iOS App 生命周期建议 +## 17. iOS App 生命周期建议 -### 16.1 App 启动 +### 17.1 App 启动 App 启动后建议执行: @@ -805,7 +829,7 @@ func makeClientOrRequireInitialization() async throws -> DataGatewayClient? { } ``` -### 16.2 前后台切换 +### 17.2 前后台切换 当前 SDK 使用 Swift Concurrency 执行上传,不是 iOS `URLSession` background transfer。App 进入后台后,系统可能暂停或终止进程。 @@ -816,15 +840,15 @@ func makeClientOrRequireInitialization() async throws -> DataGatewayClient? { 3. App 回到前台或重启后,调用 `listPendingUploads()` 并恢复任务。 4. UI 上应允许用户重试、恢复、取消或重新选择文件。 -### 16.3 文件选择与安全作用域 +### 17.3 文件选择与安全作用域 如果文件来自 `UIDocumentPickerViewController` 或其他安全作用域 URL,SDK 会尽量访问安全作用域资源并把文件复制到 SDK staging 目录。推荐保持 `copyExternalFileIntoManagedStaging = true`,这样用户移动或撤销原始文件访问权限后,已进入 staging 的上传仍更容易恢复。 -### 16.4 文件大小与内存 +### 17.4 文件大小与内存 当前版本会在本地读取文件并按分片上传。请根据目标设备内存和网络条件控制单个文件大小。对于非常大的文件、后台长传或高并发上传需求,建议在上线前与 Archebase 支持团队确认版本能力和压测结果。 -## 17. 完整示例 +## 18. 完整示例 下面示例展示一个 App 侧上传服务的典型封装方式: @@ -917,11 +941,11 @@ actor GatewayUploadService { 在 SwiftUI 或 UIKit 中更新 UI 时,请在 `MainActor` 上处理上传状态。不要在 `onLog`、`onMetric` 或上传事件循环中执行耗时同步操作。 -## 18. Public API 速查 +## 19. Public API 速查 以下类型分布在 `DataGatewayClient`、`DGWControlPlane` 和 `DGWStore` 模块中。App 按第 4 节同时添加这三个产品并导入对应模块后,可以直接使用这些公开类型。 -### 18.1 DataGatewayClient +### 19.1 DataGatewayClient ```swift public actor DataGatewayClient { @@ -944,13 +968,18 @@ public actor DataGatewayClient { public func listPendingUploads() async throws -> [PendingUploadInfo] + public func listObjects( + _ options: ListObjectsOptions = ListObjectsOptions(), + authorizationHeader: String + ) async throws -> ListObjectsPage + public func abortUpload(logicalUploadID: String) async throws public func deleteLocalSnapshot(logicalUploadID: String) async throws } ``` -### 18.2 Device Initialization +### 19.2 Device Initialization ```swift public struct DeviceInitClientConfig: Sendable { @@ -977,7 +1006,44 @@ public struct ArchebaseConfig: Codable, Sendable, Equatable { } ``` -### 18.3 Upload Types +### 19.3 Object Types + +```swift +public struct ListObjectsOptions: Sendable, Equatable { + public var pageSize: Int32 + public var pageToken: String? + public var filter: String? +} + +public struct ListObjectsPage: Sendable, Equatable { + public var objects: [DataObject] + public var nextPageToken: String +} + +public struct DataObject: Sendable, Equatable { + public var objectID: String + public var fileID: String + public var status: DataObjectStatus + public var sizeBytes: Int64 + public var createdAtUnix: Int64 + public var uploadedAtUnix: Int64 + public var verifiedAtUnix: Int64 + public var etag: String +} + +public enum DataObjectStatus: Sendable, Equatable { + case unspecified + case created + case uploaded + case verified + case bad + case aborted + case invalid + case unrecognized(Int) +} +``` + +### 19.4 Upload Types ```swift public struct UploadRequest: Sendable { @@ -1011,7 +1077,7 @@ public enum UploadEvent: Sendable, Equatable { } ``` -### 18.4 Pending Upload Types +### 19.5 Pending Upload Types ```swift public struct PendingUploadInfo: Sendable, Equatable { @@ -1034,7 +1100,7 @@ public enum PersistedUploadPhase: String, Codable, Sendable, Equatable { } ``` -### 18.5 Configuration Types +### 19.6 Configuration Types ```swift public struct DataGatewayClientConfig: Sendable { @@ -1077,7 +1143,7 @@ public struct LocalPersistencePolicy: Sendable, Equatable { } ``` -### 18.6 Observability Types +### 19.7 Observability Types ```swift public struct DataGatewayClientObservability: Sendable { @@ -1099,7 +1165,7 @@ public struct DataGatewayClientLogEvent: Sendable, Equatable { } ``` -### 18.7 Error Type +### 19.8 Error Type ```swift public enum DataGatewayClientError: Error, Sendable, Equatable { @@ -1123,7 +1189,7 @@ public enum DataGatewayClientError: Error, Sendable, Equatable { } ``` -## 19. 上线前检查清单 +## 20. 上线前检查清单 1. 确认 `archebase-endpoints.json` 已初始化到 App 私有目录,包含 `auth`、`gateway` 和 `deviceInit` 三组 endpoint,App 网络环境可以访问这些端点。 2. `archebase-endpoints.json` 和 `archebase-config.json` 写入 App 私有目录,不进入日志、备份导出或共享容器。 @@ -1136,7 +1202,7 @@ public enum DataGatewayClientError: Error, Sendable, Equatable { 9. `clientHints` 和 `rawTags` 不包含密码、token、个人隐私或其他敏感信息。 10. 已接入 `DataGatewayClientObservability` 或等价日志,且日志经过脱敏和采样策略控制。 -## 20. 快速问题定位 +## 21. 快速问题定位 | 现象 | 优先检查 | |---|---| diff --git a/Sources/DGWControlPlane/ControlPlaneTransport.swift b/Sources/DGWControlPlane/ControlPlaneTransport.swift index 210c6be..62d0cc3 100644 --- a/Sources/DGWControlPlane/ControlPlaneTransport.swift +++ b/Sources/DGWControlPlane/ControlPlaneTransport.swift @@ -197,10 +197,15 @@ package struct ControlPlaneClientFactory: Sendable { } } - package func makeGatewayClient() throws -> ManagedControlPlaneServiceClient { + package func makeGatewayClient() throws -> ManagedControlPlaneServiceClient> { try ManagedControlPlaneServiceClient(configuration: self.configuration) { grpcClient in Archebase_DataGateway_V1_DataGatewayService.Client(wrapping: grpcClient) - as any Archebase_DataGateway_V1_DataGatewayService.ClientProtocol + } + } + + package func makeObjectClient() throws -> ManagedControlPlaneServiceClient> { + try ManagedControlPlaneServiceClient(configuration: self.configuration) { grpcClient in + Archebase_DataGateway_V1_DataGatewayObjectService.Client(wrapping: grpcClient) } } @@ -432,6 +437,50 @@ package final class GatewayControlPlaneClient Archebase_DataGateway_V1_ListObjectsResponse +} + +package final class ObjectControlPlaneClient: ObjectControlPlaneClientProtocol, @unchecked Sendable { + private let client: Client + private let optionsBuilder: ControlPlaneRequestOptionsBuilder + + package init( + client: Client, + requestTimeout: Duration + ) { + self.client = client + self.optionsBuilder = ControlPlaneRequestOptionsBuilder(requestTimeout: requestTimeout) + } + + package func listObjects( + pageSize: Int32, + pageToken: String, + filter: String, + authorizationHeader: String + ) async throws -> Archebase_DataGateway_V1_ListObjectsResponse { + var request = Archebase_DataGateway_V1_ListObjectsRequest() + request.pageSize = pageSize + request.pageToken = pageToken + request.filter = filter + + let options = self.optionsBuilder.make(authorizationHeader: authorizationHeader) + let response: ClientResponse = try await self.client.listObjects( + request, + metadata: options.metadata, + options: options.callOptions, + onResponse: { response in response } + ) + + return try response.message + } +} + /// Public error model returned by the Swift Data Gateway client. public enum DataGatewayClientError: Error, Sendable, Equatable { case authenticationFailed(code: String?, message: String) diff --git a/Sources/DGWControlPlane/RetryExecutor.swift b/Sources/DGWControlPlane/RetryExecutor.swift index 56c2b0d..517502f 100644 --- a/Sources/DGWControlPlane/RetryExecutor.swift +++ b/Sources/DGWControlPlane/RetryExecutor.swift @@ -205,6 +205,7 @@ package struct RetryExecutor: Sendable { package func execute( policy: RetryPolicy = .controlPlane, + retryAuthorizationFailures: Bool = true, refreshAuthorization: @Sendable () async throws -> Void = {}, operation: @Sendable () async throws -> T ) async throws -> T { @@ -233,7 +234,7 @@ package struct RetryExecutor: Sendable { } try await self.sleeper.sleep(for: delay) attempt += 1 - case .refreshAuthorization where !didRefreshAuthorization && attempt < policy.maxAttempts: + case .refreshAuthorization where retryAuthorizationFailures && !didRefreshAuthorization && attempt < policy.maxAttempts: if let onEvent { await onEvent( ControlPlaneRetryEvent( @@ -356,6 +357,43 @@ package final class AuthenticatedGatewayControlPlaneClient< } } +package final class RetryingObjectControlPlaneClient< + ObjectClient: ObjectControlPlaneClientProtocol +>: ObjectControlPlaneClientProtocol, @unchecked Sendable { + private let objectClient: ObjectClient + private let retryExecutor: RetryExecutor + private let retryPolicy: RetryPolicy + + package init( + objectClient: ObjectClient, + retryExecutor: RetryExecutor = RetryExecutor(), + retryPolicy: RetryPolicy = .controlPlane + ) { + self.objectClient = objectClient + self.retryExecutor = retryExecutor + self.retryPolicy = retryPolicy + } + + package func listObjects( + pageSize: Int32, + pageToken: String, + filter: String, + authorizationHeader: String + ) async throws -> Archebase_DataGateway_V1_ListObjectsResponse { + try await self.retryExecutor.execute( + policy: self.retryPolicy, + retryAuthorizationFailures: false + ) { + try await self.objectClient.listObjects( + pageSize: pageSize, + pageToken: pageToken, + filter: filter, + authorizationHeader: authorizationHeader + ) + } + } +} + private extension Duration { static func fromTimeInterval(_ interval: TimeInterval) -> Duration { let clamped = max(0, interval) diff --git a/Sources/DGWProto/Generated/data_gateway.grpc.swift b/Sources/DGWProto/Generated/data_gateway.grpc.swift index 8047021..991041c 100644 --- a/Sources/DGWProto/Generated/data_gateway.grpc.swift +++ b/Sources/DGWProto/Generated/data_gateway.grpc.swift @@ -677,15 +677,28 @@ extension Archebase_DataGateway_V1_DataGatewayService.ClientProtocol { } } -// MARK: - archebase.data_gateway.v1.DataGatewayDownloadService +// MARK: - archebase.data_gateway.v1.DataGatewayObjectService -/// Namespace containing generated types for the "archebase.data_gateway.v1.DataGatewayDownloadService" service. +/// Namespace containing generated types for the "archebase.data_gateway.v1.DataGatewayObjectService" service. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -public enum Archebase_DataGateway_V1_DataGatewayDownloadService: Sendable { - /// Service descriptor for the "archebase.data_gateway.v1.DataGatewayDownloadService" service. - public static let descriptor = GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayDownloadService") +public enum Archebase_DataGateway_V1_DataGatewayObjectService: Sendable { + /// Service descriptor for the "archebase.data_gateway.v1.DataGatewayObjectService" service. + public static let descriptor = GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayObjectService") /// Namespace for method metadata. public enum Method: Sendable { + /// Namespace for "ListObjects" metadata. + public enum ListObjects: Sendable { + /// Request type for "ListObjects". + public typealias Input = Archebase_DataGateway_V1_ListObjectsRequest + /// Response type for "ListObjects". + public typealias Output = Archebase_DataGateway_V1_ListObjectsResponse + /// Descriptor for "ListObjects". + public static let descriptor = GRPCCore.MethodDescriptor( + service: GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayObjectService"), + method: "ListObjects", + type: .unary + ) + } /// Namespace for "RequestDownload" metadata. public enum RequestDownload: Sendable { /// Request type for "RequestDownload". @@ -694,13 +707,14 @@ public enum Archebase_DataGateway_V1_DataGatewayDownloadService: Sendable { public typealias Output = Archebase_DataGateway_V1_RequestDownloadResponse /// Descriptor for "RequestDownload". public static let descriptor = GRPCCore.MethodDescriptor( - service: GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayDownloadService"), + service: GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayObjectService"), method: "RequestDownload", type: .unary ) } - /// Descriptors for all methods in the "archebase.data_gateway.v1.DataGatewayDownloadService" service. + /// Descriptors for all methods in the "archebase.data_gateway.v1.DataGatewayObjectService" service. public static let descriptors: [GRPCCore.MethodDescriptor] = [ + ListObjects.descriptor, RequestDownload.descriptor ] } @@ -708,26 +722,51 @@ public enum Archebase_DataGateway_V1_DataGatewayDownloadService: Sendable { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension GRPCCore.ServiceDescriptor { - /// Service descriptor for the "archebase.data_gateway.v1.DataGatewayDownloadService" service. - public static let archebase_dataGateway_v1_DataGatewayDownloadService = GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayDownloadService") + /// Service descriptor for the "archebase.data_gateway.v1.DataGatewayObjectService" service. + public static let archebase_dataGateway_v1_DataGatewayObjectService = GRPCCore.ServiceDescriptor(fullyQualifiedService: "archebase.data_gateway.v1.DataGatewayObjectService") } -// MARK: archebase.data_gateway.v1.DataGatewayDownloadService (client) +// MARK: archebase.data_gateway.v1.DataGatewayObjectService (client) @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension Archebase_DataGateway_V1_DataGatewayDownloadService { - /// Generated client protocol for the "archebase.data_gateway.v1.DataGatewayDownloadService" service. +extension Archebase_DataGateway_V1_DataGatewayObjectService { + /// Generated client protocol for the "archebase.data_gateway.v1.DataGatewayObjectService" service. /// /// You don't need to implement this protocol directly, use the generated /// implementation, ``Client``. /// /// > Source IDL Documentation: /// > - /// > SDK-facing download authorization service exposed on the public gRPC port. + /// > SDK-facing object discovery and download authorization service exposed on + /// > the public gRPC port. /// > - /// > This service uses user access JWT authentication and returns short-lived + /// > This service uses user access JWT authentication. Object listing returns + /// > logical object IDs visible to the caller; download returns short-lived /// > presigned read operations rather than cloud credentials. public protocol ClientProtocol: Sendable { + /// Call the "ListObjects" method. + /// + /// > Source IDL Documentation: + /// > + /// > Lists verified logical objects visible to the authenticated user. + /// + /// - Parameters: + /// - request: A request containing a single `Archebase_DataGateway_V1_ListObjectsRequest` message. + /// - serializer: A serializer for `Archebase_DataGateway_V1_ListObjectsRequest` messages. + /// - deserializer: A deserializer for `Archebase_DataGateway_V1_ListObjectsResponse` messages. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + func listObjects( + request: GRPCCore.ClientRequest, + serializer: some GRPCCore.MessageSerializer, + deserializer: some GRPCCore.MessageDeserializer, + options: GRPCCore.CallOptions, + onResponse handleResponse: @Sendable @escaping (GRPCCore.ClientResponse) async throws -> Result + ) async throws -> Result where Result: Sendable + /// Call the "RequestDownload" method. /// /// > Source IDL Documentation: @@ -752,7 +791,7 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService { ) async throws -> Result where Result: Sendable } - /// Generated client for the "archebase.data_gateway.v1.DataGatewayDownloadService" service. + /// Generated client for the "archebase.data_gateway.v1.DataGatewayObjectService" service. /// /// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps /// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived @@ -760,9 +799,11 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService { /// /// > Source IDL Documentation: /// > - /// > SDK-facing download authorization service exposed on the public gRPC port. + /// > SDK-facing object discovery and download authorization service exposed on + /// > the public gRPC port. /// > - /// > This service uses user access JWT authentication and returns short-lived + /// > This service uses user access JWT authentication. Object listing returns + /// > logical object IDs visible to the caller; download returns short-lived /// > presigned read operations rather than cloud credentials. public struct Client: ClientProtocol where Transport: GRPCCore.ClientTransport { private let client: GRPCCore.GRPCClient @@ -775,6 +816,40 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService { self.client = client } + /// Call the "ListObjects" method. + /// + /// > Source IDL Documentation: + /// > + /// > Lists verified logical objects visible to the authenticated user. + /// + /// - Parameters: + /// - request: A request containing a single `Archebase_DataGateway_V1_ListObjectsRequest` message. + /// - serializer: A serializer for `Archebase_DataGateway_V1_ListObjectsRequest` messages. + /// - deserializer: A deserializer for `Archebase_DataGateway_V1_ListObjectsResponse` messages. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func listObjects( + request: GRPCCore.ClientRequest, + serializer: some GRPCCore.MessageSerializer, + deserializer: some GRPCCore.MessageDeserializer, + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.ClientResponse) async throws -> Result = { response in + try response.message + } + ) async throws -> Result where Result: Sendable { + try await self.client.unary( + request: request, + descriptor: Archebase_DataGateway_V1_DataGatewayObjectService.Method.ListObjects.descriptor, + serializer: serializer, + deserializer: deserializer, + options: options, + onResponse: handleResponse + ) + } + /// Call the "RequestDownload" method. /// /// > Source IDL Documentation: @@ -801,7 +876,7 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService { ) async throws -> Result where Result: Sendable { try await self.client.unary( request: request, - descriptor: Archebase_DataGateway_V1_DataGatewayDownloadService.Method.RequestDownload.descriptor, + descriptor: Archebase_DataGateway_V1_DataGatewayObjectService.Method.RequestDownload.descriptor, serializer: serializer, deserializer: deserializer, options: options, @@ -813,7 +888,36 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService { // Helpers providing default arguments to 'ClientProtocol' methods. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension Archebase_DataGateway_V1_DataGatewayDownloadService.ClientProtocol { +extension Archebase_DataGateway_V1_DataGatewayObjectService.ClientProtocol { + /// Call the "ListObjects" method. + /// + /// > Source IDL Documentation: + /// > + /// > Lists verified logical objects visible to the authenticated user. + /// + /// - Parameters: + /// - request: A request containing a single `Archebase_DataGateway_V1_ListObjectsRequest` message. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func listObjects( + request: GRPCCore.ClientRequest, + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.ClientResponse) async throws -> Result = { response in + try response.message + } + ) async throws -> Result where Result: Sendable { + try await self.listObjects( + request: request, + serializer: GRPCProtobuf.ProtobufSerializer(), + deserializer: GRPCProtobuf.ProtobufDeserializer(), + options: options, + onResponse: handleResponse + ) + } + /// Call the "RequestDownload" method. /// /// > Source IDL Documentation: @@ -846,7 +950,40 @@ extension Archebase_DataGateway_V1_DataGatewayDownloadService.ClientProtocol { // Helpers providing sugared APIs for 'ClientProtocol' methods. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension Archebase_DataGateway_V1_DataGatewayDownloadService.ClientProtocol { +extension Archebase_DataGateway_V1_DataGatewayObjectService.ClientProtocol { + /// Call the "ListObjects" method. + /// + /// > Source IDL Documentation: + /// > + /// > Lists verified logical objects visible to the authenticated user. + /// + /// - Parameters: + /// - message: request message to send. + /// - metadata: Additional metadata to send, defaults to empty. + /// - options: Options to apply to this RPC, defaults to `.defaults`. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func listObjects( + _ message: Archebase_DataGateway_V1_ListObjectsRequest, + metadata: GRPCCore.Metadata = [:], + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.ClientResponse) async throws -> Result = { response in + try response.message + } + ) async throws -> Result where Result: Sendable { + let request = GRPCCore.ClientRequest( + message: message, + metadata: metadata + ) + return try await self.listObjects( + request: request, + options: options, + onResponse: handleResponse + ) + } + /// Call the "RequestDownload" method. /// /// > Source IDL Documentation: diff --git a/Sources/DGWProto/Generated/data_gateway.pb.swift b/Sources/DGWProto/Generated/data_gateway.pb.swift index e5322fc..809e76f 100644 --- a/Sources/DGWProto/Generated/data_gateway.pb.swift +++ b/Sources/DGWProto/Generated/data_gateway.pb.swift @@ -117,6 +117,61 @@ public enum Archebase_DataGateway_V1_LogicalUploadStatus: SwiftProtobuf.Enum, Sw } +/// User-visible lifecycle state for a logical data object. +public enum Archebase_DataGateway_V1_DataObjectStatus: SwiftProtobuf.Enum, Swift.CaseIterable { + public typealias RawValue = Int + case unspecified // = 0 + case created // = 1 + case uploaded // = 2 + case verified // = 3 + case bad // = 4 + case aborted // = 5 + case invalid // = 6 + case UNRECOGNIZED(Int) + + public init() { + self = .unspecified + } + + public init?(rawValue: Int) { + switch rawValue { + case 0: self = .unspecified + case 1: self = .created + case 2: self = .uploaded + case 3: self = .verified + case 4: self = .bad + case 5: self = .aborted + case 6: self = .invalid + default: self = .UNRECOGNIZED(rawValue) + } + } + + public var rawValue: Int { + switch self { + case .unspecified: return 0 + case .created: return 1 + case .uploaded: return 2 + case .verified: return 3 + case .bad: return 4 + case .aborted: return 5 + case .invalid: return 6 + case .UNRECOGNIZED(let i): return i + } + } + + // The compiler won't synthesize support with the UNRECOGNIZED case. + public static let allCases: [Archebase_DataGateway_V1_DataObjectStatus] = [ + .unspecified, + .created, + .uploaded, + .verified, + .bad, + .aborted, + .invalid, + ] + +} + /// User-visible aggregate copy job state. public enum Archebase_DataGateway_V1_CopyJobStatus: SwiftProtobuf.Enum, Swift.CaseIterable { public typealias RawValue = Int @@ -614,6 +669,79 @@ public struct Archebase_DataGateway_V1_CompleteUploadResponse: Sendable { public init() {} } +/// Requests one page of verified logical objects visible to the authenticated user. +public struct Archebase_DataGateway_V1_ListObjectsRequest: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Maximum number of objects to return. + public var pageSize: Int32 = 0 + + /// Opaque continuation token returned by a previous page. The token is only + /// valid with the same filter used to obtain it. + public var pageToken: String = String() + + /// Server-side filter expression. Status filters are limited to verified objects. + public var filter: String = String() + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} +} + +/// One logical object visible to the authenticated user. +public struct Archebase_DataGateway_V1_DataObject: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Customer-facing object identifier. Currently equal to file_id. + public var objectID: String = String() + + /// Data Gateway file_id used by download and copy APIs. + public var fileID: String = String() + + /// Current upload lifecycle state. + public var status: Archebase_DataGateway_V1_DataObjectStatus = .unspecified + + /// Object size in bytes, if known. + public var sizeBytes: Int64 = 0 + + /// Unix seconds when the object record was created, or zero if unknown. + public var createdAtUnix: Int64 = 0 + + /// Unix seconds when upload completed, or zero if not uploaded. + public var uploadedAtUnix: Int64 = 0 + + /// Unix seconds when verification completed, or zero if not verified. + public var verifiedAtUnix: Int64 = 0 + + /// Object ETag recorded during upload, if known. + public var etag: String = String() + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} +} + +/// Returns one page of logical objects and pagination metadata. +public struct Archebase_DataGateway_V1_ListObjectsResponse: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Objects in page order. + public var objects: [Archebase_DataGateway_V1_DataObject] = [] + + /// Opaque continuation token for the next page, or empty when exhausted. + public var nextPageToken: String = String() + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} +} + /// Requests presigned read operations for file IDs. public struct Archebase_DataGateway_V1_RequestDownloadRequest: Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the @@ -1210,6 +1338,10 @@ extension Archebase_DataGateway_V1_LogicalUploadStatus: SwiftProtobuf._ProtoName public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{2}\0LOGICAL_UPLOAD_STATUS_UNSPECIFIED\0\u{1}LOGICAL_UPLOAD_STATUS_ACTIVE\0\u{1}LOGICAL_UPLOAD_STATUS_COMPLETING\0\u{1}LOGICAL_UPLOAD_STATUS_COMPLETED\0\u{1}LOGICAL_UPLOAD_STATUS_TERMINAL\0") } +extension Archebase_DataGateway_V1_DataObjectStatus: SwiftProtobuf._ProtoNameProviding { + public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{2}\0DATA_OBJECT_STATUS_UNSPECIFIED\0\u{1}DATA_OBJECT_STATUS_CREATED\0\u{1}DATA_OBJECT_STATUS_UPLOADED\0\u{1}DATA_OBJECT_STATUS_VERIFIED\0\u{1}DATA_OBJECT_STATUS_BAD\0\u{1}DATA_OBJECT_STATUS_ABORTED\0\u{1}DATA_OBJECT_STATUS_INVALID\0") +} + extension Archebase_DataGateway_V1_CopyJobStatus: SwiftProtobuf._ProtoNameProviding { public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{2}\0COPY_JOB_STATUS_UNSPECIFIED\0\u{1}COPY_JOB_STATUS_QUEUED\0\u{1}COPY_JOB_STATUS_RUNNING\0\u{1}COPY_JOB_STATUS_SUCCEEDED\0\u{1}COPY_JOB_STATUS_PARTIAL_FAILED\0\u{1}COPY_JOB_STATUS_FAILED\0\u{1}COPY_JOB_STATUS_CANCELING\0\u{1}COPY_JOB_STATUS_CANCELED\0") } @@ -1838,6 +1970,146 @@ extension Archebase_DataGateway_V1_CompleteUploadResponse: SwiftProtobuf.Message } } +extension Archebase_DataGateway_V1_ListObjectsRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = _protobuf_package + ".ListObjectsRequest" + public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}page_size\0\u{3}page_token\0\u{1}filter\0") + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularInt32Field(value: &self.pageSize) }() + case 2: try { try decoder.decodeSingularStringField(value: &self.pageToken) }() + case 3: try { try decoder.decodeSingularStringField(value: &self.filter) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if self.pageSize != 0 { + try visitor.visitSingularInt32Field(value: self.pageSize, fieldNumber: 1) + } + if !self.pageToken.isEmpty { + try visitor.visitSingularStringField(value: self.pageToken, fieldNumber: 2) + } + if !self.filter.isEmpty { + try visitor.visitSingularStringField(value: self.filter, fieldNumber: 3) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Archebase_DataGateway_V1_ListObjectsRequest, rhs: Archebase_DataGateway_V1_ListObjectsRequest) -> Bool { + if lhs.pageSize != rhs.pageSize {return false} + if lhs.pageToken != rhs.pageToken {return false} + if lhs.filter != rhs.filter {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Archebase_DataGateway_V1_DataObject: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = _protobuf_package + ".DataObject" + public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}object_id\0\u{3}file_id\0\u{1}status\0\u{3}size_bytes\0\u{4}\u{2}created_at_unix\0\u{3}uploaded_at_unix\0\u{3}verified_at_unix\0\u{1}etag\0") + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularStringField(value: &self.objectID) }() + case 2: try { try decoder.decodeSingularStringField(value: &self.fileID) }() + case 3: try { try decoder.decodeSingularEnumField(value: &self.status) }() + case 4: try { try decoder.decodeSingularInt64Field(value: &self.sizeBytes) }() + case 6: try { try decoder.decodeSingularInt64Field(value: &self.createdAtUnix) }() + case 7: try { try decoder.decodeSingularInt64Field(value: &self.uploadedAtUnix) }() + case 8: try { try decoder.decodeSingularInt64Field(value: &self.verifiedAtUnix) }() + case 9: try { try decoder.decodeSingularStringField(value: &self.etag) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if !self.objectID.isEmpty { + try visitor.visitSingularStringField(value: self.objectID, fieldNumber: 1) + } + if !self.fileID.isEmpty { + try visitor.visitSingularStringField(value: self.fileID, fieldNumber: 2) + } + if self.status != .unspecified { + try visitor.visitSingularEnumField(value: self.status, fieldNumber: 3) + } + if self.sizeBytes != 0 { + try visitor.visitSingularInt64Field(value: self.sizeBytes, fieldNumber: 4) + } + if self.createdAtUnix != 0 { + try visitor.visitSingularInt64Field(value: self.createdAtUnix, fieldNumber: 6) + } + if self.uploadedAtUnix != 0 { + try visitor.visitSingularInt64Field(value: self.uploadedAtUnix, fieldNumber: 7) + } + if self.verifiedAtUnix != 0 { + try visitor.visitSingularInt64Field(value: self.verifiedAtUnix, fieldNumber: 8) + } + if !self.etag.isEmpty { + try visitor.visitSingularStringField(value: self.etag, fieldNumber: 9) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Archebase_DataGateway_V1_DataObject, rhs: Archebase_DataGateway_V1_DataObject) -> Bool { + if lhs.objectID != rhs.objectID {return false} + if lhs.fileID != rhs.fileID {return false} + if lhs.status != rhs.status {return false} + if lhs.sizeBytes != rhs.sizeBytes {return false} + if lhs.createdAtUnix != rhs.createdAtUnix {return false} + if lhs.uploadedAtUnix != rhs.uploadedAtUnix {return false} + if lhs.verifiedAtUnix != rhs.verifiedAtUnix {return false} + if lhs.etag != rhs.etag {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Archebase_DataGateway_V1_ListObjectsResponse: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = _protobuf_package + ".ListObjectsResponse" + public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{1}objects\0\u{3}next_page_token\0") + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeRepeatedMessageField(value: &self.objects) }() + case 2: try { try decoder.decodeSingularStringField(value: &self.nextPageToken) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if !self.objects.isEmpty { + try visitor.visitRepeatedMessageField(value: self.objects, fieldNumber: 1) + } + if !self.nextPageToken.isEmpty { + try visitor.visitSingularStringField(value: self.nextPageToken, fieldNumber: 2) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Archebase_DataGateway_V1_ListObjectsResponse, rhs: Archebase_DataGateway_V1_ListObjectsResponse) -> Bool { + if lhs.objects != rhs.objects {return false} + if lhs.nextPageToken != rhs.nextPageToken {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + extension Archebase_DataGateway_V1_RequestDownloadRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = _protobuf_package + ".RequestDownloadRequest" public static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}file_ids\0") diff --git a/Sources/DataGatewayClient/FilePreparation.swift b/Sources/DataGatewayClient/FilePreparation.swift index 59c1dc5..303c2bd 100644 --- a/Sources/DataGatewayClient/FilePreparation.swift +++ b/Sources/DataGatewayClient/FilePreparation.swift @@ -758,6 +758,119 @@ public struct UploadResult: Sendable, Equatable { } } +/// Options for listing verified logical objects visible to the user bearer token. +public struct ListObjectsOptions: Sendable, Equatable { + public var pageSize: Int32 + public var pageToken: String? + public var filter: String? + + public init( + pageSize: Int32 = 0, + pageToken: String? = nil, + filter: String? = nil + ) { + self.pageSize = pageSize + self.pageToken = pageToken + self.filter = filter + } +} + +/// User-visible lifecycle state for one logical data object. +public enum DataObjectStatus: Sendable, Equatable { + case unspecified + case created + case uploaded + case verified + case bad + case aborted + case invalid + case unrecognized(Int) + + package init(proto: Archebase_DataGateway_V1_DataObjectStatus) { + switch proto { + case .unspecified: + self = .unspecified + case .created: + self = .created + case .uploaded: + self = .uploaded + case .verified: + self = .verified + case .bad: + self = .bad + case .aborted: + self = .aborted + case .invalid: + self = .invalid + case .UNRECOGNIZED(let value): + self = .unrecognized(value) + } + } +} + +/// One logical data object visible to the authenticated user. +public struct DataObject: Sendable, Equatable { + public var objectID: String + public var fileID: String + public var status: DataObjectStatus + public var sizeBytes: Int64 + public var createdAtUnix: Int64 + public var uploadedAtUnix: Int64 + public var verifiedAtUnix: Int64 + public var etag: String + + public init( + objectID: String, + fileID: String, + status: DataObjectStatus, + sizeBytes: Int64, + createdAtUnix: Int64, + uploadedAtUnix: Int64, + verifiedAtUnix: Int64, + etag: String + ) { + self.objectID = objectID + self.fileID = fileID + self.status = status + self.sizeBytes = sizeBytes + self.createdAtUnix = createdAtUnix + self.uploadedAtUnix = uploadedAtUnix + self.verifiedAtUnix = verifiedAtUnix + self.etag = etag + } + + package init(proto: Archebase_DataGateway_V1_DataObject) { + self.init( + objectID: proto.objectID, + fileID: proto.fileID, + status: DataObjectStatus(proto: proto.status), + sizeBytes: proto.sizeBytes, + createdAtUnix: proto.createdAtUnix, + uploadedAtUnix: proto.uploadedAtUnix, + verifiedAtUnix: proto.verifiedAtUnix, + etag: proto.etag + ) + } +} + +/// One page of logical data objects and the opaque next-page token. +public struct ListObjectsPage: Sendable, Equatable { + public var objects: [DataObject] + public var nextPageToken: String + + public init(objects: [DataObject], nextPageToken: String) { + self.objects = objects + self.nextPageToken = nextPageToken + } + + package init(proto: Archebase_DataGateway_V1_ListObjectsResponse) { + self.init( + objects: proto.objects.map(DataObject.init(proto:)), + nextPageToken: proto.nextPageToken + ) + } +} + /// Upload status events emitted by the coordinator or stream API. public enum UploadEvent: Sendable, Equatable { case preparing @@ -1595,6 +1708,7 @@ public actor UploadCoordinator { /// High-level client entry point for starting uploads. public actor DataGatewayClient { private let uploadCoordinator: UploadCoordinator + private let objectClient: (any ObjectControlPlaneClientProtocol)? private let runtimeResources: DataGatewayClientRuntimeResources? private let configTags: [String: String] @@ -1635,19 +1749,28 @@ public actor DataGatewayClient { transport: authTransport.serviceClient ) - let gatewayTransport = try ManagedControlPlaneServiceClient(configuration: ControlPlaneTransportConfiguration( - endpoint: config.gatewayEndpoint, - security: gatewaySecurity, - requestTimeout: config.requestTimeout - )) { grpcClient in - Archebase_DataGateway_V1_DataGatewayService.Client(wrapping: grpcClient) - } + let gatewayFactory = ControlPlaneClientFactory( + configuration: ControlPlaneTransportConfiguration( + endpoint: config.gatewayEndpoint, + security: gatewaySecurity, + requestTimeout: config.requestTimeout + ) + ) + let gatewayTransport = try gatewayFactory.makeGatewayClient() + let objectTransport = try gatewayFactory.makeObjectClient() let retryingGateway = AnyUploadCoordinatorGatewayClient( authProvider: authProvider, gatewayServiceClient: gatewayTransport.serviceClient, requestTimeout: config.requestTimeout, retryPolicy: config.retryPolicy.controlPlane.controlPlaneValue ) + let objectClient = RetryingObjectControlPlaneClient( + objectClient: ObjectControlPlaneClient( + client: objectTransport.serviceClient, + requestTimeout: config.requestTimeout + ), + retryPolicy: config.retryPolicy.controlPlane.controlPlaneValue + ) let stateStore = UploadStateStore(persistRoot: config.persistRootURL) let fileCoordinator = FileStagingCoordinator( @@ -1683,9 +1806,11 @@ public actor DataGatewayClient { executionPolicy: config.execution, dependencies: dependencies ) + self.objectClient = objectClient self.runtimeResources = DataGatewayClientRuntimeResources( authTransport: authTransport, - gatewayTransport: gatewayTransport + gatewayTransport: gatewayTransport, + objectTransport: objectTransport ) self.configTags = configTags } @@ -1741,10 +1866,12 @@ public actor DataGatewayClient { package init( uploadCoordinator: UploadCoordinator, + objectClient: (any ObjectControlPlaneClientProtocol)? = nil, runtimeResources: DataGatewayClientRuntimeResources? = nil, configTags: [String: String] = [:] ) { self.uploadCoordinator = uploadCoordinator + self.objectClient = objectClient self.runtimeResources = runtimeResources self.configTags = configTags } @@ -1785,6 +1912,34 @@ public actor DataGatewayClient { try await self.uploadCoordinator.listPendingUploads() } + /// Lists verified logical objects visible to the supplied user bearer token. + public func listObjects( + _ options: ListObjectsOptions = ListObjectsOptions(), + authorizationHeader: String + ) async throws -> ListObjectsPage { + guard let objectClient else { + throw DataGatewayClientError.invalidConfiguration("data gateway object client is unavailable") + } + let trimmedAuthorizationHeader = authorizationHeader.trimmingCharacters(in: .whitespacesAndNewlines) + guard !trimmedAuthorizationHeader.isEmpty else { + throw DataGatewayClientError.invalidConfiguration("authorization header is required") + } + + do { + let response = try await objectClient.listObjects( + pageSize: options.pageSize, + pageToken: options.pageToken ?? "", + filter: options.filter ?? "", + authorizationHeader: trimmedAuthorizationHeader + ) + return ListObjectsPage(proto: response) + } catch let error as DataGatewayClientError { + throw error + } catch { + throw ControlPlaneErrorMapper.map(error) + } + } + /// Aborts one logical upload remotely and always removes its local snapshot on success or not-found. public func abortUpload(logicalUploadID: String) async throws { try await self.uploadCoordinator.abortUpload(logicalUploadID: logicalUploadID) @@ -1813,13 +1968,16 @@ public actor DataGatewayClient { package final class DataGatewayClientRuntimeResources: @unchecked Sendable { private let authTransport: ManagedControlPlaneServiceClient private let gatewayTransport: ManagedControlPlaneServiceClient> + private let objectTransport: ManagedControlPlaneServiceClient> package init( authTransport: ManagedControlPlaneServiceClient, - gatewayTransport: ManagedControlPlaneServiceClient> + gatewayTransport: ManagedControlPlaneServiceClient>, + objectTransport: ManagedControlPlaneServiceClient> ) { self.authTransport = authTransport self.gatewayTransport = gatewayTransport + self.objectTransport = objectTransport } } diff --git a/Tests/DGWControlPlaneTests/ControlPlaneTransportTests.swift b/Tests/DGWControlPlaneTests/ControlPlaneTransportTests.swift index 6a362e4..d458160 100644 --- a/Tests/DGWControlPlaneTests/ControlPlaneTransportTests.swift +++ b/Tests/DGWControlPlaneTests/ControlPlaneTransportTests.swift @@ -94,6 +94,31 @@ import GRPCCore #expect(invocations[4].requestSummary == "upload-1:128:3:\"etag-1\":67108864") } +@Test func objectClientListsObjectsWithUserAuthorization() async throws { + let stub = ObjectServiceClientStub() + let client = ObjectControlPlaneClient(client: stub, requestTimeout: .seconds(4)) + + let response = try await client.listObjects( + pageSize: 25, + pageToken: "page-1", + filter: "status:verified", + authorizationHeader: "Bearer user-token" + ) + + #expect(response.nextPageToken == "page-2") + #expect(response.objects.map(\.fileID) == ["file-1"]) + + let invocations = await stub.invocations() + #expect(invocations == [ + InvocationRecord( + method: "ListObjects", + metadata: ["authorization": ["Bearer user-token"]], + timeout: .seconds(4), + requestSummary: "25:page-1:status:verified" + ), + ]) +} + @Test func deviceInitTransportBuildsRequestWithoutAuthorization() async throws { let stub = DeviceInitServiceClientStub() let transport = DeviceInitServiceClientTransport(client: stub, requestTimeout: .seconds(6)) @@ -319,6 +344,59 @@ private actor GatewayServiceClientStub: Archebase_DataGateway_V1_DataGatewayServ } } +private actor ObjectServiceClientStub: Archebase_DataGateway_V1_DataGatewayObjectService.ClientProtocol { + private var records: [InvocationRecord] = [] + + func listObjects( + request: ClientRequest, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions, + onResponse handleResponse: @Sendable @escaping (ClientResponse) async throws -> Result + ) async throws -> Result where Result : Sendable { + let summary = "\(request.message.pageSize):\(request.message.pageToken):\(request.message.filter)" + self.records.append( + InvocationRecord( + method: "ListObjects", + metadata: Dictionary(uniqueKeysWithValues: [ + ("authorization", Array(request.metadata[stringValues: "authorization"])), + ]), + timeout: options.timeout, + requestSummary: summary + ) + ) + + var object = Archebase_DataGateway_V1_DataObject() + object.objectID = "object-1" + object.fileID = "file-1" + object.status = .verified + object.sizeBytes = 128 + object.createdAtUnix = 1_700_000_001 + object.uploadedAtUnix = 1_700_000_002 + object.verifiedAtUnix = 1_700_000_003 + object.etag = "\"etag-1\"" + + var response = Archebase_DataGateway_V1_ListObjectsResponse() + response.objects = [object] + response.nextPageToken = "page-2" + return try await handleResponse(ClientResponse(message: response)) + } + + func requestDownload( + request: ClientRequest, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions, + onResponse handleResponse: @Sendable @escaping (ClientResponse) async throws -> Result + ) async throws -> Result where Result : Sendable { + return try await handleResponse(ClientResponse(message: Archebase_DataGateway_V1_RequestDownloadResponse())) + } + + func invocations() -> [InvocationRecord] { + self.records + } +} + private actor DeviceInitServiceClientStub: Archebase_DataGateway_V1_DeviceInitService.ClientProtocol { private var records: [InvocationRecord] = [] diff --git a/Tests/DGWControlPlaneTests/RetryExecutorTests.swift b/Tests/DGWControlPlaneTests/RetryExecutorTests.swift index 51e2da4..3b677cd 100644 --- a/Tests/DGWControlPlaneTests/RetryExecutorTests.swift +++ b/Tests/DGWControlPlaneTests/RetryExecutorTests.swift @@ -100,6 +100,59 @@ import GRPCCore #expect(await gateway.getRecoveryInvocations() == ["Bearer token-1"]) } +@Test func retryingObjectClientRetriesTransientErrorsWithSameUserBearer() async throws { + let objectClient = MockObjectClient(results: [ + .failure(RPCError(code: .unavailable, message: "gateway unavailable")), + .success(makeListObjectsResponse()), + ]) + let sleeper = RecordingSleeper() + let client = RetryingObjectControlPlaneClient( + objectClient: objectClient, + retryExecutor: RetryExecutor(sleeper: sleeper), + retryPolicy: RetryPolicy(maxAttempts: 3, initialBackoff: .seconds(1), maxBackoff: .seconds(8)) + ) + + let response = try await client.listObjects( + pageSize: 10, + pageToken: "page-1", + filter: "status:verified", + authorizationHeader: "Bearer user-token" + ) + + #expect(response.objects.map(\.fileID) == ["file-1"]) + #expect(await objectClient.invocations() == [ + "10:page-1:status:verified:Bearer user-token", + "10:page-1:status:verified:Bearer user-token", + ]) + #expect(await sleeper.durations() == [.seconds(1)]) +} + +@Test func retryingObjectClientDoesNotRetryUnauthenticatedUserBearer() async { + let objectClient = MockObjectClient(results: [ + .failure(RPCError(code: .unauthenticated, message: "invalid user token")), + .success(makeListObjectsResponse()), + ]) + let sleeper = RecordingSleeper() + let client = RetryingObjectControlPlaneClient( + objectClient: objectClient, + retryExecutor: RetryExecutor(sleeper: sleeper), + retryPolicy: RetryPolicy(maxAttempts: 3, initialBackoff: .seconds(1), maxBackoff: .seconds(8)) + ) + + let error = await #expect(throws: RPCError.self) { + try await client.listObjects( + pageSize: 10, + pageToken: "", + filter: "", + authorizationHeader: "Bearer stale-user-token" + ) + } + + #expect(error?.code == .unauthenticated) + #expect(await objectClient.invocations() == ["10:::Bearer stale-user-token"]) + #expect(await sleeper.durations().isEmpty) +} + private actor RecordingSleeper: RetrySleeper { private var recordedDurations: [Duration] = [] @@ -265,6 +318,29 @@ private actor MockGatewayClient: GatewayControlPlaneClientProtocol { } } +private actor MockObjectClient: ObjectControlPlaneClientProtocol { + private var results: [Result] + private var records: [String] = [] + + init(results: [Result]) { + self.results = results + } + + func listObjects( + pageSize: Int32, + pageToken: String, + filter: String, + authorizationHeader: String + ) async throws -> Archebase_DataGateway_V1_ListObjectsResponse { + self.records.append("\(pageSize):\(pageToken):\(filter):\(authorizationHeader)") + return try self.results.removeFirst().get() + } + + func invocations() -> [String] { + self.records + } +} + private func makeCreateResponse() -> Archebase_DataGateway_V1_CreateLogicalUploadResponse { var response = Archebase_DataGateway_V1_CreateLogicalUploadResponse() response.logicalUploadID = "logical-1" @@ -272,6 +348,16 @@ private func makeCreateResponse() -> Archebase_DataGateway_V1_CreateLogicalUploa return response } +private func makeListObjectsResponse() -> Archebase_DataGateway_V1_ListObjectsResponse { + var object = Archebase_DataGateway_V1_DataObject() + object.fileID = "file-1" + object.status = .verified + + var response = Archebase_DataGateway_V1_ListObjectsResponse() + response.objects = [object] + return response +} + private func makeDetailedRPCError( code: RPCError.Code, message: String, diff --git a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift index 943f9b7..ad2dad1 100644 --- a/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/ArchebaseConfigClientTests.swift @@ -147,6 +147,63 @@ import Testing #expect(pending.isEmpty) } +@Test func listObjectsUsesUserAuthorizationAndMapsPage() async throws { + let root = try temporaryRoot() + let objectClient = RecordingObjectClient() + let coordinator = UploadCoordinator( + executionPolicy: makeTestExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: RecordingGatewayClient(), + stateStore: UploadStateStore(persistRoot: root), + fileCoordinator: FileStagingCoordinator(stagingRoot: root.appendingPathComponent("staging", isDirectory: true)), + ossClientFactory: { _ in FakeMultipartSession() } + ) + ) + let client = DataGatewayClient(uploadCoordinator: coordinator, objectClient: objectClient) + + let page = try await client.listObjects( + ListObjectsOptions(pageSize: 50, pageToken: "page-1", filter: "status:verified"), + authorizationHeader: "Bearer user-token" + ) + + #expect(page.nextPageToken == "page-2") + #expect(page.objects == [ + DataObject( + objectID: "object-1", + fileID: "file-1", + status: .verified, + sizeBytes: 1024, + createdAtUnix: 1_700_000_001, + uploadedAtUnix: 1_700_000_002, + verifiedAtUnix: 1_700_000_003, + etag: "\"etag-1\"" + ), + ]) + #expect(await objectClient.lastCall == "50:page-1:status:verified:Bearer user-token") +} + +@Test func listObjectsRejectsBlankAuthorizationHeaderLocally() async throws { + let root = try temporaryRoot() + let objectClient = RecordingObjectClient() + let coordinator = UploadCoordinator( + executionPolicy: makeTestExecutionPolicy(), + dependencies: UploadCoordinatorDependencies( + gatewayClient: RecordingGatewayClient(), + stateStore: UploadStateStore(persistRoot: root), + fileCoordinator: FileStagingCoordinator(stagingRoot: root.appendingPathComponent("staging", isDirectory: true)), + ossClientFactory: { _ in FakeMultipartSession() } + ) + ) + let client = DataGatewayClient(uploadCoordinator: coordinator, objectClient: objectClient) + + let error = await #expect(throws: DataGatewayClientError.self) { + _ = try await client.listObjects(authorizationHeader: " \n\t ") + } + + #expect(error == .invalidConfiguration("authorization header is required")) + #expect(await objectClient.lastCall == nil) +} + private func temporaryRoot() throws -> URL { let root = FileManager.default.temporaryDirectory .appendingPathComponent("archebase-client-config-tests", isDirectory: true) @@ -182,6 +239,34 @@ private func makeTestExecutionPolicy() -> UploadExecutionPolicy { ) } +private actor RecordingObjectClient: ObjectControlPlaneClientProtocol { + private(set) var lastCall: String? + + func listObjects( + pageSize: Int32, + pageToken: String, + filter: String, + authorizationHeader: String + ) async throws -> Archebase_DataGateway_V1_ListObjectsResponse { + self.lastCall = "\(pageSize):\(pageToken):\(filter):\(authorizationHeader)" + + var object = Archebase_DataGateway_V1_DataObject() + object.objectID = "object-1" + object.fileID = "file-1" + object.status = .verified + object.sizeBytes = 1024 + object.createdAtUnix = 1_700_000_001 + object.uploadedAtUnix = 1_700_000_002 + object.verifiedAtUnix = 1_700_000_003 + object.etag = "\"etag-1\"" + + var response = Archebase_DataGateway_V1_ListObjectsResponse() + response.objects = [object] + response.nextPageToken = "page-2" + return response + } +} + private actor RecordingGatewayClient: UploadCoordinatorGatewayClient { private(set) var completedRawTags: [String: String] = [:] diff --git a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift index a9afc4e..f9e5d78 100644 --- a/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/LocalStackHarnessTests.swift @@ -12,6 +12,7 @@ private let runtimeIntegrationEnabled = ProcessInfo.processInfo.environment["DGW private let realRuntimeIntegrationEnabled = ProcessInfo.processInfo.environment["DGW_REAL_RUNTIME_INTEGRATION"] == "1" private let realDeviceInitIntegrationEnabled = ProcessInfo.processInfo.environment["DGW_REAL_DEVICE_INIT_INTEGRATION"] == "1" private let publicDNSIntegrationEnabled = ProcessInfo.processInfo.environment["DGW_PUBLIC_DNS_INTEGRATION"] == "1" +private let realObjectListingIntegrationEnabled = realRuntimeIntegrationEnabled && hasRealUserAuthorizationHeader() @Suite(.serialized) struct LocalStackHarnessTests { @@ -293,6 +294,15 @@ struct LocalStackHarnessTests { ) { grpcClient in Archebase_DataGateway_V1_DataGatewayService.Client(wrapping: grpcClient) } + let objectTransport = try ManagedControlPlaneServiceClient( + configuration: ControlPlaneTransportConfiguration( + endpoint: clientConfig.gatewayEndpoint, + security: .plaintext, + requestTimeout: clientConfig.requestTimeout + ) + ) { grpcClient in + Archebase_DataGateway_V1_DataGatewayObjectService.Client(wrapping: grpcClient) + } let gatewayClient = AnyUploadCoordinatorGatewayClient( authProvider: authProvider, gatewayServiceClient: gatewayTransport.serviceClient, @@ -317,7 +327,8 @@ struct LocalStackHarnessTests { ), runtimeResources: DataGatewayClientRuntimeResources( authTransport: authTransport, - gatewayTransport: gatewayTransport + gatewayTransport: gatewayTransport, + objectTransport: objectTransport ) ) @@ -554,6 +565,41 @@ struct LocalStackHarnessTests { #expect(!pending.contains(where: { $0.logicalUploadID == result.logicalUploadID })) } +@Test( + .enabled(if: realObjectListingIntegrationEnabled) +) func realAliyunListObjectsFlow() async throws { + let environment = AliyunOSSTestEnvironment() + try environment.validate() + let clientConfig = try uniqueRealClientConfig(from: environment.makeRemoteClientConfig(), label: "list-objects") + defer { try? FileManager.default.removeItem(at: clientConfig.persistRootURL) } + let client = try DataGatewayClient(config: clientConfig) + let runID = "swift-list-\(UUID().uuidString)" + let payload = Data("aliyun-real-list-objects-payload-\(runID)".utf8) + let fileURL = try writeRealPayload(payload, under: clientConfig.persistRootURL, name: "aliyun-real-list-objects") + + let upload = try await client.upload( + UploadRequest( + fileURL: fileURL, + clientHints: ["suite": "aliyun-real", "mode": "list-objects"], + rawTags: ["suite": "aliyun-real", "runtime": "list-objects", "object_list_run": runID], + displayName: "aliyun-real-list-objects" + ) + ) + + let page = try await client.listObjects( + ListObjectsOptions(pageSize: 10, pageToken: nil, filter: "raw_tags.object_list_run=\(runID)"), + authorizationHeader: try realUserAuthorizationHeader() + ) + + let object = try #require(page.objects.first) + #expect(page.objects.count == 1) + #expect(page.nextPageToken.isEmpty) + #expect(!object.fileID.isEmpty) + #expect(object.status == .verified) + #expect(object.sizeBytes == Int64(payload.count)) + #expect(canonicalObjectETag(object.etag) == canonicalObjectETag(upload.ossObjectETag)) +} + @Test( .enabled(if: realRuntimeIntegrationEnabled) ) func realAliyunUploadEventsFlow() async throws { @@ -848,6 +894,14 @@ private func writeRealPayload(_ payload: Data, under root: URL, name: String) th return fileURL } +private func canonicalObjectETag(_ value: String) -> String { + let trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines) + if trimmed.count >= 2, trimmed.first == "\"", trimmed.last == "\"" { + return String(trimmed.dropFirst().dropLast()) + } + return trimmed +} + private func realMultipartPayloadSizeBytes() -> Int { if let value = ProcessInfo.processInfo.environment["DGW_REAL_MULTIPART_SIZE_BYTES"], let parsed = Int(value), @@ -857,6 +911,30 @@ private func realMultipartPayloadSizeBytes() -> Int { return 67_108_864 + 1024 } +private func hasRealUserAuthorizationHeader() -> Bool { + do { + _ = try realUserAuthorizationHeader() + return true + } catch { + return false + } +} + +private func realUserAuthorizationHeader() throws -> String { + if let header = ProcessInfo.processInfo.environment["DGW_REAL_USER_AUTHORIZATION_HEADER"]?.trimmingCharacters(in: .whitespacesAndNewlines), + !header.isEmpty { + return header + } + if let token = ProcessInfo.processInfo.environment["DGW_REAL_USER_ACCESS_TOKEN"]?.trimmingCharacters(in: .whitespacesAndNewlines), + !token.isEmpty { + if token.lowercased().hasPrefix("bearer ") { + return token + } + return "Bearer \(token)" + } + throw AliyunOSSHarnessError.missingEnvironmentVariable("DGW_REAL_USER_AUTHORIZATION_HEADER") +} + private func makeRealGatewayHarness(clientConfig: DataGatewayClientConfig) throws -> RealGatewayHarness { let security: ControlPlaneTransportSecurity = switch clientConfig.tls { case .plaintext: .plaintext diff --git a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift index 10b83a9..74ce426 100644 --- a/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift +++ b/Tests/DataGatewayClientIntegrationTests/UploadCoordinatorTests.swift @@ -128,8 +128,14 @@ import Testing #expect(logs.contains(where: { $0.operation == "refresh_credentials" && $0.message == "[REDACTED]" })) let metrics = await metricRecorder.events() - #expect(metrics.contains(where: { $0.name == "upload_part" && $0.dimensions["upload_id"] == "upload-1" })) - #expect(metrics.contains(where: { $0.name == "credentials_refresh" && $0.dimensions["upload_id"] == "upload-1" })) + let recordedUploadPartMetric = metrics.contains { event in + event.name == "upload_part" && event.dimensions["upload_id"] == "upload-1" + } + let recordedCredentialRefreshMetric = metrics.contains { event in + event.name == "credentials_refresh" && event.dimensions["upload_id"] == "upload-1" + } + #expect(recordedUploadPartMetric) + #expect(recordedCredentialRefreshMetric) } @Test func contractPartSizeBytesControlsChunkSplitting() async throws { diff --git a/protos/data_gateway.proto b/protos/data_gateway.proto index b381551..5ab7f32 100644 --- a/protos/data_gateway.proto +++ b/protos/data_gateway.proto @@ -17,11 +17,15 @@ service DataGatewayService { returns (CompleteUploadResponse); } -// SDK-facing download authorization service exposed on the public gRPC port. +// SDK-facing object discovery and download authorization service exposed on +// the public gRPC port. // -// This service uses user access JWT authentication and returns short-lived +// This service uses user access JWT authentication. Object listing returns +// logical object IDs visible to the caller; download returns short-lived // presigned read operations rather than cloud credentials. -service DataGatewayDownloadService { +service DataGatewayObjectService { + // Lists verified logical objects visible to the authenticated user. + rpc ListObjects(ListObjectsRequest) returns (ListObjectsResponse); // Authorizes direct object downloads for the requested file IDs. rpc RequestDownload(RequestDownloadRequest) returns (RequestDownloadResponse); @@ -192,6 +196,56 @@ message CompleteUploadRequest { message CompleteUploadResponse {} +// User-visible lifecycle state for a logical data object. +enum DataObjectStatus { + DATA_OBJECT_STATUS_UNSPECIFIED = 0; + DATA_OBJECT_STATUS_CREATED = 1; + DATA_OBJECT_STATUS_UPLOADED = 2; + DATA_OBJECT_STATUS_VERIFIED = 3; + DATA_OBJECT_STATUS_BAD = 4; + DATA_OBJECT_STATUS_ABORTED = 5; + DATA_OBJECT_STATUS_INVALID = 6; +} + +// Requests one page of verified logical objects visible to the authenticated user. +message ListObjectsRequest { + // Maximum number of objects to return. + int32 page_size = 1; + // Opaque continuation token returned by a previous page. The token is only + // valid with the same filter used to obtain it. + string page_token = 2; + // Server-side filter expression. Status filters are limited to verified objects. + string filter = 3; +} + +// One logical object visible to the authenticated user. +message DataObject { + // Customer-facing object identifier. Currently equal to file_id. + string object_id = 1; + // Data Gateway file_id used by download and copy APIs. + string file_id = 2; + // Current upload lifecycle state. + DataObjectStatus status = 3; + // Object size in bytes, if known. + int64 size_bytes = 4; + // Unix seconds when the object record was created, or zero if unknown. + int64 created_at_unix = 6; + // Unix seconds when upload completed, or zero if not uploaded. + int64 uploaded_at_unix = 7; + // Unix seconds when verification completed, or zero if not verified. + int64 verified_at_unix = 8; + // Object ETag recorded during upload, if known. + string etag = 9; +} + +// Returns one page of logical objects and pagination metadata. +message ListObjectsResponse { + // Objects in page order. + repeated DataObject objects = 1; + // Opaque continuation token for the next page, or empty when exhausted. + string next_page_token = 2; +} + // Requests presigned read operations for file IDs. message RequestDownloadRequest { // Non-empty file IDs. Empty or duplicate IDs are rejected by data-gateway.