Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@ path = "src/main.rs"

[dependencies]
# Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK
# behind a shared multi-thread tokio runtime. Pinned to the rev that adds the
# CLI-consumption surface (attribution client_id, async submit_query, streaming
# upload_stream) — merged via hotdata-dev/sdk-rust#32.
hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "8d4018fb899ba52228db44eaffa6caa0eb5b603f", features = ["arrow"] }
# behind a shared multi-thread tokio runtime. The `arrow` feature backs result
# decode; the pinned rev is the first to carry `content_length` on the streaming
# `upload_stream` (sized body, not chunked, so the server can fast-fail an
# oversized upload — see src/sdk.rs::Api::upload_stream).
hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "1687ba2ffb8f816540d00d7c1fdfe165a8fbd150", features = ["arrow"] }
# Shared multi-thread runtime for the sync wrapper; block_on is called
# concurrently from rayon worker threads (see src/indexes.rs).
tokio = { version = "1", features = ["rt-multi-thread"] }
# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the
# mpsc channel that bridges the blocking upload reader into an async byte stream.
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
# CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider.
async-trait = "0.1"
# Bridge the progress-wrapped blocking upload reader into the async
# `Stream<Item = Result<Bytes, _>>` the SDK's `upload_stream` consumes: a
# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a
# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream
# bound; `bytes` matches the SDK's chunk type.
bytes = "1"
futures-core = "0.3"
tokio-stream = "0.1"
anstyle = "1.0.13"
clap = { version = "4", features = ["derive"] }
directories = "6"
Expand Down
62 changes: 11 additions & 51 deletions src/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,59 +327,19 @@ fn finish_upload(
size: Option<u64>,
pb: &ProgressBar,
) -> String {
// The streaming `/files` upload stays on the slim raw-HTTP helper: it
// needs no request timeout (a 10 GB+ parquet far outlives the seam's
// 300s default), is one-shot (no 401-retry — the body is consumed on the
// first send), and the SDK's `uploads().upload` is `PathBuf`-only with no
// progress hook or `--url` source. We still carry the same
// `Authorization: Bearer <jwt>` (resolved through the seam's installed
// token provider) and `X-Workspace-Id` header every other call uses.
let upload_client = crate::raw_http::build_upload_client();
let url = format!("{}/files", api.api_url);
let mut req = upload_client
.post(&url)
.header("Content-Type", "application/octet-stream");
if let Some(bearer) = api.current_bearer() {
req = req.header("Authorization", format!("Bearer {bearer}"));
}
if let Some(ws) = api.workspace_id() {
req = req.header("X-Workspace-Id", ws);
}
if let Some(len) = size {
req = req.header("Content-Length", len);
}
let req = req.body(reqwest::blocking::Body::new(reader));

// Body is an opaque stream, so pass `None` for logging; headers
// (including the masked Authorization) still log.
let (status, resp_body) = match crate::util::send_debug(&upload_client, req, None) {
Ok(pair) => pair,
Err(e) => {
eprintln!("error connecting to API: {e}");
std::process::exit(1);
}
};
// Stream the body to `POST /v1/files` through the SDK seam, which drives the
// SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet
// far outlives the shared 300s request timeout) and bridges this blocking,
// progress-wrapped `reader` into the async byte stream the SDK consumes.
// `size` becomes the `Content-Length` so the server fast-fails an oversized
// upload before writing bytes; the `--url` source may not know it, hence
// `Option`. Carries the same auth + scope headers as every other SDK call.
let result = api.upload_stream(reader, size);
pb.finish_and_clear();

if !status.is_success() {
use crossterm::style::Stylize;
eprintln!("{}", crate::util::api_error(resp_body).red());
std::process::exit(1);
}

let body: serde_json::Value = match serde_json::from_str(&resp_body) {
Ok(v) => v,
Err(e) => {
eprintln!("error parsing upload response: {e}");
std::process::exit(1);
}
};
match body["id"].as_str() {
Some(id) => id.to_string(),
None => {
eprintln!("error: upload response missing id");
std::process::exit(1);
}
match result {
Ok(id) => id,
Err(e) => e.exit(),
}
}

Expand Down
35 changes: 6 additions & 29 deletions src/raw_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@
//! * the session-token mints (`/v1/auth/database`, `/v1/auth/sandbox`) — a
//! distinct grant on distinct endpoints (`database_session.rs` /
//! `sandbox_session.rs`);
//! * the streaming `/files` upload (10 GB+, `--url` source, progress bar, no
//! request timeout, no 401-retry) — the SDK's `uploads().upload` is
//! `PathBuf`-only;
//! * `skill.rs`'s arbitrary-URL markdown fetch.
//!
//! This module owns the two blocking client builders (one timeout-bounded, one
//! no-timeout for uploads) and a thin bearer/header request builder. It does
//! NOT carry the old `ApiClient`'s 401-retry loop: token freshness is now the
//! `CliTokenProvider`'s job (proactive refresh at the 30s leeway), and the
//! upload path was always one-shot anyway.
//! This module owns the timeout-bounded blocking client builder and a thin
//! bearer/header request builder. Token freshness is the `CliTokenProvider`'s
//! job (proactive refresh at the 30s leeway), so these helpers carry no
//! 401-retry loop.

// Consumers (jwt.rs token mints, session mints, the streaming upload,
// skill.rs) are migrated to this helper incrementally; the allow keeps the
// build warning-free until those call sites land.
// Not every helper here is wired to a call site yet; the allow keeps the build
// warning-free.
#![allow(dead_code)]

use std::time::Duration;
Expand Down Expand Up @@ -50,19 +45,6 @@ pub fn build_http_client() -> reqwest::blocking::Client {
.expect("reqwest blocking client should always build with these defaults")
}

/// Client used only for streaming file uploads. Deliberately has **no** request
/// timeout: an upload's duration scales with file size and uplink (a 10 GB
/// parquet takes far longer than `HTTP_REQUEST_TIMEOUT`, which is sized for
/// slow server-side work), so a wall-clock cap would abort healthy-but-slow
/// transfers. TCP keepalive is kept so a genuinely dead peer is still reaped by
/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C.
pub fn build_upload_client() -> reqwest::blocking::Client {
reqwest::blocking::Client::builder()
.tcp_keepalive(TCP_KEEPALIVE_INTERVAL)
.build()
.expect("reqwest blocking client should always build with these defaults")
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -72,11 +54,6 @@ mod tests {
let _ = build_http_client();
}

#[test]
fn upload_client_builds() {
let _ = build_upload_client();
}

#[test]
fn redact_keys_cover_token_fields() {
// Guards against silently dropping a sensitive key from debug logs.
Expand Down
Loading
Loading