From 42e55dc416266c199a0390d21e779609bd68f5c0 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 May 2026 16:29:22 +0530 Subject: [PATCH 01/10] snapshot abstraction --- crates/commitlog/src/lib.rs | 60 ++++++--- crates/commitlog/src/repo/mod.rs | 11 ++ crates/core/src/db/persistence.rs | 8 +- crates/core/src/db/relational_db.rs | 66 ++++++--- crates/core/src/db/snapshot.rs | 18 +-- crates/core/src/db/update.rs | 2 +- .../src/locking_tx_datastore/datastore.rs | 40 ++++-- crates/durability/src/imp/local.rs | 83 ++++++++++-- crates/snapshot/src/lib.rs | 126 +++++++++++++++++- 9 files changed, 340 insertions(+), 74 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 3922f002a84..d80c1fb00b7 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -151,15 +151,22 @@ impl Options { } } -/// The canonical commitlog, backed by on-disk log files. +/// The canonical commitlog API over a repository backend `R`. +/// +/// The default backend is the on-disk filesystem repository +/// [`repo::Fs`], but tests may supply another [`Repo`] +/// implementation. /// /// Records in the log are of type `T`, which canonically is instantiated to /// [`payload::Txdata`]. -pub struct Commitlog { - inner: RwLock>, +pub struct Commitlog +where + R: Repo, +{ + inner: RwLock>, } -impl Commitlog { +impl Commitlog { /// Open the log at root directory `root` with [`Options`]. /// /// The root directory must already exist. @@ -178,7 +185,26 @@ impl Commitlog { root.display() ); } - let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?; + Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts) + } + + /// Determine the size on disk of this commitlog. + pub fn size_on_disk(&self) -> io::Result { + let inner = self.inner.read().unwrap(); + inner.repo.size_on_disk() + } +} + +impl Commitlog +where + R: Repo, +{ + /// Open the log in `repo` with [`Options`]. + /// + /// This is useful for tests which provide a repository + /// implementation other than [`repo::Fs`]. + pub fn open_with_repo(repo: R, opts: Options) -> io::Result { + let inner = commitlog::Generic::open(repo, opts)?; Ok(Self { inner: RwLock::new(inner), @@ -307,7 +333,7 @@ impl Commitlog { /// This means that, when this iterator yields an `Err` value, the consumer /// may want to check if the iterator is exhausted (by calling `next()`) /// before treating the `Err` value as an application error. - pub fn commits(&self) -> impl Iterator> + use { + pub fn commits(&self) -> impl Iterator> + use { self.commits_from(0) } @@ -320,7 +346,10 @@ impl Commitlog { /// Note that the first [`StoredCommit`] yielded is the first commit /// containing the given transaction offset, i.e. its `min_tx_offset` may be /// smaller than `offset`. - pub fn commits_from(&self, offset: u64) -> impl Iterator> + use { + pub fn commits_from( + &self, + offset: u64, + ) -> impl Iterator> + use { self.inner.read().unwrap().commits_from(offset) } @@ -374,15 +403,12 @@ impl Commitlog { inner: RwLock::new(inner), }) } - - /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { - let inner = self.inner.read().unwrap(); - inner.repo.size_on_disk() - } } -impl Commitlog { +impl Commitlog +where + R: Repo, +{ /// Write `transactions` to the log. /// /// This will store all `transactions` as a single [Commit] @@ -452,10 +478,11 @@ impl Commitlog { pub fn transactions<'a, D>( &self, de: &'a D, - ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T> + ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T, R> where D: Decoder, D::Error: From, + R: 'a, T: 'a, { self.transactions_from(0, de) @@ -471,10 +498,11 @@ impl Commitlog { &self, offset: u64, de: &'a D, - ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T> + ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T, R> where D: Decoder, D::Error: From, + R: 'a, T: 'a, { self.inner.read().unwrap().transactions_from(offset, de) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 2b54216bad3..51df7accb81 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -144,6 +144,17 @@ pub trait Repo: Clone + fmt::Display { } } +/// Marker for repos that do not require an external lock file. +/// +/// Durability implementations can use this to expose repo-backed opening +/// only for storage backends where skipping the filesystem `db.lock` cannot +/// violate single-writer safety. +pub trait RepoWithoutLockFile: Repo {} + +impl RepoWithoutLockFile for &T {} + +impl RepoWithoutLockFile for Memory {} + impl Repo for &T { type SegmentWriter = T::SegmentWriter; type SegmentReader = T::SegmentReader; diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index e837506da38..5b0daa5145c 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_durability::{DurabilityExited, TxOffset}; use spacetimedb_paths::server::ServerDataDir; -use spacetimedb_snapshot::SnapshotRepository; +use spacetimedb_snapshot::DynSnapshotRepo; use crate::{messages::control_db::Database, util::asyncify}; @@ -61,9 +61,9 @@ impl Persistence { } } - /// If snapshots are enabled, get the [SnapshotRepository] they are stored in. - pub fn snapshot_repo(&self) -> Option<&SnapshotRepository> { - self.snapshots.as_ref().map(|worker| worker.repo()) + /// If snapshots are enabled, get the [SnapshotRepo] they are stored in. + pub fn snapshot_repo(&self) -> Option> { + self.snapshots.as_ref().map(|worker| worker.snapshot_repo()) } /// Get the [TxOffset] reported as durable by the [Durability] impl. diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index d8cd4884bcc..6df38806dca 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -51,7 +51,7 @@ use spacetimedb_schema::schema::{ ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, }; use spacetimedb_schema::table_name::TableName; -use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotRepository}; +use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository}; use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::page_pool::PagePool; use spacetimedb_table::table::{RowRef, TableScanIter}; @@ -235,7 +235,7 @@ impl RelationalDB { /// /// - `snapshot_repo` /// - /// The [`SnapshotRepository`] which stores snapshots of this database. + /// The [`SnapshotRepo`] which stores snapshots of this database. /// This is only meaningful if `history` and `durability` are also supplied. /// If restoring from an existing database, the `snapshot_repo` must /// store views of the same sequence of TXes as the `history`. @@ -278,9 +278,10 @@ impl RelationalDB { let start_time = std::time::Instant::now(); + let snapshot_repo = persistence.as_ref().and_then(|p| p.snapshot_repo()); let inner = Self::restore_from_snapshot_or_bootstrap( database_identity, - persistence.as_ref().and_then(|p| p.snapshot_repo()), + snapshot_repo.as_deref(), durable_tx_offset, min_commitlog_offset, page_pool, @@ -292,7 +293,7 @@ impl RelationalDB { .snapshot_repo() .map(|repo| repo.database_identity() == database_identity) .unwrap_or(true), - "snapshot repository does not match database identity", + "snapshot repo does not match database identity", ); persistence.set_snapshot_state(inner.committed_state.clone()); } @@ -471,7 +472,7 @@ impl RelationalDB { fn restore_from_snapshot_or_bootstrap( database_identity: Identity, - snapshot_repo: Option<&SnapshotRepository>, + snapshot_repo: Option<&DynSnapshotRepo>, durable_tx_offset: Option, min_commitlog_offset: TxOffset, page_pool: PagePool, @@ -479,7 +480,7 @@ impl RelationalDB { // Try to load the `ReconstructedSnapshot` at `snapshot_offset`. fn try_load_snapshot( database_identity: &Identity, - snapshot_repo: &SnapshotRepository, + snapshot_repo: &DynSnapshotRepo, snapshot_offset: TxOffset, page_pool: &PagePool, ) -> Result> { @@ -592,11 +593,12 @@ impl RelationalDB { // Invalidate the snapshot if the error is permanent. // Newly created snapshots should not depend on it. if !is_transient_error(&e) { - let path = snapshot_repo.snapshot_dir_path(snapshot_offset); - log::info!("invalidating bad snapshot at {}", path.display()); - path.rename_invalid().map_err(|e| RestoreSnapshotError::Invalidate { - offset: snapshot_offset, - source: Box::new(e.into()), + log::info!("invalidating bad snapshot at {snapshot_offset}"); + snapshot_repo.invalidate_snapshot(snapshot_offset).map_err(|e| { + RestoreSnapshotError::Invalidate { + offset: snapshot_offset, + source: Box::new(e), + } })?; } // Try the next older one if the error was transient. @@ -612,7 +614,7 @@ impl RelationalDB { } } } - log::info!("[{database_identity}] DATABASE: no usable snapshot on disk"); + log::info!("[{database_identity}] DATABASE: no usable snapshot in snapshot repo"); // If we didn't find a snapshot and the commitlog doesn't start at the // zero-th commit (e.g. due to archiving), there is no way to restore @@ -769,6 +771,19 @@ impl RelationalDB { r } + #[cfg(any(feature = "test", test))] + #[tracing::instrument(level = "trace", skip_all)] + pub fn try_begin_mut_tx(&self, isolation_level: IsolationLevel, workload: Workload) -> Option { + log::trace!("TRY BEGIN MUT TX"); + let r = self.inner.try_begin_mut_tx(isolation_level, workload); + if r.is_some() { + log::trace!("ACQUIRED MUT TX"); + } else { + log::trace!("MUT TX CONTENDED"); + } + r + } + #[tracing::instrument(level = "trace", skip_all)] pub fn begin_tx(&self, workload: Workload) -> Tx { log::trace!("BEGIN TX"); @@ -1007,7 +1022,7 @@ impl RelationalDB { Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?) } - pub(crate) fn add_columns_to_table( + pub(crate) fn add_columns_to_table_mut_tx( &self, tx: &mut MutTx, table_id: TableId, @@ -1019,6 +1034,17 @@ impl RelationalDB { .add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?) } + #[cfg(any(feature = "test", test))] + pub fn add_columns_to_table( + &self, + tx: &mut MutTx, + table_id: TableId, + column_schemas: Vec, + default_values: Vec, + ) -> Result { + self.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values) + } + /// Reports the `TxMetrics`s passed. /// /// Should only be called after the tx lock has been fully released. @@ -1777,7 +1803,6 @@ pub mod tests_utils { use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; use spacetimedb_paths::server::ReplicaDir; - use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -2091,7 +2116,7 @@ pub mod tests_utils { Arc::new(|_, _| i64::MAX) } - pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result, DBError> { + pub fn take_snapshot(&self, repo: &DynSnapshotRepo) -> Result, DBError> { Ok(self.inner.take_snapshot(repo)?) } } @@ -3661,7 +3686,7 @@ mod tests { let repo = open_snapshot_repo(dir, Identity::ZERO, 0)?; RelationalDB::restore_from_snapshot_or_bootstrap( Identity::ZERO, - Some(&repo), + Some(repo.as_ref()), Some(last_compress), 0, PagePool::new_for_test(), @@ -3689,8 +3714,13 @@ mod tests { ); let last = repo.latest_snapshot()?; - let stdb = - RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, 0, PagePool::new_for_test())?; + let stdb = RelationalDB::restore_from_snapshot_or_bootstrap( + identity, + Some(repo.as_ref()), + last, + 0, + PagePool::new_for_test(), + )?; let out = TempDir::with_prefix("snapshot_test")?; let dir = SnapshotsPath::from_path_unchecked(out.path()); diff --git a/crates/core/src/db/snapshot.rs b/crates/core/src/db/snapshot.rs index c47e1d33d2d..26e3d8373cf 100644 --- a/crates/core/src/db/snapshot.rs +++ b/crates/core/src/db/snapshot.rs @@ -14,7 +14,7 @@ use prometheus::{Histogram, IntGauge}; use spacetimedb_datastore::locking_tx_datastore::{committed_state::CommittedState, datastore::Locking}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::Identity; -use spacetimedb_snapshot::{CompressionStats, SnapshotRepository}; +use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo}; use tokio::sync::watch; use crate::{util::asyncify, worker_metrics::WORKER_METRICS}; @@ -60,7 +60,7 @@ impl Compression { pub struct SnapshotWorker { snapshot_created: watch::Sender, request_snapshot: mpsc::UnboundedSender, - snapshot_repository: Arc, + snapshot_repository: Arc, } impl SnapshotWorker { @@ -69,7 +69,7 @@ impl SnapshotWorker { /// The handle is only partially initialized, as it is lacking the /// [SnapshotDatabaseState]. This allows control code to [Self::subscribe] /// to future snapshots before handing off the worker to the database. - pub fn new(snapshot_repository: Arc, compression: Compression) -> Self { + pub fn new(snapshot_repository: Arc, compression: Compression) -> Self { let database = snapshot_repository.database_identity(); let latest_snapshot = snapshot_repository.latest_snapshot().ok().flatten().unwrap_or(0); let (snapshot_created, _) = watch::channel(latest_snapshot); @@ -105,9 +105,9 @@ impl SnapshotWorker { .expect("snapshot worker panicked"); } - /// Get the [SnapshotRepository] this worker is operating on. - pub fn repo(&self) -> &SnapshotRepository { - &self.snapshot_repository + /// Get the snapshot repo this worker is operating on. + pub fn snapshot_repo(&self) -> Arc { + self.snapshot_repository.clone() } /// Request a snapshot to be taken. @@ -166,7 +166,7 @@ enum Request { struct SnapshotWorkerActor { snapshot_requests: mpsc::UnboundedReceiver, - snapshot_repo: Arc, + snapshot_repo: Arc, snapshot_created: watch::Sender, metrics: SnapshotMetrics, compression: Option, @@ -225,7 +225,7 @@ impl SnapshotWorkerActor { let maybe_snapshot = asyncify(move || { let _timer = inner_timer.start_timer(); - Locking::take_snapshot_internal(&state, &snapshot_repo) + Locking::take_snapshot_internal(&state, snapshot_repo.as_ref()) }) .await .with_context(|| format!("error capturing snapshot of database {}", database_identity))?; @@ -307,7 +307,7 @@ impl CompressionMetrics { } struct Compressor { - snapshot_repo: Arc, + snapshot_repo: Arc, metrics: CompressionMetrics, stats: Option, } diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 6c7c3bd9fc8..f9ca4c110d9 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -317,7 +317,7 @@ fn auto_migrate_database( .iter() .filter_map(|col_def| col_def.default_value.clone()) .collect(); - stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; + stdb.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?; } spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { log!(logger, "Disconnecting all users"); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index edcce91ce5e..e9d67103b16 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -30,7 +30,6 @@ use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; use spacetimedb_lib::{ConnectionId, Identity}; -use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicValue, ProductValue}; @@ -39,7 +38,7 @@ use spacetimedb_schema::{ reducer_name::ReducerName, schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}, }; -use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot}; +use spacetimedb_snapshot::{BoxedPendingSnapshot, DynSnapshotRepo, ReconstructedSnapshot}; use spacetimedb_table::{ indexes::RowPointer, page_pool::PagePool, @@ -223,11 +222,11 @@ impl Locking { /// (i.e. no transactions have been committed yet) /// and therefore no snapshot was created /// - /// - or `Some` path to the newly created snapshot directory + /// - or `Some` transaction offset for the newly created snapshot /// - /// Returns an error if [`SnapshotRepository::create_snapshot`] returns an + /// Returns an error if [`DynSnapshotRepo::create_snapshot`] returns an /// error. - pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result> { + pub fn take_snapshot(&self, repo: &DynSnapshotRepo) -> Result> { Self::take_snapshot_internal(&self.committed_state, repo)? .map(|(_offset, snap)| snap.sync_all()) .transpose() @@ -241,8 +240,8 @@ impl Locking { pub fn take_snapshot_internal( committed_state: &RwLock, - repo: &SnapshotRepository, - ) -> Result> { + repo: &DynSnapshotRepo, + ) -> Result> { let mut committed_state = committed_state.write(); let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else { return Ok(None); @@ -254,8 +253,8 @@ impl Locking { tx_offset, ); - let (tables, blob_store) = committed_state.persistent_tables_and_blob_store(); - let unflushed_snapshot = repo.create_snapshot(tables, blob_store, tx_offset)?; + let (mut tables, blob_store) = committed_state.persistent_tables_and_blob_store(); + let unflushed_snapshot = repo.create_snapshot(&mut tables, blob_store, tx_offset)?; Ok(Some((tx_offset, unflushed_snapshot))) } @@ -924,6 +923,29 @@ impl MutTx for Locking { } impl Locking { + #[cfg(any(feature = "test", test))] + pub fn try_begin_mut_tx(&self, _isolation_level: IsolationLevel, workload: Workload) -> Option { + let metrics = ExecutionMetrics::default(); + let ctx = ExecutionContext::with_workload(self.database_identity, workload); + + let timer = Instant::now(); + let committed_state_write_lock = self.committed_state.try_write_arc()?; + let sequence_state_lock = self.sequence_state.try_lock_arc()?; + let lock_wait_time = timer.elapsed(); + + Some(MutTxId { + committed_state_write_lock, + sequence_state_lock, + tx_state: TxState::default(), + lock_wait_time, + read_sets: <_>::default(), + timer, + ctx, + metrics, + _not_send: std::marker::PhantomData, + }) + } + pub fn rollback_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxMetrics, TxId) { tx.rollback_downgrade(workload) } diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 3bf1921e8a8..5cc03099ab6 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -11,7 +11,12 @@ use futures::FutureExt as _; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; -use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; +use spacetimedb_commitlog::{ + error, + payload::Txdata, + repo::{Fs, Repo, RepoWithoutLockFile}, + Commit, Commitlog, Decoder, Encode, Transaction, +}; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; use thiserror::Error; @@ -83,9 +88,12 @@ pub enum OpenError { /// /// Note, however, that instantiating `T` to a different type may require to /// change the log format version! -pub struct Local { +pub struct Local +where + R: Repo, +{ /// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps. - clog: Arc>>, + clog: Arc, R>>, /// The durable transaction offset, as reported by the background /// [`FlushAndSyncTask`]. durable_offset: watch::Receiver>, @@ -106,7 +114,7 @@ pub struct Local { actor: Mutex>>, } -impl Local { +impl Local { /// Create a [`Local`] instance at the `replica_dir`. /// /// `replica_dir` must already exist. @@ -132,6 +140,21 @@ impl Local { opts.commitlog, on_new_segment, )?); + Self::open_inner(clog, rt, opts, Some(lock)) + } +} + +impl Local +where + T: Encode + Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ + fn open_inner( + clog: Arc, R>>, + rt: tokio::runtime::Handle, + opts: Options, + lock: Option, + ) -> Result { let queue_capacity = opts.queue_capacity(); let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); @@ -161,12 +184,29 @@ impl Local { } /// Obtain a read-only copy of the durable state that implements [History]. - pub fn as_history(&self) -> impl History> + use { + pub fn as_history(&self) -> impl History> + use { self.clog.clone() } } -impl Local { +impl Local +where + T: Encode + Send + Sync + 'static, + R: RepoWithoutLockFile + Send + Sync + 'static, +{ + /// Create a [`Local`] instance backed by the provided commitlog repo. + pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { + info!("open local durability"); + let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); + Self::open_inner(clog, rt, opts, None) + } +} + +impl Local +where + T: Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ /// Inspect how many transactions added via [`Self::append_tx`] are pending /// to be applied to the underlying [`Commitlog`]. pub fn queue_depth(&self) -> u64 { @@ -174,7 +214,7 @@ impl Local { } /// Obtain an iterator over the [`Commit`]s in the underlying log. - pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> + use { + pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> + use { self.clog.commits_from(offset).map_ok(Commit::from) } @@ -187,15 +227,20 @@ impl Local { pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> { self.clog.compress_segments(offsets) } +} +impl Local { /// Get the size on disk of the underlying [`Commitlog`]. pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() } } -struct Actor { - clog: Arc>>, +struct Actor +where + R: Repo, +{ + clog: Arc, R>>, durable_offset: watch::Sender>, queue_depth: Arc, @@ -203,10 +248,14 @@ struct Actor { batch_capacity: NonZeroUsize, #[allow(unused)] - lock: LockedFile, + lock: Option, } -impl Actor { +impl Actor +where + T: Encode + Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ #[instrument(name = "durability::local::actor", skip_all)] async fn run(self, transactions_rx: async_channel::Receiver>>) { info!("starting durability actor"); @@ -287,7 +336,11 @@ impl Actor { } } -impl Durability for Local { +impl Durability for Local +where + T: Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ type TxData = Txdata; fn append_tx(&self, tx: PreparedTx) { @@ -332,7 +385,11 @@ impl Durability for Local { } } -impl History for Commitlog> { +impl History for Commitlog, R> +where + T: Encode + 'static, + R: Repo + Send + Sync + 'static, +{ type TxData = Txdata; fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error> diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 66c25ed824a..66bc3815af8 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -46,7 +46,7 @@ use spacetimedb_table::{ }; use std::fs::{self, File}; use std::io; -use std::ops::RangeBounds; +use std::ops::{Range, RangeBounds}; use std::path::Path; use std::time::{Duration, Instant}; use std::{ @@ -206,6 +206,11 @@ pub struct UnflushedSnapshot { } impl UnflushedSnapshot { + /// Return the transaction offset this pending snapshot will finalize at. + pub fn tx_offset(&self) -> TxOffset { + self.inner.as_ref().unwrap().snapshot.tx_offset + } + /// Sync all objects in the snapshot and write out the snapshot file. /// /// Returns the [SnapshotDirPath] on success. @@ -261,6 +266,28 @@ impl UnflushedSnapshotInner { } } +pub trait PendingSnapshot: Send { + /// Sync all snapshot state and return the finalized transaction offset. + fn sync_all(self: Box) -> Result; +} + +pub type BoxedPendingSnapshot = Box; +pub type DynSnapshotRepo = dyn SnapshotRepo; + +impl PendingSnapshot for BoxedPendingSnapshot { + fn sync_all(self: Box) -> Result { + (*self).sync_all() + } +} + +impl PendingSnapshot for UnflushedSnapshot { + fn sync_all(self: Box) -> Result { + let tx_offset = self.tx_offset(); + UnflushedSnapshot::sync_all(*self)?; + Ok(tx_offset) + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct Snapshot { /// A magic number: must be equal to [`MAGIC`]. @@ -1139,13 +1166,19 @@ impl SnapshotRepository { .collect::>(); for newer_snapshot in newer_snapshots { - let path = self.snapshot_dir_path(newer_snapshot); - log::info!("Renaming snapshot newer than {upper_bound} from {path:?} to {path:?}"); - path.rename_invalid()?; + self.invalidate_snapshot(newer_snapshot)?; } Ok(()) } + /// Mark a single snapshot invalid so it will not be considered for future + /// restores. + pub fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError> { + let path = self.snapshot_dir_path(tx_offset); + log::info!("Renaming snapshot {tx_offset} from {path:?} to invalid"); + path.rename_invalid().map_err(Into::into) + } + /// Compress the `current` snapshot, unless it is already compressed. /// /// If a `parent` snapshot is given, its object repo will be used to @@ -1329,6 +1362,91 @@ impl SnapshotRepository { } } +/// Snapshot storage backend. +pub trait SnapshotRepo: Send + Sync { + type Pending: PendingSnapshot; + + /// Return the database identity associated with this snapshot backend. + fn database_identity(&self) -> Identity; + + /// Start creating a snapshot at `tx_offset` from the provided tables and blob store. + fn create_snapshot<'db>( + &self, + tables: &mut dyn Iterator, + blobs: &'db dyn BlobStore, + tx_offset: TxOffset, + ) -> Result; + + /// Reconstruct the snapshot at `tx_offset` using the supplied page pool. + fn read_snapshot(&self, tx_offset: TxOffset, page_pool: &PagePool) -> Result; + + /// Return the latest snapshot at or before `upper_bound`, if one exists. + fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result, SnapshotError>; + + /// Return the latest snapshot in this backend, if one exists. + fn latest_snapshot(&self) -> Result, SnapshotError> { + self.latest_snapshot_older_than(TxOffset::MAX) + } + + /// Attempt to compress all snapshots that fall into `range`, and record + /// the outcome in `stats`. + /// + /// The snapshots in `range` are traversed in ascending order. + /// If an error occurs, processing stops and the error is returned. + /// + /// See [CompressionStats] for how to interpret the results. + fn compress_snapshots(&self, stats: &mut CompressionStats, range: Range) -> Result<(), SnapshotError>; + + /// Invalidate every snapshot newer than `upper_bound`. + fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError>; + + /// Invalidate the snapshot at `tx_offset`. + fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError>; +} + +impl SnapshotRepo for SnapshotRepository { + type Pending = BoxedPendingSnapshot; + + fn database_identity(&self) -> Identity { + SnapshotRepository::database_identity(self) + } + + fn create_snapshot<'db>( + &self, + tables: &mut dyn Iterator, + blobs: &'db dyn BlobStore, + tx_offset: TxOffset, + ) -> Result { + Ok(Box::new(SnapshotRepository::create_snapshot( + self, tables, blobs, tx_offset, + )?)) + } + + fn read_snapshot(&self, tx_offset: TxOffset, page_pool: &PagePool) -> Result { + SnapshotRepository::read_snapshot(self, tx_offset, page_pool) + } + + fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result, SnapshotError> { + SnapshotRepository::latest_snapshot_older_than(self, upper_bound) + } + + fn latest_snapshot(&self) -> Result, SnapshotError> { + SnapshotRepository::latest_snapshot(self) + } + + fn compress_snapshots(&self, stats: &mut CompressionStats, range: Range) -> Result<(), SnapshotError> { + SnapshotRepository::compress_snapshots(self, stats, range) + } + + fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError> { + SnapshotRepository::invalidate_newer_snapshots(self, upper_bound) + } + + fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError> { + SnapshotRepository::invalidate_snapshot(self, tx_offset) + } +} + pub struct ReconstructedSnapshot { /// The identity of the snapshotted database. pub database_identity: Identity, From f508a0462f806452daa7cf913ca34aefc684a332 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 May 2026 17:05:14 +0530 Subject: [PATCH 02/10] lint --- crates/commitlog/src/repo/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 51df7accb81..358936c3c2a 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -153,6 +153,7 @@ pub trait RepoWithoutLockFile: Repo {} impl RepoWithoutLockFile for &T {} +#[cfg(any(test, feature = "test"))] impl RepoWithoutLockFile for Memory {} impl Repo for &T { From c83ed2e99035e8d5eed1459d422cda047b18de59 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 May 2026 20:41:16 +0530 Subject: [PATCH 03/10] LockedFsRepo --- crates/commitlog/src/lib.rs | 30 +++++++-- crates/commitlog/src/repo/mod.rs | 10 +++ crates/durability/src/imp/local.rs | 105 +++++++++++++++++++++++------ 3 files changed, 117 insertions(+), 28 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index d80c1fb00b7..4b5727bc64c 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -6,7 +6,7 @@ use std::{ }; use log::trace; -use repo::{fs::OnNewSegmentFn, Repo}; +use repo::{fs::OnNewSegmentFn, Repo, RepoWithSizeOnDisk}; use spacetimedb_paths::server::CommitLogDir; pub mod commit; @@ -188,11 +188,6 @@ impl Commitlog { Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts) } - /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { - let inner = self.inner.read().unwrap(); - inner.repo.size_on_disk() - } } impl Commitlog @@ -210,6 +205,29 @@ where inner: RwLock::new(inner), }) } +} + +impl Commitlog +where + R: RepoWithSizeOnDisk, +{ + /// Determine the size on disk of this commitlog. + pub fn size_on_disk(&self) -> io::Result { + let inner = self.inner.read().unwrap(); + inner.repo.size_on_disk() + } +} + +impl RepoWithSizeOnDisk for repo::Fs { + fn size_on_disk(&self) -> io::Result { + Self::size_on_disk(self) + } +} + +impl Commitlog +where + R: Repo, +{ /// Determine the maximum transaction offset considered durable. /// diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 358936c3c2a..0efa173f8f6 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -144,6 +144,11 @@ pub trait Repo: Clone + fmt::Display { } } +/// Capability trait for repos that can report storage usage. +pub trait RepoWithSizeOnDisk: Repo { + fn size_on_disk(&self) -> io::Result; +} + /// Marker for repos that do not require an external lock file. /// /// Durability implementations can use this to expose repo-backed opening @@ -152,6 +157,11 @@ pub trait Repo: Clone + fmt::Display { pub trait RepoWithoutLockFile: Repo {} impl RepoWithoutLockFile for &T {} +impl RepoWithSizeOnDisk for &T { + fn size_on_disk(&self) -> io::Result { + T::size_on_disk(self) + } +} #[cfg(any(test, feature = "test"))] impl RepoWithoutLockFile for Memory {} diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 5cc03099ab6..ab5f44217b8 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -14,7 +14,7 @@ use scopeguard::ScopeGuard; use spacetimedb_commitlog::{ error, payload::Txdata, - repo::{Fs, Repo, RepoWithoutLockFile}, + repo::{Fs, Repo, RepoWithSizeOnDisk, RepoWithoutLockFile}, Commit, Commitlog, Decoder, Encode, Transaction, }; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; @@ -88,7 +88,7 @@ pub enum OpenError { /// /// Note, however, that instantiating `T` to a different type may require to /// change the log format version! -pub struct Local +pub struct Local where R: Repo, { @@ -114,7 +114,82 @@ where actor: Mutex>>, } -impl Local { +/// Commitlog repo backed by [`Fs`] and protected by a [`LockedFile`]. +#[derive(Clone, Debug)] +pub struct LockedFsRepo { + repo: Fs, + #[allow(unused)] + lock: Arc, +} + +impl LockedFsRepo { + pub fn open(replica_dir: ReplicaDir, on_new_segment: Option>) -> Result { + // We use the `db.lock` file for historical reasons and to keep + // compatibility with existing standalone layouts. + let lock = LockedFile::lock(replica_dir.0.join("db.lock")).map(Arc::new)?; + let repo = Fs::new(replica_dir.commit_log(), on_new_segment)?; + Ok(Self { repo, lock }) + } +} + +impl std::fmt::Display for LockedFsRepo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.repo.fmt(f) + } +} + +impl Repo for LockedFsRepo { + type SegmentWriter = ::SegmentWriter; + type SegmentReader = ::SegmentReader; + + fn create_segment(&self, offset: u64, header: spacetimedb_commitlog::segment::Header) -> io::Result { + self.repo.create_segment(offset, header) + } + + fn open_segment_reader(&self, offset: u64) -> io::Result { + self.repo.open_segment_reader(offset) + } + + fn open_segment_writer(&self, offset: u64) -> io::Result { + self.repo.open_segment_writer(offset) + } + + fn segment_file_path(&self, offset: u64) -> Option { + self.repo.segment_file_path(offset) + } + + fn remove_segment(&self, offset: u64) -> io::Result<()> { + self.repo.remove_segment(offset) + } + + fn compress_segment(&self, offset: u64) -> io::Result<()> { + self.repo.compress_segment(offset) + } + + fn existing_offsets(&self) -> io::Result> { + self.repo.existing_offsets() + } + + fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result { + self.repo.create_offset_index(offset, cap) + } + + fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> { + self.repo.remove_offset_index(offset) + } + + fn get_offset_index(&self, offset: TxOffset) -> io::Result { + self.repo.get_offset_index(offset) + } +} + +impl RepoWithSizeOnDisk for LockedFsRepo { + fn size_on_disk(&self) -> io::Result { + self.repo.size_on_disk() + } +} + +impl Local { /// Create a [`Local`] instance at the `replica_dir`. /// /// `replica_dir` must already exist. @@ -130,17 +205,9 @@ impl Local { on_new_segment: Option>, ) -> Result { info!("open local durability"); - - // We could just place a lock on the commitlog directory, - // yet for backwards-compatibility, we keep using the `db.lock` file. - let lock = LockedFile::lock(replica_dir.0.join("db.lock"))?; - - let clog = Arc::new(Commitlog::open( - replica_dir.commit_log(), - opts.commitlog, - on_new_segment, - )?); - Self::open_inner(clog, rt, opts, Some(lock)) + let repo = LockedFsRepo::open(replica_dir, on_new_segment)?; + let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); + Self::open_inner(clog, rt, opts) } } @@ -153,7 +220,6 @@ where clog: Arc, R>>, rt: tokio::runtime::Handle, opts: Options, - lock: Option, ) -> Result { let queue_capacity = opts.queue_capacity(); let (queue, txdata_rx) = async_channel::bounded(queue_capacity); @@ -168,8 +234,6 @@ where queue_depth: queue_depth.clone(), batch_capacity: opts.batch_capacity, - - lock, } .run(txdata_rx), ); @@ -198,7 +262,7 @@ where pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { info!("open local durability"); let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); - Self::open_inner(clog, rt, opts, None) + Self::open_inner(clog, rt, opts) } } @@ -229,7 +293,7 @@ where } } -impl Local { +impl Local { /// Get the size on disk of the underlying [`Commitlog`]. pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() @@ -246,9 +310,6 @@ where queue_depth: Arc, batch_capacity: NonZeroUsize, - - #[allow(unused)] - lock: Option, } impl Actor From 813e418bc3929a48522cc4db18a0c65cdad0e86d Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 May 2026 21:13:36 +0530 Subject: [PATCH 04/10] comments --- crates/commitlog/src/lib.rs | 5 ++--- crates/commitlog/src/repo/mod.rs | 8 ++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 4b5727bc64c..9e640733613 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -187,7 +187,6 @@ impl Commitlog { } Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts) } - } impl Commitlog @@ -228,7 +227,6 @@ impl Commitlog where R: Repo, { - /// Determine the maximum transaction offset considered durable. /// /// The offset is `None` if the log hasn't been flushed to disk yet. @@ -423,8 +421,9 @@ where } } -impl Commitlog +impl Commitlog where + T: Encode, R: Repo, { /// Write `transactions` to the log. diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 0efa173f8f6..4bbf72a97f8 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -156,8 +156,12 @@ pub trait RepoWithSizeOnDisk: Repo { /// violate single-writer safety. pub trait RepoWithoutLockFile: Repo {} -impl RepoWithoutLockFile for &T {} -impl RepoWithSizeOnDisk for &T { +impl RepoWithoutLockFile for &T where T: RepoWithoutLockFile {} + +impl RepoWithSizeOnDisk for &T +where + T: RepoWithSizeOnDisk, +{ fn size_on_disk(&self) -> io::Result { T::size_on_disk(self) } From 5946261a2617e7a95494d291f2d04333c2bb995e Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 May 2026 21:25:17 +0530 Subject: [PATCH 05/10] cleanup --- crates/commitlog/src/repo/mod.rs | 13 ------------ crates/durability/src/imp/local.rs | 34 ++++++++++++++++-------------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 4bbf72a97f8..5e1b313e766 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -149,15 +149,6 @@ pub trait RepoWithSizeOnDisk: Repo { fn size_on_disk(&self) -> io::Result; } -/// Marker for repos that do not require an external lock file. -/// -/// Durability implementations can use this to expose repo-backed opening -/// only for storage backends where skipping the filesystem `db.lock` cannot -/// violate single-writer safety. -pub trait RepoWithoutLockFile: Repo {} - -impl RepoWithoutLockFile for &T where T: RepoWithoutLockFile {} - impl RepoWithSizeOnDisk for &T where T: RepoWithSizeOnDisk, @@ -166,10 +157,6 @@ where T::size_on_disk(self) } } - -#[cfg(any(test, feature = "test"))] -impl RepoWithoutLockFile for Memory {} - impl Repo for &T { type SegmentWriter = T::SegmentWriter; type SegmentReader = T::SegmentReader; diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index ab5f44217b8..90a103ae91d 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -11,10 +11,12 @@ use futures::FutureExt as _; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; +#[cfg(any(test, feature = "test"))] +use spacetimedb_commitlog::repo::Memory; use spacetimedb_commitlog::{ error, payload::Txdata, - repo::{Fs, Repo, RepoWithSizeOnDisk, RepoWithoutLockFile}, + repo::{Fs, Repo, RepoWithSizeOnDisk}, Commit, Commitlog, Decoder, Encode, Transaction, }; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; @@ -142,7 +144,11 @@ impl Repo for LockedFsRepo { type SegmentWriter = ::SegmentWriter; type SegmentReader = ::SegmentReader; - fn create_segment(&self, offset: u64, header: spacetimedb_commitlog::segment::Header) -> io::Result { + fn create_segment( + &self, + offset: u64, + header: spacetimedb_commitlog::segment::Header, + ) -> io::Result { self.repo.create_segment(offset, header) } @@ -170,7 +176,11 @@ impl Repo for LockedFsRepo { self.repo.existing_offsets() } - fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result { + fn create_offset_index( + &self, + offset: TxOffset, + cap: u64, + ) -> io::Result { self.repo.create_offset_index(offset, cap) } @@ -216,6 +226,11 @@ where T: Encode + Send + Sync + 'static, R: Repo + Send + Sync + 'static, { + pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { + info!("open local durability"); + let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); + Self::open_inner(clog, rt, opts) + } fn open_inner( clog: Arc, R>>, rt: tokio::runtime::Handle, @@ -253,19 +268,6 @@ where } } -impl Local -where - T: Encode + Send + Sync + 'static, - R: RepoWithoutLockFile + Send + Sync + 'static, -{ - /// Create a [`Local`] instance backed by the provided commitlog repo. - pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { - info!("open local durability"); - let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); - Self::open_inner(clog, rt, opts) - } -} - impl Local where T: Send + Sync + 'static, From 2104ced1fa78f72dfe7e660ad2cd6ac485a70186 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 11 May 2026 15:38:28 +0530 Subject: [PATCH 06/10] lint Signed-off-by: Shubham Mishra --- crates/durability/src/imp/local.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 65f7499b79f..5b3124068f1 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -11,8 +11,6 @@ use futures::FutureExt as _; use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; -#[cfg(any(test, feature = "test"))] -use spacetimedb_commitlog::repo::Memory; use spacetimedb_commitlog::{ error, payload::Txdata, From e4de2bdea1556b76bf53953bbb1dad5bf591aff7 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 11 May 2026 18:21:51 +0530 Subject: [PATCH 07/10] drop durability in reopen test helper --- crates/core/src/db/relational_db.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index ca0d1d3ccdb..4d87f3df918 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2232,11 +2232,12 @@ pub mod tests_utils { drop(self.db); if let Some(DurableState { - durability: _, + durability, rt, replica_dir, }) = self.durable { + drop(durability); // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); let (db, handle) = Self::durable_internal(&replica_dir, rt.handle().clone(), self.want_snapshot_repo)?; From 795a7049d398562ae1d6cdc4014f61f7dc309bb8 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 11 May 2026 19:51:53 +0530 Subject: [PATCH 08/10] drop durability in test --- crates/core/src/db/relational_db.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4d87f3df918..e6ebf098d22 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2024,7 +2024,7 @@ pub mod tests_utils { use super::*; use core::ops::Deref; - use durability::EmptyHistory; + use durability::{Durability, EmptyHistory}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_fs_utils::compression::CompressType; @@ -2237,6 +2237,7 @@ pub mod tests_utils { replica_dir, }) = self.durable { + rt.block_on(durability.close()); drop(durability); // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); From 281f3b7c7f5b6a3550d177065934f9c3cd4d0d85 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 11 May 2026 23:32:54 +0530 Subject: [PATCH 09/10] Update crates/snapshot/src/lib.rs Co-authored-by: Phoebe Goldman Signed-off-by: Shubham Mishra --- crates/snapshot/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 32e7149a82b..8931a3b6f58 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -271,6 +271,8 @@ pub trait PendingSnapshot: Send { fn sync_all(self: Box) -> Result; } +// We pass snapshot repos as `dyn` rather than compile-time generics for convenience, +// as threading generics through a deep callstack would be a hassle. pub type BoxedPendingSnapshot = Box; pub type DynSnapshotRepo = dyn SnapshotRepo; From 554d93231109c0fdefed3a084b0d39aec9925c20 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 12 May 2026 13:55:47 +0530 Subject: [PATCH 10/10] Revert "LockedFsRepo" --- crates/commitlog/src/lib.rs | 30 ++----- crates/commitlog/src/repo/mod.rs | 23 +++--- crates/durability/src/imp/local.rs | 127 +++++++---------------------- 3 files changed, 49 insertions(+), 131 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index ff56499ef7f..abc8729c978 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -6,7 +6,7 @@ use std::{ }; use log::trace; -use repo::{fs::OnNewSegmentFn, Repo, RepoWithSizeOnDisk}; +use repo::{fs::OnNewSegmentFn, Repo}; use spacetimedb_paths::server::CommitLogDir; pub use spacetimedb_fs_utils::compression::CompressionStats; @@ -189,6 +189,12 @@ impl Commitlog { } Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts) } + + /// Determine the size on disk of this commitlog. + pub fn size_on_disk(&self) -> io::Result { + let inner = self.inner.read().unwrap(); + inner.repo.size_on_disk() + } } impl Commitlog @@ -206,29 +212,7 @@ where inner: RwLock::new(inner), }) } -} - -impl Commitlog -where - R: RepoWithSizeOnDisk, -{ - /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { - let inner = self.inner.read().unwrap(); - inner.repo.size_on_disk() - } -} - -impl RepoWithSizeOnDisk for repo::Fs { - fn size_on_disk(&self) -> io::Result { - Self::size_on_disk(self) - } -} -impl Commitlog -where - R: Repo, -{ /// Determine the maximum transaction offset considered durable. /// /// The offset is `None` if the log hasn't been flushed to disk yet. diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index df93b021416..3d79f7f1e28 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -152,19 +152,18 @@ pub trait Repo: Clone + fmt::Display { } } -/// Capability trait for repos that can report storage usage. -pub trait RepoWithSizeOnDisk: Repo { - fn size_on_disk(&self) -> io::Result; -} +/// Marker for repos that do not require an external lock file. +/// +/// Durability implementations can use this to expose repo-backed opening +/// only for storage backends where skipping the filesystem `db.lock` cannot +/// violate single-writer safety. +pub trait RepoWithoutLockFile: Repo {} + +impl RepoWithoutLockFile for &T {} + +#[cfg(any(test, feature = "test"))] +impl RepoWithoutLockFile for Memory {} -impl RepoWithSizeOnDisk for &T -where - T: RepoWithSizeOnDisk, -{ - fn size_on_disk(&self) -> io::Result { - T::size_on_disk(self) - } -} impl Repo for &T { type SegmentWriter = T::SegmentWriter; type SegmentReader = T::SegmentReader; diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 5b3124068f1..04d46d8f634 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -14,7 +14,7 @@ use scopeguard::ScopeGuard; use spacetimedb_commitlog::{ error, payload::Txdata, - repo::{Fs, Repo, RepoWithSizeOnDisk}, + repo::{Fs, Repo, RepoWithoutLockFile}, Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction, }; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; @@ -88,7 +88,7 @@ pub enum OpenError { /// /// Note, however, that instantiating `T` to a different type may require to /// change the log format version! -pub struct Local +pub struct Local where R: Repo, { @@ -114,94 +114,7 @@ where actor: Mutex>>, } -/// Commitlog repo backed by [`Fs`] and protected by a [`LockedFile`]. -#[derive(Clone, Debug)] -pub struct LockedFsRepo { - repo: Fs, - #[allow(unused)] - lock: Arc, -} - -impl LockedFsRepo { - pub fn open(replica_dir: ReplicaDir, on_new_segment: Option>) -> Result { - // We use the `db.lock` file for historical reasons and to keep - // compatibility with existing standalone layouts. - let lock = LockedFile::lock(replica_dir.0.join("db.lock")).map(Arc::new)?; - let repo = Fs::new(replica_dir.commit_log(), on_new_segment)?; - Ok(Self { repo, lock }) - } -} - -impl std::fmt::Display for LockedFsRepo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.repo.fmt(f) - } -} - -impl Repo for LockedFsRepo { - type SegmentWriter = ::SegmentWriter; - type SegmentReader = ::SegmentReader; - - fn create_segment( - &self, - offset: u64, - header: spacetimedb_commitlog::segment::Header, - ) -> io::Result { - self.repo.create_segment(offset, header) - } - - fn open_segment_reader(&self, offset: u64) -> io::Result { - self.repo.open_segment_reader(offset) - } - - fn open_segment_writer(&self, offset: u64) -> io::Result { - self.repo.open_segment_writer(offset) - } - - fn segment_file_path(&self, offset: u64) -> Option { - self.repo.segment_file_path(offset) - } - - fn remove_segment(&self, offset: u64) -> io::Result<()> { - self.repo.remove_segment(offset) - } - - fn existing_offsets(&self) -> io::Result> { - self.repo.existing_offsets() - } - - fn create_offset_index( - &self, - offset: TxOffset, - cap: u64, - ) -> io::Result { - self.repo.create_offset_index(offset, cap) - } - - fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> { - self.repo.remove_offset_index(offset) - } - - fn get_offset_index(&self, offset: TxOffset) -> io::Result { - self.repo.get_offset_index(offset) - } - - fn compress_segment_with( - &self, - offset: u64, - f: impl spacetimedb_commitlog::repo::CompressOnce, - ) -> io::Result { - self.repo.compress_segment_with(offset, f) - } -} - -impl RepoWithSizeOnDisk for LockedFsRepo { - fn size_on_disk(&self) -> io::Result { - self.repo.size_on_disk() - } -} - -impl Local { +impl Local { /// Create a [`Local`] instance at the `replica_dir`. /// /// `replica_dir` must already exist. @@ -217,26 +130,43 @@ impl Local { on_new_segment: Option>, ) -> Result { info!("open local durability"); - let repo = LockedFsRepo::open(replica_dir, on_new_segment)?; - let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); - Self::open_inner(clog, rt, opts) + + // We could just place a lock on the commitlog directory, + // yet for backwards-compatibility, we keep using the `db.lock` file. + let lock = LockedFile::lock(replica_dir.0.join("db.lock"))?; + + let clog = Arc::new(Commitlog::open( + replica_dir.commit_log(), + opts.commitlog, + on_new_segment, + )?); + Self::open_inner(clog, rt, opts, Some(lock)) } } impl Local where T: Encode + Send + Sync + 'static, - R: Repo + Send + Sync + 'static, + R: RepoWithoutLockFile + Send + Sync + 'static, { + /// Create a [`Local`] instance backed by the provided commitlog repo. pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { info!("open local durability"); let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); - Self::open_inner(clog, rt, opts) + Self::open_inner(clog, rt, opts, None) } +} + +impl Local +where + T: Encode + Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ fn open_inner( clog: Arc, R>>, rt: tokio::runtime::Handle, opts: Options, + lock: Option, ) -> Result { let queue_capacity = opts.queue_capacity(); let (queue, txdata_rx) = async_channel::bounded(queue_capacity); @@ -251,6 +181,8 @@ where queue_depth: queue_depth.clone(), batch_capacity: opts.batch_capacity, + + lock, } .run(txdata_rx), ); @@ -297,7 +229,7 @@ where } } -impl Local { +impl Local { /// Get the size on disk of the underlying [`Commitlog`]. pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() @@ -314,6 +246,9 @@ where queue_depth: Arc, batch_capacity: NonZeroUsize, + + #[allow(unused)] + lock: Option, } impl Actor