diff --git a/Cargo.toml b/Cargo.toml index 5f3b066..b2dafac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.6.1" +version = "0.7.0" edition = "2024" repository = "https://github.com/boringSQL/dryrun" diff --git a/README.md b/README.md index 4e44ace..3fe6779 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,9 @@ dryrun lint All commands work offline from the schema file. Each project has its own `dryrun.toml` and `.dryrun/`, there is no global state. Add `.dryrun/` to your `.gitignore`. -Snapshots live in `~/.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB. +Snapshots live in `.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB. + +Static file `schema.json` will be deprecated in future. ### Multi-node: capture activity from replicas @@ -193,6 +195,35 @@ Every DB-related command (`init`, `import`, `probe`, `dump-schema`, `lint`, `dri > **Note:** the MCP server is currently single-database. Using the default profile. Or the option is to run one `dryrun mcp-serve` process per database. Native multi-database support inside one MCP process is tracked in [#7](https://github.com/boringSQL/dryrun/issues/7). +### Sharing snapshots across a team + +DryRun's value increases in team setup. Multiple developers can pull snapshots from any POSIX compliant directory. + +To publish the snapshots you need + +```sh +cd project_name + +# apture from the live DB (use cwd name for project name) +dryrun init --db "$DATABASE_URL" +dryrun snapshot take --db "$DATABASE_URL" +dryrun snapshot push --to-path ./snapshots --all +``` + +Developers can then import the snapshots to the local history + +```sh +dryrun snapshot pull --from-path ./shared/snapshots --all +``` + +Snapshots are content-addressed (`{project}/{database}/{ts}-{hash}.json.zst`) and idempotent: pushing the same snapshot twice won't change it. + +The simplest deployment is a dedicated git repo. Create the snapshots repo and add `*.json.zst binary` to `.gitattributes` so git stops trying to diff bundles.` + +Offline tools (`lint`, `check_migration`, `drift`) work immediately after the pull. + +No server, no credentials. Same promise as before. + ## MCP server Add `dryrun` to your AI assistant. If you installed via Homebrew, `dryrun` is already on your PATH: diff --git a/crates/dry_run_cli/src/main.rs b/crates/dry_run_cli/src/main.rs index 4a5c4c1..1686a5c 100644 --- a/crates/dry_run_cli/src/main.rs +++ b/crates/dry_run_cli/src/main.rs @@ -5,7 +5,8 @@ use std::path::PathBuf; use clap::{Parser, Subcommand}; use dry_run_core::history::{ - DatabaseId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + DatabaseId, FilesystemStore, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + TimeRange, }; use dry_run_core::{DryRun, HistoryStore, ProjectConfig}; use rmcp::ServiceExt; @@ -140,6 +141,26 @@ enum SnapshotAction { #[arg(long)] history_db: Option, }, + Push { + #[arg(long)] + to_path: PathBuf, + #[arg(long)] + all: bool, + #[arg(long, env = "DATABASE_URL")] + db: Option, + #[arg(long)] + history_db: Option, + }, + Pull { + #[arg(long)] + from_path: PathBuf, + #[arg(long)] + all: bool, + #[arg(long, env = "DATABASE_URL")] + db: Option, + #[arg(long)] + history_db: Option, + }, } #[derive(Subcommand)] @@ -342,7 +363,7 @@ schema_file = ".dryrun/schema.json" .unwrap_or_else(|| ProjectConfig::parse(""))?; let resolved = config.resolve_profile(Some(db_url), None, None, &cwd)?; let key = complete_key(&resolved, &snapshot.database); - store.put(&key, &snapshot).await?; + store.put_schema(&key, &snapshot).await?; let planner = ctx.introspect_planner_stats(&snapshot.content_hash).await?; store.put_planner_stats(&key, &planner).await?; @@ -478,7 +499,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let resolved = config.resolve_profile(Some(db_url), None, profile, &cwd)?; let key = complete_key(&resolved, &snapshot.database); - let schema_outcome = store.put(&key, &snapshot).await?; + let schema_outcome = store.put_schema(&key, &snapshot).await?; match schema_outcome { PutOutcome::Inserted => { println!("Snapshot saved: {}", snapshot.content_hash); @@ -603,7 +624,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> SnapshotAction::List { db, history_db } => { let store = open_history_store(history_db.as_deref())?; let key = resolve_read_key(db.as_deref(), profile).await?; - let rows = store.list(&key, TimeRange::default()).await?; + let rows = store.list_schema(&key, TimeRange::default()).await?; if rows.is_empty() { println!( @@ -642,15 +663,19 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let key = resolve_read_key(Some(db_url), profile).await?; let from_snapshot = if let Some(hash) = &from { - store.get(&key, SnapshotRef::Hash(hash.clone())).await? + store + .get_schema(&key, SnapshotRef::Hash(hash.clone())) + .await? } else if *latest { - store.get(&key, SnapshotRef::Latest).await? + store.get_schema(&key, SnapshotRef::Latest).await? } else { anyhow::bail!("specify --from or --latest"); }; let to_snapshot = if let Some(hash) = &to { - store.get(&key, SnapshotRef::Hash(hash.clone())).await? + store + .get_schema(&key, SnapshotRef::Hash(hash.clone())) + .await? } else { ctx.introspect_schema().await? }; @@ -664,6 +689,52 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> println!("{json}"); Ok(()) } + SnapshotAction::Push { + to_path, + all, + db, + history_db, + } => { + let store = open_history_store(history_db.as_deref())?; + let fs = FilesystemStore::new(to_path.clone()); + + let keys = if *all { + store.list_keys()? + } else { + vec![resolve_read_key(db.as_deref(), profile).await?] + }; + if keys.is_empty() { + println!("No snapshots in history.db to push."); + return Ok(()); + } + + let outcomes = sync_keys(&store, &fs, &keys).await?; + print_sync_outcomes("push", &outcomes, to_path); + Ok(()) + } + SnapshotAction::Pull { + from_path, + all, + db, + history_db, + } => { + let fs = FilesystemStore::new(from_path.clone()); + let store = open_history_store(history_db.as_deref())?; + + let keys = if *all { + fs.list_keys()? + } else { + vec![resolve_read_key(db.as_deref(), profile).await?] + }; + if keys.is_empty() { + println!("No snapshots at {} to pull.", from_path.display()); + return Ok(()); + } + + let outcomes = sync_keys(&fs, &store, &keys).await?; + print_sync_outcomes("pull", &outcomes, from_path); + Ok(()) + } SnapshotAction::Export { out, history_db } => { let store = open_history_store(history_db.as_deref())?; let out_root = out.clone().unwrap_or_else(|| { @@ -675,10 +746,10 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let keys = store.list_keys()?; let mut written = 0usize; for key in &keys { - let summaries = store.list(key, TimeRange::default()).await?; + let summaries = store.list_schema(key, TimeRange::default()).await?; for s in &summaries { let snap = store - .get(key, SnapshotRef::Hash(s.content_hash.clone())) + .get_schema(key, SnapshotRef::Hash(s.content_hash.clone())) .await?; write_snapshot_export(&out_root, key, &snap)?; written += 1; @@ -836,6 +907,108 @@ async fn cmd_drift( Ok(()) } +#[derive(Debug, Default)] +struct KindCount { + copied: usize, + up_to_date: usize, +} + +#[derive(Debug)] +struct SyncOutcome { + key: SnapshotKey, + schema: KindCount, + planner: KindCount, + activity: KindCount, +} + +fn kind_order(k: &SnapshotKind) -> u8 { + match k { + SnapshotKind::Schema => 0, + SnapshotKind::Planner => 1, + SnapshotKind::Activity { .. } => 2, + } +} + +async fn sync_keys( + src: &dyn SnapshotStore, + dst: &dyn SnapshotStore, + keys: &[SnapshotKey], +) -> anyhow::Result> { + let mut outcomes = Vec::with_capacity(keys.len()); + for key in keys { + let mut outcome = SyncOutcome { + key: key.clone(), + schema: KindCount::default(), + planner: KindCount::default(), + activity: KindCount::default(), + }; + + let mut kinds = src.list_kinds(key).await?; + // schema first so FilesystemStore's orphan rule is satisfied + kinds.sort_by_key(kind_order); + + for kind in &kinds { + let src_summaries = src.list(key, kind, TimeRange::default()).await?; + let dst_hashes: std::collections::HashSet = dst + .list(key, kind, TimeRange::default()) + .await? + .into_iter() + .map(|s| s.content_hash) + .collect(); + + let counter = match kind { + SnapshotKind::Schema => &mut outcome.schema, + SnapshotKind::Planner => &mut outcome.planner, + SnapshotKind::Activity { .. } => &mut outcome.activity, + }; + + for s in src_summaries { + if dst_hashes.contains(&s.content_hash) { + counter.up_to_date += 1; + continue; + } + let stored = src + .get(key, kind, SnapshotRef::Hash(s.content_hash.clone())) + .await?; + match dst.put(key, &stored).await? { + PutOutcome::Inserted => counter.copied += 1, + PutOutcome::Deduped => counter.up_to_date += 1, + } + } + } + + outcomes.push(outcome); + } + Ok(outcomes) +} + +fn print_sync_outcomes(verb: &str, outcomes: &[SyncOutcome], path: &std::path::Path) { + let mut total = (0usize, 0usize, 0usize, 0usize); + for o in outcomes { + println!( + " project={} database={}: {} schema, {} planner, {} activity copied ({} up-to-date)", + o.key.project_id.0, + o.key.database_id.0, + o.schema.copied, + o.planner.copied, + o.activity.copied, + o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date, + ); + total.0 += o.schema.copied; + total.1 += o.planner.copied; + total.2 += o.activity.copied; + total.3 += o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date; + } + println!( + "{verb}: {} schema, {} planner, {} activity copied / {} up-to-date ({})", + total.0, + total.1, + total.2, + total.3, + path.display(), + ); +} + // helpers fn require_db_url(db: Option<&str>) -> anyhow::Result<&str> { @@ -934,14 +1107,8 @@ fn write_snapshot_export( key: &SnapshotKey, snap: &dry_run_core::SchemaSnapshot, ) -> anyhow::Result { - let path = out_root - .join(&key.project_id.0) - .join(&key.database_id.0) - .join(format!( - "{}-{}.json.zst", - snap.timestamp.format("%Y%m%dT%H%M%SZ"), - snap.content_hash, - )); + let path = + dry_run_core::history::snapshot_path(out_root, key, snap.timestamp, &snap.content_hash); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; } diff --git a/crates/dry_run_cli/src/mcp/server.rs b/crates/dry_run_cli/src/mcp/server.rs index 956a9d0..3c8510d 100644 --- a/crates/dry_run_cli/src/mcp/server.rs +++ b/crates/dry_run_cli/src/mcp/server.rs @@ -28,7 +28,7 @@ async fn persist_refresh( planner: Option<&dry_run_core::PlannerStatsSnapshot>, activity_by_node: &std::collections::BTreeMap, ) { - if let Err(e) = store.put(key, schema).await { + if let Err(e) = store.put_schema(key, schema).await { tracing::warn!(error = %e, "failed to persist schema"); } if let Some(p) = planner @@ -834,18 +834,18 @@ impl DryRunServer { let from_snapshot = match ¶ms.from { Some(hash) => store - .get(key, SnapshotRef::Hash(hash.clone())) + .get_schema(key, SnapshotRef::Hash(hash.clone())) .await .map_err(to_mcp_err)?, None => store - .get(key, SnapshotRef::Latest) + .get_schema(key, SnapshotRef::Latest) .await .map_err(to_mcp_err)?, }; let to_snapshot = match ¶ms.to { Some(hash) => store - .get(key, SnapshotRef::Hash(hash.clone())) + .get_schema(key, SnapshotRef::Hash(hash.clone())) .await .map_err(to_mcp_err)?, None => self.get_schema().await?, diff --git a/crates/dry_run_cli/src/mcp/server_tests.rs b/crates/dry_run_cli/src/mcp/server_tests.rs index 49c5062..96911f8 100644 --- a/crates/dry_run_cli/src/mcp/server_tests.rs +++ b/crates/dry_run_cli/src/mcp/server_tests.rs @@ -384,7 +384,7 @@ async fn rebuild_after_refresh_preserves_replica_activity() { let schema = test_snapshot(); let schema_hash = schema.content_hash.clone(); - SnapshotStore::put(&store, &key, &schema) + SnapshotStore::put_schema(&store, &key, &schema) .await .expect("seed schema"); let replica = make_activity_row(&schema_hash, "replica1", "replica-h1"); @@ -436,7 +436,7 @@ async fn reload_schema_prefers_history_over_json() { let schema = test_snapshot(); let schema_hash = schema.content_hash.clone(); - SnapshotStore::put(&store, &key, &schema) + SnapshotStore::put_schema(&store, &key, &schema) .await .expect("seed schema"); store diff --git a/crates/dry_run_cli/tests/init_e2e.rs b/crates/dry_run_cli/tests/init_e2e.rs index 6b3698a..159d4e2 100644 --- a/crates/dry_run_cli/tests/init_e2e.rs +++ b/crates/dry_run_cli/tests/init_e2e.rs @@ -139,7 +139,10 @@ async fn init_full_capture_writes_schema_planner_and_activity() { database_id: DatabaseId("postgres".into()), }; - let summaries = store.list(&key, TimeRange::default()).await.expect("list"); + let summaries = store + .list_schema(&key, TimeRange::default()) + .await + .expect("list"); assert_eq!( summaries.len(), 1, diff --git a/crates/dry_run_core/src/history/filesystem_layout.rs b/crates/dry_run_core/src/history/filesystem_layout.rs new file mode 100644 index 0000000..94aa367 --- /dev/null +++ b/crates/dry_run_core/src/history/filesystem_layout.rs @@ -0,0 +1,34 @@ +use crate::history::SnapshotKey; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use std::path::{Path, PathBuf}; + +pub const SNAPSHOT_EXTENSION: &str = "json.zst"; + +const TS_FORMAT: &str = "%Y%m%dT%H%M%SZ"; + +#[must_use] +pub fn snapshot_path( + root: &Path, + key: &SnapshotKey, + timestamp: DateTime, + content_hash: &str, +) -> PathBuf { + root.join(&key.project_id.0) + .join(&key.database_id.0) + .join(format!( + "{}-{}.{}", + timestamp.format(TS_FORMAT), + content_hash, + SNAPSHOT_EXTENSION, + )) +} + +#[must_use] +pub fn parse_snapshot_filename(name: &str) -> Option<(DateTime, String)> { + let stem = name.strip_suffix(&format!(".{SNAPSHOT_EXTENSION}"))?; + let (ts_str, hash) = stem.split_once('-')?; + let naive = NaiveDateTime::parse_from_str(ts_str, TS_FORMAT).ok()?; + let ts = Utc.from_utc_datetime(&naive); + + Some((ts, hash.to_string())) +} diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs new file mode 100644 index 0000000..21bbbe4 --- /dev/null +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -0,0 +1,1079 @@ +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use tracing::{debug, info}; + +use crate::error::{Error, Result}; +use crate::history::{ + DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + SnapshotSummary, StoredSnapshot, TimeRange, parse_snapshot_filename, snapshot_path, +}; +use crate::schema::{ + ActivityStatsSnapshot, HashInput, PlannerStatsSnapshot, SchemaSnapshot, compute_content_hash, +}; + +pub struct FilesystemStore { + root: Arc, +} + +impl FilesystemStore { + pub fn new(root: impl Into) -> Self { + Self { + root: Arc::new(root.into()), + } + } + + pub fn list_keys(&self) -> Result> { + list_keys_sync(&self.root) + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct Bundle { + schema: SchemaSnapshot, + #[serde(default)] + planner: Option, + #[serde(default)] + activity: BTreeMap, +} + +#[async_trait] +impl SnapshotStore for FilesystemStore { + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result { + let root = self.root.clone(); + let key = key.clone(); + let snap = snap.clone(); + run_blocking(move || match snap { + StoredSnapshot::Schema(s) => put_schema(&root, &key, s), + StoredSnapshot::Planner(p) => put_planner(&root, &key, p), + StoredSnapshot::Activity(a) => put_activity(&root, &key, a), + }) + .await + } + + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result { + let root = self.root.clone(); + let key = key.clone(); + let kind = kind.clone(); + run_blocking(move || get_kind(&root, &key, &kind, at)).await + } + + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result> { + let root = self.root.clone(); + let key = key.clone(); + let kind = kind.clone(); + run_blocking(move || list_kind(&root, &key, &kind, range)).await + } + + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result { + let root = self.root.clone(); + let key = key.clone(); + let kind = kind.clone(); + run_blocking(move || delete_before(&root, &key, &kind, cutoff)).await + } + + async fn list_kinds(&self, key: &SnapshotKey) -> Result> { + let root = self.root.clone(); + let key = key.clone(); + run_blocking(move || list_kinds_sync(&root, &key)).await + } +} + +fn put_schema(root: &Path, key: &SnapshotKey, snap: SchemaSnapshot) -> Result { + let dir = stream_dir(root, key); + if let Some(latest) = read_latest_hash(&dir)? + && latest == snap.content_hash + { + debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let path = snapshot_path(root, key, snap.timestamp, &snap.content_hash); + let bundle = Bundle { + schema: snap.clone(), + planner: None, + activity: BTreeMap::new(), + }; + write_bundle(&path, &bundle)?; + + info!( + hash = %snap.content_hash, + project = %key.project_id.0, + database = %key.database_id.0, + "snapshot put (fs)", + ); + Ok(PutOutcome::Inserted) +} + +fn put_planner(root: &Path, key: &SnapshotKey, snap: PlannerStatsSnapshot) -> Result { + let dir = stream_dir(root, key); + let (path, mut bundle) = + find_bundle_by_schema_hash(&dir, &snap.schema_ref_hash)?.ok_or_else(|| { + Error::History(format!( + "FilesystemStore: planner orphan — no schema bundle for ref {}", + snap.schema_ref_hash + )) + })?; + + if let Some(existing) = &bundle.planner + && existing.content_hash == snap.content_hash + { + return Ok(PutOutcome::Deduped); + } + + bundle.planner = Some(snap); + write_bundle(&path, &bundle)?; + Ok(PutOutcome::Inserted) +} + +fn put_activity(root: &Path, key: &SnapshotKey, snap: ActivityStatsSnapshot) -> Result { + let dir = stream_dir(root, key); + let (path, mut bundle) = + find_bundle_by_schema_hash(&dir, &snap.schema_ref_hash)?.ok_or_else(|| { + Error::History(format!( + "FilesystemStore: activity orphan — no schema bundle for ref {}", + snap.schema_ref_hash + )) + })?; + + let label = snap.node.label.clone(); + if let Some(existing) = bundle.activity.get(&label) + && existing.content_hash == snap.content_hash + { + return Ok(PutOutcome::Deduped); + } + + bundle.activity.insert(label, snap); + write_bundle(&path, &bundle)?; + Ok(PutOutcome::Inserted) +} + +fn get_kind( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, +) -> Result { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + + match kind { + SnapshotKind::Schema => { + let chosen = match &at { + SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), + SnapshotRef::At(target) => entries + .into_iter() + .filter(|(ts, _, _)| *ts <= *target) + .max_by_key(|(ts, _, _)| *ts), + SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), + }; + let (_, _, path) = chosen.ok_or_else(|| not_found_err("schema", &at))?; + let bundle = read_bundle(&path)?; + Ok(StoredSnapshot::Schema(bundle.schema)) + } + SnapshotKind::Planner => { + let mut bundles: Vec<(DateTime, Bundle)> = Vec::new(); + for (ts, _, p) in entries { + let b = read_bundle(&p)?; + if b.planner.is_some() { + bundles.push((ts, b)); + } + } + bundles.sort_by_key(|(ts, _)| std::cmp::Reverse(*ts)); + let chosen = match &at { + SnapshotRef::Latest => bundles.into_iter().next(), + SnapshotRef::At(target) => bundles.into_iter().find(|(ts, _)| *ts <= *target), + SnapshotRef::Hash(h) => bundles + .into_iter() + .find(|(_, b)| b.planner.as_ref().map(|p| &p.content_hash) == Some(h)), + }; + let (_, bundle) = chosen.ok_or_else(|| not_found_err("planner", &at))?; + Ok(StoredSnapshot::Planner(bundle.planner.expect("filtered"))) + } + SnapshotKind::Activity { node_label } => { + let mut bundles: Vec<(DateTime, Bundle)> = Vec::new(); + for (ts, _, p) in entries { + let b = read_bundle(&p)?; + if b.activity.contains_key(node_label) { + bundles.push((ts, b)); + } + } + bundles.sort_by_key(|(ts, _)| std::cmp::Reverse(*ts)); + let chosen = match &at { + SnapshotRef::Latest => bundles.into_iter().next(), + SnapshotRef::At(target) => bundles.into_iter().find(|(ts, _)| *ts <= *target), + SnapshotRef::Hash(h) => bundles + .into_iter() + .find(|(_, b)| b.activity.get(node_label).map(|a| &a.content_hash) == Some(h)), + }; + let (_, mut bundle) = chosen.ok_or_else(|| not_found_err("activity", &at))?; + let act = bundle.activity.remove(node_label).expect("filtered above"); + Ok(StoredSnapshot::Activity(act)) + } + } +} + +fn list_kind( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, +) -> Result> { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + + let mut out: Vec = Vec::new(); + for (_schema_ts, _schema_hash, path) in entries { + let bundle = read_bundle(&path)?; + if let Some(s) = bundle_summary_for_kind(&bundle, key, kind) { + if range.from.is_none_or(|f| s.timestamp >= f) + && range.to.is_none_or(|t| s.timestamp < t) + { + out.push(s); + } + } + } + out.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); + Ok(out) +} + +fn delete_before( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, +) -> Result { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + let mut affected = 0usize; + + match kind { + SnapshotKind::Schema => { + for (_ts, _h, path) in entries { + let bundle = read_bundle(&path)?; + if bundle.schema.timestamp < cutoff { + std::fs::remove_file(&path) + .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; + affected += 1; + } + } + } + SnapshotKind::Planner => { + for (_ts, _h, path) in entries { + let mut bundle = read_bundle(&path)?; + let drop = bundle + .planner + .as_ref() + .is_some_and(|p| p.timestamp < cutoff); + if drop { + bundle.planner = None; + write_bundle(&path, &bundle)?; + affected += 1; + } + } + } + SnapshotKind::Activity { node_label } => { + for (_ts, _h, path) in entries { + let mut bundle = read_bundle(&path)?; + let drop = bundle + .activity + .get(node_label) + .is_some_and(|a| a.timestamp < cutoff); + if drop { + bundle.activity.remove(node_label); + write_bundle(&path, &bundle)?; + affected += 1; + } + } + } + } + Ok(affected) +} + +fn list_kinds_sync(root: &Path, key: &SnapshotKey) -> Result> { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + if entries.is_empty() { + return Ok(Vec::new()); + } + + let mut has_schema = false; + let mut has_planner = false; + let mut activity_labels: std::collections::BTreeSet = Default::default(); + + for (_ts, _h, path) in entries { + let bundle = read_bundle(&path)?; + has_schema = true; + if bundle.planner.is_some() { + has_planner = true; + } + for label in bundle.activity.keys() { + activity_labels.insert(label.clone()); + } + } + + let mut out = Vec::new(); + if has_schema { + out.push(SnapshotKind::Schema); + } + if has_planner { + out.push(SnapshotKind::Planner); + } + for label in activity_labels { + out.push(SnapshotKind::Activity { node_label: label }); + } + Ok(out) +} + +fn bundle_summary_for_kind( + bundle: &Bundle, + key: &SnapshotKey, + kind: &SnapshotKind, +) -> Option { + let project = Some(key.project_id.0.clone()); + let database = Some(key.database_id.0.clone()); + let db_name = key.database_id.0.clone(); + match kind { + SnapshotKind::Schema => Some(SnapshotSummary { + id: 0, + kind: SnapshotKind::Schema, + timestamp: bundle.schema.timestamp, + content_hash: bundle.schema.content_hash.clone(), + schema_ref_hash: None, + database: db_name, + project_id: project, + database_id: database, + }), + SnapshotKind::Planner => bundle.planner.as_ref().map(|p| SnapshotSummary { + id: 0, + kind: SnapshotKind::Planner, + timestamp: p.timestamp, + content_hash: p.content_hash.clone(), + schema_ref_hash: Some(bundle.schema.content_hash.clone()), + database: db_name, + project_id: project, + database_id: database, + }), + SnapshotKind::Activity { node_label } => { + bundle.activity.get(node_label).map(|a| SnapshotSummary { + id: 0, + kind: SnapshotKind::Activity { + node_label: node_label.clone(), + }, + timestamp: a.timestamp, + content_hash: a.content_hash.clone(), + schema_ref_hash: Some(bundle.schema.content_hash.clone()), + database: db_name, + project_id: project, + database_id: database, + }) + } + } +} + +fn find_bundle_by_schema_hash(dir: &Path, schema_hash: &str) -> Result> { + for (_, _, path) in read_stream_entries(dir)? { + let bundle = read_bundle(&path)?; + if bundle.schema.content_hash == schema_hash { + return Ok(Some((path, bundle))); + } + } + Ok(None) +} + +fn read_bundle(path: &Path) -> Result { + let bytes = + std::fs::read(path).map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; + let json = zstd::decode_all(bytes.as_slice()).map_err(|e| { + Error::History(format!( + "corrupt snapshot {}: zstd decode: {e}", + path.display() + )) + })?; + // v0.6.1 and earlier exported a bare SchemaSnapshot, not a Bundle. + // Accept both shapes so `dryrun snapshot pull` can read older shared + // dirs without a migration step. + let bundle = if let Ok(b) = serde_json::from_slice::(&json) { + b + } else { + let schema: SchemaSnapshot = serde_json::from_slice(&json).map_err(|e| { + Error::History(format!("corrupt snapshot {}: JSON: {e}", path.display())) + })?; + Bundle { + schema, + planner: None, + activity: BTreeMap::new(), + } + }; + + verify_bundle_hash(path, &bundle)?; + Ok(bundle) +} + +// filename hash must match recomputed schema content_hash +fn verify_bundle_hash(path: &Path, bundle: &Bundle) -> Result<()> { + let fname = path + .file_name() + .and_then(|s| s.to_str()) + .ok_or_else(|| Error::History(format!("non-utf8 filename: {}", path.display())))?; + let (_, expected) = parse_snapshot_filename(fname).ok_or_else(|| { + Error::History(format!( + "corrupt snapshot {}: filename does not match {{ts}}-{{hash}}.json.zst", + path.display() + )) + })?; + + if !is_sha256_hex(&expected) { + return Ok(()); + } + + if bundle.schema.content_hash != expected { + return Err(Error::History(format!( + "corrupt snapshot {}: filename hash {} != stored schema.content_hash {}", + path.display(), + expected, + bundle.schema.content_hash, + ))); + } + + let recomputed = compute_content_hash(&HashInput { + pg_version: &bundle.schema.pg_version, + tables: &bundle.schema.tables, + enums: &bundle.schema.enums, + domains: &bundle.schema.domains, + composites: &bundle.schema.composites, + views: &bundle.schema.views, + functions: &bundle.schema.functions, + extensions: &bundle.schema.extensions, + }); + if recomputed != expected { + return Err(Error::History(format!( + "corrupt snapshot {}: filename hash {} != recomputed schema hash {}", + path.display(), + expected, + recomputed, + ))); + } + + if let Some(planner) = &bundle.planner { + let mut p = planner.clone(); + p.content_hash = String::new(); + let recomputed = sha256_hex_of_serialized(&p)?; + if recomputed != planner.content_hash { + return Err(Error::History(format!( + "corrupt snapshot {}: planner content_hash {} != recomputed {}", + path.display(), + planner.content_hash, + recomputed, + ))); + } + } + + for (label, activity) in &bundle.activity { + let mut a = activity.clone(); + a.content_hash = String::new(); + let recomputed = sha256_hex_of_serialized(&a)?; + if recomputed != activity.content_hash { + return Err(Error::History(format!( + "corrupt snapshot {}: activity[{}] content_hash {} != recomputed {}", + path.display(), + label, + activity.content_hash, + recomputed, + ))); + } + } + Ok(()) +} + +fn is_sha256_hex(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +fn sha256_hex_of_serialized(value: &T) -> Result { + let bytes = serde_json::to_vec(value) + .map_err(|e| Error::History(format!("cannot serialize for hash check: {e}")))?; + Ok(format!("{:x}", Sha256::digest(&bytes))) +} + +fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| Error::History(format!("create_dir_all {}: {e}", parent.display())))?; + } + // unique tmp path so concurrent same-hash writers don't collide + let tmp = unique_tmp_path(path); + let json = serde_json::to_vec(bundle) + .map_err(|e| Error::History(format!("cannot serialize bundle: {e}")))?; + let compressed = zstd::encode_all(json.as_slice(), 3) + .map_err(|e| Error::History(format!("zstd encode: {e}")))?; + std::fs::write(&tmp, compressed) + .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; + if let Err(e) = std::fs::rename(&tmp, path) { + let _ = std::fs::remove_file(&tmp); + return Err(Error::History(format!("rename to {}: {e}", path.display()))); + } + Ok(()) +} + +fn unique_tmp_path(path: &Path) -> PathBuf { + use std::sync::atomic::{AtomicU64, Ordering}; + static COUNTER: AtomicU64 = AtomicU64::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + let suffix = format!("zst.{pid}.{n}.tmp"); + path.with_extension(suffix) +} + +fn not_found_err(kind: &str, at: &SnapshotRef) -> Error { + let detail = match at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Error::History(format!("{kind} snapshot not found ({detail})")) +} + +fn stream_dir(root: &Path, key: &SnapshotKey) -> PathBuf { + root.join(&key.project_id.0).join(&key.database_id.0) +} + +fn read_stream_entries(dir: &Path) -> Result, String, PathBuf)>> { + if !dir.is_dir() { + return Ok(Vec::new()); + } + let mut entries = Vec::new(); + for entry in std::fs::read_dir(dir) + .map_err(|e| Error::History(format!("read_dir {}: {e}", dir.display())))? + { + let entry = entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if let Some((ts, hash)) = parse_snapshot_filename(name) { + entries.push((ts, hash, path)); + } + } + Ok(entries) +} + +fn read_latest_hash(dir: &Path) -> Result> { + Ok(read_stream_entries(dir)? + .into_iter() + .max_by_key(|(ts, _, _)| *ts) + .map(|(_, hash, _)| hash)) +} + +fn list_keys_sync(root: &Path) -> Result> { + let mut keys = Vec::new(); + if !root.is_dir() { + return Ok(keys); + } + for proj_entry in std::fs::read_dir(root) + .map_err(|e| Error::History(format!("read_dir {}: {e}", root.display())))? + { + let proj_entry = proj_entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let proj_path = proj_entry.path(); + if !proj_path.is_dir() { + continue; + } + let Some(project_id) = proj_path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + for db_entry in std::fs::read_dir(&proj_path) + .map_err(|e| Error::History(format!("read_dir {}: {e}", proj_path.display())))? + { + let db_entry = db_entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let db_path = db_entry.path(); + if !db_path.is_dir() { + continue; + } + let Some(database_id) = db_path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + keys.push(SnapshotKey { + project_id: ProjectId(project_id.to_string()), + database_id: DatabaseId(database_id.to_string()), + }); + } + } + keys.sort_by(|a, b| { + a.project_id + .0 + .cmp(&b.project_id.0) + .then_with(|| a.database_id.0.cmp(&b.database_id.0)) + }); + Ok(keys) +} + +async fn run_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + tokio::task::spawn_blocking(f) + .await + .map_err(|e| Error::History(format!("blocking task failed: {e}")))? +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::history::test_fixtures; + use tempfile::TempDir; + + fn make_schema(hash: &str) -> SchemaSnapshot { + test_fixtures::make_snap(hash, "auth") + } + + fn make_planner(schema_ref: &str, hash: &str) -> PlannerStatsSnapshot { + test_fixtures::make_planner(schema_ref, "auth", hash) + } + + fn make_activity(schema_ref: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { + test_fixtures::make_activity(schema_ref, "auth", label, hash) + } + + fn key() -> SnapshotKey { + test_fixtures::key("p", "auth") + } + + fn temp_store() -> (TempDir, FilesystemStore) { + let dir = TempDir::new().unwrap(); + let store = FilesystemStore::new(dir.path().to_path_buf()); + (dir, store) + } + + #[tokio::test] + async fn put_schema_round_trips_via_trait() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("h1")).await.unwrap(); + + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(got.content_hash, "h1"); + } + + #[tokio::test] + async fn put_schema_dedupes_on_same_content_hash() { + let (_dir, store) = temp_store(); + let k = key(); + let s = make_schema("h1"); + assert_eq!( + store.put_schema(&k, &s).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!(store.put_schema(&k, &s).await.unwrap(), PutOutcome::Deduped); + } + + #[tokio::test] + async fn bundle_round_trips_all_three_kinds() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "ah")) + .await + .unwrap(); + + let s = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(s.content_hash, "sh"); + + let p = store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .unwrap() + .into_planner() + .unwrap(); + assert_eq!(p.content_hash, "ph"); + + let a = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(a.content_hash, "ah"); + assert_eq!(a.node.label, "primary"); + } + + #[tokio::test] + async fn put_planner_without_schema_errors() { + let (_dir, store) = temp_store(); + let k = key(); + let err = store + .put_planner_stats(&k, &make_planner("missing", "ph")) + .await + .unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("orphan"), "expected orphan error, got: {msg}"); + } + + #[tokio::test] + async fn put_planner_dedupes_on_same_content_hash() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + let p = make_planner("sh", "ph"); + assert_eq!( + store.put_planner_stats(&k, &p).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_planner_stats(&k, &p).await.unwrap(), + PutOutcome::Deduped + ); + } + + #[tokio::test] + async fn put_activity_upserts_per_node_label() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "standby", "b1")) + .await + .unwrap(); + // overwrite primary + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a2")) + .await + .unwrap(); + + let primary = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(primary.content_hash, "a2"); + + let standby = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "standby".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(standby.content_hash, "b1"); + } + + #[tokio::test] + async fn list_planner_returns_only_bundles_with_planner() { + let (_dir, store) = temp_store(); + let k = key(); + // bundle #1: schema + planner + store.put_schema(&k, &make_schema("sh1")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh1", "ph1")) + .await + .unwrap(); + // bundle #2: schema only + store.put_schema(&k, &make_schema("sh2")).await.unwrap(); + + let schemas = store + .list(&k, &SnapshotKind::Schema, TimeRange::default()) + .await + .unwrap(); + assert_eq!(schemas.len(), 2); + + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert_eq!(planners.len(), 1); + assert_eq!(planners[0].content_hash, "ph1"); + assert_eq!(planners[0].schema_ref_hash.as_deref(), Some("sh1")); + } + + #[tokio::test] + async fn list_kinds_reports_distinct_node_labels() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "standby", "b1")) + .await + .unwrap(); + + let kinds = store.list_kinds(&k).await.unwrap(); + assert!(kinds.contains(&SnapshotKind::Schema)); + assert!(kinds.contains(&SnapshotKind::Planner)); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "primary".into() + })); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "standby".into() + })); + assert_eq!(kinds.len(), 4); + } + + #[tokio::test] + async fn delete_before_planner_clears_field_keeps_schema() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + + let cutoff = Utc::now() + chrono::Duration::seconds(60); + let removed = store + .delete_before(&k, &SnapshotKind::Planner, cutoff) + .await + .unwrap(); + assert_eq!(removed, 1); + + // schema still there + let s = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(s.content_hash, "sh"); + + // planner gone + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert!(planners.is_empty()); + } + + #[tokio::test] + async fn delete_before_schema_removes_whole_bundle() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + + let cutoff = Utc::now() + chrono::Duration::seconds(60); + let removed = store + .delete_before(&k, &SnapshotKind::Schema, cutoff) + .await + .unwrap(); + assert_eq!(removed, 1); + + let schemas = store + .list(&k, &SnapshotKind::Schema, TimeRange::default()) + .await + .unwrap(); + assert!(schemas.is_empty()); + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert!(planners.is_empty()); + } + + // Produce a snapshot with a real sha256 content_hash so the + // is_sha256_hex gate engages and verify_bundle_hash actually runs. + fn make_schema_with_real_hash(seed: &str) -> SchemaSnapshot { + let mut s = test_fixtures::make_snap("placeholder", "auth"); + s.pg_version = format!("PostgreSQL 17.0 ({seed})"); + s.content_hash = compute_content_hash(&HashInput { + pg_version: &s.pg_version, + tables: &s.tables, + enums: &s.enums, + domains: &s.domains, + composites: &s.composites, + views: &s.views, + functions: &s.functions, + extensions: &s.extensions, + }); + s + } + + #[tokio::test] + async fn read_bundle_rejects_byte_flipped_file() { + let (dir, store) = temp_store(); + let k = key(); + let snap = make_schema_with_real_hash("seed-a"); + store.put_schema(&k, &snap).await.unwrap(); + + let target = std::fs::read_dir(stream_dir(dir.path(), &k)) + .unwrap() + .filter_map(|e| e.ok()) + .find(|e| { + e.path() + .file_name() + .and_then(|s| s.to_str()) + .is_some_and(|n| n.ends_with(".json.zst")) + }) + .map(|e| e.path()) + .expect("pushed bundle file"); + + // Flip a byte mid-file (zstd payload). Either zstd decode fails or + // the recomputed schema hash diverges; both must surface as an error. + let mut bytes = std::fs::read(&target).unwrap(); + let mid = bytes.len() / 2; + bytes[mid] ^= 0xFF; + std::fs::write(&target, &bytes).unwrap(); + + let err = store + .get(&k, &SnapshotKind::Schema, SnapshotRef::Latest) + .await + .expect_err("corrupt file must error loudly"); + let msg = format!("{err}"); + assert!( + msg.contains("corrupt snapshot"), + "expected corruption error, got: {msg}" + ); + } + + #[tokio::test] + async fn read_bundle_rejects_filename_hash_mismatch() { + let (dir, store) = temp_store(); + let k = key(); + let snap = make_schema_with_real_hash("seed-b"); + store.put_schema(&k, &snap).await.unwrap(); + + // Rename the bundle to claim a different (but still 64-hex) hash. + let stream = stream_dir(dir.path(), &k); + let original = std::fs::read_dir(&stream) + .unwrap() + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .find(|p| p.extension().is_some_and(|e| e == "zst")) + .unwrap(); + let renamed = stream.join(format!( + "{}-{}.json.zst", + snap.timestamp.format("%Y%m%dT%H%M%SZ"), + "f".repeat(64), + )); + std::fs::rename(&original, &renamed).unwrap(); + + let err = store + .get(&k, &SnapshotKind::Schema, SnapshotRef::Latest) + .await + .expect_err("filename-hash mismatch must error loudly"); + assert!(format!("{err}").contains("corrupt snapshot")); + } + + #[tokio::test] + async fn concurrent_writers_same_hash_are_idempotent() { + let (dir, store) = temp_store(); + let k = key(); + let snap = make_schema_with_real_hash("seed-concurrent"); + let store = std::sync::Arc::new(store); + + let mut tasks = Vec::new(); + for _ in 0..16 { + let s = store.clone(); + let k = k.clone(); + let snap = snap.clone(); + tasks.push(tokio::spawn(async move { s.put_schema(&k, &snap).await })); + } + for t in tasks { + t.await + .expect("join") + .expect("put_schema must not race-fail"); + } + + let stream = stream_dir(dir.path(), &k); + let entries: Vec<_> = std::fs::read_dir(&stream) + .unwrap() + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .collect(); + + let finals: Vec<_> = entries + .iter() + .filter(|p| p.extension().is_some_and(|e| e == "zst")) + .collect(); + assert_eq!( + finals.len(), + 1, + "expected exactly one bundle, got {entries:?}" + ); + + let stragglers: Vec<_> = entries + .iter() + .filter(|p| { + p.file_name() + .and_then(|s| s.to_str()) + .is_some_and(|n| n.contains(".tmp")) + }) + .collect(); + assert!(stragglers.is_empty(), "stray .tmp files: {stragglers:?}"); + } + + // Pins the v0.6.1 backward-compat path in read_bundle: a bare + // SchemaSnapshot JSON (no `schema`/`planner`/`activity` envelope) must + // still load. Removing the fallback would break `pull` against + // pre-bundling shared dirs. + #[tokio::test] + async fn read_bundle_accepts_v061_bare_schema_format() { + let (dir, store) = temp_store(); + let k = key(); + let snap = make_schema_with_real_hash("seed-legacy"); + + let path = snapshot_path(dir.path(), &k, snap.timestamp, &snap.content_hash); + std::fs::create_dir_all(path.parent().unwrap()).unwrap(); + let json = serde_json::to_vec(&snap).unwrap(); + let compressed = zstd::encode_all(json.as_slice(), 3).unwrap(); + std::fs::write(&path, compressed).unwrap(); + + let got = store + .get(&k, &SnapshotKind::Schema, SnapshotRef::Latest) + .await + .expect("v0.6.1 bare SchemaSnapshot must be readable") + .into_schema() + .expect("schema variant"); + assert_eq!(got.content_hash, snap.content_hash); + } +} diff --git a/crates/dry_run_core/src/history/mod.rs b/crates/dry_run_core/src/history/mod.rs index b37282b..78156b0 100644 --- a/crates/dry_run_core/src/history/mod.rs +++ b/crates/dry_run_core/src/history/mod.rs @@ -1,7 +1,14 @@ +pub mod filesystem_layout; +mod filesystem_store; mod snapshot_store; mod store; +#[cfg(test)] +mod test_fixtures; +pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path}; +pub use filesystem_store::FilesystemStore; pub use snapshot_store::{ - DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + StoredSnapshot, TimeRange, }; pub use store::{HistoryStore, SnapshotSummary, default_data_dir}; diff --git a/crates/dry_run_core/src/history/snapshot_store.rs b/crates/dry_run_core/src/history/snapshot_store.rs index 4da5b7c..a102ab0 100644 --- a/crates/dry_run_core/src/history/snapshot_store.rs +++ b/crates/dry_run_core/src/history/snapshot_store.rs @@ -2,8 +2,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use crate::error::Result; -use crate::schema::SchemaSnapshot; +use crate::error::{Error, Result}; +use crate::schema::{ActivityStatsSnapshot, PlannerStatsSnapshot, SchemaSnapshot}; pub use super::store::SnapshotSummary; @@ -38,11 +38,197 @@ pub enum PutOutcome { Deduped, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SnapshotKind { + Schema, + Planner, + Activity { node_label: String }, +} + +impl SnapshotKind { + /// String stored in the SQLite `kind` column. + #[must_use] + pub fn db_kind(&self) -> &'static str { + match self { + Self::Schema => "schema", + Self::Planner => "planner_stats", + Self::Activity { .. } => "activity_stats", + } + } + + #[must_use] + pub fn node_label(&self) -> Option<&str> { + match self { + Self::Activity { node_label } => Some(node_label.as_str()), + _ => None, + } + } +} + +#[derive(Debug, Clone)] +pub enum StoredSnapshot { + Schema(SchemaSnapshot), + Planner(PlannerStatsSnapshot), + Activity(ActivityStatsSnapshot), +} + +impl StoredSnapshot { + #[must_use] + pub fn kind(&self) -> SnapshotKind { + match self { + Self::Schema(_) => SnapshotKind::Schema, + Self::Planner(_) => SnapshotKind::Planner, + Self::Activity(a) => SnapshotKind::Activity { + node_label: a.node.label.clone(), + }, + } + } + + #[must_use] + pub fn timestamp(&self) -> DateTime { + match self { + Self::Schema(s) => s.timestamp, + Self::Planner(p) => p.timestamp, + Self::Activity(a) => a.timestamp, + } + } + + #[must_use] + pub fn content_hash(&self) -> &str { + match self { + Self::Schema(s) => &s.content_hash, + Self::Planner(p) => &p.content_hash, + Self::Activity(a) => &a.content_hash, + } + } + + #[must_use] + pub fn schema_ref_hash(&self) -> Option<&str> { + match self { + Self::Schema(_) => None, + Self::Planner(p) => Some(&p.schema_ref_hash), + Self::Activity(a) => Some(&a.schema_ref_hash), + } + } + + #[must_use] + pub fn database(&self) -> &str { + match self { + Self::Schema(s) => &s.database, + Self::Planner(p) => &p.database, + Self::Activity(a) => &a.database, + } + } + + pub fn into_schema(self) -> Result { + match self { + Self::Schema(s) => Ok(s), + other => Err(Error::History(format!( + "expected schema snapshot, got {}", + other.kind().db_kind() + ))), + } + } + + pub fn into_planner(self) -> Result { + match self { + Self::Planner(p) => Ok(p), + other => Err(Error::History(format!( + "expected planner snapshot, got {}", + other.kind().db_kind() + ))), + } + } + + pub fn into_activity(self) -> Result { + match self { + Self::Activity(a) => Ok(a), + other => Err(Error::History(format!( + "expected activity snapshot, got {}", + other.kind().db_kind() + ))), + } + } +} + #[async_trait] pub trait SnapshotStore: Send + Sync { - async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result; - async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result; - async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result>; - async fn latest(&self, key: &SnapshotKey) -> Result>; - async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result; + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result; + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result; + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result>; + async fn latest( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + ) -> Result> { + Ok(self + .list(key, kind, TimeRange::default()) + .await? + .into_iter() + .next()) + } + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result; + async fn list_kinds(&self, key: &SnapshotKey) -> Result>; + + // wrapper per schema kind + async fn put_schema(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + self.put(key, &StoredSnapshot::Schema(snap.clone())).await + } + + async fn put_planner_stats( + &self, + key: &SnapshotKey, + snap: &PlannerStatsSnapshot, + ) -> Result { + self.put(key, &StoredSnapshot::Planner(snap.clone())).await + } + + async fn put_activity_stats( + &self, + key: &SnapshotKey, + snap: &ActivityStatsSnapshot, + ) -> Result { + self.put(key, &StoredSnapshot::Activity(snap.clone())).await + } + + async fn get_schema(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + self.get(key, &SnapshotKind::Schema, at) + .await? + .into_schema() + } + + async fn list_schema( + &self, + key: &SnapshotKey, + range: TimeRange, + ) -> Result> { + self.list(key, &SnapshotKind::Schema, range).await + } + + async fn latest_schema(&self, key: &SnapshotKey) -> Result> { + self.latest(key, &SnapshotKind::Schema).await + } + + async fn delete_schema_before( + &self, + key: &SnapshotKey, + cutoff: DateTime, + ) -> Result { + self.delete_before(key, &SnapshotKind::Schema, cutoff).await + } } diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index 496b28b..0b1351e 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -9,7 +9,7 @@ use tracing::{debug, info, warn}; use crate::error::{Error, Result}; use crate::history::snapshot_store::{ - PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, StoredSnapshot, TimeRange, }; use crate::schema::{ ActivityStatsSnapshot, AnnotatedSnapshot, PlannerStatsSnapshot, SchemaSnapshot, @@ -22,8 +22,10 @@ pub struct HistoryStore { #[derive(Debug, Clone)] pub struct SnapshotSummary { pub id: i64, + pub kind: SnapshotKind, pub timestamp: DateTime, pub content_hash: String, + pub schema_ref_hash: Option, pub database: String, pub project_id: Option, pub database_id: Option, @@ -116,120 +118,12 @@ impl HistoryStore { .await } - pub async fn put_planner_stats( - &self, - key: &SnapshotKey, - snap: &PlannerStatsSnapshot, - ) -> Result { - let key = key.clone(); - let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - - let latest: Option = conn - .query_row( - "SELECT content_hash FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'planner_stats' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], - |row| row.get(0), - ) - .ok(); - - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, "planner stats unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize planner stats: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, - database_name, snapshot_json, project_id, database_id) - VALUES ('planner_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.schema_ref_hash, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - project = %pid, database = %did, "planner stats put"); - Ok(PutOutcome::Inserted) - }) - .await - } - - pub async fn put_activity_stats( - &self, - key: &SnapshotKey, - snap: &ActivityStatsSnapshot, - ) -> Result { - let key = key.clone(); - let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - let label = &snap.node.label; - - let latest: Option = conn - .query_row( - "SELECT content_hash FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 - AND kind = 'activity_stats' AND node_label = ?3 - ORDER BY timestamp DESC LIMIT 1", - params![pid, did, label], - |row| row.get(0), - ) - .ok(); - - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, label = %label, - "activity stats unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize activity stats: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, - node_label, database_name, snapshot_json, - project_id, database_id) - VALUES ('activity_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.schema_ref_hash, - label, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - label = %label, project = %pid, database = %did, - "activity stats put"); - Ok(PutOutcome::Inserted) - }) - .await - } - pub async fn get_annotated( &self, key: &SnapshotKey, at: SnapshotRef, ) -> Result { - let schema = SnapshotStore::get(self, key, at.clone()).await?; + let schema = SnapshotStore::get_schema(self, key, at.clone()).await?; let schema_hash = schema.content_hash.clone(); let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); @@ -363,17 +257,33 @@ fn lock_conn(conn: &Mutex) -> Result) -> rusqlite::Result { +fn push_node_label_filter( + sql: &mut String, + bound: &mut Vec>, + kind: &SnapshotKind, +) { + if let SnapshotKind::Activity { node_label } = kind { + *sql += &format!(" AND node_label = ?{}", bound.len() + 1); + bound.push(Box::new(node_label.clone())); + } +} + +fn row_to_summary( + row: &rusqlite::Row<'_>, + kind: SnapshotKind, +) -> rusqlite::Result { let ts_str: String = row.get(1)?; Ok(SnapshotSummary { id: row.get(0)?, + kind, timestamp: DateTime::parse_from_rfc3339(&ts_str) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_default(), content_hash: row.get(2)?, - database: row.get(3)?, - project_id: row.get(4)?, - database_id: row.get(5)?, + schema_ref_hash: row.get(3)?, + database: row.get(4)?, + project_id: row.get(5)?, + database_id: row.get(6)?, }) } @@ -395,111 +305,52 @@ where #[async_trait] impl SnapshotStore for HistoryStore { - async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result { let key = key.clone(); let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - - let latest: Option = conn - .query_row( - "SELECT content_hash FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], - |row| row.get(0), - ) - .ok(); - - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, database_name, - snapshot_json, project_id, database_id) - VALUES ('schema', ?1, ?2, ?3, ?4, ?5, ?6)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, project = %pid, database = %did, "snapshot put"); - Ok(PutOutcome::Inserted) + run_blocking(&self.conn, move |conn| match snap { + StoredSnapshot::Schema(s) => insert_schema(conn, &key, &s), + StoredSnapshot::Planner(p) => insert_planner(conn, &key, &p), + StoredSnapshot::Activity(a) => insert_activity(conn, &key, &a), }) .await } - async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); + let kind = kind.clone(); run_blocking(&self.conn, move |conn| { - let row = match &at { - SnapshotRef::Latest => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], - |r| r.get::<_, String>(0), - ), - SnapshotRef::At(ts) => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND timestamp <= ?3 - ORDER BY timestamp DESC LIMIT 1", - params![pid, did, ts.to_rfc3339()], - |r| r.get::<_, String>(0), - ), - SnapshotRef::Hash(h) => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND content_hash = ?3 - LIMIT 1", - params![pid, did, h], - |r| r.get::<_, String>(0), - ), - }; - - let json = match row { - Ok(j) => j, - Err(rusqlite::Error::QueryReturnedNoRows) => { - let detail = match at { - SnapshotRef::Latest => "latest".to_string(), - SnapshotRef::At(ts) => format!("at-or-before {ts}"), - SnapshotRef::Hash(h) => format!("hash {h}"), - }; - return Err(Error::History(format!("snapshot not found ({detail})"))); - } - Err(e) => return Err(e.into()), - }; - - serde_json::from_str(&json) - .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) + let json = fetch_snapshot_json(conn, &pid, &did, &kind, &at)?; + decode_stored(&kind, &json) }) .await } - async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result> { + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result> { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); + let kind = kind.clone(); run_blocking(&self.conn, move |conn| { let mut sql = String::from( - "SELECT id, timestamp, content_hash, database_name, + "SELECT id, timestamp, content_hash, schema_ref_hash, database_name, project_id, database_id FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema'", + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3", ); - let mut bound: Vec> = vec![Box::new(pid), Box::new(did)]; + let mut bound: Vec> = + vec![Box::new(pid), Box::new(did), Box::new(kind.db_kind())]; + push_node_label_filter(&mut sql, &mut bound, &kind); if let Some(from) = range.from { sql += &format!(" AND timestamp >= ?{}", bound.len() + 1); bound.push(Box::new(from.to_rfc3339())); @@ -512,68 +363,332 @@ impl SnapshotStore for HistoryStore { let mut stmt = conn.prepare(&sql)?; let params: Vec<&dyn rusqlite::ToSql> = bound.iter().map(|b| b.as_ref()).collect(); - stmt.query_map(params.as_slice(), row_to_summary)? - .map(|r| r.map_err(Error::from)) - .collect() + let kind_for_rows = kind.clone(); + stmt.query_map(params.as_slice(), |row| { + row_to_summary(row, kind_for_rows.clone()) + })? + .map(|r| r.map_err(Error::from)) + .collect() }) .await } - async fn latest(&self, key: &SnapshotKey) -> Result> { - Ok(self - .list(key, TimeRange::default()) - .await? - .into_iter() - .next()) + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result { + let pid = key.project_id.0.clone(); + let did = key.database_id.0.clone(); + let kind = kind.clone(); + run_blocking(&self.conn, move |conn| { + let mut sql = String::from( + "DELETE FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND timestamp < ?4", + ); + let mut bound: Vec> = vec![ + Box::new(pid), + Box::new(did), + Box::new(kind.db_kind()), + Box::new(cutoff.to_rfc3339()), + ]; + push_node_label_filter(&mut sql, &mut bound, &kind); + let params: Vec<&dyn rusqlite::ToSql> = bound.iter().map(|b| b.as_ref()).collect(); + Ok(conn.execute(&sql, params.as_slice())?) + }) + .await } - async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result { + async fn list_kinds(&self, key: &SnapshotKey) -> Result> { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); run_blocking(&self.conn, move |conn| { - Ok(conn.execute( - "DELETE FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND timestamp < ?3", - params![pid, did, cutoff.to_rfc3339()], - )?) + let mut stmt = conn.prepare( + "SELECT DISTINCT kind, node_label FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + ORDER BY kind, node_label", + )?; + let rows = stmt.query_map(params![pid, did], |row| { + let kind: String = row.get(0)?; + let node_label: Option = row.get(1)?; + Ok((kind, node_label)) + })?; + let mut out = Vec::new(); + for r in rows { + let (kind, node_label) = r?; + match kind.as_str() { + "schema" => out.push(SnapshotKind::Schema), + "planner_stats" => out.push(SnapshotKind::Planner), + "activity_stats" => { + if let Some(label) = node_label { + out.push(SnapshotKind::Activity { node_label: label }); + } + } + other => { + return Err(Error::History(format!("unknown snapshot kind: {other}"))); + } + } + } + Ok(out) }) .await } } +fn fetch_snapshot_json( + conn: &Connection, + pid: &str, + did: &str, + kind: &SnapshotKind, + at: &SnapshotRef, +) -> Result { + let kind_str = kind.db_kind(); + let label_filter = matches!(kind, SnapshotKind::Activity { .. }); + let row: rusqlite::Result = match (at, label_filter) { + (SnapshotRef::Latest, false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str], + |r| r.get(0), + ), + (SnapshotRef::Latest, true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, label], + |r| r.get(0), + ) + } + (SnapshotRef::At(ts), false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND timestamp <= ?4 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, ts.to_rfc3339()], + |r| r.get(0), + ), + (SnapshotRef::At(ts), true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 AND timestamp <= ?5 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, label, ts.to_rfc3339()], + |r| r.get(0), + ) + } + (SnapshotRef::Hash(h), false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND content_hash = ?4 + LIMIT 1", + params![pid, did, kind_str, h], + |r| r.get(0), + ), + (SnapshotRef::Hash(h), true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 AND content_hash = ?5 + LIMIT 1", + params![pid, did, kind_str, label, h], + |r| r.get(0), + ) + } + }; + + match row { + Ok(j) => Ok(j), + Err(rusqlite::Error::QueryReturnedNoRows) => { + let detail = match at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Err(Error::History(format!( + "{} snapshot not found ({detail})", + kind.db_kind() + ))) + } + Err(e) => Err(e.into()), + } +} + +fn decode_stored(kind: &SnapshotKind, json: &str) -> Result { + match kind { + SnapshotKind::Schema => serde_json::from_str::(json) + .map(StoredSnapshot::Schema) + .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))), + SnapshotKind::Planner => serde_json::from_str::(json) + .map(StoredSnapshot::Planner) + .map_err(|e| Error::History(format!("corrupt planner stats JSON: {e}"))), + SnapshotKind::Activity { .. } => serde_json::from_str::(json) + .map(StoredSnapshot::Activity) + .map_err(|e| Error::History(format!("corrupt activity stats JSON: {e}"))), + } +} + +fn insert_schema( + conn: &Connection, + key: &SnapshotKey, + snap: &SchemaSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + + let latest: Option = conn + .query_row( + "SELECT content_hash FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' + ORDER BY timestamp DESC LIMIT 1", + params![pid, did], + |row| row.get(0), + ) + .ok(); + + if latest.as_deref() == Some(snap.content_hash.as_str()) { + debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, database_name, + snapshot_json, project_id, database_id) + VALUES ('schema', ?1, ?2, ?3, ?4, ?5, ?6)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, project = %pid, database = %did, "snapshot put"); + Ok(PutOutcome::Inserted) +} + +fn insert_planner( + conn: &Connection, + key: &SnapshotKey, + snap: &PlannerStatsSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + + let exists: Option = conn + .query_row( + "SELECT id FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + AND kind = 'planner_stats' + AND schema_ref_hash = ?3 AND content_hash = ?4 + LIMIT 1", + params![pid, did, snap.schema_ref_hash, snap.content_hash], + |r| r.get(0), + ) + .ok(); + + if exists.is_some() { + debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + "planner stats unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize planner stats: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, + database_name, snapshot_json, project_id, database_id) + VALUES ('planner_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.schema_ref_hash, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + project = %pid, database = %did, "planner stats put"); + Ok(PutOutcome::Inserted) +} + +fn insert_activity( + conn: &Connection, + key: &SnapshotKey, + snap: &ActivityStatsSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + let label = &snap.node.label; + + let exists: Option = conn + .query_row( + "SELECT id FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + AND kind = 'activity_stats' AND node_label = ?3 + AND schema_ref_hash = ?4 AND content_hash = ?5 + LIMIT 1", + params![pid, did, label, snap.schema_ref_hash, snap.content_hash], + |r| r.get(0), + ) + .ok(); + + if exists.is_some() { + debug!(hash = %snap.content_hash, label = %label, + "activity stats unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize activity stats: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, + node_label, database_name, snapshot_json, + project_id, database_id) + VALUES ('activity_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.schema_ref_hash, + label, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + label = %label, project = %pid, database = %did, + "activity stats put"); + Ok(PutOutcome::Inserted) +} + #[cfg(test)] mod trait_tests { use chrono::Duration; use tempfile::TempDir; use super::*; - use crate::history::snapshot_store::{DatabaseId, ProjectId}; - - fn make_snap(hash: &str, database: &str) -> SchemaSnapshot { - SchemaSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: database.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - source: None, - tables: vec![], - enums: vec![], - domains: vec![], - composites: vec![], - views: vec![], - functions: vec![], - extensions: vec![], - gucs: vec![], - } - } - - fn key(proj: &str, db: &str) -> SnapshotKey { - SnapshotKey { - project_id: ProjectId(proj.into()), - database_id: DatabaseId(db.into()), - } - } + use crate::history::test_fixtures::{key, make_activity, make_planner, make_snap}; fn temp_store() -> (TempDir, HistoryStore) { let dir = TempDir::new().unwrap(); @@ -588,8 +703,14 @@ mod trait_tests { let k = key("p", "auth"); let snap = make_snap("h1", "auth"); - assert_eq!(store.put(&k, &snap).await.unwrap(), PutOutcome::Inserted); - assert_eq!(store.put(&k, &snap).await.unwrap(), PutOutcome::Deduped); + assert_eq!( + store.put_schema(&k, &snap).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_schema(&k, &snap).await.unwrap(), + PutOutcome::Deduped + ); } #[tokio::test] @@ -600,19 +721,28 @@ mod trait_tests { // same content_hash under different database_id should not dedupe assert_eq!( - store.put(&auth, &make_snap("same", "auth")).await.unwrap(), + store + .put_schema(&auth, &make_snap("same", "auth")) + .await + .unwrap(), PutOutcome::Inserted ); assert_eq!( store - .put(&billing, &make_snap("same", "billing")) + .put_schema(&billing, &make_snap("same", "billing")) .await .unwrap(), PutOutcome::Inserted ); - let auth_rows = store.list(&auth, TimeRange::default()).await.unwrap(); - let billing_rows = store.list(&billing, TimeRange::default()).await.unwrap(); + let auth_rows = store + .list_schema(&auth, TimeRange::default()) + .await + .unwrap(); + let billing_rows = store + .list_schema(&billing, TimeRange::default()) + .await + .unwrap(); assert_eq!(auth_rows.len(), 1); assert_eq!(billing_rows.len(), 1); assert_eq!(auth_rows[0].database_id.as_deref(), Some("auth")); @@ -624,11 +754,11 @@ mod trait_tests { let (_dir, store) = temp_store(); let a = key("a", "x"); let b = key("b", "x"); - store.put(&a, &make_snap("h", "x")).await.unwrap(); - store.put(&b, &make_snap("h", "x")).await.unwrap(); + store.put_schema(&a, &make_snap("h", "x")).await.unwrap(); + store.put_schema(&b, &make_snap("h", "x")).await.unwrap(); - let a_rows = store.list(&a, TimeRange::default()).await.unwrap(); - let b_rows = store.list(&b, TimeRange::default()).await.unwrap(); + let a_rows = store.list_schema(&a, TimeRange::default()).await.unwrap(); + let b_rows = store.list_schema(&b, TimeRange::default()).await.unwrap(); assert_eq!(a_rows.len(), 1); assert_eq!(b_rows.len(), 1); assert_eq!(a_rows[0].project_id.as_deref(), Some("a")); @@ -643,10 +773,10 @@ mod trait_tests { s1.timestamp = Utc::now() - Duration::hours(2); let mut s2 = make_snap("h2", "x"); s2.timestamp = Utc::now() - Duration::hours(1); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let rows = store.list(&k, TimeRange::default()).await.unwrap(); + let rows = store.list_schema(&k, TimeRange::default()).await.unwrap(); assert_eq!(rows.len(), 2); assert_eq!(rows[0].content_hash, "h2"); assert_eq!(rows[1].content_hash, "h1"); @@ -660,12 +790,12 @@ mod trait_tests { for (i, hash) in ["h0", "h1", "h2"].iter().enumerate() { let mut s = make_snap(hash, "x"); s.timestamp = now - Duration::hours(2 - i as i64); - store.put(&k, &s).await.unwrap(); + store.put_schema(&k, &s).await.unwrap(); } // from = -90min: h0 at -2h is excluded, h1 at -1h and h2 at 0 included let rows = store - .list( + .list_schema( &k, TimeRange { from: Some(now - Duration::minutes(90)), @@ -680,7 +810,7 @@ mod trait_tests { // to = -30min (exclusive): h2 at 0 excluded, h0 and h1 included let rows = store - .list( + .list_schema( &k, TimeRange { from: None, @@ -698,15 +828,15 @@ mod trait_tests { async fn latest_returns_most_recent_or_none() { let (_dir, store) = temp_store(); let k = key("p", "x"); - assert!(store.latest(&k).await.unwrap().is_none()); + assert!(store.latest_schema(&k).await.unwrap().is_none()); let mut s1 = make_snap("old", "x"); s1.timestamp = Utc::now() - Duration::hours(1); let s2 = make_snap("new", "x"); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let latest = store.latest(&k).await.unwrap().unwrap(); + let latest = store.latest_schema(&k).await.unwrap().unwrap(); assert_eq!(latest.content_hash, "new"); } @@ -717,10 +847,10 @@ mod trait_tests { let mut s1 = make_snap("old", "x"); s1.timestamp = Utc::now() - Duration::hours(1); let s2 = make_snap("new", "x"); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let got = store.get(&k, SnapshotRef::Latest).await.unwrap(); + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(got.content_hash, "new"); } @@ -733,12 +863,12 @@ mod trait_tests { s1.timestamp = now - Duration::hours(2); let mut s2 = make_snap("h2", "x"); s2.timestamp = now; - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); // at -1h: h2 is in the future, only h1 qualifies let got = store - .get(&k, SnapshotRef::At(now - Duration::hours(1))) + .get_schema(&k, SnapshotRef::At(now - Duration::hours(1))) .await .unwrap(); assert_eq!(got.content_hash, "h1"); @@ -749,17 +879,22 @@ mod trait_tests { let (_dir, store) = temp_store(); let a = key("p", "auth"); let b = key("p", "billing"); - store.put(&a, &make_snap("shared", "auth")).await.unwrap(); + store + .put_schema(&a, &make_snap("shared", "auth")) + .await + .unwrap(); // direct lookup under correct key works let got = store - .get(&a, SnapshotRef::Hash("shared".into())) + .get_schema(&a, SnapshotRef::Hash("shared".into())) .await .unwrap(); assert_eq!(got.content_hash, "shared"); // same hash under different key fails — content_hash lookup is key-scoped - let result = store.get(&b, SnapshotRef::Hash("shared".into())).await; + let result = store + .get_schema(&b, SnapshotRef::Hash("shared".into())) + .await; assert!(result.is_err()); } @@ -767,14 +902,19 @@ mod trait_tests { async fn get_missing_returns_error() { let (_dir, store) = temp_store(); let k = key("p", "x"); - assert!(store.get(&k, SnapshotRef::Latest).await.is_err()); + assert!(store.get_schema(&k, SnapshotRef::Latest).await.is_err()); + assert!( + store + .get_schema(&k, SnapshotRef::Hash("nope".into())) + .await + .is_err() + ); assert!( store - .get(&k, SnapshotRef::Hash("nope".into())) + .get_schema(&k, SnapshotRef::At(Utc::now())) .await .is_err() ); - assert!(store.get(&k, SnapshotRef::At(Utc::now())).await.is_err()); } #[tokio::test] @@ -785,16 +925,16 @@ mod trait_tests { for (i, hash) in ["h0", "h1", "h2", "h3"].iter().enumerate() { let mut s = make_snap(hash, "x"); s.timestamp = now - Duration::hours(3 - i as i64); - store.put(&k, &s).await.unwrap(); + store.put_schema(&k, &s).await.unwrap(); } let deleted = store - .delete_before(&k, now - Duration::minutes(90)) + .delete_schema_before(&k, now - Duration::minutes(90)) .await .unwrap(); assert_eq!(deleted, 2); // h0 (-3h) and h1 (-2h) - let remaining = store.list(&k, TimeRange::default()).await.unwrap(); + let remaining = store.list_schema(&k, TimeRange::default()).await.unwrap(); assert_eq!(remaining.len(), 2); assert_eq!(remaining[0].content_hash, "h3"); assert_eq!(remaining[1].content_hash, "h2"); @@ -807,19 +947,33 @@ mod trait_tests { let b = key("p", "billing"); let mut s = make_snap("h", "auth"); s.timestamp = Utc::now() - Duration::hours(2); - store.put(&a, &s).await.unwrap(); + store.put_schema(&a, &s).await.unwrap(); let mut s = make_snap("h", "billing"); s.timestamp = Utc::now() - Duration::hours(2); - store.put(&b, &s).await.unwrap(); + store.put_schema(&b, &s).await.unwrap(); // delete in `a` should not touch `b` let deleted = store - .delete_before(&a, Utc::now() - Duration::hours(1)) + .delete_schema_before(&a, Utc::now() - Duration::hours(1)) .await .unwrap(); assert_eq!(deleted, 1); - assert_eq!(store.list(&a, TimeRange::default()).await.unwrap().len(), 0); - assert_eq!(store.list(&b, TimeRange::default()).await.unwrap().len(), 1); + assert_eq!( + store + .list_schema(&a, TimeRange::default()) + .await + .unwrap() + .len(), + 0 + ); + assert_eq!( + store + .list_schema(&b, TimeRange::default()) + .await + .unwrap() + .len(), + 1 + ); } #[tokio::test] @@ -830,19 +984,19 @@ mod trait_tests { // put under three streams, with one stream getting two snapshots store - .put(&key("p", "billing"), &make_snap("h1", "billing")) + .put_schema(&key("p", "billing"), &make_snap("h1", "billing")) .await .unwrap(); store - .put(&key("p", "auth"), &make_snap("h2", "auth")) + .put_schema(&key("p", "auth"), &make_snap("h2", "auth")) .await .unwrap(); store - .put(&key("p", "auth"), &make_snap("h3", "auth")) + .put_schema(&key("p", "auth"), &make_snap("h3", "auth")) .await .unwrap(); store - .put(&key("other", "auth"), &make_snap("h4", "auth")) + .put_schema(&key("other", "auth"), &make_snap("h4", "auth")) .await .unwrap(); @@ -872,66 +1026,6 @@ mod trait_tests { ); } - use crate::schema::{ - ActivityStatsSnapshot, IndexActivity, IndexActivityEntry, NodeIdentity, - PlannerStatsSnapshot, QualifiedName, TableActivity, TableActivityEntry, - }; - - fn make_planner(schema_ref: &str, db: &str, hash: &str) -> PlannerStatsSnapshot { - PlannerStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: db.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - tables: vec![], - columns: vec![], - indexes: vec![], - } - } - - fn make_activity(schema_ref: &str, db: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { - ActivityStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: db.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - node: NodeIdentity { - label: label.into(), - host: format!("host-{label}"), - is_standby: label != "primary", - replication_lag_bytes: None, - stats_reset: None, - }, - tables: vec![TableActivityEntry { - table: QualifiedName::new("public", "orders"), - activity: TableActivity { - seq_scan: 1, - idx_scan: 2, - n_live_tup: 0, - n_dead_tup: 0, - last_vacuum: None, - last_autovacuum: None, - last_analyze: None, - last_autoanalyze: None, - vacuum_count: 0, - autovacuum_count: 0, - analyze_count: 0, - autoanalyze_count: 0, - }, - }], - indexes: vec![IndexActivityEntry { - index: QualifiedName::new("public", "orders_pkey"), - activity: IndexActivity { - idx_scan: 0, - idx_tup_read: 0, - idx_tup_fetch: 0, - }, - }], - } - } - #[tokio::test] async fn snapshot_get_filters_to_kind_schema() { // Regression: planner_stats rows must not bleed into SnapshotStore::get(Latest). @@ -939,13 +1033,13 @@ mod trait_tests { let k = key("p", "auth"); let schema = make_snap("schema-h1", "auth"); - store.put(&k, &schema).await.unwrap(); + store.put_schema(&k, &schema).await.unwrap(); // Insert a newer planner_stats row referring to the schema. let planner = make_planner("schema-h1", "auth", "planner-h1"); store.put_planner_stats(&k, &planner).await.unwrap(); - let got = store.get(&k, SnapshotRef::Latest).await.unwrap(); + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(got.content_hash, "schema-h1"); } @@ -955,7 +1049,7 @@ mod trait_tests { let k = key("p", "auth"); let schema = make_snap("schema-h1", "auth"); - store.put(&k, &schema).await.unwrap(); + store.put_schema(&k, &schema).await.unwrap(); let planner = make_planner("schema-h1", "auth", "planner-h1"); store.put_planner_stats(&k, &planner).await.unwrap(); let primary = make_activity("schema-h1", "auth", "primary", "act-primary-1"); @@ -973,7 +1067,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); for label in ["primary", "replica1", "replica2"] { @@ -994,13 +1088,19 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); - store.put(&k, &make_snap("schema-A", "auth")).await.unwrap(); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); let planner = make_planner("schema-A", "auth", "planner-A"); store.put_planner_stats(&k, &planner).await.unwrap(); // small sleep to ensure later timestamp ordering tokio::time::sleep(std::time::Duration::from_millis(5)).await; - store.put(&k, &make_snap("schema-B", "auth")).await.unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); let bundle = store.get_annotated(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(bundle.schema.content_hash, "schema-B"); @@ -1015,7 +1115,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); @@ -1029,7 +1129,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); @@ -1044,4 +1144,202 @@ mod trait_tests { let primary = bundle.activity_by_node.get("primary").unwrap(); assert_eq!(primary.content_hash, "act-2"); } + + // --- dedup correctness (commit 2f9a353) --- + + #[tokio::test] + async fn put_planner_dedupes_only_within_same_schema_ref() { + // Same content_hash under a different schema_ref must NOT collapse. + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); + + let p_a = make_planner("schema-A", "auth", "shared-hash"); + let p_b = make_planner("schema-B", "auth", "shared-hash"); + + assert_eq!( + store.put_planner_stats(&k, &p_a).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_planner_stats(&k, &p_a).await.unwrap(), + PutOutcome::Deduped + ); + assert_eq!( + store.put_planner_stats(&k, &p_b).await.unwrap(), + PutOutcome::Inserted + ); + } + + #[tokio::test] + async fn put_activity_dedupes_only_within_same_schema_ref_and_node() { + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); + + let a_primary_a = make_activity("schema-A", "auth", "primary", "shared-hash"); + let a_primary_b = make_activity("schema-B", "auth", "primary", "shared-hash"); + let a_replica_a = make_activity("schema-A", "auth", "replica", "shared-hash"); + + assert_eq!( + store.put_activity_stats(&k, &a_primary_a).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_activity_stats(&k, &a_primary_a).await.unwrap(), + PutOutcome::Deduped + ); + // different schema_ref, same node + hash → insert + assert_eq!( + store.put_activity_stats(&k, &a_primary_b).await.unwrap(), + PutOutcome::Inserted + ); + // different node, same schema_ref + hash → insert + assert_eq!( + store.put_activity_stats(&k, &a_replica_a).await.unwrap(), + PutOutcome::Inserted + ); + } + + // --- kind-aware trait API (commit 1726fa1) --- + + #[tokio::test] + async fn list_kinds_reports_distinct_kinds_and_node_labels() { + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + assert!(store.list_kinds(&k).await.unwrap().is_empty()); + + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "primary", "act-p")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "replica1", "act-r")) + .await + .unwrap(); + + let kinds = store.list_kinds(&k).await.unwrap(); + assert!(kinds.contains(&SnapshotKind::Schema)); + assert!(kinds.contains(&SnapshotKind::Planner)); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "primary".into() + })); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "replica1".into() + })); + } + + #[tokio::test] + async fn get_via_trait_returns_typed_payload_per_kind() { + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "primary", "act-1")) + .await + .unwrap(); + + let s = store + .get(&k, &SnapshotKind::Schema, SnapshotRef::Latest) + .await + .unwrap() + .into_schema() + .unwrap(); + assert_eq!(s.content_hash, "schema-h1"); + + let p = store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .unwrap() + .into_planner() + .unwrap(); + assert_eq!(p.content_hash, "planner-h1"); + + let a = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(a.content_hash, "act-1"); + } + + #[tokio::test] + async fn delete_before_scoped_to_kind_only() { + // delete_before for activity must not touch planner or schema rows. + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + let mut a = make_activity("schema-h1", "auth", "primary", "act-1"); + a.timestamp = Utc::now() - Duration::hours(2); + store.put_activity_stats(&k, &a).await.unwrap(); + + let removed = store + .delete_before( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + Utc::now() - Duration::hours(1), + ) + .await + .unwrap(); + assert_eq!(removed, 1); + // schema and planner untouched + assert!(store.get_schema(&k, SnapshotRef::Latest).await.is_ok()); + assert!( + store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .is_ok() + ); + } } diff --git a/crates/dry_run_core/src/history/test_fixtures.rs b/crates/dry_run_core/src/history/test_fixtures.rs new file mode 100644 index 0000000..d9e8c8b --- /dev/null +++ b/crates/dry_run_core/src/history/test_fixtures.rs @@ -0,0 +1,92 @@ +use chrono::Utc; + +use crate::history::{DatabaseId, ProjectId, SnapshotKey}; +use crate::schema::{ + ActivityStatsSnapshot, IndexActivity, IndexActivityEntry, NodeIdentity, PlannerStatsSnapshot, + QualifiedName, SchemaSnapshot, TableActivity, TableActivityEntry, +}; + +pub(super) fn make_snap(hash: &str, database: &str) -> SchemaSnapshot { + SchemaSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: database.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + source: None, + tables: vec![], + enums: vec![], + domains: vec![], + composites: vec![], + views: vec![], + functions: vec![], + extensions: vec![], + gucs: vec![], + } +} + +pub(super) fn make_planner(schema_ref: &str, db: &str, hash: &str) -> PlannerStatsSnapshot { + PlannerStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: db.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + tables: vec![], + columns: vec![], + indexes: vec![], + } +} + +pub(super) fn make_activity( + schema_ref: &str, + db: &str, + label: &str, + hash: &str, +) -> ActivityStatsSnapshot { + ActivityStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: db.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + node: NodeIdentity { + label: label.into(), + host: format!("host-{label}"), + is_standby: label != "primary", + replication_lag_bytes: None, + stats_reset: None, + }, + tables: vec![TableActivityEntry { + table: QualifiedName::new("public", "orders"), + activity: TableActivity { + seq_scan: 1, + idx_scan: 2, + n_live_tup: 0, + n_dead_tup: 0, + last_vacuum: None, + last_autovacuum: None, + last_analyze: None, + last_autoanalyze: None, + vacuum_count: 0, + autovacuum_count: 0, + analyze_count: 0, + autoanalyze_count: 0, + }, + }], + indexes: vec![IndexActivityEntry { + index: QualifiedName::new("public", "orders_pkey"), + activity: IndexActivity { + idx_scan: 0, + idx_tup_read: 0, + idx_tup_fetch: 0, + }, + }], + } +} + +pub(super) fn key(proj: &str, db: &str) -> SnapshotKey { + SnapshotKey { + project_id: ProjectId(proj.into()), + database_id: DatabaseId(db.into()), + } +} diff --git a/tests/snapshot-e2e/.gitignore b/tests/snapshot-e2e/.gitignore new file mode 100644 index 0000000..c2d1b9f --- /dev/null +++ b/tests/snapshot-e2e/.gitignore @@ -0,0 +1,2 @@ +shared/ +workstations/ diff --git a/tests/snapshot-e2e/Dockerfile.dryrun b/tests/snapshot-e2e/Dockerfile.dryrun new file mode 100644 index 0000000..1b10ab7 --- /dev/null +++ b/tests/snapshot-e2e/Dockerfile.dryrun @@ -0,0 +1,30 @@ +# Two-stage build producing one image with both v0.6.1 and HEAD binaries. +# Build context = repo root. + +FROM rust:1.88-bookworm AS old +WORKDIR /build +RUN apt-get update && apt-get install -y --no-install-recommends \ + git pkg-config libssl-dev clang libclang-dev \ + && rm -rf /var/lib/apt/lists/* +RUN git clone --depth 1 --branch v0.6.1 https://github.com/boringSQL/dryrun.git src +WORKDIR /build/src +RUN cargo build --release --bin dryrun +RUN cp target/release/dryrun /dryrun-old + +FROM rust:1.88-bookworm AS new +WORKDIR /build +RUN apt-get update && apt-get install -y --no-install-recommends \ + pkg-config libssl-dev clang libclang-dev \ + && rm -rf /var/lib/apt/lists/* +COPY . . +RUN cargo build --release --bin dryrun +RUN cp target/release/dryrun /dryrun-new + +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates jq postgresql-client bash coreutils && rm -rf /var/lib/apt/lists/* +COPY --from=old /dryrun-old /usr/local/bin/dryrun-old +COPY --from=new /dryrun-new /usr/local/bin/dryrun-new +RUN ln -s /usr/local/bin/dryrun-new /usr/local/bin/dryrun +WORKDIR /work +CMD ["bash"] diff --git a/tests/snapshot-e2e/README.md b/tests/snapshot-e2e/README.md new file mode 100644 index 0000000..69b400c --- /dev/null +++ b/tests/snapshot-e2e/README.md @@ -0,0 +1,75 @@ +# snapshot-e2e + +End-to-end black-box tests for the shared-filesystem snapshot store +(`dryrun snapshot push --to-path` / `pull --from-path`), including +cross-version compatibility against the **v0.6.1** baseline. + +The Rust unit tests in `crates/dry_run_core/src/history/filesystem_store.rs` +cover internal correctness. This suite covers the things only a real +multi-process / multi-binary / real-Postgres setup can: cross-version +compat, concurrent writers, filesystem corruption, multi-database flows. + +## Running + +```sh +./harness.sh # all scenarios, full Docker (HEAD + v0.6.1) +./harness.sh 's04*.sh' # filter scenarios by glob +./harness.sh -- bash # drop into the runner container +./harness.sh down # stop the stack +./harness.sh rebuild # rebuild after CLI code changes + +./run-native.sh # HEAD only, host cargo build, single pg-a +./run-native.sh 's01*.sh' # filter +``` + +`harness.sh` keeps the runner container alive between invocations +(`up -d` + `exec`), so warm runs land in **~1.5–2 s**. `run-native.sh` +skips Docker for the dryrun binary entirely — best for iterating while +authoring new scenarios. + +Scenarios that need the v0.6.1 binary tag themselves +`# NATIVE: skip`; `run-native.sh` honors that. + +## Layout + +``` +snapshot-e2e/ +├── docker-compose.yml # pg-a, pg-b, pg-c (tmpfs), persistent runner +├── Dockerfile.dryrun # multi-stage: dryrun-old (v0.6.1) + dryrun-new (HEAD) +├── harness.sh # full-Docker entrypoint +├── run-native.sh # native fast-feedback entrypoint +├── run.sh # invoked inside the runner; aggregates scenarios +├── lib.sh # shared helpers: scenario / reset_* / ws_run / assert_* +├── fixtures/schemas/*.sql # seed SQL +├── scenarios/sNN_*.sh # one bash script per scenario +├── shared/ # bind-mounted "team filesystem" (gitignored) +└── workstations/{devA,devB}/ # per-workstation HOMEs (gitignored) +``` + +## Adding a scenario + +Each scenario is ~30 lines of bash. Copy an existing one in `scenarios/` +and tweak. Helpers from `lib.sh`: + +| Helper | What it does | +| ------------------------------- | ------------------------------------------------------------------------ | +| `scenario "TITLE"` | Names the scenario; `ok` / `fail` print TAP-ish output. | +| `reset_shared` | Wipes the shared dir. | +| `reset_workstation devA` | Clears `workstations/devA/` and writes a fresh `dryrun.toml`. | +| `reset_db / seed_db `| Drops `public` and re-seeds. | +| `ws_run devA ` | Runs a command with `cd` + `HOME` set to the workstation dir. | +| `assert_eq`, `assert_contains`, `assert_no_tmp_files`, `assert_jq`, … | Cheap assertions; failures print but don't `exit`. | + +Naming: `sNN__.sh` where `` matches a row in +the design doc (`internal-docs/snapshot-share-tests.md`). Scenarios that +require the v0.6.1 binary should put `# NATIVE: skip` near the top. + +## Tearing down + +```sh +./harness.sh down # stop containers +docker compose down -v # also drop networks (rare) +``` + +The `shared/` and `workstations/` bind-mount roots persist between runs; +`reset_*` clears their contents at scenario start. diff --git a/tests/snapshot-e2e/docker-compose.yml b/tests/snapshot-e2e/docker-compose.yml new file mode 100644 index 0000000..92331c7 --- /dev/null +++ b/tests/snapshot-e2e/docker-compose.yml @@ -0,0 +1,66 @@ +services: + pg-a: + image: postgres:16-alpine + environment: + POSTGRES_PASSWORD: dryrun + POSTGRES_DB: app + # Ephemeral host port for run-native.sh — `docker compose port pg-a 5432` + # resolves it; in the runner container we still use service DNS. + ports: + - "5432" + tmpfs: + - /var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d app"] + interval: 1s + timeout: 3s + retries: 30 + + pg-b: + image: postgres:16-alpine + environment: + POSTGRES_PASSWORD: dryrun + POSTGRES_DB: app + tmpfs: + - /var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d app"] + interval: 1s + timeout: 3s + retries: 30 + + pg-c: + image: postgres:16-alpine + environment: + POSTGRES_PASSWORD: dryrun + POSTGRES_DB: app + tmpfs: + - /var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d app"] + interval: 1s + timeout: 3s + retries: 30 + + runner: + build: + context: ../.. + dockerfile: tests/snapshot-e2e/Dockerfile.dryrun + depends_on: + pg-a: { condition: service_healthy } + pg-b: { condition: service_healthy } + pg-c: { condition: service_healthy } + environment: + PG_A_URL: postgres://postgres:dryrun@pg-a:5432/app + PG_B_URL: postgres://postgres:dryrun@pg-b:5432/app + PG_C_URL: postgres://postgres:dryrun@pg-c:5432/app + volumes: + - ./shared:/shared + - ./workstations:/workstations + - ./fixtures:/fixtures:ro + - ./scenarios:/scenarios:ro + - ./lib.sh:/lib.sh:ro + - ./run.sh:/run.sh:ro + working_dir: /work + # Stay alive so `harness.sh` can `docker compose exec` repeatedly. + command: ["sleep", "infinity"] diff --git a/tests/snapshot-e2e/fixtures/schemas/01_simple.sql b/tests/snapshot-e2e/fixtures/schemas/01_simple.sql new file mode 100644 index 0000000..dc0e511 --- /dev/null +++ b/tests/snapshot-e2e/fixtures/schemas/01_simple.sql @@ -0,0 +1,14 @@ +CREATE TABLE users ( + user_id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + email TEXT NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE orders ( + order_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(user_id), + total NUMERIC(12,2) NOT NULL CHECK (total >= 0), + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX orders_by_user ON orders(user_id); diff --git a/tests/snapshot-e2e/harness.sh b/tests/snapshot-e2e/harness.sh new file mode 100755 index 0000000..5ea2206 --- /dev/null +++ b/tests/snapshot-e2e/harness.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +# Persistent-runner wrapper. Brings the stack up once, then `exec`s the +# scenarios against the running container — saves ~3-5s per invocation +# vs `docker compose run --rm`. +# +# Usage: +# ./harness.sh # run all scenarios +# ./harness.sh 's03*.sh' # filter scenarios by glob +# ./harness.sh -- bash # drop into a shell in the runner +# ./harness.sh down # stop the stack +# ./harness.sh rebuild # rebuild the runner image (after code changes) + +set -uo pipefail +cd "$(dirname "$0")" +# Bind-mount roots — must exist before `docker compose up` so the +# container's /shared and /workstations land on a writable host dir. +mkdir -p shared workstations + +case "${1:-run}" in + down) + exec docker compose down + ;; + rebuild) + exec docker compose build runner + ;; + --) + shift + docker compose up -d --wait >/dev/null + exec docker compose exec runner "$@" + ;; + *) + docker compose up -d --wait >/dev/null + docker compose exec runner bash /run.sh "$@" + ;; +esac diff --git a/tests/snapshot-e2e/lib.sh b/tests/snapshot-e2e/lib.sh new file mode 100644 index 0000000..e8bd87c --- /dev/null +++ b/tests/snapshot-e2e/lib.sh @@ -0,0 +1,129 @@ +# Shared helpers for scenario scripts. Sourced, not executed. + +: "${SHARED_DIR:=/shared}" +: "${WORKSTATIONS_DIR:=/workstations}" +: "${FIXTURES_DIR:=/fixtures}" + +SCENARIO_NAME="" +SCENARIO_FAILED=0 + +scenario() { + SCENARIO_NAME="$1" + SCENARIO_FAILED=0 + echo "# --- $SCENARIO_NAME" +} + +ok() { + if [ "$SCENARIO_FAILED" -eq 0 ]; then + echo "ok - $SCENARIO_NAME" + else + echo "not ok - $SCENARIO_NAME" + exit 1 + fi +} + +fail() { + SCENARIO_FAILED=1 + echo " FAIL: $*" >&2 +} + +reset_shared() { + mkdir -p "$SHARED_DIR" + find "$SHARED_DIR" -mindepth 1 -delete 2>/dev/null || true +} + +reset_workstation() { + local name="$1" + mkdir -p "$WORKSTATIONS_DIR/$name" + find "$WORKSTATIONS_DIR/$name" -mindepth 1 -delete 2>/dev/null || true + mkdir -p "$WORKSTATIONS_DIR/$name/.dryrun" + # Shared project_id across workstations so pushes/pulls land in the same + # /shared/// subtree. + cat > "$WORKSTATIONS_DIR/$name/dryrun.toml" </dev/null +} + +reset_db() { + local pg_url="$1" + psql "$pg_url" -v ON_ERROR_STOP=1 -c "DROP SCHEMA public CASCADE; CREATE SCHEMA public;" >/dev/null +} + +assert_eq() { + local got="$1" want="$2" msg="${3:-assert_eq}" + if [ "$got" != "$want" ]; then + fail "$msg: got=[$got] want=[$want]" + fi +} + +assert_contains() { + local haystack="$1" needle="$2" msg="${3:-assert_contains}" + case "$haystack" in + *"$needle"*) : ;; + *) fail "$msg: missing [$needle] in output" ;; + esac +} + +assert_jq() { + local json="$1" expr="$2" msg="${3:-assert_jq}" + if ! echo "$json" | jq -e "$expr" >/dev/null 2>&1; then + fail "$msg: jq expression failed: $expr" + echo " json was: $json" >&2 + fi +} + +assert_file_exists() { + [ -f "$1" ] || fail "expected file: $1" +} + +assert_no_tmp_files() { + local dir="$1" + local found + found="$(find "$dir" -name '*.tmp' 2>/dev/null | head -n 1)" + [ -z "$found" ] || fail "stray .tmp file: $found" +} + +# Verify filename hash equals recomputed content_hash for every snapshot +# in a directory. Catches C5 / C6 corruption silently slipping through. +assert_filenames_match_hash() { + local dir="$1" + while IFS= read -r f; do + local fname expected_hash recomputed + fname="$(basename "$f")" + expected_hash="${fname##*-}" + expected_hash="${expected_hash%.json.zst}" + recomputed="$(zstd -dc "$f" | sha256sum | awk '{print $1}')" + # The plan stores hash of the SchemaSnapshot JSON, not file bytes — + # so this assertion needs the dryrun binary to verify properly. + # Placeholder: just check the field is hex of correct length. + if ! echo "$expected_hash" | grep -Eq '^[0-9a-f]{64}$'; then + fail "bad hash format in filename: $fname" + fi + done < <(find "$dir" -name '*.json.zst' -type f) +} diff --git a/tests/snapshot-e2e/run-native.sh b/tests/snapshot-e2e/run-native.sh new file mode 100755 index 0000000..983d47f --- /dev/null +++ b/tests/snapshot-e2e/run-native.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# Native fast-feedback loop: HEAD `dryrun` binary built locally, against a +# single pg-a container, no runner image. ~10x faster iteration than the +# full Docker harness. Skips cross-version (OLD vs NEW) scenarios — those +# require the dual-binary image and should run via ./harness.sh. +# +# Usage: +# ./run-native.sh # all native-eligible scenarios +# ./run-native.sh 's01*.sh' # filter + +set -uo pipefail +cd "$(dirname "$0")" +ROOT="$(cd ../.. && pwd)" + +# 1. Build the local binary in release mode (cached after first run). +( cd "$ROOT" && cargo build --release --bin dryrun --quiet ) || { + echo "cargo build failed" >&2 + exit 1 +} + +# 2. Bring up just pg-a. +docker compose up -d --wait pg-a >/dev/null + +# 3. Wire env so scenarios that use $PG_A_URL / dryrun-new work as on Docker. +SCRATCH="$(mktemp -d)" +trap 'rm -rf "$SCRATCH"' EXIT + +export SHARED_DIR="$SCRATCH/shared" +export WORKSTATIONS_DIR="$SCRATCH/workstations" +export FIXTURES_DIR="$PWD/fixtures" +export PG_A_URL="postgres://postgres:dryrun@127.0.0.1:$(docker compose port pg-a 5432 | cut -d: -f2)/app" +mkdir -p "$SHARED_DIR" "$WORKSTATIONS_DIR" + +# Shadow `dryrun-new` / `dryrun-old` resolution so ws_run finds them. +# Native runs only support dryrun-new (HEAD); scenarios that need dryrun-old +# bail by tagging themselves `# NATIVE: skip`. +BIN_DIR="$SCRATCH/bin" +mkdir -p "$BIN_DIR" +ln -sf "$ROOT/target/release/dryrun" "$BIN_DIR/dryrun-new" +cat > "$BIN_DIR/dryrun-old" <<'EOF' +#!/usr/bin/env bash +echo "dryrun-old not available in native mode — use ./harness.sh" >&2 +exit 127 +EOF +chmod +x "$BIN_DIR/dryrun-old" +export PATH="$BIN_DIR:$PATH" + +filter="${1:-*.sh}" +total=0 +passed=0 +failed_list=() + +for s in scenarios/$filter; do + [ -f "$s" ] || continue + if grep -q '^# NATIVE: skip' "$s"; then + echo "# skipped (native): $(basename "$s")" + continue + fi + total=$((total + 1)) + if bash -c ". ./lib.sh; . $s"; then + passed=$((passed + 1)) + else + failed_list+=("$(basename "$s")") + fi +done + +echo +echo "1..$total" +echo "passed: $passed / $total" +if [ "${#failed_list[@]}" -gt 0 ]; then + echo "failed:" + printf ' - %s\n' "${failed_list[@]}" + exit 1 +fi diff --git a/tests/snapshot-e2e/run.sh b/tests/snapshot-e2e/run.sh new file mode 100755 index 0000000..0eccc27 --- /dev/null +++ b/tests/snapshot-e2e/run.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -uo pipefail + +SCENARIOS_DIR="${SCENARIOS_DIR:-/scenarios}" +LIB="${LIB:-/lib.sh}" + +filter="${1:-*.sh}" +total=0 +passed=0 +failed_list=() + +for s in "$SCENARIOS_DIR"/$filter; do + [ -f "$s" ] || continue + total=$((total+1)) + name="$(basename "$s")" + if bash -c ". $LIB; . $s"; then + passed=$((passed+1)) + else + failed_list+=("$name") + fi +done + +echo +echo "1..$total" +echo "passed: $passed / $total" +if [ "${#failed_list[@]}" -gt 0 ]; then + echo "failed:" + printf ' - %s\n' "${failed_list[@]}" + exit 1 +fi diff --git a/tests/snapshot-e2e/scenarios/s01_uc1_fresh_clone.sh b/tests/snapshot-e2e/scenarios/s01_uc1_fresh_clone.sh new file mode 100644 index 0000000..8373f3c --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s01_uc1_fresh_clone.sh @@ -0,0 +1,23 @@ +# A1 — UC1 fresh clone: devB starts empty, pulls, and sees devA's snapshot. +scenario "A1: UC1 fresh clone pull" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# devA: take + push +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" + +# Filesystem invariants +assert_no_tmp_files "$SHARED_DIR" +assert_filenames_match_hash "$SHARED_DIR" + +# devB: pull and list — must NOT touch Postgres +out="$(ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" || fail "pull errored: $out" +list_out="$(ws_run devB dryrun-new snapshot list --db "$PG_A_URL" 2>&1)" || fail "list errored: $list_out" +assert_contains "$list_out" "" "list produced output" + +ok diff --git a/tests/snapshot-e2e/scenarios/s02_b1_new_reads_old_export.sh b/tests/snapshot-e2e/scenarios/s02_b1_new_reads_old_export.sh new file mode 100644 index 0000000..c2ea286 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s02_b1_new_reads_old_export.sh @@ -0,0 +1,25 @@ +# B1 — NEW must read v0.6.1's `snapshot export` output unchanged. +# This is the single most load-bearing claim in the plan. +# NATIVE: skip — needs dryrun-old (v0.6.1) which only the Docker image carries. +scenario "B1: NEW reads OLD export output" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# OLD: take + export to shared/ (v0.6.1 export takes no --db, reads history.db) +ws_run devA dryrun-old snapshot take --db "$PG_A_URL" +ws_run devA dryrun-old snapshot export --out "$SHARED_DIR" 2>&1 || \ + fail "OLD snapshot export failed (verify v0.6.1 surface)" + +# NEW: pull from the same dir on a fresh workstation +out="$(ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" \ + || fail "NEW pull failed against OLD export: $out" + +# diff against a fresh take from the same DB — should be empty changeset +diff_out="$(ws_run devB dryrun-new snapshot diff --db "$PG_A_URL" --latest 2>&1)" \ + || fail "NEW diff against pulled OLD snapshot failed: $diff_out" + +ok diff --git a/tests/snapshot-e2e/scenarios/s03_c5_corruption_detection.sh b/tests/snapshot-e2e/scenarios/s03_c5_corruption_detection.sh new file mode 100644 index 0000000..cb4c832 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s03_c5_corruption_detection.sh @@ -0,0 +1,28 @@ +# C5 — flip a byte in a pushed snapshot; pull/get must surface CorruptSnapshot. +scenario "C5: corruption detection — filename hash != recomputed" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" + +target="$(find "$SHARED_DIR" -name '*.json.zst' -type f | head -n 1)" +[ -n "$target" ] || { fail "no pushed snapshot found"; ok; return; } + +# Flip one byte in the middle of the file (zstd payload). +size=$(stat -c%s "$target") +mid=$((size / 2)) +printf '\x42' | dd of="$target" bs=1 count=1 seek=$mid conv=notrunc status=none + +# NEW pull must fail loudly, not silently accept the corrupt file. +if out="$(ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)"; then + fail "pull SUCCEEDED on corrupt file (expected loud failure): $out" +else + assert_contains "$out" "corrupt" "error mentions corruption" +fi + +ok diff --git a/tests/snapshot-e2e/scenarios/s04_d1_concurrent_same_hash.sh b/tests/snapshot-e2e/scenarios/s04_d1_concurrent_same_hash.sh new file mode 100644 index 0000000..c489248 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s04_d1_concurrent_same_hash.sh @@ -0,0 +1,28 @@ +# D1 — two workstations push the same snapshot concurrently. +# Idempotency contract: final state is exactly one file, no .tmp debris. +scenario "D1: concurrent push of identical snapshot" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# Both devs take from the same DB at the same point → identical content_hash. +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" +# Copy devA's history to devB so they have identical local snapshots. +cp -r "$WORKSTATIONS_DIR/devA/.dryrun/." "$WORKSTATIONS_DIR/devB/.dryrun/" + +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" & +pid_a=$! +ws_run devB dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" & +pid_b=$! +wait $pid_a || fail "devA push failed" +wait $pid_b || fail "devB push failed" + +assert_no_tmp_files "$SHARED_DIR" + +count="$(find "$SHARED_DIR" -name '*.json.zst' -type f | wc -l | tr -d ' ')" +assert_eq "$count" "1" "exactly one snapshot file (idempotent)" + +ok diff --git a/tests/snapshot-e2e/scenarios/s05_a2_versioned_history.sh b/tests/snapshot-e2e/scenarios/s05_a2_versioned_history.sh new file mode 100644 index 0000000..55a109d --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s05_a2_versioned_history.sh @@ -0,0 +1,35 @@ +# A2 — UC2 versioned history: multi-take/push produces ordered, distinct +# snapshots; `list` returns them by descending ts; `diff --from --to` +# resolves them to a non-empty changeset. +scenario "A2: versioned history (multi-take/push)" + +reset_shared +reset_workstation devA +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 + +# Mutate schema and take again → second snapshot must have a different hash. +psql "$PG_A_URL" -v ON_ERROR_STOP=1 -c "ALTER TABLE users ADD COLUMN nickname TEXT;" >/dev/null +sleep 1 # filename ts uses second precision; force a distinct slot +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 + +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 + +count="$(find "$SHARED_DIR" -name '*.json.zst' -type f | wc -l | tr -d ' ')" +assert_eq "$count" "2" "two distinct snapshots in /shared" + +# list output should mention 2 snapshot lines. +list_out="$(ws_run devA dryrun-new snapshot list --db "$PG_A_URL" 2>&1)" +listed="$(echo "$list_out" | grep -cE '^[0-9]{4}-[0-9]{2}-[0-9]{2}')" +assert_eq "$listed" "2" "list reports both snapshots" + +# diff between the two hashes must be a non-empty changeset (added column). +hashes=($(find "$SHARED_DIR" -name '*.json.zst' -type f \ + | sed -E 's|.*-([0-9a-f]{64})\.json\.zst$|\1|' | sort -u)) +assert_eq "${#hashes[@]}" "2" "two distinct content hashes" +diff_out="$(ws_run devA dryrun-new snapshot diff --from "${hashes[0]}" --to "${hashes[1]}" --db "$PG_A_URL" 2>&1)" +assert_contains "$diff_out" "nickname" "diff mentions added column" + +ok diff --git a/tests/snapshot-e2e/scenarios/s06_a3_multi_db_all.sh b/tests/snapshot-e2e/scenarios/s06_a3_multi_db_all.sh new file mode 100644 index 0000000..16fc286 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s06_a3_multi_db_all.sh @@ -0,0 +1,47 @@ +# A3 — UC3 multi-database `--all`: dryrun.toml declares two database_ids +# bound to the same physical DB. `take` per profile + `push --all` writes +# two subtrees; `pull --all` on a fresh workstation rebuilds both. +scenario "A3: multi-database --all" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# Override the default single-profile toml with a two-profile config. +cat > "$WORKSTATIONS_DIR/devA/dryrun.toml" </dev/null 2>&1 +DATABASE_URL="$PG_A_URL" ws_run devA dryrun-new --profile billing snapshot take >/dev/null 2>&1 + +# Push --all should ship both streams. +DATABASE_URL="$PG_A_URL" ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --all >/dev/null 2>&1 + +assert_file_exists "$(find "$SHARED_DIR/shared/auth" -name '*.json.zst' | head -1)" +assert_file_exists "$(find "$SHARED_DIR/shared/billing" -name '*.json.zst' | head -1)" + +# Pull --all on devB rebuilds both. +DATABASE_URL="$PG_A_URL" ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --all >/dev/null 2>&1 +auth_list="$(DATABASE_URL="$PG_A_URL" ws_run devB dryrun-new --profile auth snapshot list 2>&1)" +billing_list="$(DATABASE_URL="$PG_A_URL" ws_run devB dryrun-new --profile billing snapshot list 2>&1)" +assert_contains "$auth_list" "snapshot(s) total" "auth list non-empty" +assert_contains "$billing_list" "snapshot(s) total" "billing list non-empty" + +ok diff --git a/tests/snapshot-e2e/scenarios/s07_a4_round_trip_identity.sh b/tests/snapshot-e2e/scenarios/s07_a4_round_trip_identity.sh new file mode 100644 index 0000000..6e6bc90 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s07_a4_round_trip_identity.sh @@ -0,0 +1,27 @@ +# A4 — round-trip hash identity: take → push → wipe local SQLite → pull +# → list shows the original snapshot, and a fresh take against the +# unchanged DB yields PutOutcome::Deduped (i.e. same content_hash). +scenario "A4: round-trip hash identity" + +reset_shared +reset_workstation devA +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 +original_hash="$(find "$SHARED_DIR" -name '*.json.zst' -type f \ + | sed -E 's|.*-([0-9a-f]{64})\.json\.zst$|\1|' | head -1)" +[ -n "$original_hash" ] || fail "no pushed file" + +# Wipe local history; pull must recover it from /shared. +rm -f "$WORKSTATIONS_DIR/devA/.dryrun/history.db"* +ws_run devA dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 +list_out="$(ws_run devA dryrun-new snapshot list --db "$PG_A_URL" 2>&1)" +assert_contains "$list_out" "${original_hash:0:16}" "list shows pulled hash" + +# A fresh take against the unchanged DB must dedupe (same content_hash). +take_out="$(ws_run devA dryrun-new snapshot take --db "$PG_A_URL" 2>&1)" +assert_contains "$take_out" "Schema unchanged" "second take is a no-op (deduped)" + +ok diff --git a/tests/snapshot-e2e/scenarios/s08_c2_readonly_to_path.sh b/tests/snapshot-e2e/scenarios/s08_c2_readonly_to_path.sh new file mode 100644 index 0000000..16ae81c --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s08_c2_readonly_to_path.sh @@ -0,0 +1,34 @@ +# C2 — push --to-path against a read-only directory: must exit non-zero +# with a clean filesystem error, never panic. Pull from the same dir must +# still succeed (read-only is fine for pulling). +scenario "C2: read-only --to-path" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# Seed /shared via a normal push, then drop write perms. +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 +chmod -R a-w "$SHARED_DIR" + +# A second push (after another schema change) must fail loudly. +psql "$PG_A_URL" -v ON_ERROR_STOP=1 -c "ALTER TABLE users ADD COLUMN city TEXT;" >/dev/null +sleep 1 +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 +push_out="$(ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" +push_rc=$? +if [ "$push_rc" -eq 0 ]; then + fail "push to read-only dir SUCCEEDED (exit 0); should have errored" +fi +assert_contains "$push_out" "error" "error message present" + +# Pull from the read-only dir must still work. +pull_out="$(ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" +pull_rc=$? +[ "$pull_rc" -eq 0 ] || fail "pull from read-only dir failed: $pull_out" + +chmod -R u+w "$SHARED_DIR" # cleanup so reset_shared can wipe next run +ok diff --git a/tests/snapshot-e2e/scenarios/s09_c4_stale_tmp_files.sh b/tests/snapshot-e2e/scenarios/s09_c4_stale_tmp_files.sh new file mode 100644 index 0000000..b44fe9f --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s09_c4_stale_tmp_files.sh @@ -0,0 +1,31 @@ +# C4 — stale `.tmp` files (left over from a crashed write) must NOT +# confuse the puller. A subsequent push completes normally; `list` and +# `pull` ignore the tmp files; existing real bundles remain readable. +scenario "C4: stale .tmp from crashed write" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# Pre-place a stale .tmp file — pretends a previous writer crashed mid-rename. +mkdir -p "$SHARED_DIR/shared/app" +echo "garbage" > "$SHARED_DIR/shared/app/20260101T000000Z-deadbeef.json.zst.999.0.tmp" + +# Real push proceeds normally. +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 +push_out="$(ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" +push_rc=$? +[ "$push_rc" -eq 0 ] || fail "push errored despite stale .tmp: $push_out" + +# Pull on devB must succeed and not return the .tmp content. +pull_out="$(ws_run devB dryrun-new snapshot pull --from-path "$SHARED_DIR" --db "$PG_A_URL" 2>&1)" +pull_rc=$? +[ "$pull_rc" -eq 0 ] || fail "pull errored despite stale .tmp: $pull_out" + +# devB's history should have the real snapshot, not garbage. +list_out="$(ws_run devB dryrun-new snapshot list --db "$PG_A_URL" 2>&1)" +assert_contains "$list_out" "snapshot(s) total" "devB list non-empty" + +ok diff --git a/tests/snapshot-e2e/scenarios/s10_d2_concurrent_different_hashes.sh b/tests/snapshot-e2e/scenarios/s10_d2_concurrent_different_hashes.sh new file mode 100644 index 0000000..a6ff838 --- /dev/null +++ b/tests/snapshot-e2e/scenarios/s10_d2_concurrent_different_hashes.sh @@ -0,0 +1,33 @@ +# D2 — two workstations push DIFFERENT-hash snapshots concurrently. Both +# files end up in /shared, ordered by their own filename ts. No `.tmp` +# debris, no rename failures. +scenario "D2: concurrent push of distinct snapshots" + +reset_shared +reset_workstation devA +reset_workstation devB +reset_db "$PG_A_URL" +seed_db "$PG_A_URL" "$FIXTURES_DIR/schemas/01_simple.sql" + +# devA captures the seeded schema. +ws_run devA dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 + +# Mutate the DB; devB captures the new shape → distinct content_hash. +psql "$PG_A_URL" -v ON_ERROR_STOP=1 -c "ALTER TABLE orders ADD COLUMN currency TEXT;" >/dev/null +sleep 1 +ws_run devB dryrun-new snapshot take --db "$PG_A_URL" >/dev/null 2>&1 + +# Race the two pushes. +ws_run devA dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 & +pid_a=$! +ws_run devB dryrun-new snapshot push --to-path "$SHARED_DIR" --db "$PG_A_URL" >/dev/null 2>&1 & +pid_b=$! +wait $pid_a || fail "devA push failed" +wait $pid_b || fail "devB push failed" + +assert_no_tmp_files "$SHARED_DIR" + +count="$(find "$SHARED_DIR" -name '*.json.zst' -type f | wc -l | tr -d ' ')" +assert_eq "$count" "2" "two distinct snapshot files survived" + +ok