From 215e1c018f4da76606a7b1704a8d8d50cc9451d9 Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Thu, 30 Apr 2026 16:07:58 -0700 Subject: [PATCH 1/4] [bfops/smoketest-flakes]: WIP --- crates/smoketests/src/lib.rs | 45 ++++++++++++++++++++- crates/smoketests/tests/smoketests/views.rs | 2 +- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index 1ef17047d31..6daea8f972c 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -62,7 +62,7 @@ use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::process::{Command, Output, Stdio}; use std::sync::OnceLock; -use std::time::Instant; +use std::time::{Duration, Instant}; use which::which; /// Returns the remote server URL if running against a remote server. @@ -1317,6 +1317,49 @@ log = "0.4" ); } + /// Asserts that a SQL query eventually produces the expected output. + /// + /// Use this only for read-only queries after operations that can briefly + /// leave the database worker unavailable, such as publishing an update. + pub fn assert_sql_eventually(&self, query: &str, expected: &str) { + let expected_normalized = normalize_whitespace(expected); + let deadline = Instant::now() + Duration::from_secs(10); + let mut last_actual = None; + let mut last_error = None; + + while Instant::now() < deadline { + match self.sql(query) { + Ok(actual) => { + let actual_normalized = normalize_whitespace(&actual); + if actual_normalized == expected_normalized { + return; + } + last_actual = Some(actual_normalized); + last_error = None; + } + Err(err) => { + last_error = Some(err.to_string()); + } + } + + std::thread::sleep(Duration::from_millis(200)); + } + + if let Some(actual_normalized) = last_actual { + assert_eq!( + actual_normalized, expected_normalized, + "SQL output mismatch for query after retry: {}\n\nExpected:\n{}\n\nActual:\n{}", + query, expected_normalized, actual_normalized + ); + } + + panic!( + "SQL query failed after retry: {}\n\nLast error:\n{}", + query, + last_error.unwrap_or_else(|| "no attempts completed".to_string()) + ); + } + /// Fetches the last N log entries from the database. pub fn logs(&self, n: usize) -> Result> { let records = self.log_records(n)?; diff --git a/crates/smoketests/tests/smoketests/views.rs b/crates/smoketests/tests/smoketests/views.rs index f44c6076159..6d15a1e2797 100644 --- a/crates/smoketests/tests/smoketests/views.rs +++ b/crates/smoketests/tests/smoketests/views.rs @@ -486,7 +486,7 @@ fn test_views_auto_migration() { test.use_precompiled_module("views-auto-migrate-updated"); test.publish_module_clear(false).unwrap(); - test.assert_sql( + test.assert_sql_eventually( "SELECT * FROM player", r#" id | level ----+------- From 923f99c10a0128a7bdd6ec1be363ab1ee2812942 Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Tue, 5 May 2026 11:05:36 -0700 Subject: [PATCH 2/4] [bfops/test-flakes]: Allow custom smoketest subscribe timeout --- crates/smoketests/src/lib.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index 6daea8f972c..b90a4358911 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -1609,21 +1609,30 @@ log = "0.4" /// This matches Python's subscribe semantics - start subscription first, /// perform actions, then call the handle to collect results. pub fn subscribe_background(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, None) + self.subscribe_background_opts(queries, n, Duration::from_secs(30), None) + } + + pub fn subscribe_background_with_timeout( + &self, + queries: &[&str], + n: usize, + timeout: Duration, + ) -> Result { + self.subscribe_background_opts(queries, n, timeout, None) } pub fn subscribe_background_on(&self, database: &str, queries: &[&str], n: usize) -> Result { - self.subscribe_background_on_opts(database, queries, n, Some(false)) + self.subscribe_background_on_opts(database, queries, n, Duration::from_secs(30), Some(false)) } /// Starts a subscription in the background with --confirmed flag. pub fn subscribe_background_confirmed(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, Some(true)) + self.subscribe_background_opts(queries, n, Duration::from_secs(30), Some(true)) } /// Starts a subscription in the background with --confirmed flag. pub fn subscribe_background_unconfirmed(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, Some(false)) + self.subscribe_background_opts(queries, n, Duration::from_secs(30), Some(false)) } pub fn subscribe_background_on_confirmed( @@ -1632,7 +1641,7 @@ log = "0.4" queries: &[&str], n: usize, ) -> Result { - self.subscribe_background_on_opts(database, queries, n, Some(true)) + self.subscribe_background_on_opts(database, queries, n, Duration::from_secs(30), Some(true)) } /// Internal helper for background subscribe with options. @@ -1640,6 +1649,7 @@ log = "0.4" &self, queries: &[&str], n: usize, + timeout: Duration, confirmed: Option, ) -> Result { let identity = self @@ -1648,7 +1658,7 @@ log = "0.4" .context("No database published")? .clone(); - self.subscribe_background_on_impl(&identity, queries, n, confirmed) + self.subscribe_background_on_impl(&identity, queries, n, timeout, confirmed) } fn subscribe_background_on_opts( @@ -1656,9 +1666,10 @@ log = "0.4" database: &str, queries: &[&str], n: usize, + timeout: Duration, confirmed: Option, ) -> Result { - self.subscribe_background_on_impl(database, queries, n, confirmed) + self.subscribe_background_on_impl(database, queries, n, timeout, confirmed) } fn subscribe_background_on_impl( @@ -1666,6 +1677,7 @@ log = "0.4" database: &str, queries: &[&str], n: usize, + timeout: Duration, confirmed: Option, ) -> Result { let cli_path = ensure_binaries_built(); @@ -1680,7 +1692,7 @@ log = "0.4" self.server_url.clone(), database.to_string(), "-t".to_string(), - "30".to_string(), + timeout.as_secs().to_string(), "-n".to_string(), n.to_string(), "--print-initial-update".to_string(), From 23515a49eb1d405c712ab71ce72966773020a295 Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Tue, 5 May 2026 11:11:45 -0700 Subject: [PATCH 3/4] Revert "[bfops/test-flakes]: Allow custom smoketest subscribe timeout" This reverts commit 923f99c10a0128a7bdd6ec1be363ab1ee2812942. --- crates/smoketests/src/lib.rs | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index b90a4358911..6daea8f972c 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -1609,30 +1609,21 @@ log = "0.4" /// This matches Python's subscribe semantics - start subscription first, /// perform actions, then call the handle to collect results. pub fn subscribe_background(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, Duration::from_secs(30), None) - } - - pub fn subscribe_background_with_timeout( - &self, - queries: &[&str], - n: usize, - timeout: Duration, - ) -> Result { - self.subscribe_background_opts(queries, n, timeout, None) + self.subscribe_background_opts(queries, n, None) } pub fn subscribe_background_on(&self, database: &str, queries: &[&str], n: usize) -> Result { - self.subscribe_background_on_opts(database, queries, n, Duration::from_secs(30), Some(false)) + self.subscribe_background_on_opts(database, queries, n, Some(false)) } /// Starts a subscription in the background with --confirmed flag. pub fn subscribe_background_confirmed(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, Duration::from_secs(30), Some(true)) + self.subscribe_background_opts(queries, n, Some(true)) } /// Starts a subscription in the background with --confirmed flag. pub fn subscribe_background_unconfirmed(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, Duration::from_secs(30), Some(false)) + self.subscribe_background_opts(queries, n, Some(false)) } pub fn subscribe_background_on_confirmed( @@ -1641,7 +1632,7 @@ log = "0.4" queries: &[&str], n: usize, ) -> Result { - self.subscribe_background_on_opts(database, queries, n, Duration::from_secs(30), Some(true)) + self.subscribe_background_on_opts(database, queries, n, Some(true)) } /// Internal helper for background subscribe with options. @@ -1649,7 +1640,6 @@ log = "0.4" &self, queries: &[&str], n: usize, - timeout: Duration, confirmed: Option, ) -> Result { let identity = self @@ -1658,7 +1648,7 @@ log = "0.4" .context("No database published")? .clone(); - self.subscribe_background_on_impl(&identity, queries, n, timeout, confirmed) + self.subscribe_background_on_impl(&identity, queries, n, confirmed) } fn subscribe_background_on_opts( @@ -1666,10 +1656,9 @@ log = "0.4" database: &str, queries: &[&str], n: usize, - timeout: Duration, confirmed: Option, ) -> Result { - self.subscribe_background_on_impl(database, queries, n, timeout, confirmed) + self.subscribe_background_on_impl(database, queries, n, confirmed) } fn subscribe_background_on_impl( @@ -1677,7 +1666,6 @@ log = "0.4" database: &str, queries: &[&str], n: usize, - timeout: Duration, confirmed: Option, ) -> Result { let cli_path = ensure_binaries_built(); @@ -1692,7 +1680,7 @@ log = "0.4" self.server_url.clone(), database.to_string(), "-t".to_string(), - timeout.as_secs().to_string(), + "30".to_string(), "-n".to_string(), n.to_string(), "--print-initial-update".to_string(), From 68ae475436fa682291ae4a792cef194b29df594b Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Fri, 8 May 2026 09:20:02 -0700 Subject: [PATCH 4/4] debug changes --- crates/client-api/src/routes/database.rs | 149 +++++++++++++++++++++-- crates/smoketests/src/lib.rs | 18 +++ 2 files changed, 158 insertions(+), 9 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index e1464814656..9bc93833886 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::num::NonZeroU8; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{env, io}; use crate::auth::{ @@ -24,7 +24,6 @@ use axum_extra::TypedHeader; use derive_more::From; use futures::TryStreamExt; use http::StatusCode; -use log::{info, warn}; use serde::Deserialize; use spacetimedb::auth::identity::ConnectionAuthCtx; use spacetimedb::database_logger::DatabaseLogger; @@ -49,6 +48,7 @@ use spacetimedb_schema::auto_migrate::{ use tokio::sync::oneshot; use tokio::time::error::Elapsed; use tokio::time::timeout; +use tracing::{info, warn}; use super::subscribe::{handle_websocket, HasWebSocketOptions}; @@ -838,6 +838,15 @@ pub async fn publish( }; let schema_migration_policy = schema_migration_policy(policy, token)?; + let publish_start = Instant::now(); + info!( + database = %database_identity, + op = ?publish_op, + host_type = ?host_type, + num_replicas = ?num_replicas, + confirmation_timeout = ?confirmation_timeout, + "publishing database" + ); let maybe_updated = ctx .publish_database( &auth.claims.identity, @@ -853,6 +862,13 @@ pub async fn publish( ) .await .map_err(log_and_500)?; + info!( + database = %database_identity, + op = ?publish_op, + result = update_database_result_name(&maybe_updated), + elapsed = ?publish_start.elapsed(), + "publish_database returned" + ); let success = || { axum::Json(PublishResult::Success { @@ -879,24 +895,139 @@ pub async fn publish( durable_offset, }, ) => { - timeout(confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT), async { - let tx_offset = tx_offset.await?; + let confirmation_timeout = confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT); + let confirmation_start = Instant::now(); + let initial_durable_offset = durable_offset.as_ref().and_then(|offset| offset.last_seen()); + info!( + database = %database_identity, + op = ?publish_op, + confirmation_timeout = ?confirmation_timeout, + has_durable_offset = durable_offset.is_some(), + initial_durable_offset = ?initial_durable_offset, + "waiting for database update confirmation" + ); + let confirmation_result = timeout(confirmation_timeout, async { + let tx_wait_start = Instant::now(); + let tx_offset = match tx_offset.await { + Ok(tx_offset) => { + info!( + database = %database_identity, + op = ?publish_op, + tx_offset, + elapsed = ?tx_wait_start.elapsed(), + "database update tx offset confirmed" + ); + tx_offset + } + Err(err) => { + warn!( + database = %database_identity, + op = ?publish_op, + elapsed = ?tx_wait_start.elapsed(), + error = %err, + "database update tx offset wait was cancelled" + ); + return Err(UpdateConfirmationError::Cancelled(err)); + } + }; if let Some(mut durable_offset) = durable_offset { - durable_offset.wait_for(tx_offset).await?; + let durable_wait_start = Instant::now(); + let last_seen_before_wait = durable_offset.last_seen(); + info!( + database = %database_identity, + op = ?publish_op, + tx_offset, + last_seen_durable_offset = ?last_seen_before_wait, + "waiting for database update durable offset" + ); + match durable_offset.wait_for(tx_offset).await { + Ok(confirmed_durable_offset) => { + info!( + database = %database_identity, + op = ?publish_op, + tx_offset, + confirmed_durable_offset, + elapsed = ?durable_wait_start.elapsed(), + "database update durable offset confirmed" + ); + } + Err(err) => { + warn!( + database = %database_identity, + op = ?publish_op, + tx_offset, + last_seen_durable_offset = ?durable_offset.last_seen(), + elapsed = ?durable_wait_start.elapsed(), + error = %err, + "database update durable offset wait failed" + ); + return Err(UpdateConfirmationError::Crashed(err)); + } + } + } else { + info!( + database = %database_identity, + op = ?publish_op, + tx_offset, + "database update has no durable offset to wait for" + ); } Ok::<_, UpdateConfirmationError>(()) }) - .await - .map_err(Into::into) - .flatten()?; + .await; + + match confirmation_result { + Ok(Ok(())) => { + info!( + database = %database_identity, + op = ?publish_op, + elapsed = ?confirmation_start.elapsed(), + "database update confirmation completed" + ); + } + Ok(Err(err)) => { + warn!( + database = %database_identity, + op = ?publish_op, + elapsed = ?confirmation_start.elapsed(), + error = ?err, + "database update confirmation failed" + ); + return Err(err.into()); + } + Err(err) => { + warn!( + database = %database_identity, + op = ?publish_op, + confirmation_timeout = ?confirmation_timeout, + elapsed = ?confirmation_start.elapsed(), + initial_durable_offset = ?initial_durable_offset, + "database update confirmation timed out" + ); + return Err(UpdateConfirmationError::Timeout(err).into()); + } + } Ok(success()) } } } -#[derive(From)] +fn update_database_result_name(result: &Option) -> &'static str { + match result { + None => "created", + Some(UpdateDatabaseResult::NoUpdateNeeded) => "no_update_needed", + Some(UpdateDatabaseResult::UpdatePerformed { .. }) => "update_performed", + Some(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { .. }) => { + "update_performed_with_client_disconnect" + } + Some(UpdateDatabaseResult::AutoMigrateError(_)) => "auto_migrate_error", + Some(UpdateDatabaseResult::ErrorExecutingMigration(_)) => "error_executing_migration", + } +} + +#[derive(Debug, From)] enum UpdateConfirmationError { Cancelled(oneshot::error::RecvError), Crashed(DurabilityExited), diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index 6daea8f972c..7724c6846dd 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -1166,6 +1166,12 @@ log = "0.4" // Now publish with --bin-path to skip rebuild let publish_start = Instant::now(); let mut args = vec!["publish", "--server", &self.server_url, "--bin-path", &wasm_path_str]; + eprintln!( + "[SMOKETEST] publish start: server={} module={} target={}", + self.server_url, + self.module_name, + name.unwrap_or("") + ); if opts.force { args.push("--yes"); @@ -1223,6 +1229,10 @@ log = "0.4" /// Arguments are passed directly to the CLI as strings. pub fn call(&self, name: &str, args: &[&str]) -> Result { let identity = self.database_identity.as_ref().context("No database published")?; + eprintln!( + "[SMOKETEST] call start: server={} database={} reducer={} args={:?}", + self.server_url, identity, name, args + ); let mut cmd_args = vec!["call", "--server", &self.server_url, "--", identity.as_str(), name]; cmd_args.extend(args); @@ -1282,6 +1292,10 @@ log = "0.4" /// Executes a SQL query against the database. pub fn sql(&self, query: &str) -> Result { let identity = self.database_identity.as_ref().context("No database published")?; + eprintln!( + "[SMOKETEST] sql start: server={} database={} query={}", + self.server_url, identity, query + ); self.spacetime(&["sql", "--server", &self.server_url, identity.as_str(), query]) } @@ -1289,6 +1303,10 @@ log = "0.4" /// Executes a SQL query with the --confirmed flag. pub fn sql_confirmed(&self, query: &str) -> Result { let identity = self.database_identity.as_ref().context("No database published")?; + eprintln!( + "[SMOKETEST] confirmed sql start: server={} database={} query={}", + self.server_url, identity, query + ); self.spacetime(&[ "sql",