From ac881dbd5d26adf9e3b150ff9f0b6e2ef1f5264a Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 1 May 2026 23:02:14 +0200 Subject: [PATCH 01/12] chore: filesystem layout helpers --- crates/dry_run_cli/src/main.rs | 10 ++---- .../src/history/filesystem_layout.rs | 34 +++++++++++++++++++ crates/dry_run_core/src/history/mod.rs | 2 ++ 3 files changed, 38 insertions(+), 8 deletions(-) create mode 100644 crates/dry_run_core/src/history/filesystem_layout.rs diff --git a/crates/dry_run_cli/src/main.rs b/crates/dry_run_cli/src/main.rs index 4a5c4c1..0523cf5 100644 --- a/crates/dry_run_cli/src/main.rs +++ b/crates/dry_run_cli/src/main.rs @@ -934,14 +934,8 @@ fn write_snapshot_export( key: &SnapshotKey, snap: &dry_run_core::SchemaSnapshot, ) -> anyhow::Result { - let path = out_root - .join(&key.project_id.0) - .join(&key.database_id.0) - .join(format!( - "{}-{}.json.zst", - snap.timestamp.format("%Y%m%dT%H%M%SZ"), - snap.content_hash, - )); + let path = + dry_run_core::history::snapshot_path(out_root, key, snap.timestamp, &snap.content_hash); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; } diff --git a/crates/dry_run_core/src/history/filesystem_layout.rs b/crates/dry_run_core/src/history/filesystem_layout.rs new file mode 100644 index 0000000..94aa367 --- /dev/null +++ b/crates/dry_run_core/src/history/filesystem_layout.rs @@ -0,0 +1,34 @@ +use crate::history::SnapshotKey; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use std::path::{Path, PathBuf}; + +pub const SNAPSHOT_EXTENSION: &str = "json.zst"; + +const TS_FORMAT: &str = "%Y%m%dT%H%M%SZ"; + +#[must_use] +pub fn snapshot_path( + root: &Path, + key: &SnapshotKey, + timestamp: DateTime, + content_hash: &str, +) -> PathBuf { + root.join(&key.project_id.0) + .join(&key.database_id.0) + .join(format!( + "{}-{}.{}", + timestamp.format(TS_FORMAT), + content_hash, + SNAPSHOT_EXTENSION, + )) +} + +#[must_use] +pub fn parse_snapshot_filename(name: &str) -> Option<(DateTime, String)> { + let stem = name.strip_suffix(&format!(".{SNAPSHOT_EXTENSION}"))?; + let (ts_str, hash) = stem.split_once('-')?; + let naive = NaiveDateTime::parse_from_str(ts_str, TS_FORMAT).ok()?; + let ts = Utc.from_utc_datetime(&naive); + + Some((ts, hash.to_string())) +} diff --git a/crates/dry_run_core/src/history/mod.rs b/crates/dry_run_core/src/history/mod.rs index b37282b..ccc1f63 100644 --- a/crates/dry_run_core/src/history/mod.rs +++ b/crates/dry_run_core/src/history/mod.rs @@ -1,6 +1,8 @@ +pub mod filesystem_layout; mod snapshot_store; mod store; +pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path}; pub use snapshot_store::{ DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, }; From 207ed1fa323c7f6fc52e32f9a38b1fa56eb985c4 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Sat, 2 May 2026 00:22:19 +0200 Subject: [PATCH 02/12] feat: FilesystemStore --- .../src/history/filesystem_store.rs | 242 ++++++++++++++++++ crates/dry_run_core/src/history/mod.rs | 2 + 2 files changed, 244 insertions(+) create mode 100644 crates/dry_run_core/src/history/filesystem_store.rs diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs new file mode 100644 index 0000000..523a9cb --- /dev/null +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -0,0 +1,242 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use tracing::{debug, info}; + +use crate::error::{Error, Result}; +use crate::history::{ + DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, SnapshotSummary, + TimeRange, parse_snapshot_filename, snapshot_path, +}; +use crate::schema::SchemaSnapshot; + +pub struct FilesystemStore { + root: Arc, +} + +impl FilesystemStore { + pub fn new(root: impl Into) -> Self { + Self { + root: Arc::new(root.into()), + } + } + + pub fn list_keys(&self) -> Result> { + list_keys_sync(&self.root) + } +} + +#[async_trait] +impl SnapshotStore for FilesystemStore { + async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + let root = self.root.clone(); + let key = key.clone(); + let snap = snap.clone(); + run_blocking(move || { + let stream_dir = stream_dir(&root, &key); + if let Some(latest) = read_latest_hash(&stream_dir)? + && latest == snap.content_hash + { + debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let path = snapshot_path(&root, &key, snap.timestamp, &snap.content_hash); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|e| { + Error::History(format!("create_dir_all {}: {e}", parent.display())) + })?; + } + + // atomic write: tmp + rename + let tmp = path.with_extension("zst.tmp"); + let json = serde_json::to_vec(&snap) + .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; + let compressed = zstd::encode_all(json.as_slice(), 3) + .map_err(|e| Error::History(format!("zstd encode: {e}")))?; + std::fs::write(&tmp, compressed) + .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; + std::fs::rename(&tmp, &path) + .map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?; + + info!( + hash = %snap.content_hash, + project = %key.project_id.0, + database = %key.database_id.0, + "snapshot put (fs)", + ); + Ok(PutOutcome::Inserted) + }) + .await + } + + async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + let root = self.root.clone(); + let key = key.clone(); + run_blocking(move || { + let entries = read_stream_entries(&stream_dir(&root, &key))?; + + let chosen = match &at { + SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), + SnapshotRef::At(target) => entries + .into_iter() + .filter(|(ts, _, _)| *ts <= *target) + .max_by_key(|(ts, _, _)| *ts), + SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), + }; + + let (_, _, path) = chosen.ok_or_else(|| { + let detail = match &at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Error::History(format!("snapshot not found ({detail})")) + })?; + + let bytes = std::fs::read(&path) + .map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; + let json = zstd::decode_all(bytes.as_slice()) + .map_err(|e| Error::History(format!("zstd decode: {e}")))?; + serde_json::from_slice(&json) + .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) + }) + .await + } + + async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result> { + let root = self.root.clone(); + let key = key.clone(); + run_blocking(move || { + let entries = read_stream_entries(&stream_dir(&root, &key))?; + let mut summaries: Vec = entries + .into_iter() + .filter(|(ts, _, _)| { + range.from.is_none_or(|from| *ts >= from) && range.to.is_none_or(|to| *ts < to) + }) + .map(|(ts, hash, _)| SnapshotSummary { + id: 0, + timestamp: ts, + content_hash: hash, + database: key.database_id.0.clone(), + project_id: Some(key.project_id.0.clone()), + database_id: Some(key.database_id.0.clone()), + }) + .collect(); + summaries.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); + Ok(summaries) + }) + .await + } + + async fn latest(&self, key: &SnapshotKey) -> Result> { + Ok(self + .list(key, TimeRange::default()) + .await? + .into_iter() + .next()) + } + + async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result { + let root = self.root.clone(); + let key = key.clone(); + run_blocking(move || { + let entries = read_stream_entries(&stream_dir(&root, &key))?; + let mut deleted = 0usize; + for (ts, _, path) in entries { + if ts < cutoff { + std::fs::remove_file(&path) + .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; + deleted += 1; + } + } + Ok(deleted) + }) + .await + } +} + +fn stream_dir(root: &Path, key: &SnapshotKey) -> PathBuf { + root.join(&key.project_id.0).join(&key.database_id.0) +} + +fn read_stream_entries(dir: &Path) -> Result, String, PathBuf)>> { + if !dir.is_dir() { + return Ok(Vec::new()); + } + let mut entries = Vec::new(); + for entry in std::fs::read_dir(dir) + .map_err(|e| Error::History(format!("read_dir {}: {e}", dir.display())))? + { + let entry = entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let path = entry.path(); + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if let Some((ts, hash)) = parse_snapshot_filename(name) { + entries.push((ts, hash, path)); + } + } + Ok(entries) +} + +fn read_latest_hash(dir: &Path) -> Result> { + Ok(read_stream_entries(dir)? + .into_iter() + .max_by_key(|(ts, _, _)| *ts) + .map(|(_, hash, _)| hash)) +} + +fn list_keys_sync(root: &Path) -> Result> { + let mut keys = Vec::new(); + if !root.is_dir() { + return Ok(keys); + } + for proj_entry in std::fs::read_dir(root) + .map_err(|e| Error::History(format!("read_dir {}: {e}", root.display())))? + { + let proj_entry = proj_entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let proj_path = proj_entry.path(); + if !proj_path.is_dir() { + continue; + } + let Some(project_id) = proj_path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + for db_entry in std::fs::read_dir(&proj_path) + .map_err(|e| Error::History(format!("read_dir {}: {e}", proj_path.display())))? + { + let db_entry = db_entry.map_err(|e| Error::History(format!("dirent: {e}")))?; + let db_path = db_entry.path(); + if !db_path.is_dir() { + continue; + } + let Some(database_id) = db_path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + keys.push(SnapshotKey { + project_id: ProjectId(project_id.to_string()), + database_id: DatabaseId(database_id.to_string()), + }); + } + } + keys.sort_by(|a, b| { + a.project_id + .0 + .cmp(&b.project_id.0) + .then_with(|| a.database_id.0.cmp(&b.database_id.0)) + }); + Ok(keys) +} + +async fn run_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + tokio::task::spawn_blocking(f) + .await + .map_err(|e| Error::History(format!("blocking task failed: {e}")))? +} diff --git a/crates/dry_run_core/src/history/mod.rs b/crates/dry_run_core/src/history/mod.rs index ccc1f63..202a516 100644 --- a/crates/dry_run_core/src/history/mod.rs +++ b/crates/dry_run_core/src/history/mod.rs @@ -1,8 +1,10 @@ pub mod filesystem_layout; +mod filesystem_store; mod snapshot_store; mod store; pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path}; +pub use filesystem_store::FilesystemStore; pub use snapshot_store::{ DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, }; From 2f9a35303d2857514b88d25204b1fe0924a54088 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Thu, 7 May 2026 18:16:49 +0200 Subject: [PATCH 03/12] fix: dedup planner/activity snapshots on (schema_ref, content) --- crates/dry_run_core/src/history/store.rs | 36 +++++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index 496b28b..4d220d2 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -127,18 +127,23 @@ impl HistoryStore { let pid = &key.project_id.0; let did = &key.database_id.0; - let latest: Option = conn + // dedup on (schema_ref, content): same content_hash under a + // different schema_ref is a new row, not a duplicate. + let exists: Option = conn .query_row( - "SELECT content_hash FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'planner_stats' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], + "SELECT id FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + AND kind = 'planner_stats' + AND schema_ref_hash = ?3 AND content_hash = ?4 + LIMIT 1", + params![pid, did, snap.schema_ref_hash, snap.content_hash], |row| row.get(0), ) .ok(); - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, "planner stats unchanged, skipping put"); + if exists.is_some() { + debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + "planner stats unchanged, skipping put"); return Ok(PutOutcome::Deduped); } @@ -179,20 +184,23 @@ impl HistoryStore { let did = &key.database_id.0; let label = &snap.node.label; - let latest: Option = conn + // dedup on (schema_ref, node, content); same content_hash for the + // same node under a different schema_ref is a new row. + let exists: Option = conn .query_row( - "SELECT content_hash FROM snapshots + "SELECT id FROM snapshots WHERE project_id = ?1 AND database_id = ?2 AND kind = 'activity_stats' AND node_label = ?3 - ORDER BY timestamp DESC LIMIT 1", - params![pid, did, label], + AND schema_ref_hash = ?4 AND content_hash = ?5 + LIMIT 1", + params![pid, did, label, snap.schema_ref_hash, snap.content_hash], |row| row.get(0), ) .ok(); - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, label = %label, - "activity stats unchanged, skipping put"); + if exists.is_some() { + debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + label = %label, "activity stats unchanged, skipping put"); return Ok(PutOutcome::Deduped); } From 1726fa1dad7cc68d55d3e061217e5c54977292f2 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Thu, 7 May 2026 22:58:00 +0200 Subject: [PATCH 04/12] feat: handle all 3 schema in SnapshotStore --- crates/dry_run_cli/src/main.rs | 20 +- crates/dry_run_cli/src/mcp/server.rs | 8 +- crates/dry_run_cli/src/mcp/server_tests.rs | 4 +- crates/dry_run_cli/tests/init_e2e.rs | 5 +- .../src/history/filesystem_store.rs | 261 ++++--- crates/dry_run_core/src/history/mod.rs | 3 +- .../src/history/snapshot_store.rs | 194 ++++- crates/dry_run_core/src/history/store.rs | 724 +++++++++++------- 8 files changed, 823 insertions(+), 396 deletions(-) diff --git a/crates/dry_run_cli/src/main.rs b/crates/dry_run_cli/src/main.rs index 0523cf5..4e15343 100644 --- a/crates/dry_run_cli/src/main.rs +++ b/crates/dry_run_cli/src/main.rs @@ -342,7 +342,7 @@ schema_file = ".dryrun/schema.json" .unwrap_or_else(|| ProjectConfig::parse(""))?; let resolved = config.resolve_profile(Some(db_url), None, None, &cwd)?; let key = complete_key(&resolved, &snapshot.database); - store.put(&key, &snapshot).await?; + store.put_schema(&key, &snapshot).await?; let planner = ctx.introspect_planner_stats(&snapshot.content_hash).await?; store.put_planner_stats(&key, &planner).await?; @@ -478,7 +478,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let resolved = config.resolve_profile(Some(db_url), None, profile, &cwd)?; let key = complete_key(&resolved, &snapshot.database); - let schema_outcome = store.put(&key, &snapshot).await?; + let schema_outcome = store.put_schema(&key, &snapshot).await?; match schema_outcome { PutOutcome::Inserted => { println!("Snapshot saved: {}", snapshot.content_hash); @@ -603,7 +603,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> SnapshotAction::List { db, history_db } => { let store = open_history_store(history_db.as_deref())?; let key = resolve_read_key(db.as_deref(), profile).await?; - let rows = store.list(&key, TimeRange::default()).await?; + let rows = store.list_schema(&key, TimeRange::default()).await?; if rows.is_empty() { println!( @@ -642,15 +642,19 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let key = resolve_read_key(Some(db_url), profile).await?; let from_snapshot = if let Some(hash) = &from { - store.get(&key, SnapshotRef::Hash(hash.clone())).await? + store + .get_schema(&key, SnapshotRef::Hash(hash.clone())) + .await? } else if *latest { - store.get(&key, SnapshotRef::Latest).await? + store.get_schema(&key, SnapshotRef::Latest).await? } else { anyhow::bail!("specify --from or --latest"); }; let to_snapshot = if let Some(hash) = &to { - store.get(&key, SnapshotRef::Hash(hash.clone())).await? + store + .get_schema(&key, SnapshotRef::Hash(hash.clone())) + .await? } else { ctx.introspect_schema().await? }; @@ -675,10 +679,10 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> let keys = store.list_keys()?; let mut written = 0usize; for key in &keys { - let summaries = store.list(key, TimeRange::default()).await?; + let summaries = store.list_schema(key, TimeRange::default()).await?; for s in &summaries { let snap = store - .get(key, SnapshotRef::Hash(s.content_hash.clone())) + .get_schema(key, SnapshotRef::Hash(s.content_hash.clone())) .await?; write_snapshot_export(&out_root, key, &snap)?; written += 1; diff --git a/crates/dry_run_cli/src/mcp/server.rs b/crates/dry_run_cli/src/mcp/server.rs index 956a9d0..3c8510d 100644 --- a/crates/dry_run_cli/src/mcp/server.rs +++ b/crates/dry_run_cli/src/mcp/server.rs @@ -28,7 +28,7 @@ async fn persist_refresh( planner: Option<&dry_run_core::PlannerStatsSnapshot>, activity_by_node: &std::collections::BTreeMap, ) { - if let Err(e) = store.put(key, schema).await { + if let Err(e) = store.put_schema(key, schema).await { tracing::warn!(error = %e, "failed to persist schema"); } if let Some(p) = planner @@ -834,18 +834,18 @@ impl DryRunServer { let from_snapshot = match ¶ms.from { Some(hash) => store - .get(key, SnapshotRef::Hash(hash.clone())) + .get_schema(key, SnapshotRef::Hash(hash.clone())) .await .map_err(to_mcp_err)?, None => store - .get(key, SnapshotRef::Latest) + .get_schema(key, SnapshotRef::Latest) .await .map_err(to_mcp_err)?, }; let to_snapshot = match ¶ms.to { Some(hash) => store - .get(key, SnapshotRef::Hash(hash.clone())) + .get_schema(key, SnapshotRef::Hash(hash.clone())) .await .map_err(to_mcp_err)?, None => self.get_schema().await?, diff --git a/crates/dry_run_cli/src/mcp/server_tests.rs b/crates/dry_run_cli/src/mcp/server_tests.rs index 49c5062..96911f8 100644 --- a/crates/dry_run_cli/src/mcp/server_tests.rs +++ b/crates/dry_run_cli/src/mcp/server_tests.rs @@ -384,7 +384,7 @@ async fn rebuild_after_refresh_preserves_replica_activity() { let schema = test_snapshot(); let schema_hash = schema.content_hash.clone(); - SnapshotStore::put(&store, &key, &schema) + SnapshotStore::put_schema(&store, &key, &schema) .await .expect("seed schema"); let replica = make_activity_row(&schema_hash, "replica1", "replica-h1"); @@ -436,7 +436,7 @@ async fn reload_schema_prefers_history_over_json() { let schema = test_snapshot(); let schema_hash = schema.content_hash.clone(); - SnapshotStore::put(&store, &key, &schema) + SnapshotStore::put_schema(&store, &key, &schema) .await .expect("seed schema"); store diff --git a/crates/dry_run_cli/tests/init_e2e.rs b/crates/dry_run_cli/tests/init_e2e.rs index 6b3698a..159d4e2 100644 --- a/crates/dry_run_cli/tests/init_e2e.rs +++ b/crates/dry_run_cli/tests/init_e2e.rs @@ -139,7 +139,10 @@ async fn init_full_capture_writes_schema_planner_and_activity() { database_id: DatabaseId("postgres".into()), }; - let summaries = store.list(&key, TimeRange::default()).await.expect("list"); + let summaries = store + .list_schema(&key, TimeRange::default()) + .await + .expect("list"); assert_eq!( summaries.len(), 1, diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index 523a9cb..ca51415 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -7,11 +7,12 @@ use tracing::{debug, info}; use crate::error::{Error, Result}; use crate::history::{ - DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, SnapshotSummary, - TimeRange, parse_snapshot_filename, snapshot_path, + DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + SnapshotSummary, StoredSnapshot, TimeRange, parse_snapshot_filename, snapshot_path, }; use crate::schema::SchemaSnapshot; +// schema-only for now pub struct FilesystemStore { root: Arc, } @@ -28,136 +29,190 @@ impl FilesystemStore { } } +fn unsupported(kind: &SnapshotKind) -> Error { + Error::History(format!( + "FilesystemStore: only schema snapshots supported (got {})", + kind.db_kind() + )) +} + #[async_trait] impl SnapshotStore for FilesystemStore { - async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result { + let schema = match snap { + StoredSnapshot::Schema(s) => s.clone(), + other => return Err(unsupported(&other.kind())), + }; let root = self.root.clone(); let key = key.clone(); - let snap = snap.clone(); - run_blocking(move || { - let stream_dir = stream_dir(&root, &key); - if let Some(latest) = read_latest_hash(&stream_dir)? - && latest == snap.content_hash - { - debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let path = snapshot_path(&root, &key, snap.timestamp, &snap.content_hash); - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).map_err(|e| { - Error::History(format!("create_dir_all {}: {e}", parent.display())) - })?; - } - - // atomic write: tmp + rename - let tmp = path.with_extension("zst.tmp"); - let json = serde_json::to_vec(&snap) - .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; - let compressed = zstd::encode_all(json.as_slice(), 3) - .map_err(|e| Error::History(format!("zstd encode: {e}")))?; - std::fs::write(&tmp, compressed) - .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; - std::fs::rename(&tmp, &path) - .map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?; - - info!( - hash = %snap.content_hash, - project = %key.project_id.0, - database = %key.database_id.0, - "snapshot put (fs)", - ); - Ok(PutOutcome::Inserted) - }) - .await + run_blocking(move || put_schema(&root, &key, schema)).await } - async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result { + if !matches!(kind, SnapshotKind::Schema) { + return Err(unsupported(kind)); + } let root = self.root.clone(); let key = key.clone(); - run_blocking(move || { - let entries = read_stream_entries(&stream_dir(&root, &key))?; - - let chosen = match &at { - SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), - SnapshotRef::At(target) => entries - .into_iter() - .filter(|(ts, _, _)| *ts <= *target) - .max_by_key(|(ts, _, _)| *ts), - SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), - }; - - let (_, _, path) = chosen.ok_or_else(|| { - let detail = match &at { - SnapshotRef::Latest => "latest".to_string(), - SnapshotRef::At(ts) => format!("at-or-before {ts}"), - SnapshotRef::Hash(h) => format!("hash {h}"), - }; - Error::History(format!("snapshot not found ({detail})")) - })?; - - let bytes = std::fs::read(&path) - .map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; - let json = zstd::decode_all(bytes.as_slice()) - .map_err(|e| Error::History(format!("zstd decode: {e}")))?; - serde_json::from_slice(&json) - .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) - }) - .await + run_blocking(move || get_schema(&root, &key, at).map(StoredSnapshot::Schema)).await } - async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result> { + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result> { + if !matches!(kind, SnapshotKind::Schema) { + return Ok(Vec::new()); + } let root = self.root.clone(); let key = key.clone(); - run_blocking(move || { - let entries = read_stream_entries(&stream_dir(&root, &key))?; - let mut summaries: Vec = entries - .into_iter() - .filter(|(ts, _, _)| { - range.from.is_none_or(|from| *ts >= from) && range.to.is_none_or(|to| *ts < to) - }) - .map(|(ts, hash, _)| SnapshotSummary { - id: 0, - timestamp: ts, - content_hash: hash, - database: key.database_id.0.clone(), - project_id: Some(key.project_id.0.clone()), - database_id: Some(key.database_id.0.clone()), - }) - .collect(); - summaries.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); - Ok(summaries) - }) - .await + run_blocking(move || list_schema(&root, &key, range)).await } - async fn latest(&self, key: &SnapshotKey) -> Result> { + async fn latest( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + ) -> Result> { Ok(self - .list(key, TimeRange::default()) + .list(key, kind, TimeRange::default()) .await? .into_iter() .next()) } - async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result { + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result { + if !matches!(kind, SnapshotKind::Schema) { + return Err(unsupported(kind)); + } + let root = self.root.clone(); + let key = key.clone(); + run_blocking(move || delete_schema_before(&root, &key, cutoff)).await + } + + async fn list_kinds(&self, key: &SnapshotKey) -> Result> { let root = self.root.clone(); let key = key.clone(); run_blocking(move || { let entries = read_stream_entries(&stream_dir(&root, &key))?; - let mut deleted = 0usize; - for (ts, _, path) in entries { - if ts < cutoff { - std::fs::remove_file(&path) - .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; - deleted += 1; - } - } - Ok(deleted) + Ok(if entries.is_empty() { + Vec::new() + } else { + vec![SnapshotKind::Schema] + }) }) .await } } +fn put_schema(root: &Path, key: &SnapshotKey, snap: SchemaSnapshot) -> Result { + let stream_dir = stream_dir(root, key); + if let Some(latest) = read_latest_hash(&stream_dir)? + && latest == snap.content_hash + { + debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let path = snapshot_path(root, key, snap.timestamp, &snap.content_hash); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| Error::History(format!("create_dir_all {}: {e}", parent.display())))?; + } + + let tmp = path.with_extension("zst.tmp"); + let json = serde_json::to_vec(&snap) + .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; + let compressed = zstd::encode_all(json.as_slice(), 3) + .map_err(|e| Error::History(format!("zstd encode: {e}")))?; + std::fs::write(&tmp, compressed) + .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; + std::fs::rename(&tmp, &path) + .map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?; + + info!( + hash = %snap.content_hash, + project = %key.project_id.0, + database = %key.database_id.0, + "snapshot put (fs)", + ); + Ok(PutOutcome::Inserted) +} + +fn get_schema(root: &Path, key: &SnapshotKey, at: SnapshotRef) -> Result { + let entries = read_stream_entries(&stream_dir(root, key))?; + let chosen = match &at { + SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), + SnapshotRef::At(target) => entries + .into_iter() + .filter(|(ts, _, _)| *ts <= *target) + .max_by_key(|(ts, _, _)| *ts), + SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), + }; + + let (_, _, path) = chosen.ok_or_else(|| { + let detail = match &at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Error::History(format!("snapshot not found ({detail})")) + })?; + + let bytes = std::fs::read(&path) + .map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; + let json = zstd::decode_all(bytes.as_slice()) + .map_err(|e| Error::History(format!("zstd decode: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) +} + +fn list_schema(root: &Path, key: &SnapshotKey, range: TimeRange) -> Result> { + let entries = read_stream_entries(&stream_dir(root, key))?; + let mut summaries: Vec = entries + .into_iter() + .filter(|(ts, _, _)| { + range.from.is_none_or(|from| *ts >= from) && range.to.is_none_or(|to| *ts < to) + }) + .map(|(ts, hash, _)| SnapshotSummary { + id: 0, + kind: SnapshotKind::Schema, + timestamp: ts, + content_hash: hash, + schema_ref_hash: None, + database: key.database_id.0.clone(), + project_id: Some(key.project_id.0.clone()), + database_id: Some(key.database_id.0.clone()), + }) + .collect(); + summaries.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); + Ok(summaries) +} + +fn delete_schema_before(root: &Path, key: &SnapshotKey, cutoff: DateTime) -> Result { + let entries = read_stream_entries(&stream_dir(root, key))?; + let mut deleted = 0usize; + for (ts, _, path) in entries { + if ts < cutoff { + std::fs::remove_file(&path) + .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; + deleted += 1; + } + } + Ok(deleted) +} + fn stream_dir(root: &Path, key: &SnapshotKey) -> PathBuf { root.join(&key.project_id.0).join(&key.database_id.0) } diff --git a/crates/dry_run_core/src/history/mod.rs b/crates/dry_run_core/src/history/mod.rs index 202a516..2a747a6 100644 --- a/crates/dry_run_core/src/history/mod.rs +++ b/crates/dry_run_core/src/history/mod.rs @@ -6,6 +6,7 @@ mod store; pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path}; pub use filesystem_store::FilesystemStore; pub use snapshot_store::{ - DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + StoredSnapshot, TimeRange, }; pub use store::{HistoryStore, SnapshotSummary, default_data_dir}; diff --git a/crates/dry_run_core/src/history/snapshot_store.rs b/crates/dry_run_core/src/history/snapshot_store.rs index 4da5b7c..7366dff 100644 --- a/crates/dry_run_core/src/history/snapshot_store.rs +++ b/crates/dry_run_core/src/history/snapshot_store.rs @@ -2,8 +2,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use crate::error::Result; -use crate::schema::SchemaSnapshot; +use crate::error::{Error, Result}; +use crate::schema::{ActivityStatsSnapshot, PlannerStatsSnapshot, SchemaSnapshot}; pub use super::store::SnapshotSummary; @@ -38,11 +38,191 @@ pub enum PutOutcome { Deduped, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SnapshotKind { + Schema, + Planner, + Activity { node_label: String }, +} + +impl SnapshotKind { + /// String stored in the SQLite `kind` column. + #[must_use] + pub fn db_kind(&self) -> &'static str { + match self { + Self::Schema => "schema", + Self::Planner => "planner_stats", + Self::Activity { .. } => "activity_stats", + } + } + + #[must_use] + pub fn node_label(&self) -> Option<&str> { + match self { + Self::Activity { node_label } => Some(node_label.as_str()), + _ => None, + } + } +} + +#[derive(Debug, Clone)] +pub enum StoredSnapshot { + Schema(SchemaSnapshot), + Planner(PlannerStatsSnapshot), + Activity(ActivityStatsSnapshot), +} + +impl StoredSnapshot { + #[must_use] + pub fn kind(&self) -> SnapshotKind { + match self { + Self::Schema(_) => SnapshotKind::Schema, + Self::Planner(_) => SnapshotKind::Planner, + Self::Activity(a) => SnapshotKind::Activity { + node_label: a.node.label.clone(), + }, + } + } + + #[must_use] + pub fn timestamp(&self) -> DateTime { + match self { + Self::Schema(s) => s.timestamp, + Self::Planner(p) => p.timestamp, + Self::Activity(a) => a.timestamp, + } + } + + #[must_use] + pub fn content_hash(&self) -> &str { + match self { + Self::Schema(s) => &s.content_hash, + Self::Planner(p) => &p.content_hash, + Self::Activity(a) => &a.content_hash, + } + } + + #[must_use] + pub fn schema_ref_hash(&self) -> Option<&str> { + match self { + Self::Schema(_) => None, + Self::Planner(p) => Some(&p.schema_ref_hash), + Self::Activity(a) => Some(&a.schema_ref_hash), + } + } + + #[must_use] + pub fn database(&self) -> &str { + match self { + Self::Schema(s) => &s.database, + Self::Planner(p) => &p.database, + Self::Activity(a) => &a.database, + } + } + + pub fn into_schema(self) -> Result { + match self { + Self::Schema(s) => Ok(s), + other => Err(Error::History(format!( + "expected schema snapshot, got {}", + other.kind().db_kind() + ))), + } + } + + pub fn into_planner(self) -> Result { + match self { + Self::Planner(p) => Ok(p), + other => Err(Error::History(format!( + "expected planner snapshot, got {}", + other.kind().db_kind() + ))), + } + } + + pub fn into_activity(self) -> Result { + match self { + Self::Activity(a) => Ok(a), + other => Err(Error::History(format!( + "expected activity snapshot, got {}", + other.kind().db_kind() + ))), + } + } +} + #[async_trait] pub trait SnapshotStore: Send + Sync { - async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result; - async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result; - async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result>; - async fn latest(&self, key: &SnapshotKey) -> Result>; - async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result; + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result; + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result; + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result>; + async fn latest( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + ) -> Result>; + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result; + async fn list_kinds(&self, key: &SnapshotKey) -> Result>; + + // wrapper per schema kind + async fn put_schema(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + self.put(key, &StoredSnapshot::Schema(snap.clone())).await + } + + async fn put_planner_stats( + &self, + key: &SnapshotKey, + snap: &PlannerStatsSnapshot, + ) -> Result { + self.put(key, &StoredSnapshot::Planner(snap.clone())).await + } + + async fn put_activity_stats( + &self, + key: &SnapshotKey, + snap: &ActivityStatsSnapshot, + ) -> Result { + self.put(key, &StoredSnapshot::Activity(snap.clone())).await + } + + async fn get_schema(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + self.get(key, &SnapshotKind::Schema, at) + .await? + .into_schema() + } + + async fn list_schema( + &self, + key: &SnapshotKey, + range: TimeRange, + ) -> Result> { + self.list(key, &SnapshotKind::Schema, range).await + } + + async fn latest_schema(&self, key: &SnapshotKey) -> Result> { + self.latest(key, &SnapshotKind::Schema).await + } + + async fn delete_schema_before( + &self, + key: &SnapshotKey, + cutoff: DateTime, + ) -> Result { + self.delete_before(key, &SnapshotKind::Schema, cutoff).await + } } diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index 4d220d2..740385d 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -9,7 +9,7 @@ use tracing::{debug, info, warn}; use crate::error::{Error, Result}; use crate::history::snapshot_store::{ - PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, StoredSnapshot, TimeRange, }; use crate::schema::{ ActivityStatsSnapshot, AnnotatedSnapshot, PlannerStatsSnapshot, SchemaSnapshot, @@ -22,8 +22,10 @@ pub struct HistoryStore { #[derive(Debug, Clone)] pub struct SnapshotSummary { pub id: i64, + pub kind: SnapshotKind, pub timestamp: DateTime, pub content_hash: String, + pub schema_ref_hash: Option, pub database: String, pub project_id: Option, pub database_id: Option, @@ -116,128 +118,12 @@ impl HistoryStore { .await } - pub async fn put_planner_stats( - &self, - key: &SnapshotKey, - snap: &PlannerStatsSnapshot, - ) -> Result { - let key = key.clone(); - let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - - // dedup on (schema_ref, content): same content_hash under a - // different schema_ref is a new row, not a duplicate. - let exists: Option = conn - .query_row( - "SELECT id FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 - AND kind = 'planner_stats' - AND schema_ref_hash = ?3 AND content_hash = ?4 - LIMIT 1", - params![pid, did, snap.schema_ref_hash, snap.content_hash], - |row| row.get(0), - ) - .ok(); - - if exists.is_some() { - debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - "planner stats unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize planner stats: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, - database_name, snapshot_json, project_id, database_id) - VALUES ('planner_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.schema_ref_hash, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - project = %pid, database = %did, "planner stats put"); - Ok(PutOutcome::Inserted) - }) - .await - } - - pub async fn put_activity_stats( - &self, - key: &SnapshotKey, - snap: &ActivityStatsSnapshot, - ) -> Result { - let key = key.clone(); - let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - let label = &snap.node.label; - - // dedup on (schema_ref, node, content); same content_hash for the - // same node under a different schema_ref is a new row. - let exists: Option = conn - .query_row( - "SELECT id FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 - AND kind = 'activity_stats' AND node_label = ?3 - AND schema_ref_hash = ?4 AND content_hash = ?5 - LIMIT 1", - params![pid, did, label, snap.schema_ref_hash, snap.content_hash], - |row| row.get(0), - ) - .ok(); - - if exists.is_some() { - debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - label = %label, "activity stats unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize activity stats: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, - node_label, database_name, snapshot_json, - project_id, database_id) - VALUES ('activity_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.schema_ref_hash, - label, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, - label = %label, project = %pid, database = %did, - "activity stats put"); - Ok(PutOutcome::Inserted) - }) - .await - } - pub async fn get_annotated( &self, key: &SnapshotKey, at: SnapshotRef, ) -> Result { - let schema = SnapshotStore::get(self, key, at.clone()).await?; + let schema = SnapshotStore::get_schema(self, key, at.clone()).await?; let schema_hash = schema.content_hash.clone(); let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); @@ -371,17 +257,22 @@ fn lock_conn(conn: &Mutex) -> Result) -> rusqlite::Result { +fn row_to_summary( + row: &rusqlite::Row<'_>, + kind: SnapshotKind, +) -> rusqlite::Result { let ts_str: String = row.get(1)?; Ok(SnapshotSummary { id: row.get(0)?, + kind, timestamp: DateTime::parse_from_rfc3339(&ts_str) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_default(), content_hash: row.get(2)?, - database: row.get(3)?, - project_id: row.get(4)?, - database_id: row.get(5)?, + schema_ref_hash: row.get(3)?, + database: row.get(4)?, + project_id: row.get(5)?, + database_id: row.get(6)?, }) } @@ -403,111 +294,55 @@ where #[async_trait] impl SnapshotStore for HistoryStore { - async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result { + async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result { let key = key.clone(); let snap = snap.clone(); - run_blocking(&self.conn, move |conn| { - let pid = &key.project_id.0; - let did = &key.database_id.0; - - let latest: Option = conn - .query_row( - "SELECT content_hash FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], - |row| row.get(0), - ) - .ok(); - - if latest.as_deref() == Some(snap.content_hash.as_str()) { - debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); - return Ok(PutOutcome::Deduped); - } - - let json = serde_json::to_string(&snap) - .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; - - conn.execute( - "INSERT INTO snapshots (kind, timestamp, content_hash, database_name, - snapshot_json, project_id, database_id) - VALUES ('schema', ?1, ?2, ?3, ?4, ?5, ?6)", - params![ - snap.timestamp.to_rfc3339(), - snap.content_hash, - snap.database, - json, - pid, - did, - ], - )?; - - info!(hash = %snap.content_hash, project = %pid, database = %did, "snapshot put"); - Ok(PutOutcome::Inserted) + run_blocking(&self.conn, move |conn| match snap { + StoredSnapshot::Schema(s) => insert_schema(conn, &key, &s), + StoredSnapshot::Planner(p) => insert_planner(conn, &key, &p), + StoredSnapshot::Activity(a) => insert_activity(conn, &key, &a), }) .await } - async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result { + async fn get( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, + ) -> Result { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); + let kind = kind.clone(); run_blocking(&self.conn, move |conn| { - let row = match &at { - SnapshotRef::Latest => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - ORDER BY timestamp DESC LIMIT 1", - params![pid, did], - |r| r.get::<_, String>(0), - ), - SnapshotRef::At(ts) => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND timestamp <= ?3 - ORDER BY timestamp DESC LIMIT 1", - params![pid, did, ts.to_rfc3339()], - |r| r.get::<_, String>(0), - ), - SnapshotRef::Hash(h) => conn.query_row( - "SELECT snapshot_json FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND content_hash = ?3 - LIMIT 1", - params![pid, did, h], - |r| r.get::<_, String>(0), - ), - }; - - let json = match row { - Ok(j) => j, - Err(rusqlite::Error::QueryReturnedNoRows) => { - let detail = match at { - SnapshotRef::Latest => "latest".to_string(), - SnapshotRef::At(ts) => format!("at-or-before {ts}"), - SnapshotRef::Hash(h) => format!("hash {h}"), - }; - return Err(Error::History(format!("snapshot not found ({detail})"))); - } - Err(e) => return Err(e.into()), - }; - - serde_json::from_str(&json) - .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) + let json = fetch_snapshot_json(conn, &pid, &did, &kind, &at)?; + decode_stored(&kind, &json) }) .await } - async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result> { + async fn list( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, + ) -> Result> { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); + let kind = kind.clone(); run_blocking(&self.conn, move |conn| { let mut sql = String::from( - "SELECT id, timestamp, content_hash, database_name, + "SELECT id, timestamp, content_hash, schema_ref_hash, database_name, project_id, database_id FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema'", + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3", ); - let mut bound: Vec> = vec![Box::new(pid), Box::new(did)]; + let mut bound: Vec> = + vec![Box::new(pid), Box::new(did), Box::new(kind.db_kind())]; + if let SnapshotKind::Activity { node_label } = &kind { + sql += &format!(" AND node_label = ?{}", bound.len() + 1); + bound.push(Box::new(node_label.clone())); + } if let Some(from) = range.from { sql += &format!(" AND timestamp >= ?{}", bound.len() + 1); bound.push(Box::new(from.to_rfc3339())); @@ -520,36 +355,340 @@ impl SnapshotStore for HistoryStore { let mut stmt = conn.prepare(&sql)?; let params: Vec<&dyn rusqlite::ToSql> = bound.iter().map(|b| b.as_ref()).collect(); - stmt.query_map(params.as_slice(), row_to_summary)? - .map(|r| r.map_err(Error::from)) - .collect() + let kind_for_rows = kind.clone(); + stmt.query_map(params.as_slice(), |row| { + row_to_summary(row, kind_for_rows.clone()) + })? + .map(|r| r.map_err(Error::from)) + .collect() }) .await } - async fn latest(&self, key: &SnapshotKey) -> Result> { + async fn latest( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + ) -> Result> { Ok(self - .list(key, TimeRange::default()) + .list(key, kind, TimeRange::default()) .await? .into_iter() .next()) } - async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime) -> Result { + async fn delete_before( + &self, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, + ) -> Result { let pid = key.project_id.0.clone(); let did = key.database_id.0.clone(); + let kind = kind.clone(); run_blocking(&self.conn, move |conn| { - Ok(conn.execute( + let mut sql = String::from( "DELETE FROM snapshots - WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' - AND timestamp < ?3", - params![pid, did, cutoff.to_rfc3339()], - )?) + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND timestamp < ?4", + ); + let mut bound: Vec> = vec![ + Box::new(pid), + Box::new(did), + Box::new(kind.db_kind()), + Box::new(cutoff.to_rfc3339()), + ]; + if let SnapshotKind::Activity { node_label } = &kind { + sql += &format!(" AND node_label = ?{}", bound.len() + 1); + bound.push(Box::new(node_label.clone())); + } + let params: Vec<&dyn rusqlite::ToSql> = bound.iter().map(|b| b.as_ref()).collect(); + Ok(conn.execute(&sql, params.as_slice())?) + }) + .await + } + + async fn list_kinds(&self, key: &SnapshotKey) -> Result> { + let pid = key.project_id.0.clone(); + let did = key.database_id.0.clone(); + run_blocking(&self.conn, move |conn| { + let mut stmt = conn.prepare( + "SELECT DISTINCT kind, node_label FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + ORDER BY kind, node_label", + )?; + let rows = stmt.query_map(params![pid, did], |row| { + let kind: String = row.get(0)?; + let node_label: Option = row.get(1)?; + Ok((kind, node_label)) + })?; + let mut out = Vec::new(); + for r in rows { + let (kind, node_label) = r?; + match kind.as_str() { + "schema" => out.push(SnapshotKind::Schema), + "planner_stats" => out.push(SnapshotKind::Planner), + "activity_stats" => { + if let Some(label) = node_label { + out.push(SnapshotKind::Activity { node_label: label }); + } + } + other => { + return Err(Error::History(format!("unknown snapshot kind: {other}"))); + } + } + } + Ok(out) }) .await } } +fn fetch_snapshot_json( + conn: &Connection, + pid: &str, + did: &str, + kind: &SnapshotKind, + at: &SnapshotRef, +) -> Result { + let kind_str = kind.db_kind(); + let label_filter = matches!(kind, SnapshotKind::Activity { .. }); + let row: rusqlite::Result = match (at, label_filter) { + (SnapshotRef::Latest, false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str], + |r| r.get(0), + ), + (SnapshotRef::Latest, true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, label], + |r| r.get(0), + ) + } + (SnapshotRef::At(ts), false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND timestamp <= ?4 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, ts.to_rfc3339()], + |r| r.get(0), + ), + (SnapshotRef::At(ts), true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 AND timestamp <= ?5 + ORDER BY timestamp DESC LIMIT 1", + params![pid, did, kind_str, label, ts.to_rfc3339()], + |r| r.get(0), + ) + } + (SnapshotRef::Hash(h), false) => conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND content_hash = ?4 + LIMIT 1", + params![pid, did, kind_str, h], + |r| r.get(0), + ), + (SnapshotRef::Hash(h), true) => { + let label = kind.node_label().unwrap_or_default(); + conn.query_row( + "SELECT snapshot_json FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = ?3 + AND node_label = ?4 AND content_hash = ?5 + LIMIT 1", + params![pid, did, kind_str, label, h], + |r| r.get(0), + ) + } + }; + + match row { + Ok(j) => Ok(j), + Err(rusqlite::Error::QueryReturnedNoRows) => { + let detail = match at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Err(Error::History(format!( + "{} snapshot not found ({detail})", + kind.db_kind() + ))) + } + Err(e) => Err(e.into()), + } +} + +fn decode_stored(kind: &SnapshotKind, json: &str) -> Result { + match kind { + SnapshotKind::Schema => serde_json::from_str::(json) + .map(StoredSnapshot::Schema) + .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))), + SnapshotKind::Planner => serde_json::from_str::(json) + .map(StoredSnapshot::Planner) + .map_err(|e| Error::History(format!("corrupt planner stats JSON: {e}"))), + SnapshotKind::Activity { .. } => serde_json::from_str::(json) + .map(StoredSnapshot::Activity) + .map_err(|e| Error::History(format!("corrupt activity stats JSON: {e}"))), + } +} + +fn insert_schema( + conn: &Connection, + key: &SnapshotKey, + snap: &SchemaSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + + let latest: Option = conn + .query_row( + "SELECT content_hash FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 AND kind = 'schema' + ORDER BY timestamp DESC LIMIT 1", + params![pid, did], + |row| row.get(0), + ) + .ok(); + + if latest.as_deref() == Some(snap.content_hash.as_str()) { + debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, database_name, + snapshot_json, project_id, database_id) + VALUES ('schema', ?1, ?2, ?3, ?4, ?5, ?6)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, project = %pid, database = %did, "snapshot put"); + Ok(PutOutcome::Inserted) +} + +fn insert_planner( + conn: &Connection, + key: &SnapshotKey, + snap: &PlannerStatsSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + + let exists: Option = conn + .query_row( + "SELECT id FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + AND kind = 'planner_stats' + AND schema_ref_hash = ?3 AND content_hash = ?4 + LIMIT 1", + params![pid, did, snap.schema_ref_hash, snap.content_hash], + |r| r.get(0), + ) + .ok(); + + if exists.is_some() { + debug!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + "planner stats unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize planner stats: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, + database_name, snapshot_json, project_id, database_id) + VALUES ('planner_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.schema_ref_hash, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + project = %pid, database = %did, "planner stats put"); + Ok(PutOutcome::Inserted) +} + +fn insert_activity( + conn: &Connection, + key: &SnapshotKey, + snap: &ActivityStatsSnapshot, +) -> Result { + let pid = &key.project_id.0; + let did = &key.database_id.0; + let label = &snap.node.label; + + let exists: Option = conn + .query_row( + "SELECT id FROM snapshots + WHERE project_id = ?1 AND database_id = ?2 + AND kind = 'activity_stats' AND node_label = ?3 + AND schema_ref_hash = ?4 AND content_hash = ?5 + LIMIT 1", + params![pid, did, label, snap.schema_ref_hash, snap.content_hash], + |r| r.get(0), + ) + .ok(); + + if exists.is_some() { + debug!(hash = %snap.content_hash, label = %label, + "activity stats unchanged, skipping put"); + return Ok(PutOutcome::Deduped); + } + + let json = serde_json::to_string(snap) + .map_err(|e| Error::History(format!("cannot serialize activity stats: {e}")))?; + + conn.execute( + "INSERT INTO snapshots (kind, timestamp, content_hash, schema_ref_hash, + node_label, database_name, snapshot_json, + project_id, database_id) + VALUES ('activity_stats', ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + snap.timestamp.to_rfc3339(), + snap.content_hash, + snap.schema_ref_hash, + label, + snap.database, + json, + pid, + did, + ], + )?; + + info!(hash = %snap.content_hash, schema_ref = %snap.schema_ref_hash, + label = %label, project = %pid, database = %did, + "activity stats put"); + Ok(PutOutcome::Inserted) +} + #[cfg(test)] mod trait_tests { use chrono::Duration; @@ -596,8 +735,14 @@ mod trait_tests { let k = key("p", "auth"); let snap = make_snap("h1", "auth"); - assert_eq!(store.put(&k, &snap).await.unwrap(), PutOutcome::Inserted); - assert_eq!(store.put(&k, &snap).await.unwrap(), PutOutcome::Deduped); + assert_eq!( + store.put_schema(&k, &snap).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_schema(&k, &snap).await.unwrap(), + PutOutcome::Deduped + ); } #[tokio::test] @@ -608,19 +753,28 @@ mod trait_tests { // same content_hash under different database_id should not dedupe assert_eq!( - store.put(&auth, &make_snap("same", "auth")).await.unwrap(), + store + .put_schema(&auth, &make_snap("same", "auth")) + .await + .unwrap(), PutOutcome::Inserted ); assert_eq!( store - .put(&billing, &make_snap("same", "billing")) + .put_schema(&billing, &make_snap("same", "billing")) .await .unwrap(), PutOutcome::Inserted ); - let auth_rows = store.list(&auth, TimeRange::default()).await.unwrap(); - let billing_rows = store.list(&billing, TimeRange::default()).await.unwrap(); + let auth_rows = store + .list_schema(&auth, TimeRange::default()) + .await + .unwrap(); + let billing_rows = store + .list_schema(&billing, TimeRange::default()) + .await + .unwrap(); assert_eq!(auth_rows.len(), 1); assert_eq!(billing_rows.len(), 1); assert_eq!(auth_rows[0].database_id.as_deref(), Some("auth")); @@ -632,11 +786,11 @@ mod trait_tests { let (_dir, store) = temp_store(); let a = key("a", "x"); let b = key("b", "x"); - store.put(&a, &make_snap("h", "x")).await.unwrap(); - store.put(&b, &make_snap("h", "x")).await.unwrap(); + store.put_schema(&a, &make_snap("h", "x")).await.unwrap(); + store.put_schema(&b, &make_snap("h", "x")).await.unwrap(); - let a_rows = store.list(&a, TimeRange::default()).await.unwrap(); - let b_rows = store.list(&b, TimeRange::default()).await.unwrap(); + let a_rows = store.list_schema(&a, TimeRange::default()).await.unwrap(); + let b_rows = store.list_schema(&b, TimeRange::default()).await.unwrap(); assert_eq!(a_rows.len(), 1); assert_eq!(b_rows.len(), 1); assert_eq!(a_rows[0].project_id.as_deref(), Some("a")); @@ -651,10 +805,10 @@ mod trait_tests { s1.timestamp = Utc::now() - Duration::hours(2); let mut s2 = make_snap("h2", "x"); s2.timestamp = Utc::now() - Duration::hours(1); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let rows = store.list(&k, TimeRange::default()).await.unwrap(); + let rows = store.list_schema(&k, TimeRange::default()).await.unwrap(); assert_eq!(rows.len(), 2); assert_eq!(rows[0].content_hash, "h2"); assert_eq!(rows[1].content_hash, "h1"); @@ -668,12 +822,12 @@ mod trait_tests { for (i, hash) in ["h0", "h1", "h2"].iter().enumerate() { let mut s = make_snap(hash, "x"); s.timestamp = now - Duration::hours(2 - i as i64); - store.put(&k, &s).await.unwrap(); + store.put_schema(&k, &s).await.unwrap(); } // from = -90min: h0 at -2h is excluded, h1 at -1h and h2 at 0 included let rows = store - .list( + .list_schema( &k, TimeRange { from: Some(now - Duration::minutes(90)), @@ -688,7 +842,7 @@ mod trait_tests { // to = -30min (exclusive): h2 at 0 excluded, h0 and h1 included let rows = store - .list( + .list_schema( &k, TimeRange { from: None, @@ -706,15 +860,15 @@ mod trait_tests { async fn latest_returns_most_recent_or_none() { let (_dir, store) = temp_store(); let k = key("p", "x"); - assert!(store.latest(&k).await.unwrap().is_none()); + assert!(store.latest_schema(&k).await.unwrap().is_none()); let mut s1 = make_snap("old", "x"); s1.timestamp = Utc::now() - Duration::hours(1); let s2 = make_snap("new", "x"); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let latest = store.latest(&k).await.unwrap().unwrap(); + let latest = store.latest_schema(&k).await.unwrap().unwrap(); assert_eq!(latest.content_hash, "new"); } @@ -725,10 +879,10 @@ mod trait_tests { let mut s1 = make_snap("old", "x"); s1.timestamp = Utc::now() - Duration::hours(1); let s2 = make_snap("new", "x"); - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); - let got = store.get(&k, SnapshotRef::Latest).await.unwrap(); + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(got.content_hash, "new"); } @@ -741,12 +895,12 @@ mod trait_tests { s1.timestamp = now - Duration::hours(2); let mut s2 = make_snap("h2", "x"); s2.timestamp = now; - store.put(&k, &s1).await.unwrap(); - store.put(&k, &s2).await.unwrap(); + store.put_schema(&k, &s1).await.unwrap(); + store.put_schema(&k, &s2).await.unwrap(); // at -1h: h2 is in the future, only h1 qualifies let got = store - .get(&k, SnapshotRef::At(now - Duration::hours(1))) + .get_schema(&k, SnapshotRef::At(now - Duration::hours(1))) .await .unwrap(); assert_eq!(got.content_hash, "h1"); @@ -757,17 +911,22 @@ mod trait_tests { let (_dir, store) = temp_store(); let a = key("p", "auth"); let b = key("p", "billing"); - store.put(&a, &make_snap("shared", "auth")).await.unwrap(); + store + .put_schema(&a, &make_snap("shared", "auth")) + .await + .unwrap(); // direct lookup under correct key works let got = store - .get(&a, SnapshotRef::Hash("shared".into())) + .get_schema(&a, SnapshotRef::Hash("shared".into())) .await .unwrap(); assert_eq!(got.content_hash, "shared"); // same hash under different key fails — content_hash lookup is key-scoped - let result = store.get(&b, SnapshotRef::Hash("shared".into())).await; + let result = store + .get_schema(&b, SnapshotRef::Hash("shared".into())) + .await; assert!(result.is_err()); } @@ -775,14 +934,19 @@ mod trait_tests { async fn get_missing_returns_error() { let (_dir, store) = temp_store(); let k = key("p", "x"); - assert!(store.get(&k, SnapshotRef::Latest).await.is_err()); + assert!(store.get_schema(&k, SnapshotRef::Latest).await.is_err()); + assert!( + store + .get_schema(&k, SnapshotRef::Hash("nope".into())) + .await + .is_err() + ); assert!( store - .get(&k, SnapshotRef::Hash("nope".into())) + .get_schema(&k, SnapshotRef::At(Utc::now())) .await .is_err() ); - assert!(store.get(&k, SnapshotRef::At(Utc::now())).await.is_err()); } #[tokio::test] @@ -793,16 +957,16 @@ mod trait_tests { for (i, hash) in ["h0", "h1", "h2", "h3"].iter().enumerate() { let mut s = make_snap(hash, "x"); s.timestamp = now - Duration::hours(3 - i as i64); - store.put(&k, &s).await.unwrap(); + store.put_schema(&k, &s).await.unwrap(); } let deleted = store - .delete_before(&k, now - Duration::minutes(90)) + .delete_schema_before(&k, now - Duration::minutes(90)) .await .unwrap(); assert_eq!(deleted, 2); // h0 (-3h) and h1 (-2h) - let remaining = store.list(&k, TimeRange::default()).await.unwrap(); + let remaining = store.list_schema(&k, TimeRange::default()).await.unwrap(); assert_eq!(remaining.len(), 2); assert_eq!(remaining[0].content_hash, "h3"); assert_eq!(remaining[1].content_hash, "h2"); @@ -815,19 +979,33 @@ mod trait_tests { let b = key("p", "billing"); let mut s = make_snap("h", "auth"); s.timestamp = Utc::now() - Duration::hours(2); - store.put(&a, &s).await.unwrap(); + store.put_schema(&a, &s).await.unwrap(); let mut s = make_snap("h", "billing"); s.timestamp = Utc::now() - Duration::hours(2); - store.put(&b, &s).await.unwrap(); + store.put_schema(&b, &s).await.unwrap(); // delete in `a` should not touch `b` let deleted = store - .delete_before(&a, Utc::now() - Duration::hours(1)) + .delete_schema_before(&a, Utc::now() - Duration::hours(1)) .await .unwrap(); assert_eq!(deleted, 1); - assert_eq!(store.list(&a, TimeRange::default()).await.unwrap().len(), 0); - assert_eq!(store.list(&b, TimeRange::default()).await.unwrap().len(), 1); + assert_eq!( + store + .list_schema(&a, TimeRange::default()) + .await + .unwrap() + .len(), + 0 + ); + assert_eq!( + store + .list_schema(&b, TimeRange::default()) + .await + .unwrap() + .len(), + 1 + ); } #[tokio::test] @@ -838,19 +1016,19 @@ mod trait_tests { // put under three streams, with one stream getting two snapshots store - .put(&key("p", "billing"), &make_snap("h1", "billing")) + .put_schema(&key("p", "billing"), &make_snap("h1", "billing")) .await .unwrap(); store - .put(&key("p", "auth"), &make_snap("h2", "auth")) + .put_schema(&key("p", "auth"), &make_snap("h2", "auth")) .await .unwrap(); store - .put(&key("p", "auth"), &make_snap("h3", "auth")) + .put_schema(&key("p", "auth"), &make_snap("h3", "auth")) .await .unwrap(); store - .put(&key("other", "auth"), &make_snap("h4", "auth")) + .put_schema(&key("other", "auth"), &make_snap("h4", "auth")) .await .unwrap(); @@ -947,13 +1125,13 @@ mod trait_tests { let k = key("p", "auth"); let schema = make_snap("schema-h1", "auth"); - store.put(&k, &schema).await.unwrap(); + store.put_schema(&k, &schema).await.unwrap(); // Insert a newer planner_stats row referring to the schema. let planner = make_planner("schema-h1", "auth", "planner-h1"); store.put_planner_stats(&k, &planner).await.unwrap(); - let got = store.get(&k, SnapshotRef::Latest).await.unwrap(); + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(got.content_hash, "schema-h1"); } @@ -963,7 +1141,7 @@ mod trait_tests { let k = key("p", "auth"); let schema = make_snap("schema-h1", "auth"); - store.put(&k, &schema).await.unwrap(); + store.put_schema(&k, &schema).await.unwrap(); let planner = make_planner("schema-h1", "auth", "planner-h1"); store.put_planner_stats(&k, &planner).await.unwrap(); let primary = make_activity("schema-h1", "auth", "primary", "act-primary-1"); @@ -981,7 +1159,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); for label in ["primary", "replica1", "replica2"] { @@ -1002,13 +1180,19 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); - store.put(&k, &make_snap("schema-A", "auth")).await.unwrap(); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); let planner = make_planner("schema-A", "auth", "planner-A"); store.put_planner_stats(&k, &planner).await.unwrap(); // small sleep to ensure later timestamp ordering tokio::time::sleep(std::time::Duration::from_millis(5)).await; - store.put(&k, &make_snap("schema-B", "auth")).await.unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); let bundle = store.get_annotated(&k, SnapshotRef::Latest).await.unwrap(); assert_eq!(bundle.schema.content_hash, "schema-B"); @@ -1023,7 +1207,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); @@ -1037,7 +1221,7 @@ mod trait_tests { let (_dir, store) = temp_store(); let k = key("p", "auth"); store - .put(&k, &make_snap("schema-h1", "auth")) + .put_schema(&k, &make_snap("schema-h1", "auth")) .await .unwrap(); From 0be20b4a290b8f7d472c233b08ac99a0c3caba57 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Thu, 7 May 2026 23:09:41 +0200 Subject: [PATCH 05/12] test(history): cover dedup keys + kind-aware SnapshotStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For 2f9a353: prove planner/activity dedup splits on schema_ref so the same content_hash under a different schema ref doesn't collapse rows. For 1726fa1: cover the new trait surface — list_kinds reports distinct (kind, node_label) pairs, get(kind, ref) returns typed payloads, delete_before is scoped to one kind. FilesystemStore tests pin the schema-only behavior: planner/activity put/get return the unsupported error; list of non-schema kinds returns empty. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/history/filesystem_store.rs | 179 ++++++++++++++++ crates/dry_run_core/src/history/store.rs | 198 ++++++++++++++++++ 2 files changed, 377 insertions(+) diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index ca51415..ed84f8a 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -295,3 +295,182 @@ where .await .map_err(|e| Error::History(format!("blocking task failed: {e}")))? } + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::{ + IndexActivity, IndexActivityEntry, NodeIdentity, QualifiedName, TableActivity, + TableActivityEntry, + }; + use tempfile::TempDir; + + fn make_schema(hash: &str) -> SchemaSnapshot { + SchemaSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: "auth".into(), + timestamp: Utc::now(), + content_hash: hash.into(), + source: None, + tables: vec![], + enums: vec![], + domains: vec![], + composites: vec![], + views: vec![], + functions: vec![], + extensions: vec![], + gucs: vec![], + } + } + + fn make_planner(schema_ref: &str, hash: &str) -> crate::schema::PlannerStatsSnapshot { + crate::schema::PlannerStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: "auth".into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + tables: vec![], + columns: vec![], + indexes: vec![], + } + } + + fn make_activity( + schema_ref: &str, + label: &str, + hash: &str, + ) -> crate::schema::ActivityStatsSnapshot { + crate::schema::ActivityStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: "auth".into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + node: NodeIdentity { + label: label.into(), + host: format!("host-{label}"), + is_standby: label != "primary", + replication_lag_bytes: None, + stats_reset: None, + }, + tables: vec![TableActivityEntry { + table: QualifiedName::new("public", "orders"), + activity: TableActivity { + seq_scan: 1, + idx_scan: 2, + n_live_tup: 0, + n_dead_tup: 0, + last_vacuum: None, + last_autovacuum: None, + last_analyze: None, + last_autoanalyze: None, + vacuum_count: 0, + autovacuum_count: 0, + analyze_count: 0, + autoanalyze_count: 0, + }, + }], + indexes: vec![IndexActivityEntry { + index: QualifiedName::new("public", "orders_pkey"), + activity: IndexActivity { + idx_scan: 0, + idx_tup_read: 0, + idx_tup_fetch: 0, + }, + }], + } + } + + fn key() -> SnapshotKey { + SnapshotKey { + project_id: ProjectId("p".into()), + database_id: DatabaseId("auth".into()), + } + } + + fn temp_store() -> (TempDir, FilesystemStore) { + let dir = TempDir::new().unwrap(); + let store = FilesystemStore::new(dir.path().to_path_buf()); + (dir, store) + } + + #[tokio::test] + async fn put_schema_round_trips_via_trait() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("h1")).await.unwrap(); + + let got = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(got.content_hash, "h1"); + } + + #[tokio::test] + async fn put_schema_dedupes_on_same_content_hash() { + let (_dir, store) = temp_store(); + let k = key(); + let s = make_schema("h1"); + assert_eq!( + store.put_schema(&k, &s).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!(store.put_schema(&k, &s).await.unwrap(), PutOutcome::Deduped); + } + + #[tokio::test] + async fn put_planner_returns_unsupported_error() { + let (_dir, store) = temp_store(); + let k = key(); + let err = store + .put_planner_stats(&k, &make_planner("schema-h1", "p1")) + .await + .unwrap_err(); + assert!(format!("{err}").contains("only schema snapshots supported")); + } + + #[tokio::test] + async fn put_activity_returns_unsupported_error() { + let (_dir, store) = temp_store(); + let k = key(); + let err = store + .put_activity_stats(&k, &make_activity("schema-h1", "primary", "a1")) + .await + .unwrap_err(); + assert!(format!("{err}").contains("only schema snapshots supported")); + } + + #[tokio::test] + async fn get_planner_returns_unsupported_error() { + let (_dir, store) = temp_store(); + let k = key(); + let err = store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .unwrap_err(); + assert!(format!("{err}").contains("only schema snapshots supported")); + } + + #[tokio::test] + async fn list_returns_empty_for_non_schema_kinds() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("h1")).await.unwrap(); + + let planner = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert!(planner.is_empty()); + } + + #[tokio::test] + async fn list_kinds_returns_schema_only_when_files_present() { + let (_dir, store) = temp_store(); + let k = key(); + assert!(store.list_kinds(&k).await.unwrap().is_empty()); + + store.put_schema(&k, &make_schema("h1")).await.unwrap(); + let kinds = store.list_kinds(&k).await.unwrap(); + assert_eq!(kinds, vec![SnapshotKind::Schema]); + } +} diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index 740385d..c0f6146 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -1236,4 +1236,202 @@ mod trait_tests { let primary = bundle.activity_by_node.get("primary").unwrap(); assert_eq!(primary.content_hash, "act-2"); } + + // --- dedup correctness (commit 2f9a353) --- + + #[tokio::test] + async fn put_planner_dedupes_only_within_same_schema_ref() { + // Same content_hash under a different schema_ref must NOT collapse. + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); + + let p_a = make_planner("schema-A", "auth", "shared-hash"); + let p_b = make_planner("schema-B", "auth", "shared-hash"); + + assert_eq!( + store.put_planner_stats(&k, &p_a).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_planner_stats(&k, &p_a).await.unwrap(), + PutOutcome::Deduped + ); + assert_eq!( + store.put_planner_stats(&k, &p_b).await.unwrap(), + PutOutcome::Inserted + ); + } + + #[tokio::test] + async fn put_activity_dedupes_only_within_same_schema_ref_and_node() { + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-A", "auth")) + .await + .unwrap(); + store + .put_schema(&k, &make_snap("schema-B", "auth")) + .await + .unwrap(); + + let a_primary_a = make_activity("schema-A", "auth", "primary", "shared-hash"); + let a_primary_b = make_activity("schema-B", "auth", "primary", "shared-hash"); + let a_replica_a = make_activity("schema-A", "auth", "replica", "shared-hash"); + + assert_eq!( + store.put_activity_stats(&k, &a_primary_a).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_activity_stats(&k, &a_primary_a).await.unwrap(), + PutOutcome::Deduped + ); + // different schema_ref, same node + hash → insert + assert_eq!( + store.put_activity_stats(&k, &a_primary_b).await.unwrap(), + PutOutcome::Inserted + ); + // different node, same schema_ref + hash → insert + assert_eq!( + store.put_activity_stats(&k, &a_replica_a).await.unwrap(), + PutOutcome::Inserted + ); + } + + // --- kind-aware trait API (commit 1726fa1) --- + + #[tokio::test] + async fn list_kinds_reports_distinct_kinds_and_node_labels() { + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + assert!(store.list_kinds(&k).await.unwrap().is_empty()); + + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "primary", "act-p")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "replica1", "act-r")) + .await + .unwrap(); + + let kinds = store.list_kinds(&k).await.unwrap(); + assert!(kinds.contains(&SnapshotKind::Schema)); + assert!(kinds.contains(&SnapshotKind::Planner)); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "primary".into() + })); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "replica1".into() + })); + } + + #[tokio::test] + async fn get_via_trait_returns_typed_payload_per_kind() { + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("schema-h1", "auth", "primary", "act-1")) + .await + .unwrap(); + + let s = store + .get(&k, &SnapshotKind::Schema, SnapshotRef::Latest) + .await + .unwrap() + .into_schema() + .unwrap(); + assert_eq!(s.content_hash, "schema-h1"); + + let p = store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .unwrap() + .into_planner() + .unwrap(); + assert_eq!(p.content_hash, "planner-h1"); + + let a = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(a.content_hash, "act-1"); + } + + #[tokio::test] + async fn delete_before_scoped_to_kind_only() { + // delete_before for activity must not touch planner or schema rows. + use crate::history::SnapshotKind; + + let (_dir, store) = temp_store(); + let k = key("p", "auth"); + store + .put_schema(&k, &make_snap("schema-h1", "auth")) + .await + .unwrap(); + store + .put_planner_stats(&k, &make_planner("schema-h1", "auth", "planner-h1")) + .await + .unwrap(); + let mut a = make_activity("schema-h1", "auth", "primary", "act-1"); + a.timestamp = Utc::now() - Duration::hours(2); + store.put_activity_stats(&k, &a).await.unwrap(); + + let removed = store + .delete_before( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + Utc::now() - Duration::hours(1), + ) + .await + .unwrap(); + assert_eq!(removed, 1); + // schema and planner untouched + assert!(store.get_schema(&k, SnapshotRef::Latest).await.is_ok()); + assert!( + store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .is_ok() + ); + } } From ec85d3c25d952cc4dfe8294ca9d40d890da5bcc1 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 00:49:24 +0200 Subject: [PATCH 06/12] feat: bundle all 3 schemas together --- .../src/history/filesystem_store.rs | 505 ++++++++++++------ 1 file changed, 348 insertions(+), 157 deletions(-) diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index ed84f8a..590d5c7 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -1,8 +1,10 @@ +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use tracing::{debug, info}; use crate::error::{Error, Result}; @@ -10,9 +12,8 @@ use crate::history::{ DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, SnapshotSummary, StoredSnapshot, TimeRange, parse_snapshot_filename, snapshot_path, }; -use crate::schema::SchemaSnapshot; +use crate::schema::{ActivityStatsSnapshot, PlannerStatsSnapshot, SchemaSnapshot}; -// schema-only for now pub struct FilesystemStore { root: Arc, } @@ -29,23 +30,27 @@ impl FilesystemStore { } } -fn unsupported(kind: &SnapshotKind) -> Error { - Error::History(format!( - "FilesystemStore: only schema snapshots supported (got {})", - kind.db_kind() - )) +#[derive(Debug, Serialize, Deserialize)] +struct Bundle { + schema: SchemaSnapshot, + #[serde(default)] + planner: Option, + #[serde(default)] + activity: BTreeMap, } #[async_trait] impl SnapshotStore for FilesystemStore { async fn put(&self, key: &SnapshotKey, snap: &StoredSnapshot) -> Result { - let schema = match snap { - StoredSnapshot::Schema(s) => s.clone(), - other => return Err(unsupported(&other.kind())), - }; let root = self.root.clone(); let key = key.clone(); - run_blocking(move || put_schema(&root, &key, schema)).await + let snap = snap.clone(); + run_blocking(move || match snap { + StoredSnapshot::Schema(s) => put_schema(&root, &key, s), + StoredSnapshot::Planner(p) => put_planner(&root, &key, p), + StoredSnapshot::Activity(a) => put_activity(&root, &key, a), + }) + .await } async fn get( @@ -54,12 +59,10 @@ impl SnapshotStore for FilesystemStore { kind: &SnapshotKind, at: SnapshotRef, ) -> Result { - if !matches!(kind, SnapshotKind::Schema) { - return Err(unsupported(kind)); - } let root = self.root.clone(); let key = key.clone(); - run_blocking(move || get_schema(&root, &key, at).map(StoredSnapshot::Schema)).await + let kind = kind.clone(); + run_blocking(move || get_kind(&root, &key, &kind, at)).await } async fn list( @@ -68,12 +71,10 @@ impl SnapshotStore for FilesystemStore { kind: &SnapshotKind, range: TimeRange, ) -> Result> { - if !matches!(kind, SnapshotKind::Schema) { - return Ok(Vec::new()); - } let root = self.root.clone(); let key = key.clone(); - run_blocking(move || list_schema(&root, &key, range)).await + let kind = kind.clone(); + run_blocking(move || list_kind(&root, &key, &kind, range)).await } async fn latest( @@ -94,32 +95,22 @@ impl SnapshotStore for FilesystemStore { kind: &SnapshotKind, cutoff: DateTime, ) -> Result { - if !matches!(kind, SnapshotKind::Schema) { - return Err(unsupported(kind)); - } let root = self.root.clone(); let key = key.clone(); - run_blocking(move || delete_schema_before(&root, &key, cutoff)).await + let kind = kind.clone(); + run_blocking(move || delete_before(&root, &key, &kind, cutoff)).await } async fn list_kinds(&self, key: &SnapshotKey) -> Result> { let root = self.root.clone(); let key = key.clone(); - run_blocking(move || { - let entries = read_stream_entries(&stream_dir(&root, &key))?; - Ok(if entries.is_empty() { - Vec::new() - } else { - vec![SnapshotKind::Schema] - }) - }) - .await + run_blocking(move || list_kinds_sync(&root, &key)).await } } fn put_schema(root: &Path, key: &SnapshotKey, snap: SchemaSnapshot) -> Result { - let stream_dir = stream_dir(root, key); - if let Some(latest) = read_latest_hash(&stream_dir)? + let dir = stream_dir(root, key); + if let Some(latest) = read_latest_hash(&dir)? && latest == snap.content_hash { debug!(hash = %snap.content_hash, "schema unchanged, skipping put"); @@ -132,15 +123,12 @@ fn put_schema(root: &Path, key: &SnapshotKey, snap: SchemaSnapshot) -> Result Result Result { - let entries = read_stream_entries(&stream_dir(root, key))?; - let chosen = match &at { - SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), - SnapshotRef::At(target) => entries - .into_iter() - .filter(|(ts, _, _)| *ts <= *target) - .max_by_key(|(ts, _, _)| *ts), - SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), - }; +fn put_planner(root: &Path, key: &SnapshotKey, snap: PlannerStatsSnapshot) -> Result { + let dir = stream_dir(root, key); + let (path, mut bundle) = + find_bundle_by_schema_hash(&dir, &snap.schema_ref_hash)?.ok_or_else(|| { + Error::History(format!( + "FilesystemStore: planner orphan — no schema bundle for ref {}", + snap.schema_ref_hash + )) + })?; + + if let Some(existing) = &bundle.planner + && existing.content_hash == snap.content_hash + { + return Ok(PutOutcome::Deduped); + } + + bundle.planner = Some(snap); + write_bundle(&path, &bundle)?; + Ok(PutOutcome::Inserted) +} - let (_, _, path) = chosen.ok_or_else(|| { - let detail = match &at { - SnapshotRef::Latest => "latest".to_string(), - SnapshotRef::At(ts) => format!("at-or-before {ts}"), - SnapshotRef::Hash(h) => format!("hash {h}"), +fn put_activity(root: &Path, key: &SnapshotKey, snap: ActivityStatsSnapshot) -> Result { + let dir = stream_dir(root, key); + let (path, mut bundle) = + find_bundle_by_schema_hash(&dir, &snap.schema_ref_hash)?.ok_or_else(|| { + Error::History(format!( + "FilesystemStore: activity orphan — no schema bundle for ref {}", + snap.schema_ref_hash + )) + })?; + + let label = snap.node.label.clone(); + if let Some(existing) = bundle.activity.get(&label) + && existing.content_hash == snap.content_hash + { + return Ok(PutOutcome::Deduped); + } + + bundle.activity.insert(label, snap); + write_bundle(&path, &bundle)?; + Ok(PutOutcome::Inserted) +} + +fn get_kind( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + at: SnapshotRef, +) -> Result { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + + match kind { + SnapshotKind::Schema => { + let chosen = match &at { + SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts), + SnapshotRef::At(target) => entries + .into_iter() + .filter(|(ts, _, _)| *ts <= *target) + .max_by_key(|(ts, _, _)| *ts), + SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h), + }; + let (_, _, path) = chosen.ok_or_else(|| not_found_err("schema", &at))?; + let bundle = read_bundle(&path)?; + Ok(StoredSnapshot::Schema(bundle.schema)) + } + SnapshotKind::Planner => { + let mut bundles: Vec<(DateTime, Bundle)> = entries + .into_iter() + .filter_map(|(ts, _, p)| read_bundle(&p).ok().map(|b| (ts, b))) + .filter(|(_, b)| b.planner.is_some()) + .collect(); + bundles.sort_by_key(|(ts, _)| std::cmp::Reverse(*ts)); + let chosen = match &at { + SnapshotRef::Latest => bundles.into_iter().next(), + SnapshotRef::At(target) => bundles.into_iter().find(|(ts, _)| *ts <= *target), + SnapshotRef::Hash(h) => bundles + .into_iter() + .find(|(_, b)| b.planner.as_ref().map(|p| &p.content_hash) == Some(h)), + }; + let (_, bundle) = chosen.ok_or_else(|| not_found_err("planner", &at))?; + Ok(StoredSnapshot::Planner(bundle.planner.expect("filtered"))) + } + SnapshotKind::Activity { node_label } => { + let mut bundles: Vec<(DateTime, Bundle)> = entries + .into_iter() + .filter_map(|(ts, _, p)| read_bundle(&p).ok().map(|b| (ts, b))) + .filter(|(_, b)| b.activity.contains_key(node_label)) + .collect(); + bundles.sort_by_key(|(ts, _)| std::cmp::Reverse(*ts)); + let chosen = match &at { + SnapshotRef::Latest => bundles.into_iter().next(), + SnapshotRef::At(target) => bundles.into_iter().find(|(ts, _)| *ts <= *target), + SnapshotRef::Hash(h) => bundles + .into_iter() + .find(|(_, b)| b.activity.get(node_label).map(|a| &a.content_hash) == Some(h)), + }; + let (_, mut bundle) = chosen.ok_or_else(|| not_found_err("activity", &at))?; + let act = bundle.activity.remove(node_label).expect("filtered above"); + Ok(StoredSnapshot::Activity(act)) + } + } +} + +fn list_kind( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + range: TimeRange, +) -> Result> { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + + let mut out: Vec = Vec::new(); + for (_schema_ts, _schema_hash, path) in entries { + let bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, }; - Error::History(format!("snapshot not found ({detail})")) - })?; + if let Some(s) = bundle_summary_for_kind(&bundle, key, kind) { + if range.from.is_none_or(|f| s.timestamp >= f) + && range.to.is_none_or(|t| s.timestamp < t) + { + out.push(s); + } + } + } + out.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); + Ok(out) +} - let bytes = std::fs::read(&path) - .map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; - let json = zstd::decode_all(bytes.as_slice()) - .map_err(|e| Error::History(format!("zstd decode: {e}")))?; - serde_json::from_slice(&json).map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) +fn delete_before( + root: &Path, + key: &SnapshotKey, + kind: &SnapshotKind, + cutoff: DateTime, +) -> Result { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + let mut affected = 0usize; + + match kind { + SnapshotKind::Schema => { + for (_ts, _h, path) in entries { + let bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, + }; + if bundle.schema.timestamp < cutoff { + std::fs::remove_file(&path) + .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; + affected += 1; + } + } + } + SnapshotKind::Planner => { + for (_ts, _h, path) in entries { + let mut bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, + }; + let drop = bundle + .planner + .as_ref() + .is_some_and(|p| p.timestamp < cutoff); + if drop { + bundle.planner = None; + write_bundle(&path, &bundle)?; + affected += 1; + } + } + } + SnapshotKind::Activity { node_label } => { + for (_ts, _h, path) in entries { + let mut bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, + }; + let drop = bundle + .activity + .get(node_label) + .is_some_and(|a| a.timestamp < cutoff); + if drop { + bundle.activity.remove(node_label); + write_bundle(&path, &bundle)?; + affected += 1; + } + } + } + } + Ok(affected) } -fn list_schema(root: &Path, key: &SnapshotKey, range: TimeRange) -> Result> { - let entries = read_stream_entries(&stream_dir(root, key))?; - let mut summaries: Vec = entries - .into_iter() - .filter(|(ts, _, _)| { - range.from.is_none_or(|from| *ts >= from) && range.to.is_none_or(|to| *ts < to) - }) - .map(|(ts, hash, _)| SnapshotSummary { +fn list_kinds_sync(root: &Path, key: &SnapshotKey) -> Result> { + let dir = stream_dir(root, key); + let entries = read_stream_entries(&dir)?; + if entries.is_empty() { + return Ok(Vec::new()); + } + + let mut has_schema = false; + let mut has_planner = false; + let mut activity_labels: std::collections::BTreeSet = Default::default(); + + for (_ts, _h, path) in entries { + let bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, + }; + has_schema = true; + if bundle.planner.is_some() { + has_planner = true; + } + for label in bundle.activity.keys() { + activity_labels.insert(label.clone()); + } + } + + let mut out = Vec::new(); + if has_schema { + out.push(SnapshotKind::Schema); + } + if has_planner { + out.push(SnapshotKind::Planner); + } + for label in activity_labels { + out.push(SnapshotKind::Activity { node_label: label }); + } + Ok(out) +} + +fn bundle_summary_for_kind( + bundle: &Bundle, + key: &SnapshotKey, + kind: &SnapshotKind, +) -> Option { + let project = Some(key.project_id.0.clone()); + let database = Some(key.database_id.0.clone()); + let db_name = key.database_id.0.clone(); + match kind { + SnapshotKind::Schema => Some(SnapshotSummary { id: 0, kind: SnapshotKind::Schema, - timestamp: ts, - content_hash: hash, + timestamp: bundle.schema.timestamp, + content_hash: bundle.schema.content_hash.clone(), schema_ref_hash: None, - database: key.database_id.0.clone(), - project_id: Some(key.project_id.0.clone()), - database_id: Some(key.database_id.0.clone()), - }) - .collect(); - summaries.sort_by_key(|s| std::cmp::Reverse(s.timestamp)); - Ok(summaries) + database: db_name, + project_id: project, + database_id: database, + }), + SnapshotKind::Planner => bundle.planner.as_ref().map(|p| SnapshotSummary { + id: 0, + kind: SnapshotKind::Planner, + timestamp: p.timestamp, + content_hash: p.content_hash.clone(), + schema_ref_hash: Some(bundle.schema.content_hash.clone()), + database: db_name, + project_id: project, + database_id: database, + }), + SnapshotKind::Activity { node_label } => { + bundle.activity.get(node_label).map(|a| SnapshotSummary { + id: 0, + kind: SnapshotKind::Activity { + node_label: node_label.clone(), + }, + timestamp: a.timestamp, + content_hash: a.content_hash.clone(), + schema_ref_hash: Some(bundle.schema.content_hash.clone()), + database: db_name, + project_id: project, + database_id: database, + }) + } + } } -fn delete_schema_before(root: &Path, key: &SnapshotKey, cutoff: DateTime) -> Result { - let entries = read_stream_entries(&stream_dir(root, key))?; - let mut deleted = 0usize; - for (ts, _, path) in entries { - if ts < cutoff { - std::fs::remove_file(&path) - .map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?; - deleted += 1; +fn find_bundle_by_schema_hash(dir: &Path, schema_hash: &str) -> Result> { + for (_, _, path) in read_stream_entries(dir)? { + let bundle = match read_bundle(&path) { + Ok(b) => b, + Err(_) => continue, + }; + if bundle.schema.content_hash == schema_hash { + return Ok(Some((path, bundle))); } } - Ok(deleted) + Ok(None) +} + +fn read_bundle(path: &Path) -> Result { + let bytes = + std::fs::read(path).map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; + let json = zstd::decode_all(bytes.as_slice()) + .map_err(|e| Error::History(format!("zstd decode: {e}")))?; + serde_json::from_slice(&json).map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) +} + +fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| Error::History(format!("create_dir_all {}: {e}", parent.display())))?; + } + let tmp = path.with_extension("zst.tmp"); + let json = serde_json::to_vec(bundle) + .map_err(|e| Error::History(format!("cannot serialize bundle: {e}")))?; + let compressed = zstd::encode_all(json.as_slice(), 3) + .map_err(|e| Error::History(format!("zstd encode: {e}")))?; + std::fs::write(&tmp, compressed) + .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; + std::fs::rename(&tmp, path) + .map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?; + Ok(()) +} + +fn not_found_err(kind: &str, at: &SnapshotRef) -> Error { + let detail = match at { + SnapshotRef::Latest => "latest".to_string(), + SnapshotRef::At(ts) => format!("at-or-before {ts}"), + SnapshotRef::Hash(h) => format!("hash {h}"), + }; + Error::History(format!("{kind} snapshot not found ({detail})")) } fn stream_dir(root: &Path, key: &SnapshotKey) -> PathBuf { @@ -323,8 +573,9 @@ mod tests { } } - fn make_planner(schema_ref: &str, hash: &str) -> crate::schema::PlannerStatsSnapshot { - crate::schema::PlannerStatsSnapshot { + #[allow(dead_code)] + fn make_planner(schema_ref: &str, hash: &str) -> PlannerStatsSnapshot { + PlannerStatsSnapshot { pg_version: "PostgreSQL 17.0".into(), database: "auth".into(), timestamp: Utc::now(), @@ -336,12 +587,9 @@ mod tests { } } - fn make_activity( - schema_ref: &str, - label: &str, - hash: &str, - ) -> crate::schema::ActivityStatsSnapshot { - crate::schema::ActivityStatsSnapshot { + #[allow(dead_code)] + fn make_activity(schema_ref: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { + ActivityStatsSnapshot { pg_version: "PostgreSQL 17.0".into(), database: "auth".into(), timestamp: Utc::now(), @@ -416,61 +664,4 @@ mod tests { ); assert_eq!(store.put_schema(&k, &s).await.unwrap(), PutOutcome::Deduped); } - - #[tokio::test] - async fn put_planner_returns_unsupported_error() { - let (_dir, store) = temp_store(); - let k = key(); - let err = store - .put_planner_stats(&k, &make_planner("schema-h1", "p1")) - .await - .unwrap_err(); - assert!(format!("{err}").contains("only schema snapshots supported")); - } - - #[tokio::test] - async fn put_activity_returns_unsupported_error() { - let (_dir, store) = temp_store(); - let k = key(); - let err = store - .put_activity_stats(&k, &make_activity("schema-h1", "primary", "a1")) - .await - .unwrap_err(); - assert!(format!("{err}").contains("only schema snapshots supported")); - } - - #[tokio::test] - async fn get_planner_returns_unsupported_error() { - let (_dir, store) = temp_store(); - let k = key(); - let err = store - .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) - .await - .unwrap_err(); - assert!(format!("{err}").contains("only schema snapshots supported")); - } - - #[tokio::test] - async fn list_returns_empty_for_non_schema_kinds() { - let (_dir, store) = temp_store(); - let k = key(); - store.put_schema(&k, &make_schema("h1")).await.unwrap(); - - let planner = store - .list(&k, &SnapshotKind::Planner, TimeRange::default()) - .await - .unwrap(); - assert!(planner.is_empty()); - } - - #[tokio::test] - async fn list_kinds_returns_schema_only_when_files_present() { - let (_dir, store) = temp_store(); - let k = key(); - assert!(store.list_kinds(&k).await.unwrap().is_empty()); - - store.put_schema(&k, &make_schema("h1")).await.unwrap(); - let kinds = store.list_kinds(&k).await.unwrap(); - assert_eq!(kinds, vec![SnapshotKind::Schema]); - } } From 41532bfa6e9479c78773ae6728b3b56af24f8c9c Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 00:53:28 +0200 Subject: [PATCH 07/12] test(fs): cover bundled snapshot store Eight tests against the bundled FilesystemStore: round-trip of all three kinds, planner orphan rejection, planner/activity dedup, per-node activity upsert, kind-filtered list, list_kinds enumerating node labels, and delete_before semantics for planner (clear field, keep schema) vs schema (remove bundle). --- .../src/history/filesystem_store.rs | 235 +++++++++++++++++- 1 file changed, 233 insertions(+), 2 deletions(-) diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index 590d5c7..f7b89ed 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -573,7 +573,6 @@ mod tests { } } - #[allow(dead_code)] fn make_planner(schema_ref: &str, hash: &str) -> PlannerStatsSnapshot { PlannerStatsSnapshot { pg_version: "PostgreSQL 17.0".into(), @@ -587,7 +586,6 @@ mod tests { } } - #[allow(dead_code)] fn make_activity(schema_ref: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { ActivityStatsSnapshot { pg_version: "PostgreSQL 17.0".into(), @@ -664,4 +662,237 @@ mod tests { ); assert_eq!(store.put_schema(&k, &s).await.unwrap(), PutOutcome::Deduped); } + + #[tokio::test] + async fn bundle_round_trips_all_three_kinds() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "ah")) + .await + .unwrap(); + + let s = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(s.content_hash, "sh"); + + let p = store + .get(&k, &SnapshotKind::Planner, SnapshotRef::Latest) + .await + .unwrap() + .into_planner() + .unwrap(); + assert_eq!(p.content_hash, "ph"); + + let a = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(a.content_hash, "ah"); + assert_eq!(a.node.label, "primary"); + } + + #[tokio::test] + async fn put_planner_without_schema_errors() { + let (_dir, store) = temp_store(); + let k = key(); + let err = store + .put_planner_stats(&k, &make_planner("missing", "ph")) + .await + .unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("orphan"), "expected orphan error, got: {msg}"); + } + + #[tokio::test] + async fn put_planner_dedupes_on_same_content_hash() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + let p = make_planner("sh", "ph"); + assert_eq!( + store.put_planner_stats(&k, &p).await.unwrap(), + PutOutcome::Inserted + ); + assert_eq!( + store.put_planner_stats(&k, &p).await.unwrap(), + PutOutcome::Deduped + ); + } + + #[tokio::test] + async fn put_activity_upserts_per_node_label() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "standby", "b1")) + .await + .unwrap(); + // overwrite primary + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a2")) + .await + .unwrap(); + + let primary = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "primary".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(primary.content_hash, "a2"); + + let standby = store + .get( + &k, + &SnapshotKind::Activity { + node_label: "standby".into(), + }, + SnapshotRef::Latest, + ) + .await + .unwrap() + .into_activity() + .unwrap(); + assert_eq!(standby.content_hash, "b1"); + } + + #[tokio::test] + async fn list_planner_returns_only_bundles_with_planner() { + let (_dir, store) = temp_store(); + let k = key(); + // bundle #1: schema + planner + store.put_schema(&k, &make_schema("sh1")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh1", "ph1")) + .await + .unwrap(); + // bundle #2: schema only + store.put_schema(&k, &make_schema("sh2")).await.unwrap(); + + let schemas = store + .list(&k, &SnapshotKind::Schema, TimeRange::default()) + .await + .unwrap(); + assert_eq!(schemas.len(), 2); + + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert_eq!(planners.len(), 1); + assert_eq!(planners[0].content_hash, "ph1"); + assert_eq!(planners[0].schema_ref_hash.as_deref(), Some("sh1")); + } + + #[tokio::test] + async fn list_kinds_reports_distinct_node_labels() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "primary", "a1")) + .await + .unwrap(); + store + .put_activity_stats(&k, &make_activity("sh", "standby", "b1")) + .await + .unwrap(); + + let kinds = store.list_kinds(&k).await.unwrap(); + assert!(kinds.contains(&SnapshotKind::Schema)); + assert!(kinds.contains(&SnapshotKind::Planner)); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "primary".into() + })); + assert!(kinds.contains(&SnapshotKind::Activity { + node_label: "standby".into() + })); + assert_eq!(kinds.len(), 4); + } + + #[tokio::test] + async fn delete_before_planner_clears_field_keeps_schema() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + + let cutoff = Utc::now() + chrono::Duration::seconds(60); + let removed = store + .delete_before(&k, &SnapshotKind::Planner, cutoff) + .await + .unwrap(); + assert_eq!(removed, 1); + + // schema still there + let s = store.get_schema(&k, SnapshotRef::Latest).await.unwrap(); + assert_eq!(s.content_hash, "sh"); + + // planner gone + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert!(planners.is_empty()); + } + + #[tokio::test] + async fn delete_before_schema_removes_whole_bundle() { + let (_dir, store) = temp_store(); + let k = key(); + store.put_schema(&k, &make_schema("sh")).await.unwrap(); + store + .put_planner_stats(&k, &make_planner("sh", "ph")) + .await + .unwrap(); + + let cutoff = Utc::now() + chrono::Duration::seconds(60); + let removed = store + .delete_before(&k, &SnapshotKind::Schema, cutoff) + .await + .unwrap(); + assert_eq!(removed, 1); + + let schemas = store + .list(&k, &SnapshotKind::Schema, TimeRange::default()) + .await + .unwrap(); + assert!(schemas.is_empty()); + let planners = store + .list(&k, &SnapshotKind::Planner, TimeRange::default()) + .await + .unwrap(); + assert!(planners.is_empty()); + } } From d36aa91f4497621041cb94f4e8eb4ebe22b5fabe Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 02:32:19 +0200 Subject: [PATCH 08/12] feat: CLI snapshot push/pull --- crates/dry_run_cli/src/main.rs | 171 ++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 1 deletion(-) diff --git a/crates/dry_run_cli/src/main.rs b/crates/dry_run_cli/src/main.rs index 4e15343..1686a5c 100644 --- a/crates/dry_run_cli/src/main.rs +++ b/crates/dry_run_cli/src/main.rs @@ -5,7 +5,8 @@ use std::path::PathBuf; use clap::{Parser, Subcommand}; use dry_run_core::history::{ - DatabaseId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange, + DatabaseId, FilesystemStore, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, + TimeRange, }; use dry_run_core::{DryRun, HistoryStore, ProjectConfig}; use rmcp::ServiceExt; @@ -140,6 +141,26 @@ enum SnapshotAction { #[arg(long)] history_db: Option, }, + Push { + #[arg(long)] + to_path: PathBuf, + #[arg(long)] + all: bool, + #[arg(long, env = "DATABASE_URL")] + db: Option, + #[arg(long)] + history_db: Option, + }, + Pull { + #[arg(long)] + from_path: PathBuf, + #[arg(long)] + all: bool, + #[arg(long, env = "DATABASE_URL")] + db: Option, + #[arg(long)] + history_db: Option, + }, } #[derive(Subcommand)] @@ -668,6 +689,52 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()> println!("{json}"); Ok(()) } + SnapshotAction::Push { + to_path, + all, + db, + history_db, + } => { + let store = open_history_store(history_db.as_deref())?; + let fs = FilesystemStore::new(to_path.clone()); + + let keys = if *all { + store.list_keys()? + } else { + vec![resolve_read_key(db.as_deref(), profile).await?] + }; + if keys.is_empty() { + println!("No snapshots in history.db to push."); + return Ok(()); + } + + let outcomes = sync_keys(&store, &fs, &keys).await?; + print_sync_outcomes("push", &outcomes, to_path); + Ok(()) + } + SnapshotAction::Pull { + from_path, + all, + db, + history_db, + } => { + let fs = FilesystemStore::new(from_path.clone()); + let store = open_history_store(history_db.as_deref())?; + + let keys = if *all { + fs.list_keys()? + } else { + vec![resolve_read_key(db.as_deref(), profile).await?] + }; + if keys.is_empty() { + println!("No snapshots at {} to pull.", from_path.display()); + return Ok(()); + } + + let outcomes = sync_keys(&fs, &store, &keys).await?; + print_sync_outcomes("pull", &outcomes, from_path); + Ok(()) + } SnapshotAction::Export { out, history_db } => { let store = open_history_store(history_db.as_deref())?; let out_root = out.clone().unwrap_or_else(|| { @@ -840,6 +907,108 @@ async fn cmd_drift( Ok(()) } +#[derive(Debug, Default)] +struct KindCount { + copied: usize, + up_to_date: usize, +} + +#[derive(Debug)] +struct SyncOutcome { + key: SnapshotKey, + schema: KindCount, + planner: KindCount, + activity: KindCount, +} + +fn kind_order(k: &SnapshotKind) -> u8 { + match k { + SnapshotKind::Schema => 0, + SnapshotKind::Planner => 1, + SnapshotKind::Activity { .. } => 2, + } +} + +async fn sync_keys( + src: &dyn SnapshotStore, + dst: &dyn SnapshotStore, + keys: &[SnapshotKey], +) -> anyhow::Result> { + let mut outcomes = Vec::with_capacity(keys.len()); + for key in keys { + let mut outcome = SyncOutcome { + key: key.clone(), + schema: KindCount::default(), + planner: KindCount::default(), + activity: KindCount::default(), + }; + + let mut kinds = src.list_kinds(key).await?; + // schema first so FilesystemStore's orphan rule is satisfied + kinds.sort_by_key(kind_order); + + for kind in &kinds { + let src_summaries = src.list(key, kind, TimeRange::default()).await?; + let dst_hashes: std::collections::HashSet = dst + .list(key, kind, TimeRange::default()) + .await? + .into_iter() + .map(|s| s.content_hash) + .collect(); + + let counter = match kind { + SnapshotKind::Schema => &mut outcome.schema, + SnapshotKind::Planner => &mut outcome.planner, + SnapshotKind::Activity { .. } => &mut outcome.activity, + }; + + for s in src_summaries { + if dst_hashes.contains(&s.content_hash) { + counter.up_to_date += 1; + continue; + } + let stored = src + .get(key, kind, SnapshotRef::Hash(s.content_hash.clone())) + .await?; + match dst.put(key, &stored).await? { + PutOutcome::Inserted => counter.copied += 1, + PutOutcome::Deduped => counter.up_to_date += 1, + } + } + } + + outcomes.push(outcome); + } + Ok(outcomes) +} + +fn print_sync_outcomes(verb: &str, outcomes: &[SyncOutcome], path: &std::path::Path) { + let mut total = (0usize, 0usize, 0usize, 0usize); + for o in outcomes { + println!( + " project={} database={}: {} schema, {} planner, {} activity copied ({} up-to-date)", + o.key.project_id.0, + o.key.database_id.0, + o.schema.copied, + o.planner.copied, + o.activity.copied, + o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date, + ); + total.0 += o.schema.copied; + total.1 += o.planner.copied; + total.2 += o.activity.copied; + total.3 += o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date; + } + println!( + "{verb}: {} schema, {} planner, {} activity copied / {} up-to-date ({})", + total.0, + total.1, + total.2, + total.3, + path.display(), + ); +} + // helpers fn require_db_url(db: Option<&str>) -> anyhow::Result<&str> { From 2c1aa695c1497ec8d786379936e92aaafccdeaae Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 02:45:57 +0200 Subject: [PATCH 09/12] fix: transparent handling of original snapshots --- .../dry_run_core/src/history/filesystem_store.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index f7b89ed..69ff5e4 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -434,7 +434,19 @@ fn read_bundle(path: &Path) -> Result { std::fs::read(path).map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; let json = zstd::decode_all(bytes.as_slice()) .map_err(|e| Error::History(format!("zstd decode: {e}")))?; - serde_json::from_slice(&json).map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}"))) + if let Ok(b) = serde_json::from_slice::(&json) { + return Ok(b); + } + + // handle original base snapshot + // TODO: remove in about month time + let schema: SchemaSnapshot = serde_json::from_slice(&json) + .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}")))?; + Ok(Bundle { + schema, + planner: None, + activity: BTreeMap::new(), + }) } fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> { From 0c0638be7484493382e124c3856e52cdd597b8d1 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 11:54:38 +0200 Subject: [PATCH 10/12] chore: deduplicate snapshot store logic --- .../src/history/filesystem_store.rs | 94 +----------------- crates/dry_run_core/src/history/mod.rs | 2 + .../src/history/snapshot_store.rs | 8 +- crates/dry_run_core/src/history/store.rs | 99 +------------------ .../dry_run_core/src/history/test_fixtures.rs | 92 +++++++++++++++++ 5 files changed, 107 insertions(+), 188 deletions(-) create mode 100644 crates/dry_run_core/src/history/test_fixtures.rs diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index 69ff5e4..fa0e3f6 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -77,18 +77,6 @@ impl SnapshotStore for FilesystemStore { run_blocking(move || list_kind(&root, &key, &kind, range)).await } - async fn latest( - &self, - key: &SnapshotKey, - kind: &SnapshotKind, - ) -> Result> { - Ok(self - .list(key, kind, TimeRange::default()) - .await? - .into_iter() - .next()) - } - async fn delete_before( &self, key: &SnapshotKey, @@ -118,11 +106,6 @@ fn put_schema(root: &Path, key: &SnapshotKey, snap: SchemaSnapshot) -> Result SchemaSnapshot { - SchemaSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: "auth".into(), - timestamp: Utc::now(), - content_hash: hash.into(), - source: None, - tables: vec![], - enums: vec![], - domains: vec![], - composites: vec![], - views: vec![], - functions: vec![], - extensions: vec![], - gucs: vec![], - } + test_fixtures::make_snap(hash, "auth") } fn make_planner(schema_ref: &str, hash: &str) -> PlannerStatsSnapshot { - PlannerStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: "auth".into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - tables: vec![], - columns: vec![], - indexes: vec![], - } + test_fixtures::make_planner(schema_ref, "auth", hash) } fn make_activity(schema_ref: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { - ActivityStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: "auth".into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - node: NodeIdentity { - label: label.into(), - host: format!("host-{label}"), - is_standby: label != "primary", - replication_lag_bytes: None, - stats_reset: None, - }, - tables: vec![TableActivityEntry { - table: QualifiedName::new("public", "orders"), - activity: TableActivity { - seq_scan: 1, - idx_scan: 2, - n_live_tup: 0, - n_dead_tup: 0, - last_vacuum: None, - last_autovacuum: None, - last_analyze: None, - last_autoanalyze: None, - vacuum_count: 0, - autovacuum_count: 0, - analyze_count: 0, - autoanalyze_count: 0, - }, - }], - indexes: vec![IndexActivityEntry { - index: QualifiedName::new("public", "orders_pkey"), - activity: IndexActivity { - idx_scan: 0, - idx_tup_read: 0, - idx_tup_fetch: 0, - }, - }], - } + test_fixtures::make_activity(schema_ref, "auth", label, hash) } fn key() -> SnapshotKey { - SnapshotKey { - project_id: ProjectId("p".into()), - database_id: DatabaseId("auth".into()), - } + test_fixtures::key("p", "auth") } fn temp_store() -> (TempDir, FilesystemStore) { diff --git a/crates/dry_run_core/src/history/mod.rs b/crates/dry_run_core/src/history/mod.rs index 2a747a6..78156b0 100644 --- a/crates/dry_run_core/src/history/mod.rs +++ b/crates/dry_run_core/src/history/mod.rs @@ -2,6 +2,8 @@ pub mod filesystem_layout; mod filesystem_store; mod snapshot_store; mod store; +#[cfg(test)] +mod test_fixtures; pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path}; pub use filesystem_store::FilesystemStore; diff --git a/crates/dry_run_core/src/history/snapshot_store.rs b/crates/dry_run_core/src/history/snapshot_store.rs index 7366dff..a102ab0 100644 --- a/crates/dry_run_core/src/history/snapshot_store.rs +++ b/crates/dry_run_core/src/history/snapshot_store.rs @@ -170,7 +170,13 @@ pub trait SnapshotStore: Send + Sync { &self, key: &SnapshotKey, kind: &SnapshotKind, - ) -> Result>; + ) -> Result> { + Ok(self + .list(key, kind, TimeRange::default()) + .await? + .into_iter() + .next()) + } async fn delete_before( &self, key: &SnapshotKey, diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index c0f6146..8957ac9 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -365,18 +365,6 @@ impl SnapshotStore for HistoryStore { .await } - async fn latest( - &self, - key: &SnapshotKey, - kind: &SnapshotKind, - ) -> Result> { - Ok(self - .list(key, kind, TimeRange::default()) - .await? - .into_iter() - .next()) - } - async fn delete_before( &self, key: &SnapshotKey, @@ -695,32 +683,7 @@ mod trait_tests { use tempfile::TempDir; use super::*; - use crate::history::snapshot_store::{DatabaseId, ProjectId}; - - fn make_snap(hash: &str, database: &str) -> SchemaSnapshot { - SchemaSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: database.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - source: None, - tables: vec![], - enums: vec![], - domains: vec![], - composites: vec![], - views: vec![], - functions: vec![], - extensions: vec![], - gucs: vec![], - } - } - - fn key(proj: &str, db: &str) -> SnapshotKey { - SnapshotKey { - project_id: ProjectId(proj.into()), - database_id: DatabaseId(db.into()), - } - } + use crate::history::test_fixtures::{key, make_activity, make_planner, make_snap}; fn temp_store() -> (TempDir, HistoryStore) { let dir = TempDir::new().unwrap(); @@ -1058,66 +1021,6 @@ mod trait_tests { ); } - use crate::schema::{ - ActivityStatsSnapshot, IndexActivity, IndexActivityEntry, NodeIdentity, - PlannerStatsSnapshot, QualifiedName, TableActivity, TableActivityEntry, - }; - - fn make_planner(schema_ref: &str, db: &str, hash: &str) -> PlannerStatsSnapshot { - PlannerStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: db.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - tables: vec![], - columns: vec![], - indexes: vec![], - } - } - - fn make_activity(schema_ref: &str, db: &str, label: &str, hash: &str) -> ActivityStatsSnapshot { - ActivityStatsSnapshot { - pg_version: "PostgreSQL 17.0".into(), - database: db.into(), - timestamp: Utc::now(), - content_hash: hash.into(), - schema_ref_hash: schema_ref.into(), - node: NodeIdentity { - label: label.into(), - host: format!("host-{label}"), - is_standby: label != "primary", - replication_lag_bytes: None, - stats_reset: None, - }, - tables: vec![TableActivityEntry { - table: QualifiedName::new("public", "orders"), - activity: TableActivity { - seq_scan: 1, - idx_scan: 2, - n_live_tup: 0, - n_dead_tup: 0, - last_vacuum: None, - last_autovacuum: None, - last_analyze: None, - last_autoanalyze: None, - vacuum_count: 0, - autovacuum_count: 0, - analyze_count: 0, - autoanalyze_count: 0, - }, - }], - indexes: vec![IndexActivityEntry { - index: QualifiedName::new("public", "orders_pkey"), - activity: IndexActivity { - idx_scan: 0, - idx_tup_read: 0, - idx_tup_fetch: 0, - }, - }], - } - } - #[tokio::test] async fn snapshot_get_filters_to_kind_schema() { // Regression: planner_stats rows must not bleed into SnapshotStore::get(Latest). diff --git a/crates/dry_run_core/src/history/test_fixtures.rs b/crates/dry_run_core/src/history/test_fixtures.rs new file mode 100644 index 0000000..d9e8c8b --- /dev/null +++ b/crates/dry_run_core/src/history/test_fixtures.rs @@ -0,0 +1,92 @@ +use chrono::Utc; + +use crate::history::{DatabaseId, ProjectId, SnapshotKey}; +use crate::schema::{ + ActivityStatsSnapshot, IndexActivity, IndexActivityEntry, NodeIdentity, PlannerStatsSnapshot, + QualifiedName, SchemaSnapshot, TableActivity, TableActivityEntry, +}; + +pub(super) fn make_snap(hash: &str, database: &str) -> SchemaSnapshot { + SchemaSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: database.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + source: None, + tables: vec![], + enums: vec![], + domains: vec![], + composites: vec![], + views: vec![], + functions: vec![], + extensions: vec![], + gucs: vec![], + } +} + +pub(super) fn make_planner(schema_ref: &str, db: &str, hash: &str) -> PlannerStatsSnapshot { + PlannerStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: db.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + tables: vec![], + columns: vec![], + indexes: vec![], + } +} + +pub(super) fn make_activity( + schema_ref: &str, + db: &str, + label: &str, + hash: &str, +) -> ActivityStatsSnapshot { + ActivityStatsSnapshot { + pg_version: "PostgreSQL 17.0".into(), + database: db.into(), + timestamp: Utc::now(), + content_hash: hash.into(), + schema_ref_hash: schema_ref.into(), + node: NodeIdentity { + label: label.into(), + host: format!("host-{label}"), + is_standby: label != "primary", + replication_lag_bytes: None, + stats_reset: None, + }, + tables: vec![TableActivityEntry { + table: QualifiedName::new("public", "orders"), + activity: TableActivity { + seq_scan: 1, + idx_scan: 2, + n_live_tup: 0, + n_dead_tup: 0, + last_vacuum: None, + last_autovacuum: None, + last_analyze: None, + last_autoanalyze: None, + vacuum_count: 0, + autovacuum_count: 0, + analyze_count: 0, + autoanalyze_count: 0, + }, + }], + indexes: vec![IndexActivityEntry { + index: QualifiedName::new("public", "orders_pkey"), + activity: IndexActivity { + idx_scan: 0, + idx_tup_read: 0, + idx_tup_fetch: 0, + }, + }], + } +} + +pub(super) fn key(proj: &str, db: &str) -> SnapshotKey { + SnapshotKey { + project_id: ProjectId(proj.into()), + database_id: DatabaseId(db.into()), + } +} From 54baf563bd2be2720ee36b7e3d5ea73b62882652 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 12:14:47 +0200 Subject: [PATCH 11/12] chore: more deduplication --- crates/dry_run_core/src/history/store.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/dry_run_core/src/history/store.rs b/crates/dry_run_core/src/history/store.rs index 8957ac9..0b1351e 100644 --- a/crates/dry_run_core/src/history/store.rs +++ b/crates/dry_run_core/src/history/store.rs @@ -257,6 +257,17 @@ fn lock_conn(conn: &Mutex) -> Result>, + kind: &SnapshotKind, +) { + if let SnapshotKind::Activity { node_label } = kind { + *sql += &format!(" AND node_label = ?{}", bound.len() + 1); + bound.push(Box::new(node_label.clone())); + } +} + fn row_to_summary( row: &rusqlite::Row<'_>, kind: SnapshotKind, @@ -339,10 +350,7 @@ impl SnapshotStore for HistoryStore { ); let mut bound: Vec> = vec![Box::new(pid), Box::new(did), Box::new(kind.db_kind())]; - if let SnapshotKind::Activity { node_label } = &kind { - sql += &format!(" AND node_label = ?{}", bound.len() + 1); - bound.push(Box::new(node_label.clone())); - } + push_node_label_filter(&mut sql, &mut bound, &kind); if let Some(from) = range.from { sql += &format!(" AND timestamp >= ?{}", bound.len() + 1); bound.push(Box::new(from.to_rfc3339())); @@ -386,10 +394,7 @@ impl SnapshotStore for HistoryStore { Box::new(kind.db_kind()), Box::new(cutoff.to_rfc3339()), ]; - if let SnapshotKind::Activity { node_label } = &kind { - sql += &format!(" AND node_label = ?{}", bound.len() + 1); - bound.push(Box::new(node_label.clone())); - } + push_node_label_filter(&mut sql, &mut bound, &kind); let params: Vec<&dyn rusqlite::ToSql> = bound.iter().map(|b| b.as_ref()).collect(); Ok(conn.execute(&sql, params.as_slice())?) }) From 262248dd57f904672f0da98819837d2be5adc454 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Fri, 8 May 2026 14:49:00 +0200 Subject: [PATCH 12/12] fix: very snapshot hash & unique tmp destinations to avoid conflicts --- .../src/history/filesystem_store.rs | 103 ++++++++++++++---- 1 file changed, 84 insertions(+), 19 deletions(-) diff --git a/crates/dry_run_core/src/history/filesystem_store.rs b/crates/dry_run_core/src/history/filesystem_store.rs index fa0e3f6..f80ea47 100644 --- a/crates/dry_run_core/src/history/filesystem_store.rs +++ b/crates/dry_run_core/src/history/filesystem_store.rs @@ -12,7 +12,9 @@ use crate::history::{ DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore, SnapshotSummary, StoredSnapshot, TimeRange, parse_snapshot_filename, snapshot_path, }; -use crate::schema::{ActivityStatsSnapshot, PlannerStatsSnapshot, SchemaSnapshot}; +use crate::schema::{ + ActivityStatsSnapshot, HashInput, PlannerStatsSnapshot, SchemaSnapshot, compute_content_hash, +}; pub struct FilesystemStore { root: Arc, @@ -415,21 +417,72 @@ fn find_bundle_by_schema_hash(dir: &Path, schema_hash: &str) -> Result Result { let bytes = std::fs::read(path).map_err(|e| Error::History(format!("read {}: {e}", path.display())))?; - let json = zstd::decode_all(bytes.as_slice()) - .map_err(|e| Error::History(format!("zstd decode: {e}")))?; - if let Ok(b) = serde_json::from_slice::(&json) { - return Ok(b); - } - - // handle original base snapshot - // TODO: remove in about month time - let schema: SchemaSnapshot = serde_json::from_slice(&json) - .map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}")))?; - Ok(Bundle { - schema, - planner: None, - activity: BTreeMap::new(), - }) + let json = zstd::decode_all(bytes.as_slice()).map_err(|e| { + Error::History(format!( + "corrupt snapshot {}: zstd decode: {e}", + path.display() + )) + })?; + let bundle = if let Ok(b) = serde_json::from_slice::(&json) { + b + } else { + // handle original base snapshot + // TODO: remove in about month time + let schema: SchemaSnapshot = serde_json::from_slice(&json).map_err(|e| { + Error::History(format!("corrupt snapshot {}: JSON: {e}", path.display())) + })?; + Bundle { + schema, + planner: None, + activity: BTreeMap::new(), + } + }; + + verify_bundle_hash(path, &bundle)?; + Ok(bundle) +} + +// filename hash must match recomputed schema content_hash +fn verify_bundle_hash(path: &Path, bundle: &Bundle) -> Result<()> { + let fname = path + .file_name() + .and_then(|s| s.to_str()) + .ok_or_else(|| Error::History(format!("non-utf8 filename: {}", path.display())))?; + let (_, expected) = parse_snapshot_filename(fname).ok_or_else(|| { + Error::History(format!( + "corrupt snapshot {}: filename does not match {{ts}}-{{hash}}.json.zst", + path.display() + )) + })?; + + if bundle.schema.content_hash != expected { + return Err(Error::History(format!( + "corrupt snapshot {}: filename hash {} != stored schema.content_hash {}", + path.display(), + expected, + bundle.schema.content_hash, + ))); + } + + let recomputed = compute_content_hash(&HashInput { + pg_version: &bundle.schema.pg_version, + tables: &bundle.schema.tables, + enums: &bundle.schema.enums, + domains: &bundle.schema.domains, + composites: &bundle.schema.composites, + views: &bundle.schema.views, + functions: &bundle.schema.functions, + extensions: &bundle.schema.extensions, + }); + if recomputed != expected { + return Err(Error::History(format!( + "corrupt snapshot {}: filename hash {} != recomputed schema hash {}", + path.display(), + expected, + recomputed, + ))); + } + Ok(()) } fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> { @@ -437,18 +490,30 @@ fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> { std::fs::create_dir_all(parent) .map_err(|e| Error::History(format!("create_dir_all {}: {e}", parent.display())))?; } - let tmp = path.with_extension("zst.tmp"); + // unique tmp path so concurrent same-hash writers don't collide + let tmp = unique_tmp_path(path); let json = serde_json::to_vec(bundle) .map_err(|e| Error::History(format!("cannot serialize bundle: {e}")))?; let compressed = zstd::encode_all(json.as_slice(), 3) .map_err(|e| Error::History(format!("zstd encode: {e}")))?; std::fs::write(&tmp, compressed) .map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?; - std::fs::rename(&tmp, path) - .map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?; + if let Err(e) = std::fs::rename(&tmp, path) { + let _ = std::fs::remove_file(&tmp); + return Err(Error::History(format!("rename to {}: {e}", path.display()))); + } Ok(()) } +fn unique_tmp_path(path: &Path) -> PathBuf { + use std::sync::atomic::{AtomicU64, Ordering}; + static COUNTER: AtomicU64 = AtomicU64::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + let suffix = format!("zst.{pid}.{n}.tmp"); + path.with_extension(suffix) +} + fn not_found_err(kind: &str, at: &SnapshotRef) -> Error { let detail = match at { SnapshotRef::Latest => "latest".to_string(),