diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 82be228e4a2..abc8729c978 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -153,15 +153,22 @@ impl Options { } } -/// The canonical commitlog, backed by on-disk log files. +/// The canonical commitlog API over a repository backend `R`. +/// +/// The default backend is the on-disk filesystem repository +/// [`repo::Fs`], but tests may supply another [`Repo`] +/// implementation. /// /// Records in the log are of type `T`, which canonically is instantiated to /// [`payload::Txdata`]. -pub struct Commitlog { - inner: RwLock>, +pub struct Commitlog +where + R: Repo, +{ + inner: RwLock>, } -impl Commitlog { +impl Commitlog { /// Open the log at root directory `root` with [`Options`]. /// /// The root directory must already exist. @@ -180,7 +187,26 @@ impl Commitlog { root.display() ); } - let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?; + Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts) + } + + /// Determine the size on disk of this commitlog. + pub fn size_on_disk(&self) -> io::Result { + let inner = self.inner.read().unwrap(); + inner.repo.size_on_disk() + } +} + +impl Commitlog +where + R: Repo, +{ + /// Open the log in `repo` with [`Options`]. + /// + /// This is useful for tests which provide a repository + /// implementation other than [`repo::Fs`]. + pub fn open_with_repo(repo: R, opts: Options) -> io::Result { + let inner = commitlog::Generic::open(repo, opts)?; Ok(Self { inner: RwLock::new(inner), @@ -309,7 +335,7 @@ impl Commitlog { /// This means that, when this iterator yields an `Err` value, the consumer /// may want to check if the iterator is exhausted (by calling `next()`) /// before treating the `Err` value as an application error. - pub fn commits(&self) -> impl Iterator> + use { + pub fn commits(&self) -> impl Iterator> + use { self.commits_from(0) } @@ -322,7 +348,10 @@ impl Commitlog { /// Note that the first [`StoredCommit`] yielded is the first commit /// containing the given transaction offset, i.e. its `min_tx_offset` may be /// smaller than `offset`. - pub fn commits_from(&self, offset: u64) -> impl Iterator> + use { + pub fn commits_from( + &self, + offset: u64, + ) -> impl Iterator> + use { self.inner.read().unwrap().commits_from(offset) } @@ -401,15 +430,13 @@ impl Commitlog { inner: RwLock::new(inner), }) } - - /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { - let inner = self.inner.read().unwrap(); - inner.repo.size_on_disk() - } } -impl Commitlog { +impl Commitlog +where + T: Encode, + R: Repo, +{ /// Write `transactions` to the log. /// /// This will store all `transactions` as a single [Commit] @@ -479,10 +506,11 @@ impl Commitlog { pub fn transactions<'a, D>( &self, de: &'a D, - ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T> + ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T, R> where D: Decoder, D::Error: From, + R: 'a, T: 'a, { self.transactions_from(0, de) @@ -498,10 +526,11 @@ impl Commitlog { &self, offset: u64, de: &'a D, - ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T> + ) -> impl Iterator, D::Error>> + 'a + use<'a, D, T, R> where D: Decoder, D::Error: From, + R: 'a, T: 'a, { self.inner.read().unwrap().transactions_from(offset, de) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index c6c85b93f4f..3d79f7f1e28 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -152,6 +152,18 @@ pub trait Repo: Clone + fmt::Display { } } +/// Marker for repos that do not require an external lock file. +/// +/// Durability implementations can use this to expose repo-backed opening +/// only for storage backends where skipping the filesystem `db.lock` cannot +/// violate single-writer safety. +pub trait RepoWithoutLockFile: Repo {} + +impl RepoWithoutLockFile for &T {} + +#[cfg(any(test, feature = "test"))] +impl RepoWithoutLockFile for Memory {} + impl Repo for &T { type SegmentWriter = T::SegmentWriter; type SegmentReader = T::SegmentReader; diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index e837506da38..5b0daa5145c 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_durability::{DurabilityExited, TxOffset}; use spacetimedb_paths::server::ServerDataDir; -use spacetimedb_snapshot::SnapshotRepository; +use spacetimedb_snapshot::DynSnapshotRepo; use crate::{messages::control_db::Database, util::asyncify}; @@ -61,9 +61,9 @@ impl Persistence { } } - /// If snapshots are enabled, get the [SnapshotRepository] they are stored in. - pub fn snapshot_repo(&self) -> Option<&SnapshotRepository> { - self.snapshots.as_ref().map(|worker| worker.repo()) + /// If snapshots are enabled, get the [SnapshotRepo] they are stored in. + pub fn snapshot_repo(&self) -> Option> { + self.snapshots.as_ref().map(|worker| worker.snapshot_repo()) } /// Get the [TxOffset] reported as durable by the [Durability] impl. diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index d8cd4884bcc..9f041c92ccb 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -51,7 +51,7 @@ use spacetimedb_schema::schema::{ ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, }; use spacetimedb_schema::table_name::TableName; -use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotRepository}; +use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository}; use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::page_pool::PagePool; use spacetimedb_table::table::{RowRef, TableScanIter}; @@ -235,7 +235,7 @@ impl RelationalDB { /// /// - `snapshot_repo` /// - /// The [`SnapshotRepository`] which stores snapshots of this database. + /// The [`SnapshotRepo`] which stores snapshots of this database. /// This is only meaningful if `history` and `durability` are also supplied. /// If restoring from an existing database, the `snapshot_repo` must /// store views of the same sequence of TXes as the `history`. @@ -278,9 +278,10 @@ impl RelationalDB { let start_time = std::time::Instant::now(); + let snapshot_repo = persistence.as_ref().and_then(|p| p.snapshot_repo()); let inner = Self::restore_from_snapshot_or_bootstrap( database_identity, - persistence.as_ref().and_then(|p| p.snapshot_repo()), + snapshot_repo.as_deref(), durable_tx_offset, min_commitlog_offset, page_pool, @@ -292,7 +293,7 @@ impl RelationalDB { .snapshot_repo() .map(|repo| repo.database_identity() == database_identity) .unwrap_or(true), - "snapshot repository does not match database identity", + "snapshot repo does not match database identity", ); persistence.set_snapshot_state(inner.committed_state.clone()); } @@ -471,7 +472,7 @@ impl RelationalDB { fn restore_from_snapshot_or_bootstrap( database_identity: Identity, - snapshot_repo: Option<&SnapshotRepository>, + snapshot_repo: Option<&DynSnapshotRepo>, durable_tx_offset: Option, min_commitlog_offset: TxOffset, page_pool: PagePool, @@ -479,7 +480,7 @@ impl RelationalDB { // Try to load the `ReconstructedSnapshot` at `snapshot_offset`. fn try_load_snapshot( database_identity: &Identity, - snapshot_repo: &SnapshotRepository, + snapshot_repo: &DynSnapshotRepo, snapshot_offset: TxOffset, page_pool: &PagePool, ) -> Result> { @@ -592,11 +593,12 @@ impl RelationalDB { // Invalidate the snapshot if the error is permanent. // Newly created snapshots should not depend on it. if !is_transient_error(&e) { - let path = snapshot_repo.snapshot_dir_path(snapshot_offset); - log::info!("invalidating bad snapshot at {}", path.display()); - path.rename_invalid().map_err(|e| RestoreSnapshotError::Invalidate { - offset: snapshot_offset, - source: Box::new(e.into()), + log::info!("invalidating bad snapshot at {snapshot_offset}"); + snapshot_repo.invalidate_snapshot(snapshot_offset).map_err(|e| { + RestoreSnapshotError::Invalidate { + offset: snapshot_offset, + source: Box::new(e), + } })?; } // Try the next older one if the error was transient. @@ -612,7 +614,7 @@ impl RelationalDB { } } } - log::info!("[{database_identity}] DATABASE: no usable snapshot on disk"); + log::info!("[{database_identity}] DATABASE: no usable snapshot in snapshot repo"); // If we didn't find a snapshot and the commitlog doesn't start at the // zero-th commit (e.g. due to archiving), there is no way to restore @@ -769,6 +771,19 @@ impl RelationalDB { r } + #[cfg(any(feature = "test", test))] + #[tracing::instrument(level = "trace", skip_all)] + pub fn try_begin_mut_tx(&self, isolation_level: IsolationLevel, workload: Workload) -> Option { + log::trace!("TRY BEGIN MUT TX"); + let r = self.inner.try_begin_mut_tx(isolation_level, workload); + if r.is_some() { + log::trace!("ACQUIRED MUT TX"); + } else { + log::trace!("MUT TX CONTENDED"); + } + r + } + #[tracing::instrument(level = "trace", skip_all)] pub fn begin_tx(&self, workload: Workload) -> Tx { log::trace!("BEGIN TX"); @@ -1007,7 +1022,7 @@ impl RelationalDB { Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?) } - pub(crate) fn add_columns_to_table( + pub(crate) fn add_columns_to_table_mut_tx( &self, tx: &mut MutTx, table_id: TableId, @@ -1019,6 +1034,17 @@ impl RelationalDB { .add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?) } + #[cfg(any(feature = "test", test))] + pub fn add_columns_to_table( + &self, + tx: &mut MutTx, + table_id: TableId, + column_schemas: Vec, + default_values: Vec, + ) -> Result { + self.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values) + } + /// Reports the `TxMetrics`s passed. /// /// Should only be called after the tx lock has been fully released. @@ -1771,13 +1797,12 @@ pub mod tests_utils { use super::*; use core::ops::Deref; - use durability::EmptyHistory; + use durability::{Durability, EmptyHistory}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; use spacetimedb_paths::server::ReplicaDir; - use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -1980,11 +2005,13 @@ pub mod tests_utils { drop(self.db); if let Some(DurableState { - durability: _, + durability, rt, replica_dir, }) = self.durable { + rt.block_on(durability.close()); + drop(durability); // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); let (db, handle) = Self::durable_internal(&replica_dir, rt.handle().clone(), self.want_snapshot_repo)?; @@ -2091,7 +2118,7 @@ pub mod tests_utils { Arc::new(|_, _| i64::MAX) } - pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result, DBError> { + pub fn take_snapshot(&self, repo: &DynSnapshotRepo) -> Result, DBError> { Ok(self.inner.take_snapshot(repo)?) } } @@ -3661,7 +3688,7 @@ mod tests { let repo = open_snapshot_repo(dir, Identity::ZERO, 0)?; RelationalDB::restore_from_snapshot_or_bootstrap( Identity::ZERO, - Some(&repo), + Some(repo.as_ref()), Some(last_compress), 0, PagePool::new_for_test(), @@ -3689,8 +3716,13 @@ mod tests { ); let last = repo.latest_snapshot()?; - let stdb = - RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, 0, PagePool::new_for_test())?; + let stdb = RelationalDB::restore_from_snapshot_or_bootstrap( + identity, + Some(repo.as_ref()), + last, + 0, + PagePool::new_for_test(), + )?; let out = TempDir::with_prefix("snapshot_test")?; let dir = SnapshotsPath::from_path_unchecked(out.path()); diff --git a/crates/core/src/db/snapshot.rs b/crates/core/src/db/snapshot.rs index c47e1d33d2d..26e3d8373cf 100644 --- a/crates/core/src/db/snapshot.rs +++ b/crates/core/src/db/snapshot.rs @@ -14,7 +14,7 @@ use prometheus::{Histogram, IntGauge}; use spacetimedb_datastore::locking_tx_datastore::{committed_state::CommittedState, datastore::Locking}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::Identity; -use spacetimedb_snapshot::{CompressionStats, SnapshotRepository}; +use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo}; use tokio::sync::watch; use crate::{util::asyncify, worker_metrics::WORKER_METRICS}; @@ -60,7 +60,7 @@ impl Compression { pub struct SnapshotWorker { snapshot_created: watch::Sender, request_snapshot: mpsc::UnboundedSender, - snapshot_repository: Arc, + snapshot_repository: Arc, } impl SnapshotWorker { @@ -69,7 +69,7 @@ impl SnapshotWorker { /// The handle is only partially initialized, as it is lacking the /// [SnapshotDatabaseState]. This allows control code to [Self::subscribe] /// to future snapshots before handing off the worker to the database. - pub fn new(snapshot_repository: Arc, compression: Compression) -> Self { + pub fn new(snapshot_repository: Arc, compression: Compression) -> Self { let database = snapshot_repository.database_identity(); let latest_snapshot = snapshot_repository.latest_snapshot().ok().flatten().unwrap_or(0); let (snapshot_created, _) = watch::channel(latest_snapshot); @@ -105,9 +105,9 @@ impl SnapshotWorker { .expect("snapshot worker panicked"); } - /// Get the [SnapshotRepository] this worker is operating on. - pub fn repo(&self) -> &SnapshotRepository { - &self.snapshot_repository + /// Get the snapshot repo this worker is operating on. + pub fn snapshot_repo(&self) -> Arc { + self.snapshot_repository.clone() } /// Request a snapshot to be taken. @@ -166,7 +166,7 @@ enum Request { struct SnapshotWorkerActor { snapshot_requests: mpsc::UnboundedReceiver, - snapshot_repo: Arc, + snapshot_repo: Arc, snapshot_created: watch::Sender, metrics: SnapshotMetrics, compression: Option, @@ -225,7 +225,7 @@ impl SnapshotWorkerActor { let maybe_snapshot = asyncify(move || { let _timer = inner_timer.start_timer(); - Locking::take_snapshot_internal(&state, &snapshot_repo) + Locking::take_snapshot_internal(&state, snapshot_repo.as_ref()) }) .await .with_context(|| format!("error capturing snapshot of database {}", database_identity))?; @@ -307,7 +307,7 @@ impl CompressionMetrics { } struct Compressor { - snapshot_repo: Arc, + snapshot_repo: Arc, metrics: CompressionMetrics, stats: Option, } diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 6c7c3bd9fc8..f9ca4c110d9 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -317,7 +317,7 @@ fn auto_migrate_database( .iter() .filter_map(|col_def| col_def.default_value.clone()) .collect(); - stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; + stdb.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?; } spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { log!(logger, "Disconnecting all users"); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index edcce91ce5e..e9d67103b16 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -30,7 +30,6 @@ use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; use spacetimedb_lib::{ConnectionId, Identity}; -use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicValue, ProductValue}; @@ -39,7 +38,7 @@ use spacetimedb_schema::{ reducer_name::ReducerName, schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}, }; -use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot}; +use spacetimedb_snapshot::{BoxedPendingSnapshot, DynSnapshotRepo, ReconstructedSnapshot}; use spacetimedb_table::{ indexes::RowPointer, page_pool::PagePool, @@ -223,11 +222,11 @@ impl Locking { /// (i.e. no transactions have been committed yet) /// and therefore no snapshot was created /// - /// - or `Some` path to the newly created snapshot directory + /// - or `Some` transaction offset for the newly created snapshot /// - /// Returns an error if [`SnapshotRepository::create_snapshot`] returns an + /// Returns an error if [`DynSnapshotRepo::create_snapshot`] returns an /// error. - pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result> { + pub fn take_snapshot(&self, repo: &DynSnapshotRepo) -> Result> { Self::take_snapshot_internal(&self.committed_state, repo)? .map(|(_offset, snap)| snap.sync_all()) .transpose() @@ -241,8 +240,8 @@ impl Locking { pub fn take_snapshot_internal( committed_state: &RwLock, - repo: &SnapshotRepository, - ) -> Result> { + repo: &DynSnapshotRepo, + ) -> Result> { let mut committed_state = committed_state.write(); let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else { return Ok(None); @@ -254,8 +253,8 @@ impl Locking { tx_offset, ); - let (tables, blob_store) = committed_state.persistent_tables_and_blob_store(); - let unflushed_snapshot = repo.create_snapshot(tables, blob_store, tx_offset)?; + let (mut tables, blob_store) = committed_state.persistent_tables_and_blob_store(); + let unflushed_snapshot = repo.create_snapshot(&mut tables, blob_store, tx_offset)?; Ok(Some((tx_offset, unflushed_snapshot))) } @@ -924,6 +923,29 @@ impl MutTx for Locking { } impl Locking { + #[cfg(any(feature = "test", test))] + pub fn try_begin_mut_tx(&self, _isolation_level: IsolationLevel, workload: Workload) -> Option { + let metrics = ExecutionMetrics::default(); + let ctx = ExecutionContext::with_workload(self.database_identity, workload); + + let timer = Instant::now(); + let committed_state_write_lock = self.committed_state.try_write_arc()?; + let sequence_state_lock = self.sequence_state.try_lock_arc()?; + let lock_wait_time = timer.elapsed(); + + Some(MutTxId { + committed_state_write_lock, + sequence_state_lock, + tx_state: TxState::default(), + lock_wait_time, + read_sets: <_>::default(), + timer, + ctx, + metrics, + _not_send: std::marker::PhantomData, + }) + } + pub fn rollback_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxMetrics, TxId) { tx.rollback_downgrade(workload) } diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 04a98d5de75..04d46d8f634 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -12,7 +12,10 @@ use itertools::Itertools as _; use log::{info, trace, warn}; use scopeguard::ScopeGuard; use spacetimedb_commitlog::{ - error, payload::Txdata, Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction, + error, + payload::Txdata, + repo::{Fs, Repo, RepoWithoutLockFile}, + Commit, Commitlog, CompressionStats, Decoder, Encode, Transaction, }; use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; use spacetimedb_paths::server::ReplicaDir; @@ -85,9 +88,12 @@ pub enum OpenError { /// /// Note, however, that instantiating `T` to a different type may require to /// change the log format version! -pub struct Local { +pub struct Local +where + R: Repo, +{ /// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps. - clog: Arc>>, + clog: Arc, R>>, /// The durable transaction offset, as reported by the background /// [`FlushAndSyncTask`]. durable_offset: watch::Receiver>, @@ -108,7 +114,7 @@ pub struct Local { actor: Mutex>>, } -impl Local { +impl Local { /// Create a [`Local`] instance at the `replica_dir`. /// /// `replica_dir` must already exist. @@ -134,6 +140,34 @@ impl Local { opts.commitlog, on_new_segment, )?); + Self::open_inner(clog, rt, opts, Some(lock)) + } +} + +impl Local +where + T: Encode + Send + Sync + 'static, + R: RepoWithoutLockFile + Send + Sync + 'static, +{ + /// Create a [`Local`] instance backed by the provided commitlog repo. + pub fn open_with_repo(repo: R, rt: tokio::runtime::Handle, opts: Options) -> Result { + info!("open local durability"); + let clog = Arc::new(Commitlog::open_with_repo(repo, opts.commitlog)?); + Self::open_inner(clog, rt, opts, None) + } +} + +impl Local +where + T: Encode + Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ + fn open_inner( + clog: Arc, R>>, + rt: tokio::runtime::Handle, + opts: Options, + lock: Option, + ) -> Result { let queue_capacity = opts.queue_capacity(); let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); @@ -163,12 +197,16 @@ impl Local { } /// Obtain a read-only copy of the durable state that implements [History]. - pub fn as_history(&self) -> impl History> + use { + pub fn as_history(&self) -> impl History> + use { self.clog.clone() } } -impl Local { +impl Local +where + T: Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ /// Inspect how many transactions added via [`Self::append_tx`] are pending /// to be applied to the underlying [`Commitlog`]. pub fn queue_depth(&self) -> u64 { @@ -176,7 +214,7 @@ impl Local { } /// Obtain an iterator over the [`Commit`]s in the underlying log. - pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> + use { + pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> + use { self.clog.commits_from(offset).map_ok(Commit::from) } @@ -189,15 +227,20 @@ impl Local { pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result { self.clog.compress_segments(offsets) } +} +impl Local { /// Get the size on disk of the underlying [`Commitlog`]. pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() } } -struct Actor { - clog: Arc>>, +struct Actor +where + R: Repo, +{ + clog: Arc, R>>, durable_offset: watch::Sender>, queue_depth: Arc, @@ -205,10 +248,14 @@ struct Actor { batch_capacity: NonZeroUsize, #[allow(unused)] - lock: LockedFile, + lock: Option, } -impl Actor { +impl Actor +where + T: Encode + Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ #[instrument(name = "durability::local::actor", skip_all)] async fn run(self, transactions_rx: async_channel::Receiver>>) { info!("starting durability actor"); @@ -289,7 +336,11 @@ impl Actor { } } -impl Durability for Local { +impl Durability for Local +where + T: Send + Sync + 'static, + R: Repo + Send + Sync + 'static, +{ type TxData = Txdata; fn append_tx(&self, tx: PreparedTx) { @@ -334,7 +385,11 @@ impl Durability for Local { } } -impl History for Commitlog> { +impl History for Commitlog, R> +where + T: Encode + 'static, + R: Repo + Send + Sync + 'static, +{ type TxData = Txdata; fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error> diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 46e24684078..6af30dc0f26 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -46,7 +46,7 @@ use spacetimedb_table::{ }; use std::fs::{self, File}; use std::io; -use std::ops::RangeBounds; +use std::ops::{Range, RangeBounds}; use std::path::Path; use std::time::{Duration, Instant}; use std::{ @@ -206,6 +206,11 @@ pub struct UnflushedSnapshot { } impl UnflushedSnapshot { + /// Return the transaction offset this pending snapshot will finalize at. + pub fn tx_offset(&self) -> TxOffset { + self.inner.as_ref().unwrap().snapshot.tx_offset + } + /// Sync all objects in the snapshot and write out the snapshot file. /// /// Returns the [SnapshotDirPath] on success. @@ -255,6 +260,30 @@ impl UnflushedSnapshotInner { } } +pub trait PendingSnapshot: Send { + /// Sync all snapshot state and return the finalized transaction offset. + fn sync_all(self: Box) -> Result; +} + +// We pass snapshot repos as `dyn` rather than compile-time generics for convenience, +// as threading generics through a deep callstack would be a hassle. +pub type BoxedPendingSnapshot = Box; +pub type DynSnapshotRepo = dyn SnapshotRepo; + +impl PendingSnapshot for BoxedPendingSnapshot { + fn sync_all(self: Box) -> Result { + (*self).sync_all() + } +} + +impl PendingSnapshot for UnflushedSnapshot { + fn sync_all(self: Box) -> Result { + let tx_offset = self.tx_offset(); + UnflushedSnapshot::sync_all(*self)?; + Ok(tx_offset) + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct Snapshot { /// A magic number: must be equal to [`MAGIC`]. @@ -1143,13 +1172,19 @@ impl SnapshotRepository { .collect::>(); for newer_snapshot in newer_snapshots { - let path = self.snapshot_dir_path(newer_snapshot); - log::info!("Renaming snapshot newer than {upper_bound} from {path:?} to {path:?}"); - path.rename_invalid()?; + self.invalidate_snapshot(newer_snapshot)?; } Ok(()) } + /// Mark a single snapshot invalid so it will not be considered for future + /// restores. + pub fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError> { + let path = self.snapshot_dir_path(tx_offset); + log::info!("Renaming snapshot {tx_offset} from {path:?} to invalid"); + path.rename_invalid().map_err(Into::into) + } + /// Compress the `current` snapshot, unless it is already compressed. /// /// If a `parent` snapshot is given, its object repo will be used to @@ -1334,6 +1369,91 @@ impl SnapshotRepository { } } +/// Snapshot storage backend. +pub trait SnapshotRepo: Send + Sync { + type Pending: PendingSnapshot; + + /// Return the database identity associated with this snapshot backend. + fn database_identity(&self) -> Identity; + + /// Start creating a snapshot at `tx_offset` from the provided tables and blob store. + fn create_snapshot<'db>( + &self, + tables: &mut dyn Iterator, + blobs: &'db dyn BlobStore, + tx_offset: TxOffset, + ) -> Result; + + /// Reconstruct the snapshot at `tx_offset` using the supplied page pool. + fn read_snapshot(&self, tx_offset: TxOffset, page_pool: &PagePool) -> Result; + + /// Return the latest snapshot at or before `upper_bound`, if one exists. + fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result, SnapshotError>; + + /// Return the latest snapshot in this backend, if one exists. + fn latest_snapshot(&self) -> Result, SnapshotError> { + self.latest_snapshot_older_than(TxOffset::MAX) + } + + /// Attempt to compress all snapshots that fall into `range`, and record + /// the outcome in `stats`. + /// + /// The snapshots in `range` are traversed in ascending order. + /// If an error occurs, processing stops and the error is returned. + /// + /// See [CompressionStats] for how to interpret the results. + fn compress_snapshots(&self, stats: &mut CompressionStats, range: Range) -> Result<(), SnapshotError>; + + /// Invalidate every snapshot newer than `upper_bound`. + fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError>; + + /// Invalidate the snapshot at `tx_offset`. + fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError>; +} + +impl SnapshotRepo for SnapshotRepository { + type Pending = BoxedPendingSnapshot; + + fn database_identity(&self) -> Identity { + SnapshotRepository::database_identity(self) + } + + fn create_snapshot<'db>( + &self, + tables: &mut dyn Iterator, + blobs: &'db dyn BlobStore, + tx_offset: TxOffset, + ) -> Result { + Ok(Box::new(SnapshotRepository::create_snapshot( + self, tables, blobs, tx_offset, + )?)) + } + + fn read_snapshot(&self, tx_offset: TxOffset, page_pool: &PagePool) -> Result { + SnapshotRepository::read_snapshot(self, tx_offset, page_pool) + } + + fn latest_snapshot_older_than(&self, upper_bound: TxOffset) -> Result, SnapshotError> { + SnapshotRepository::latest_snapshot_older_than(self, upper_bound) + } + + fn latest_snapshot(&self) -> Result, SnapshotError> { + SnapshotRepository::latest_snapshot(self) + } + + fn compress_snapshots(&self, stats: &mut CompressionStats, range: Range) -> Result<(), SnapshotError> { + SnapshotRepository::compress_snapshots(self, stats, range) + } + + fn invalidate_newer_snapshots(&self, upper_bound: TxOffset) -> Result<(), SnapshotError> { + SnapshotRepository::invalidate_newer_snapshots(self, upper_bound) + } + + fn invalidate_snapshot(&self, tx_offset: TxOffset) -> Result<(), SnapshotError> { + SnapshotRepository::invalidate_snapshot(self, tx_offset) + } +} + pub struct ReconstructedSnapshot { /// The identity of the snapshotted database. pub database_identity: Identity,