From 9e45fa8d667c7d12974c0633a3fcf85ec88cc289 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 12:32:53 -0700 Subject: [PATCH 1/4] refactor(databases): stream /files upload via SDK upload_stream --- Cargo.lock | 15 +++- Cargo.toml | 17 +++- src/databases.rs | 62 +++----------- src/raw_http.rs | 34 ++------ src/sdk.rs | 214 +++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 252 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c0098f4..d0fe747 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1166,7 +1166,6 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hotdata" version = "0.1.0" -source = "git+https://github.com/hotdata-dev/sdk-rust?rev=8d4018fb899ba52228db44eaffa6caa0eb5b603f#8d4018fb899ba52228db44eaffa6caa0eb5b603f" dependencies = [ "arrow-array", "arrow-ipc", @@ -1193,12 +1192,14 @@ dependencies = [ "arrow", "async-trait", "base64", + "bytes", "clap", "clap_complete", "crossterm 0.28.1", "directories", "dotenvy", "flate2", + "futures-core", "hotdata", "indicatif", "inquire", @@ -1222,6 +1223,7 @@ dependencies = [ "tempfile", "tiny_http", "tokio", + "tokio-stream", "toml", ] @@ -3315,6 +3317,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" diff --git a/Cargo.toml b/Cargo.toml index 060a5ab..8e8ac29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,23 @@ path = "src/main.rs" # behind a shared multi-thread tokio runtime. Pinned to the rev that adds the # CLI-consumption surface (attribution client_id, async submit_query, streaming # upload_stream) — merged via hotdata-dev/sdk-rust#32. -hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "8d4018fb899ba52228db44eaffa6caa0eb5b603f", features = ["arrow"] } +# TEMP: local path dep while sdk-rust#35 (content_length on upload_stream) is in +# review. Flip back to a `git + rev` pin once that PR squash-merges to main. +hotdata = { path = "/tmp/sdk-rust-133", features = ["arrow"] } # Shared multi-thread runtime for the sync wrapper; block_on is called -# concurrently from rayon worker threads (see src/indexes.rs). -tokio = { version = "1", features = ["rt-multi-thread"] } +# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the +# mpsc channel that bridges the blocking upload reader into an async byte stream. +tokio = { version = "1", features = ["rt-multi-thread", "sync"] } # CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider. async-trait = "0.1" +# Bridge the progress-wrapped blocking upload reader into the async +# `Stream>` the SDK's `upload_stream` consumes: a +# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a +# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream +# bound; `bytes` matches the SDK's chunk type. +bytes = "1" +futures-core = "0.3" +tokio-stream = "0.1" anstyle = "1.0.13" clap = { version = "4", features = ["derive"] } directories = "6" diff --git a/src/databases.rs b/src/databases.rs index 4c259d0..97c3083 100644 --- a/src/databases.rs +++ b/src/databases.rs @@ -327,59 +327,19 @@ fn finish_upload( size: Option, pb: &ProgressBar, ) -> String { - // The streaming `/files` upload stays on the slim raw-HTTP helper: it - // needs no request timeout (a 10 GB+ parquet far outlives the seam's - // 300s default), is one-shot (no 401-retry — the body is consumed on the - // first send), and the SDK's `uploads().upload` is `PathBuf`-only with no - // progress hook or `--url` source. We still carry the same - // `Authorization: Bearer ` (resolved through the seam's installed - // token provider) and `X-Workspace-Id` header every other call uses. - let upload_client = crate::raw_http::build_upload_client(); - let url = format!("{}/files", api.api_url); - let mut req = upload_client - .post(&url) - .header("Content-Type", "application/octet-stream"); - if let Some(bearer) = api.current_bearer() { - req = req.header("Authorization", format!("Bearer {bearer}")); - } - if let Some(ws) = api.workspace_id() { - req = req.header("X-Workspace-Id", ws); - } - if let Some(len) = size { - req = req.header("Content-Length", len); - } - let req = req.body(reqwest::blocking::Body::new(reader)); - - // Body is an opaque stream, so pass `None` for logging; headers - // (including the masked Authorization) still log. - let (status, resp_body) = match crate::util::send_debug(&upload_client, req, None) { - Ok(pair) => pair, - Err(e) => { - eprintln!("error connecting to API: {e}"); - std::process::exit(1); - } - }; + // Stream the body to `POST /v1/files` through the SDK seam, which drives the + // SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet + // far outlives the shared 300s request timeout) and bridges this blocking, + // progress-wrapped `reader` into the async byte stream the SDK consumes. + // `size` becomes the `Content-Length` so the server fast-fails an oversized + // upload before writing bytes; the `--url` source may not know it, hence + // `Option`. Carries the same auth + scope headers as every other SDK call. + let result = api.upload_stream(reader, size, "application/octet-stream"); pb.finish_and_clear(); - if !status.is_success() { - use crossterm::style::Stylize; - eprintln!("{}", crate::util::api_error(resp_body).red()); - std::process::exit(1); - } - - let body: serde_json::Value = match serde_json::from_str(&resp_body) { - Ok(v) => v, - Err(e) => { - eprintln!("error parsing upload response: {e}"); - std::process::exit(1); - } - }; - match body["id"].as_str() { - Some(id) => id.to_string(), - None => { - eprintln!("error: upload response missing id"); - std::process::exit(1); - } + match result { + Ok(id) => id, + Err(e) => e.exit(), } } diff --git a/src/raw_http.rs b/src/raw_http.rs index 1dc29e9..cfb5bec 100644 --- a/src/raw_http.rs +++ b/src/raw_http.rs @@ -8,16 +8,16 @@ //! * the session-token mints (`/v1/auth/database`, `/v1/auth/sandbox`) — a //! distinct grant on distinct endpoints (`database_session.rs` / //! `sandbox_session.rs`); -//! * the streaming `/files` upload (10 GB+, `--url` source, progress bar, no -//! request timeout, no 401-retry) — the SDK's `uploads().upload` is -//! `PathBuf`-only; //! * `skill.rs`'s arbitrary-URL markdown fetch. //! -//! This module owns the two blocking client builders (one timeout-bounded, one -//! no-timeout for uploads) and a thin bearer/header request builder. It does -//! NOT carry the old `ApiClient`'s 401-retry loop: token freshness is now the -//! `CliTokenProvider`'s job (proactive refresh at the 30s leeway), and the -//! upload path was always one-shot anyway. +//! (The streaming `/files` upload moved onto the SDK seam's +//! [`Api::upload_stream`](crate::sdk::Api::upload_stream), which owns its own +//! no-timeout client.) +//! +//! This module owns the timeout-bounded blocking client builder and a thin +//! bearer/header request builder. It does NOT carry the old `ApiClient`'s +//! 401-retry loop: token freshness is now the `CliTokenProvider`'s job +//! (proactive refresh at the 30s leeway). // Consumers (jwt.rs token mints, session mints, the streaming upload, // skill.rs) are migrated to this helper incrementally; the allow keeps the @@ -50,19 +50,6 @@ pub fn build_http_client() -> reqwest::blocking::Client { .expect("reqwest blocking client should always build with these defaults") } -/// Client used only for streaming file uploads. Deliberately has **no** request -/// timeout: an upload's duration scales with file size and uplink (a 10 GB -/// parquet takes far longer than `HTTP_REQUEST_TIMEOUT`, which is sized for -/// slow server-side work), so a wall-clock cap would abort healthy-but-slow -/// transfers. TCP keepalive is kept so a genuinely dead peer is still reaped by -/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C. -pub fn build_upload_client() -> reqwest::blocking::Client { - reqwest::blocking::Client::builder() - .tcp_keepalive(TCP_KEEPALIVE_INTERVAL) - .build() - .expect("reqwest blocking client should always build with these defaults") -} - #[cfg(test)] mod tests { use super::*; @@ -72,11 +59,6 @@ mod tests { let _ = build_http_client(); } - #[test] - fn upload_client_builds() { - let _ = build_upload_client(); - } - #[test] fn redact_keys_cover_token_fields() { // Guards against silently dropping a sensitive key from debug logs. diff --git a/src/sdk.rs b/src/sdk.rs index fba56a9..800de61 100644 --- a/src/sdk.rs +++ b/src/sdk.rs @@ -87,6 +87,63 @@ fn sdk_http_client() -> reqwest::Client { .expect("reqwest client with timeout should build") } +/// The `reqwest::Client` backing the streaming `/files` upload. Deliberately has +/// **no** request timeout: an upload's duration scales with file size and uplink +/// (a 10 GB parquet far outlives [`HTTP_REQUEST_TIMEOUT`], which is sized for +/// slow server-side work), so a wall-clock cap would abort a healthy-but-slow +/// transfer. TCP keepalive is kept so a genuinely dead peer is still reaped by +/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C. +fn upload_reqwest_client() -> reqwest::Client { + reqwest::Client::builder() + .tcp_keepalive(TCP_KEEPALIVE_INTERVAL) + .build() + .expect("reqwest client should build without a timeout") +} + +/// Size of each chunk pulled from the blocking reader (1 MiB). Large enough to +/// keep per-chunk overhead negligible on a multi-GB upload, small enough that an +/// in-flight chunk is a trivial allocation. +const UPLOAD_CHUNK_SIZE: usize = 1 << 20; +/// Bound on chunks buffered between the blocking reader and the async sender. +/// Caps in-flight memory so a fast local disk can't outrun a slow uplink; the +/// read task blocks on a full channel (back-pressure). +const UPLOAD_CHANNEL_DEPTH: usize = 4; + +/// Bridge a blocking [`Read`](std::io::Read) source into the async +/// `Stream>` the SDK's `upload_stream` consumes. +/// +/// A `spawn_blocking` task reads fixed-size chunks and forwards them through a +/// bounded tokio mpsc channel; the returned [`ReceiverStream`] yields them to +/// the request body. The blocking task lives on the runtime's blocking pool, so +/// it does not stall an async worker, and a full channel back-pressures the +/// reader (which keeps the caller's progress bar — wrapped around `reader` — +/// honest). If the receiver is dropped (request aborted/failed) the send errors +/// and the task exits; a read error is forwarded as the stream's terminal item. +fn reader_into_stream( + mut reader: impl std::io::Read + Send + 'static, +) -> impl futures_core::Stream> + Send + 'static { + let (tx, rx) = tokio::sync::mpsc::channel(UPLOAD_CHANNEL_DEPTH); + rt().spawn_blocking(move || { + let mut buf = vec![0u8; UPLOAD_CHUNK_SIZE]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + let chunk = bytes::Bytes::copy_from_slice(&buf[..n]); + if tx.blocking_send(Ok(chunk)).is_err() { + break; // receiver gone — request aborted + } + } + Err(e) => { + let _ = tx.blocking_send(Err(e)); + break; + } + } + } + }); + tokio_stream::wrappers::ReceiverStream::new(rx) +} + // Compile-time guarantee that the rayon bound can never silently regress. const _: fn() = || { fn assert_send_sync_clone() {} @@ -505,16 +562,37 @@ impl Api { &self.client } - /// Resolve the current bearer token synchronously by driving the installed - /// `token_provider` on the shared runtime. + /// Stream a file/URL body to `POST /v1/files` through the SDK's + /// [`Client::upload_stream`], returning the upload id. /// - /// Used by raw-HTTP paths that the SDK can't serve (the streaming `/files` - /// upload) but that still need the same `Authorization: Bearer ` the - /// SDK installs on every call. Returns `None` if no provider/static token - /// is configured. - pub fn current_bearer(&self) -> Option { - let cfg = self.client.configuration(); - rt().block_on(cfg.resolve_bearer_token()) + /// Drives the async SDK from the CLI's synchronous call site, like every + /// other seam method, but on a **dedicated no-timeout client**: a 10 GB+ + /// parquet far outlives the shared client's 300s request timeout, so a + /// wall-clock cap would abort a healthy-but-slow transfer. We clone the + /// configured `Configuration` (same base_path, token_provider, scope + /// api_keys, user-agent) and swap only the reqwest client, so the upload + /// carries the identical auth + `X-Workspace-Id`/`X-Session-Id` headers. + /// + /// `reader` is the progress-wrapped blocking source (file or URL response); + /// it is bridged into the async byte stream the SDK consumes by + /// [`reader_into_stream`]. `content_length`, when known, is sent as + /// `Content-Length` so the server can reject an oversized upload up front + /// (the `--url` path may not know the length, hence `Option`). + pub fn upload_stream( + &self, + reader: impl std::io::Read + Send + 'static, + content_length: Option, + content_type: &str, + ) -> Result { + let mut cfg = self.client.configuration().clone(); + cfg.client = upload_reqwest_client(); + let upload_client = Client::from_configuration(cfg); + + let stream = reader_into_stream(reader); + let resp = rt() + .block_on(upload_client.upload_stream(stream, Some(content_type), content_length)) + .map_err(ApiError::from_sdk)?; + Ok(resp.id) } /// Issue an authenticated `GET {base}/v1{path}` through the SDK @@ -1230,4 +1308,122 @@ mod tests { assert_eq!(resp.count, 0); m.assert(); } + + // --- streaming /files upload -------------------------------------------- + + /// A deterministic ASCII payload of `len` bytes, so a body can be matched + /// exactly to prove the bridged stream delivered every byte in order. + fn upload_payload(len: usize) -> Vec { + (0..len).map(|i| b'a' + (i % 26) as u8).collect() + } + + fn upload_response_body(id: &str, size: usize) -> String { + format!( + r#"{{"id":"{id}","status":"ready","size_bytes":{size},"content_type":"application/octet-stream","created_at":"2026-06-05T00:00:00Z"}}"# + ) + } + + /// A sized upload (`content_length = Some`) streams the blocking reader + /// through the bridge to `POST /v1/files`, sends a matching `Content-Length` + /// (so the server can fast-fail oversize before reading), and delivers every + /// byte intact. The 5 MiB payload spans several `UPLOAD_CHUNK_SIZE` chunks + /// and overruns `UPLOAD_CHANNEL_DEPTH`, exercising the channel back-pressure. + #[test] + fn upload_stream_sends_sized_body_with_content_length() { + let payload = upload_payload(5 * 1024 * 1024); + let len = payload.len(); + + let mut server = mockito::Server::new(); + let m = server + .mock("POST", "/v1/files") + .match_header("Authorization", "Bearer test-jwt") + .match_header("X-Workspace-Id", "ws-1") + .match_header("Content-Type", "application/octet-stream") + .match_header("Content-Length", len.to_string().as_str()) + .match_body(mockito::Matcher::Exact( + String::from_utf8(payload.clone()).unwrap(), + )) + .with_status(201) + .with_header("content-type", "application/json") + .with_body(upload_response_body("upload_sized", len)) + .create(); + + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + let id = api + .upload_stream( + std::io::Cursor::new(payload), + Some(len as u64), + "application/octet-stream", + ) + .expect("sized upload should succeed"); + + assert_eq!(id, "upload_sized"); + m.assert(); + } + + /// With an unknown length (`content_length = None`, the `--url` source) the + /// body streams chunked and still arrives intact. Multiple chunks, so the + /// bridge is genuinely streaming rather than buffering a single read. + #[test] + fn upload_stream_streams_chunked_when_length_unknown() { + let payload = upload_payload(3 * 1024 * 1024); + let len = payload.len(); + + let mut server = mockito::Server::new(); + let m = server + .mock("POST", "/v1/files") + .match_body(mockito::Matcher::Exact( + String::from_utf8(payload.clone()).unwrap(), + )) + .with_status(201) + .with_header("content-type", "application/json") + .with_body(upload_response_body("upload_chunked", len)) + .create(); + + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + let id = api + .upload_stream( + std::io::Cursor::new(payload), + None, + "application/octet-stream", + ) + .expect("chunked upload should succeed"); + + assert_eq!(id, "upload_chunked"); + m.assert(); + } + + /// A non-success status surfaces as an `ApiError::Status` carrying the body, + /// the same mapping every other seam call uses (so the CLI prints the server + /// message and the 4xx re-auth hint still fires). + #[test] + fn upload_stream_maps_error_status() { + let payload = upload_payload(64); + let len = payload.len(); + + let mut server = mockito::Server::new(); + let _m = server + .mock("POST", "/v1/files") + .with_status(400) + .with_header("content-type", "application/json") + .with_body(r#"{"error":"Upload exceeds maximum size"}"#) + .create(); + + let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); + let err = api + .upload_stream( + std::io::Cursor::new(payload), + Some(len as u64), + "application/octet-stream", + ) + .expect_err("a 400 should map to an error"); + + match err { + ApiError::Status { status, body } => { + assert_eq!(status, reqwest::StatusCode::BAD_REQUEST); + assert!(body.contains("Upload exceeds maximum size")); + } + other => panic!("expected Status error, got {other:?}"), + } + } } From 73d53b858a83f2f4e247204377286ac0ef36ea4f Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 13:12:49 -0700 Subject: [PATCH 2/4] chore(deps): pin sdk-rust to upload_stream content_length rev --- Cargo.lock | 1 + Cargo.toml | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0fe747..2de11de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1166,6 +1166,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hotdata" version = "0.1.0" +source = "git+https://github.com/hotdata-dev/sdk-rust?rev=1687ba2ffb8f816540d00d7c1fdfe165a8fbd150#1687ba2ffb8f816540d00d7c1fdfe165a8fbd150" dependencies = [ "arrow-array", "arrow-ipc", diff --git a/Cargo.toml b/Cargo.toml index 8e8ac29..fa6be45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,12 +13,12 @@ path = "src/main.rs" [dependencies] # Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK -# behind a shared multi-thread tokio runtime. Pinned to the rev that adds the -# CLI-consumption surface (attribution client_id, async submit_query, streaming -# upload_stream) — merged via hotdata-dev/sdk-rust#32. -# TEMP: local path dep while sdk-rust#35 (content_length on upload_stream) is in -# review. Flip back to a `git + rev` pin once that PR squash-merges to main. -hotdata = { path = "/tmp/sdk-rust-133", features = ["arrow"] } +# behind a shared multi-thread tokio runtime. Pinned to the rev that adds +# `content_length` to the streaming `upload_stream` (sized body, not chunked, so +# the server can fast-fail an oversized upload) — merged via hotdata-dev/sdk-rust#35 +# (which subsumes the earlier #32 CLI-consumption surface: attribution +# client_id, async submit_query, streaming upload_stream). +hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "1687ba2ffb8f816540d00d7c1fdfe165a8fbd150", features = ["arrow"] } # Shared multi-thread runtime for the sync wrapper; block_on is called # concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the # mpsc channel that bridges the blocking upload reader into an async byte stream. From b12c79218ead56ee6f8cb1a02e3761dc264b9d1a Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 13:22:43 -0700 Subject: [PATCH 3/4] refactor(databases): drop redundant upload content-type param --- src/databases.rs | 2 +- src/sdk.rs | 25 ++++++++----------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/databases.rs b/src/databases.rs index 97c3083..b4f7de1 100644 --- a/src/databases.rs +++ b/src/databases.rs @@ -334,7 +334,7 @@ fn finish_upload( // `size` becomes the `Content-Length` so the server fast-fails an oversized // upload before writing bytes; the `--url` source may not know it, hence // `Option`. Carries the same auth + scope headers as every other SDK call. - let result = api.upload_stream(reader, size, "application/octet-stream"); + let result = api.upload_stream(reader, size); pb.finish_and_clear(); match result { diff --git a/src/sdk.rs b/src/sdk.rs index 800de61..0001946 100644 --- a/src/sdk.rs +++ b/src/sdk.rs @@ -578,11 +578,14 @@ impl Api { /// [`reader_into_stream`]. `content_length`, when known, is sent as /// `Content-Length` so the server can reject an oversized upload up front /// (the `--url` path may not know the length, hence `Option`). + /// + /// The `Content-Type` is left to the SDK default (`application/octet-stream`): + /// the managed-table load keys off the parquet file extension, not the + /// upload's recorded content type. pub fn upload_stream( &self, reader: impl std::io::Read + Send + 'static, content_length: Option, - content_type: &str, ) -> Result { let mut cfg = self.client.configuration().clone(); cfg.client = upload_reqwest_client(); @@ -590,7 +593,7 @@ impl Api { let stream = reader_into_stream(reader); let resp = rt() - .block_on(upload_client.upload_stream(stream, Some(content_type), content_length)) + .block_on(upload_client.upload_stream(stream, None, content_length)) .map_err(ApiError::from_sdk)?; Ok(resp.id) } @@ -1350,11 +1353,7 @@ mod tests { let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); let id = api - .upload_stream( - std::io::Cursor::new(payload), - Some(len as u64), - "application/octet-stream", - ) + .upload_stream(std::io::Cursor::new(payload), Some(len as u64)) .expect("sized upload should succeed"); assert_eq!(id, "upload_sized"); @@ -1382,11 +1381,7 @@ mod tests { let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); let id = api - .upload_stream( - std::io::Cursor::new(payload), - None, - "application/octet-stream", - ) + .upload_stream(std::io::Cursor::new(payload), None) .expect("chunked upload should succeed"); assert_eq!(id, "upload_chunked"); @@ -1411,11 +1406,7 @@ mod tests { let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); let err = api - .upload_stream( - std::io::Cursor::new(payload), - Some(len as u64), - "application/octet-stream", - ) + .upload_stream(std::io::Cursor::new(payload), Some(len as u64)) .expect_err("a 400 should map to an error"); match err { From a2e4ad3245d34245f38799e2777775a654302606 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 13:22:44 -0700 Subject: [PATCH 4/4] docs: describe current behavior, not change history, in comments --- Cargo.toml | 9 ++++----- src/raw_http.rs | 15 +++++---------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fa6be45..a7e6d35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,10 @@ path = "src/main.rs" [dependencies] # Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK -# behind a shared multi-thread tokio runtime. Pinned to the rev that adds -# `content_length` to the streaming `upload_stream` (sized body, not chunked, so -# the server can fast-fail an oversized upload) — merged via hotdata-dev/sdk-rust#35 -# (which subsumes the earlier #32 CLI-consumption surface: attribution -# client_id, async submit_query, streaming upload_stream). +# behind a shared multi-thread tokio runtime. The `arrow` feature backs result +# decode; the pinned rev is the first to carry `content_length` on the streaming +# `upload_stream` (sized body, not chunked, so the server can fast-fail an +# oversized upload — see src/sdk.rs::Api::upload_stream). hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "1687ba2ffb8f816540d00d7c1fdfe165a8fbd150", features = ["arrow"] } # Shared multi-thread runtime for the sync wrapper; block_on is called # concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the diff --git a/src/raw_http.rs b/src/raw_http.rs index cfb5bec..d899672 100644 --- a/src/raw_http.rs +++ b/src/raw_http.rs @@ -10,18 +10,13 @@ //! `sandbox_session.rs`); //! * `skill.rs`'s arbitrary-URL markdown fetch. //! -//! (The streaming `/files` upload moved onto the SDK seam's -//! [`Api::upload_stream`](crate::sdk::Api::upload_stream), which owns its own -//! no-timeout client.) -//! //! This module owns the timeout-bounded blocking client builder and a thin -//! bearer/header request builder. It does NOT carry the old `ApiClient`'s -//! 401-retry loop: token freshness is now the `CliTokenProvider`'s job -//! (proactive refresh at the 30s leeway). +//! bearer/header request builder. Token freshness is the `CliTokenProvider`'s +//! job (proactive refresh at the 30s leeway), so these helpers carry no +//! 401-retry loop. -// Consumers (jwt.rs token mints, session mints, the streaming upload, -// skill.rs) are migrated to this helper incrementally; the allow keeps the -// build warning-free until those call sites land. +// Not every helper here is wired to a call site yet; the allow keeps the build +// warning-free. #![allow(dead_code)] use std::time::Duration;