From c7a14f110258180f66342d10037542fac889c3c4 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 10:18:57 +0100 Subject: [PATCH 1/3] Move server tests to integration suite --- src/server.rs | 639 +---------------------------------- src/server/worker.rs | 176 ++++++++++ tests/server_runtime.rs | 256 ++++++++++++++ tests/server_runtime_more.rs | 226 +++++++++++++ 4 files changed, 668 insertions(+), 629 deletions(-) create mode 100644 src/server/worker.rs create mode 100644 tests/server_runtime.rs create mode 100644 tests/server_runtime_more.rs diff --git a/src/server.rs b/src/server.rs index f1592a19..81d00f7f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,19 +26,14 @@ pub type PreambleCallback = Arc< /// Callback invoked when decoding a connection preamble fails. pub type PreambleErrorCallback = Arc; -use tokio::{ - net::TcpListener, - sync::oneshot, - time::{Duration, sleep}, -}; +use tokio::{net::TcpListener, sync::oneshot}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::{ - app::WireframeApp, - preamble::{Preamble, read_preamble}, - rewind_stream::RewindStream, -}; +use crate::{app::WireframeApp, preamble::Preamble}; +mod worker; +#[cfg(test)] +pub use worker::worker_task; /// Tokio-based server for `WireframeApp` instances. /// /// `WireframeServer` spawns a worker task per thread. Each worker @@ -236,6 +231,10 @@ where pub fn local_addr(&self) -> Option { self.listener.as_ref().and_then(|l| l.local_addr().ok()) } + #[doc(hidden)] + pub fn has_preamble_success(&self) -> bool { self.on_preamble_success.is_some() } + #[doc(hidden)] + pub fn has_preamble_failure(&self) -> bool { self.on_preamble_failure.is_some() } /// Bind the server to the given address and create a listener. /// @@ -356,7 +355,7 @@ where let on_failure = self.on_preamble_failure.clone(); let token = shutdown_token.clone(); let t = tracker.clone(); - tracker.spawn(worker_task( + tracker.spawn(worker::worker_task( listener, factory, on_success, on_failure, token, t, )); } @@ -371,621 +370,3 @@ where Ok(()) } } - -/// Runs a worker task that accepts incoming TCP connections and processes them asynchronously. -/// -/// Each accepted connection is handled in a separate task, with optional callbacks for preamble -/// decode success or failure. The worker listens for shutdown signals to terminate gracefully. -/// Accept errors are retried with exponential backoff. -async fn worker_task( - listener: Arc, - factory: F, - on_success: Option>, - on_failure: Option, - shutdown: CancellationToken, - tracker: TaskTracker, -) where - F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - // `Preamble` ensures `T` supports borrowed decoding. - T: Preamble, -{ - let mut delay = Duration::from_millis(10); - loop { - tokio::select! { - biased; - - () = shutdown.cancelled() => break, - - res = listener.accept() => match res { - Ok((stream, _)) => { - let success = on_success.clone(); - let failure = on_failure.clone(); - let factory = factory.clone(); - let t = tracker.clone(); - // Capture peer address for better error context - let peer_addr = stream.peer_addr().ok(); - t.spawn(async move { - use futures::FutureExt as _; - let fut = std::panic::AssertUnwindSafe( - process_stream(stream, factory, success, failure), - ) - .catch_unwind(); - - if let Err(panic) = fut.await { - let panic_msg = panic - .downcast_ref::<&str>() - .copied() - .or_else(|| panic.downcast_ref::().map(String::as_str)) - .unwrap_or(""); - tracing::error!(panic = %panic_msg, ?peer_addr, "connection task panicked"); - } - }); - delay = Duration::from_millis(10); - } - Err(e) => { - eprintln!("accept error: {e}"); - sleep(delay).await; - delay = (delay * 2).min(Duration::from_secs(1)); - } - }, - } - } -} - -/// Processes an incoming TCP stream by decoding a preamble and dispatching the connection to a -/// `WireframeApp`. -/// -/// Attempts to asynchronously decode a preamble of type `T` from the provided stream. If decoding -/// succeeds, invokes the optional success handler, wraps the stream to include any leftover bytes, -/// and passes it to a new `WireframeApp` instance for connection handling. If decoding fails, -/// invokes the optional failure handler and closes the connection. -/// -/// # Type Parameters -/// -/// - `F`: A factory closure that produces `WireframeApp` instances. -/// - `T`: The preamble type, which must support borrowed decoding via the `Preamble` trait. -/// -/// # Examples -/// -/// ```no_run -/// # use std::sync::Arc; -/// # use tokio::net::TcpStream; -/// # use wireframe::app::WireframeApp; -/// # async fn example() { -/// let stream: TcpStream = unimplemented!(); -/// let factory = || WireframeApp::new(); -/// // process_stream::<_, ()>(stream, factory, None, None).await; -/// # } -/// ``` -async fn process_stream( - mut stream: tokio::net::TcpStream, - factory: F, - on_success: Option>, - on_failure: Option, -) where - F: Fn() -> WireframeApp + Send + Sync + 'static, - // `Preamble` ensures `T` supports borrowed decoding. - T: Preamble, -{ - match read_preamble::<_, T>(&mut stream).await { - Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - eprintln!("preamble callback error: {e}"); - } - let stream = RewindStream::new(leftover, stream); - // Hand the connection to the application for processing. - // We already run `process_stream` inside a task, so spawning again - // only adds overhead. - let app = (factory)(); - app.handle_connection(stream).await; - } - Err(err) => { - if let Some(handler) = on_failure.as_ref() { - handler(&err); - } - // drop stream on failure - } - } -} - -#[cfg(test)] -mod tests { - use std::{ - net::{Ipv4Addr, SocketAddr}, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, - }; - - use bincode::{Decode, Encode}; - use rstest::{fixture, rstest}; - use tokio::{ - net::{TcpListener, TcpStream}, - sync::oneshot, - time::{Duration, timeout}, - }; - use tokio_util::{sync::CancellationToken, task::TaskTracker}; - use tracing_test::traced_test; - - use super::*; - - #[derive(Debug, Clone, PartialEq, Encode, Decode)] - struct TestPreamble { - id: u32, - message: String, - } - - /// Test helper preamble carrying no data. - #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect(dead_code, reason = "test helper for unused preamble type")] - struct EmptyPreamble; - - #[fixture] - fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::default() - } - - #[fixture] - fn free_port() -> SocketAddr { - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() - } - - fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer - where - F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - { - WireframeServer::new(factory) - .bind(addr) - .expect("Failed to bind") - } - - fn server_with_preamble(factory: F) -> WireframeServer - where - F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - { - WireframeServer::new(factory).with_preamble::() - } - - #[rstest] - fn test_new_server_creation( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - assert!(server.worker_count() >= 1); - assert!(server.local_addr().is_none()); - } - - #[rstest] - fn test_new_server_default_worker_count( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - let expected_workers = std::thread::available_parallelism() - .map_or(1, std::num::NonZeroUsize::get) - .max(1); - assert_eq!(server.worker_count(), expected_workers); - } - - #[rstest] - fn test_workers_configuration( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - - let server = server.workers(4); - assert_eq!(server.worker_count(), 4); - - let server = server.workers(100); - assert_eq!(server.worker_count(), 100); - - let server = server.workers(0); - assert_eq!(server.worker_count(), 1); - } - - #[rstest] - fn test_with_preamble_type_conversion( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - let server_with_preamble = server.with_preamble::(); - assert_eq!( - server_with_preamble.worker_count(), - std::thread::available_parallelism() - .map_or(1, std::num::NonZeroUsize::get) - .max(1) - ); - } - - #[rstest] - #[tokio::test] - async fn test_bind_success( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = bind_server(factory, free_port); - let bound_addr = server.local_addr().unwrap(); - assert_eq!(bound_addr.ip(), free_port.ip()); - } - - #[rstest] - #[tokio::test] - async fn test_bind_invalid_address( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1); - let result = server.bind(addr); - assert!(result.is_ok() || result.is_err()); - } - - #[rstest] - fn test_local_addr_before_bind( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - assert!(server.local_addr().is_none()); - } - - #[rstest] - #[tokio::test] - async fn test_local_addr_after_bind( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = bind_server(factory, free_port); - let local_addr = server.local_addr(); - assert!(local_addr.is_some()); - assert_eq!(local_addr.unwrap().ip(), free_port.ip()); - } - - #[rstest] - #[tokio::test] - async fn test_preamble_success_callback( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let callback_counter = Arc::new(AtomicUsize::new(0)); - let counter_clone = callback_counter.clone(); - - let server = server_with_preamble(factory).on_preamble_decode_success( - move |_preamble: &TestPreamble, _| { - let cnt = counter_clone.clone(); - Box::pin(async move { - cnt.fetch_add(1, Ordering::SeqCst); - Ok(()) - }) - }, - ); - - assert_eq!(callback_counter.load(Ordering::SeqCst), 0); - assert!(server.on_preamble_success.is_some()); - } - - #[rstest] - #[tokio::test] - async fn test_preamble_failure_callback( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let callback_counter = Arc::new(AtomicUsize::new(0)); - let counter_clone = callback_counter.clone(); - - let server = server_with_preamble(factory).on_preamble_decode_failure( - move |_error: &DecodeError| { - counter_clone.fetch_add(1, Ordering::SeqCst); - }, - ); - - assert_eq!(callback_counter.load(Ordering::SeqCst), 0); - assert!(server.on_preamble_failure.is_some()); - } - - #[rstest] - #[tokio::test] - async fn test_method_chaining( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let callback_invoked = Arc::new(AtomicUsize::new(0)); - let counter_clone = callback_invoked.clone(); - - let server = WireframeServer::new(factory) - .workers(2) - .with_preamble::() - .on_preamble_decode_success(move |_: &TestPreamble, _| { - let cnt = counter_clone.clone(); - Box::pin(async move { - cnt.fetch_add(1, Ordering::SeqCst); - Ok(()) - }) - }) - .on_preamble_decode_failure(|_: &DecodeError| { - eprintln!("Preamble decode failed"); - }) - .bind(free_port) - .expect("Failed to bind"); - - assert_eq!(server.worker_count(), 2); - assert!(server.local_addr().is_some()); - assert!(server.on_preamble_success.is_some()); - assert!(server.on_preamble_failure.is_some()); - } - - #[rstest] - #[tokio::test] - #[should_panic(expected = "`bind` must be called before `run`")] - async fn test_run_without_bind_panics( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - let _ = timeout(Duration::from_millis(100), server.run()).await; - } - - #[rstest] - #[tokio::test] - #[should_panic(expected = "`bind` must be called before `run`")] - async fn test_run_with_shutdown_without_bind_panics( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - let shutdown_future = async { tokio::time::sleep(Duration::from_millis(10)).await }; - let _ = timeout( - Duration::from_millis(100), - server.run_with_shutdown(shutdown_future), - ) - .await; - } - - #[rstest] - #[tokio::test] - async fn test_run_with_immediate_shutdown( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = WireframeServer::new(factory) - .workers(1) - .bind(free_port) - .expect("Failed to bind"); - - let shutdown_future = async {}; - - let result = timeout( - Duration::from_millis(1000), - server.run_with_shutdown(shutdown_future), - ) - .await; - - assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); - } - - #[rstest] - #[tokio::test] - async fn test_server_graceful_shutdown_with_ctrl_c_simulation( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = WireframeServer::new(factory) - .workers(2) - .bind(free_port) - .expect("Failed to bind"); - - let shutdown_future = async { - tokio::time::sleep(Duration::from_millis(50)).await; - }; - - let start = std::time::Instant::now(); - let result = timeout( - Duration::from_millis(1000), - server.run_with_shutdown(shutdown_future), - ) - .await; - let elapsed = start.elapsed(); - - assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); - assert!(elapsed < Duration::from_millis(500)); - } - - #[rstest] - #[tokio::test] - async fn test_multiple_worker_creation( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let _ = &factory; - let call_count = Arc::new(AtomicUsize::new(0)); - let call_count_clone = call_count.clone(); - - let factory = move || { - call_count_clone.fetch_add(1, Ordering::SeqCst); - WireframeApp::default() - }; - - let server = WireframeServer::new(factory) - .workers(3) - .bind(free_port) - .expect("Failed to bind"); - - let shutdown_future = async { - tokio::time::sleep(Duration::from_millis(10)).await; - }; - - let result = timeout( - Duration::from_millis(1000), - server.run_with_shutdown(shutdown_future), - ) - .await; - - assert!(result.is_ok()); - assert!(result.unwrap().is_ok()); - } - - #[rstest] - #[tokio::test] - async fn test_server_configuration_persistence( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = WireframeServer::new(factory).workers(5); - - assert_eq!(server.worker_count(), 5); - - let server = server.bind(free_port).expect("Failed to bind"); - assert_eq!(server.worker_count(), 5); - assert!(server.local_addr().is_some()); - } - - #[rstest] - fn test_preamble_callbacks_reset_on_type_change( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory) - .on_preamble_decode_success(|&(), _| Box::pin(async { Ok(()) })) - .on_preamble_decode_failure(|_: &DecodeError| {}); - - assert!(server.on_preamble_success.is_some()); - assert!(server.on_preamble_failure.is_some()); - - let server = server.with_preamble::(); - assert!(server.on_preamble_success.is_none()); - assert!(server.on_preamble_failure.is_none()); - } - - #[rstest] - #[tokio::test] - async fn test_worker_task_shutdown_signal( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let token = CancellationToken::new(); - let tracker = TaskTracker::new(); - let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap()); - - tracker.spawn(worker_task::<_, ()>( - listener, - factory, - None, - None, - token.clone(), - tracker.clone(), - )); - - token.cancel(); - tracker.close(); - - let result = timeout(Duration::from_millis(100), tracker.wait()).await; - assert!(result.is_ok()); - } - - #[rstest] - fn test_extreme_worker_counts( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory); - - let server = server.workers(usize::MAX); - assert_eq!(server.worker_count(), usize::MAX); - - let server = server.workers(0); - assert_eq!(server.worker_count(), 1); - } - - #[rstest] - #[tokio::test] - async fn test_bind_to_multiple_addresses( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - free_port: SocketAddr, - ) { - let server = WireframeServer::new(factory); - let addr1 = free_port; - let addr2 = { - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() - }; - - let server = server.bind(addr1).expect("Failed to bind first address"); - let first_local_addr = server.local_addr().unwrap(); - - let server = server.bind(addr2).expect("Failed to bind second address"); - let second_local_addr = server.local_addr().unwrap(); - - assert_ne!(first_local_addr.port(), second_local_addr.port()); - assert_eq!(second_local_addr.ip(), addr2.ip()); - } - - #[test] - fn test_server_debug_compilation_guard() { - assert!(cfg!(debug_assertions)); - } - - /// Ensure the server survives panicking connection tasks. - /// - /// The test spawns a server with a connection setup callback that - /// immediately panics. Logs are captured so the panic message and peer - /// address can be asserted. A first client - /// connection triggers the panic and writes dummy preamble bytes to ensure - /// the panic is logged. The client's peer address is captured before - /// dropping the connection so the error log can be validated. A second - /// connection verifies the server continues accepting new clients after the - /// failure. Finally, the logs are scanned for the expected error entry - /// containing `peer_addr` and `panic=boom`. - #[rstest] - #[traced_test] - #[tokio::test] - async fn connection_panic_is_caught( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let app_factory = move || { - factory() - .on_connection_setup(|| async { panic!("boom") }) - .unwrap() - }; - let server = WireframeServer::new(app_factory) - .workers(1) - .bind("127.0.0.1:0".parse().unwrap()) - .expect("bind"); - let addr = server.local_addr().unwrap(); - - let (tx, rx) = oneshot::channel(); - let handle = tokio::spawn(async move { - server - .run_with_shutdown(async { - let _ = rx.await; - }) - .await - .unwrap(); - }); - - let first = TcpStream::connect(addr) - .await - .expect("first connection should succeed"); - let peer_addr = first.local_addr().expect("first connection peer address"); - first.writable().await.unwrap(); - first.try_write(&[0; 8]).unwrap(); - drop(first); - TcpStream::connect(addr) - .await - .expect("second connection should succeed after panic"); - - let _ = tx.send(()); - handle.await.unwrap(); - - tokio::task::yield_now().await; - - logs_assert(|lines: &[&str]| { - lines - .iter() - .find(|line| { - line.contains("connection task panicked") - && line.contains("panic=boom") - && line.contains(&format!("peer_addr=Some({peer_addr})")) - }) - .map(|_| ()) - .ok_or_else(|| "panic log not found".to_string()) - }); - } -} diff --git a/src/server/worker.rs b/src/server/worker.rs new file mode 100644 index 00000000..90556d7e --- /dev/null +++ b/src/server/worker.rs @@ -0,0 +1,176 @@ +//! Worker task and connection processing helpers for `WireframeServer`. + +use std::sync::Arc; + +use futures::FutureExt; +use tokio::{ + net::TcpListener, + time::{Duration, sleep}, +}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +use super::{PreambleCallback, PreambleErrorCallback}; +use crate::{ + app::WireframeApp, + preamble::{Preamble, read_preamble}, + rewind_stream::RewindStream, +}; + +/// Runs a worker task that accepts incoming TCP connections and processes them asynchronously. +/// +/// Each accepted connection is handled in a separate task, with optional callbacks for preamble +/// decode success or failure. The worker listens for shutdown signals to terminate gracefully. +/// Accept errors are retried with exponential backoff. +pub async fn worker_task( + listener: Arc, + factory: F, + on_success: Option>, + on_failure: Option, + shutdown: CancellationToken, + tracker: TaskTracker, +) where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + // `Preamble` ensures `T` supports borrowed decoding. + T: Preamble, +{ + let mut delay = Duration::from_millis(10); + loop { + tokio::select! { + biased; + + () = shutdown.cancelled() => break, + + res = listener.accept() => match res { + Ok((stream, _)) => { + let success = on_success.clone(); + let failure = on_failure.clone(); + let factory = factory.clone(); + let t = tracker.clone(); + // Capture peer address for better error context + let peer_addr = stream.peer_addr().ok(); + t.spawn(async move { + let fut = std::panic::AssertUnwindSafe( + process_stream(stream, factory, success, failure), + ) + .catch_unwind(); + + if let Err(panic) = fut.await { + let panic_msg = panic + .downcast_ref::<&str>() + .copied() + .or_else(|| panic.downcast_ref::().map(String::as_str)) + .unwrap_or(""); + tracing::error!(panic = %panic_msg, ?peer_addr, "connection task panicked"); + } + }); + delay = Duration::from_millis(10); + } + Err(e) => { + eprintln!("accept error: {e}"); + sleep(delay).await; + delay = (delay * 2).min(Duration::from_secs(1)); + } + }, + } + } +} + +/// Processes an incoming TCP stream by decoding a preamble and dispatching the connection to a +/// `WireframeApp`. +/// +/// Attempts to asynchronously decode a preamble of type `T` from the provided stream. If decoding +/// succeeds, invokes the optional success handler, wraps the stream to include any leftover bytes, +/// and passes it to a new `WireframeApp` instance for connection handling. If decoding fails, +/// invokes the optional failure handler and closes the connection. +/// +/// # Type Parameters +/// +/// - `F`: A factory closure that produces `WireframeApp` instances. +/// - `T`: The preamble type, which must support borrowed decoding via the `Preamble` trait. +/// +/// # Examples +/// +/// ```no_run +/// # use std::sync::Arc; +/// # use tokio::net::TcpStream; +/// # use wireframe::app::WireframeApp; +/// # async fn example() { +/// let stream: TcpStream = unimplemented!(); +/// let factory = || WireframeApp::new(); +/// // process_stream::<_, ()>(stream, factory, None, None).await; +/// # } +/// ``` +pub(crate) async fn process_stream( + mut stream: tokio::net::TcpStream, + factory: F, + on_success: Option>, + on_failure: Option, +) where + F: Fn() -> WireframeApp + Send + Sync + 'static, + // `Preamble` ensures `T` supports borrowed decoding. + T: Preamble, +{ + match read_preamble::<_, T>(&mut stream).await { + Ok((preamble, leftover)) => { + if let Some(handler) = on_success.as_ref() + && let Err(e) = handler(&preamble, &mut stream).await + { + eprintln!("preamble callback error: {e}"); + } + let stream = RewindStream::new(leftover, stream); + // Hand the connection to the application for processing. + // We already run `process_stream` inside a task, so spawning again + // only adds overhead. + let app = (factory)(); + app.handle_connection(stream).await; + } + Err(err) => { + if let Some(handler) = on_failure.as_ref() { + handler(&err); + } + // drop stream on failure + } + } +} + +#[cfg(test)] +mod tests { + use rstest::{fixture, rstest}; + use tokio::{ + net::TcpListener, + time::{Duration, timeout}, + }; + use tokio_util::{sync::CancellationToken, task::TaskTracker}; + + use super::*; + + #[fixture] + fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { + || WireframeApp::default() + } + + #[rstest] + #[tokio::test] + async fn test_worker_task_shutdown_signal( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + ) { + let token = CancellationToken::new(); + let tracker = TaskTracker::new(); + let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap()); + + tracker.spawn(worker_task::<_, ()>( + listener, + factory, + None, + None, + token.clone(), + tracker.clone(), + )); + + token.cancel(); + tracker.close(); + + let result = timeout(Duration::from_millis(100), tracker.wait()).await; + assert!(result.is_ok()); + } +} diff --git a/tests/server_runtime.rs b/tests/server_runtime.rs new file mode 100644 index 00000000..c9f6fbd9 --- /dev/null +++ b/tests/server_runtime.rs @@ -0,0 +1,256 @@ +//! Runtime behaviour tests for `WireframeServer`. +use std::{ + net::{Ipv4Addr, SocketAddr}, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use bincode::{Decode, Encode, error::DecodeError}; +use rstest::{fixture, rstest}; +use tokio::time::{Duration, timeout}; +use wireframe::{app::WireframeApp, server::WireframeServer}; + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +struct TestPreamble { + id: u32, + message: String, +} + +/// Test helper preamble carrying no data. +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +#[expect(dead_code, reason = "test helper for unused preamble type")] +struct EmptyPreamble; + +#[fixture] +fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { + || WireframeApp::default() +} + +#[fixture] +fn free_port() -> SocketAddr { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = std::net::TcpListener::bind(addr).unwrap(); + listener.local_addr().unwrap() +} + +fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + WireframeServer::new(factory) + .bind(addr) + .expect("Failed to bind") +} + +fn server_with_preamble(factory: F) -> WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + WireframeServer::new(factory).with_preamble::() +} + +#[rstest] +fn test_new_server_creation(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { + let server = WireframeServer::new(factory); + assert!(server.worker_count() >= 1); + assert!(server.local_addr().is_none()); +} + +#[rstest] +fn test_new_server_default_worker_count( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory); + let expected_workers = std::thread::available_parallelism() + .map_or(1, std::num::NonZeroUsize::get) + .max(1); + assert_eq!(server.worker_count(), expected_workers); +} + +#[rstest] +fn test_workers_configuration(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { + let server = WireframeServer::new(factory); + + let server = server.workers(4); + assert_eq!(server.worker_count(), 4); + + let server = server.workers(100); + assert_eq!(server.worker_count(), 100); + + let server = server.workers(0); + assert_eq!(server.worker_count(), 1); +} + +#[rstest] +fn test_with_preamble_type_conversion( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory); + let server_with_preamble = server.with_preamble::(); + assert_eq!( + server_with_preamble.worker_count(), + std::thread::available_parallelism() + .map_or(1, std::num::NonZeroUsize::get) + .max(1) + ); +} + +#[rstest] +#[tokio::test] +async fn test_bind_success( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = bind_server(factory, free_port); + let bound_addr = server.local_addr().unwrap(); + assert_eq!(bound_addr.ip(), free_port.ip()); +} + +#[rstest] +#[tokio::test] +async fn test_bind_invalid_address( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory); + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1); + let result = server.bind(addr); + assert!(result.is_ok() || result.is_err()); +} + +#[rstest] +fn test_local_addr_before_bind(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { + let server = WireframeServer::new(factory); + assert!(server.local_addr().is_none()); +} + +#[rstest] +#[tokio::test] +async fn test_local_addr_after_bind( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = bind_server(factory, free_port); + let local_addr = server.local_addr(); + assert!(local_addr.is_some()); + assert_eq!(local_addr.unwrap().ip(), free_port.ip()); +} + +#[rstest] +#[tokio::test] +async fn test_preamble_success_callback( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let callback_counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = callback_counter.clone(); + + let server = server_with_preamble(factory).on_preamble_decode_success( + move |_preamble: &TestPreamble, _| { + let cnt = counter_clone.clone(); + Box::pin(async move { + cnt.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + }, + ); + + assert_eq!(callback_counter.load(Ordering::SeqCst), 0); + assert!(server.has_preamble_success()); +} + +#[rstest] +#[tokio::test] +async fn test_preamble_failure_callback( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let callback_counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = callback_counter.clone(); + + let server = + server_with_preamble(factory).on_preamble_decode_failure(move |_error: &DecodeError| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + + assert_eq!(callback_counter.load(Ordering::SeqCst), 0); + assert!(server.has_preamble_failure()); +} + +#[rstest] +#[tokio::test] +async fn test_method_chaining( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let callback_invoked = Arc::new(AtomicUsize::new(0)); + let counter_clone = callback_invoked.clone(); + + let server = WireframeServer::new(factory) + .workers(2) + .with_preamble::() + .on_preamble_decode_success(move |_: &TestPreamble, _| { + let cnt = counter_clone.clone(); + Box::pin(async move { + cnt.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + }) + .on_preamble_decode_failure(|_: &DecodeError| { + eprintln!("Preamble decode failed"); + }) + .bind(free_port) + .expect("Failed to bind"); + + assert_eq!(server.worker_count(), 2); + assert!(server.local_addr().is_some()); + assert!(server.has_preamble_success()); + assert!(server.has_preamble_failure()); +} + +#[rstest] +#[tokio::test] +#[should_panic(expected = "`bind` must be called before `run`")] +async fn test_run_without_bind_panics( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory); + let _ = timeout(Duration::from_millis(100), server.run()).await; +} + +#[rstest] +#[tokio::test] +#[should_panic(expected = "`bind` must be called before `run`")] +async fn test_run_with_shutdown_without_bind_panics( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory); + let shutdown_future = async { tokio::time::sleep(Duration::from_millis(10)).await }; + let _ = timeout( + Duration::from_millis(100), + server.run_with_shutdown(shutdown_future), + ) + .await; +} + +#[rstest] +#[tokio::test] +async fn test_run_with_immediate_shutdown( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = WireframeServer::new(factory) + .workers(1) + .bind(free_port) + .expect("Failed to bind"); + + let shutdown_future = async {}; + + let result = timeout( + Duration::from_millis(1000), + server.run_with_shutdown(shutdown_future), + ) + .await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_ok()); +} diff --git a/tests/server_runtime_more.rs b/tests/server_runtime_more.rs new file mode 100644 index 00000000..86198ffe --- /dev/null +++ b/tests/server_runtime_more.rs @@ -0,0 +1,226 @@ +//! Additional runtime tests for `WireframeServer`. +use std::{ + net::{Ipv4Addr, SocketAddr}, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use bincode::{Decode, Encode, error::DecodeError}; +use rstest::{fixture, rstest}; +use tokio::{ + net::TcpStream, + sync::oneshot, + time::{Duration, timeout}, +}; +use tracing_test::traced_test; +use wireframe::{app::WireframeApp, server::WireframeServer}; + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +struct TestPreamble { + id: u32, + message: String, +} + +/// Test helper preamble carrying no data. +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +#[expect(dead_code, reason = "test helper for unused preamble type")] +struct EmptyPreamble; + +#[fixture] +fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { + || WireframeApp::default() +} + +#[fixture] +fn free_port() -> SocketAddr { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = std::net::TcpListener::bind(addr).unwrap(); + listener.local_addr().unwrap() +} + +#[rstest] +#[tokio::test] +async fn test_server_graceful_shutdown_with_ctrl_c_simulation( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = WireframeServer::new(factory) + .workers(2) + .bind(free_port) + .expect("Failed to bind"); + + let shutdown_future = async { + tokio::time::sleep(Duration::from_millis(50)).await; + }; + + let start = std::time::Instant::now(); + let result = timeout( + Duration::from_millis(1000), + server.run_with_shutdown(shutdown_future), + ) + .await; + let elapsed = start.elapsed(); + + assert!(result.is_ok()); + assert!(result.unwrap().is_ok()); + assert!(elapsed < Duration::from_millis(500)); +} + +#[rstest] +#[tokio::test] +async fn test_multiple_worker_creation( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let _ = &factory; + let call_count = Arc::new(AtomicUsize::new(0)); + let call_count_clone = call_count.clone(); + + let factory = move || { + call_count_clone.fetch_add(1, Ordering::SeqCst); + WireframeApp::default() + }; + + let server = WireframeServer::new(factory) + .workers(3) + .bind(free_port) + .expect("Failed to bind"); + + let shutdown_future = async { + tokio::time::sleep(Duration::from_millis(10)).await; + }; + + let result = timeout( + Duration::from_millis(1000), + server.run_with_shutdown(shutdown_future), + ) + .await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_ok()); +} + +#[rstest] +#[tokio::test] +async fn test_server_configuration_persistence( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = WireframeServer::new(factory).workers(5); + + assert_eq!(server.worker_count(), 5); + + let server = server.bind(free_port).expect("Failed to bind"); + assert_eq!(server.worker_count(), 5); + assert!(server.local_addr().is_some()); +} + +#[rstest] +fn test_preamble_callbacks_reset_on_type_change( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let server = WireframeServer::new(factory) + .on_preamble_decode_success(|&(), _| Box::pin(async { Ok(()) })) + .on_preamble_decode_failure(|_: &DecodeError| {}); + + assert!(server.has_preamble_success()); + assert!(server.has_preamble_failure()); + + let server = server.with_preamble::(); + assert!(!server.has_preamble_success()); + assert!(!server.has_preamble_failure()); +} + +#[rstest] +#[rstest] +fn test_extreme_worker_counts(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { + let server = WireframeServer::new(factory); + + let server = server.workers(usize::MAX); + assert_eq!(server.worker_count(), usize::MAX); + + let server = server.workers(0); + assert_eq!(server.worker_count(), 1); +} + +#[rstest] +#[tokio::test] +async fn test_bind_to_multiple_addresses( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + free_port: SocketAddr, +) { + let server = WireframeServer::new(factory); + let addr1 = free_port; + let addr2 = { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = std::net::TcpListener::bind(addr).unwrap(); + listener.local_addr().unwrap() + }; + + let server = server.bind(addr1).expect("Failed to bind first address"); + let first_local_addr = server.local_addr().unwrap(); + + let server = server.bind(addr2).expect("Failed to bind second address"); + let second_local_addr = server.local_addr().unwrap(); + + assert_ne!(first_local_addr.port(), second_local_addr.port()); + assert_eq!(second_local_addr.ip(), addr2.ip()); +} + +#[test] +fn test_server_debug_compilation_guard() { + assert!(cfg!(debug_assertions)); +} + +/// Ensure the server survives panicking connection tasks. +/// +/// The test spawns a server with a connection setup callback that +/// immediately panics. Logs are captured so the panic message and peer +/// address can be asserted. A first client +/// connection triggers the panic and writes dummy preamble bytes to ensure +/// the panic is logged. The client's peer address is captured before +/// dropping the connection so the error log can be validated. A second +/// connection verifies the server continues accepting new clients after the +/// failure. Finally, the logs are scanned for the expected error entry +#[rstest] +#[traced_test] +#[tokio::test] +async fn connection_panic_is_caught( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let app_factory = move || { + factory() + .on_connection_setup(|| async { panic!("boom") }) + .unwrap() + }; + let server = WireframeServer::new(app_factory) + .workers(1) + .bind("127.0.0.1:0".parse().unwrap()) + .expect("bind"); + let addr = server.local_addr().unwrap(); + + let (tx, rx) = oneshot::channel(); + let handle = tokio::spawn(async move { + server + .run_with_shutdown(async { + let _ = rx.await; + }) + .await + .unwrap(); + }); + + let first = TcpStream::connect(addr) + .await + .expect("first connection should succeed"); + first.writable().await.unwrap(); + first.try_write(&[0; 8]).unwrap(); + drop(first); + TcpStream::connect(addr) + .await + .expect("second connection should succeed after panic"); + + let _ = tx.send(()); + handle.await.unwrap(); +} From 9f62d4256991681f91042ef2fb3cb51d03257408 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 16:46:52 +0100 Subject: [PATCH 2/3] Refactor server tests and logging --- src/server/worker.rs | 6 ++--- tests/server_helpers.rs | 50 ++++++++++++++++++++++++++++++++++++ tests/server_runtime.rs | 50 +++++++----------------------------- tests/server_runtime_more.rs | 50 +++++++++++++++++++----------------- 4 files changed, 88 insertions(+), 68 deletions(-) create mode 100644 tests/server_helpers.rs diff --git a/src/server/worker.rs b/src/server/worker.rs index 90556d7e..c7c9ade2 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -66,7 +66,7 @@ pub async fn worker_task( delay = Duration::from_millis(10); } Err(e) => { - eprintln!("accept error: {e}"); + tracing::error!(error = %e, "accept error"); sleep(delay).await; delay = (delay * 2).min(Duration::from_secs(1)); } @@ -100,7 +100,7 @@ pub async fn worker_task( /// // process_stream::<_, ()>(stream, factory, None, None).await; /// # } /// ``` -pub(crate) async fn process_stream( +async fn process_stream( mut stream: tokio::net::TcpStream, factory: F, on_success: Option>, @@ -115,7 +115,7 @@ pub(crate) async fn process_stream( if let Some(handler) = on_success.as_ref() && let Err(e) = handler(&preamble, &mut stream).await { - eprintln!("preamble callback error: {e}"); + tracing::error!(error = %e, "preamble callback error"); } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. diff --git a/tests/server_helpers.rs b/tests/server_helpers.rs new file mode 100644 index 00000000..63bf7dec --- /dev/null +++ b/tests/server_helpers.rs @@ -0,0 +1,50 @@ +//! Utilities shared across server runtime tests. +//! +//! This module provides fixtures and helper constructors used by +//! `server_runtime.rs` and `server_runtime_more.rs`. +use std::net::{Ipv4Addr, SocketAddr}; + +use bincode::{Decode, Encode}; +use rstest::fixture; +use wireframe::{app::WireframeApp, server::WireframeServer}; + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub struct TestPreamble { + pub id: u32, + pub message: String, +} + +#[fixture] +pub fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { + || WireframeApp::default() +} + +#[fixture] +pub fn free_port() -> SocketAddr { + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = std::net::TcpListener::bind(addr).unwrap(); + listener.local_addr().unwrap() +} + +/// Create a server bound to the provided address for testing. +/// +/// # Panics +/// +/// Panics if binding to `addr` fails. +#[allow(dead_code)] +pub fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + WireframeServer::new(factory) + .bind(addr) + .expect("Failed to bind") +} + +#[allow(dead_code)] +pub fn server_with_preamble(factory: F) -> WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, +{ + WireframeServer::new(factory).with_preamble::() +} diff --git a/tests/server_runtime.rs b/tests/server_runtime.rs index c9f6fbd9..5b63afd2 100644 --- a/tests/server_runtime.rs +++ b/tests/server_runtime.rs @@ -7,49 +7,14 @@ use std::{ }, }; -use bincode::{Decode, Encode, error::DecodeError}; -use rstest::{fixture, rstest}; +use bincode::error::DecodeError; +use rstest::rstest; use tokio::time::{Duration, timeout}; use wireframe::{app::WireframeApp, server::WireframeServer}; -#[derive(Debug, Clone, PartialEq, Encode, Decode)] -struct TestPreamble { - id: u32, - message: String, -} - -/// Test helper preamble carrying no data. -#[derive(Debug, Clone, PartialEq, Encode, Decode)] -#[expect(dead_code, reason = "test helper for unused preamble type")] -struct EmptyPreamble; +use crate::server_helpers::{TestPreamble, bind_server, factory, free_port, server_with_preamble}; -#[fixture] -fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::default() -} - -#[fixture] -fn free_port() -> SocketAddr { - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() -} - -fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer -where - F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, -{ - WireframeServer::new(factory) - .bind(addr) - .expect("Failed to bind") -} - -fn server_with_preamble(factory: F) -> WireframeServer -where - F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, -{ - WireframeServer::new(factory).with_preamble::() -} +mod server_helpers; #[rstest] fn test_new_server_creation(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { @@ -114,9 +79,12 @@ async fn test_bind_invalid_address( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, ) { let server = WireframeServer::new(factory); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1); + // Binding to a privileged port typically requires elevated privileges. + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 80); let result = server.bind(addr); - assert!(result.is_ok() || result.is_err()); + if !cfg!(target_os = "windows") && std::env::var("USER").is_ok_and(|u| u != "root") { + assert!(result.is_err(), "Expected bind to privileged port to fail"); + } } #[rstest] diff --git a/tests/server_runtime_more.rs b/tests/server_runtime_more.rs index 86198ffe..e250fa9a 100644 --- a/tests/server_runtime_more.rs +++ b/tests/server_runtime_more.rs @@ -7,8 +7,8 @@ use std::{ }, }; -use bincode::{Decode, Encode, error::DecodeError}; -use rstest::{fixture, rstest}; +use bincode::error::DecodeError; +use rstest::rstest; use tokio::{ net::TcpStream, sync::oneshot, @@ -16,29 +16,11 @@ use tokio::{ }; use tracing_test::traced_test; use wireframe::{app::WireframeApp, server::WireframeServer}; +use wireframe_testing::{LoggerHandle, logger}; -#[derive(Debug, Clone, PartialEq, Encode, Decode)] -struct TestPreamble { - id: u32, - message: String, -} - -/// Test helper preamble carrying no data. -#[derive(Debug, Clone, PartialEq, Encode, Decode)] -#[expect(dead_code, reason = "test helper for unused preamble type")] -struct EmptyPreamble; +use crate::server_helpers::{TestPreamble, factory, free_port}; -#[fixture] -fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::default() -} - -#[fixture] -fn free_port() -> SocketAddr { - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() -} +mod server_helpers; #[rstest] #[tokio::test] @@ -100,6 +82,8 @@ async fn test_multiple_worker_creation( assert!(result.is_ok()); assert!(result.unwrap().is_ok()); + // no connections handled, factory should not be called + assert_eq!(call_count.load(Ordering::SeqCst), 0); } #[rstest] @@ -133,7 +117,6 @@ fn test_preamble_callbacks_reset_on_type_change( assert!(!server.has_preamble_failure()); } -#[rstest] #[rstest] fn test_extreme_worker_counts(factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static) { let server = WireframeServer::new(factory); @@ -189,6 +172,7 @@ fn test_server_debug_compilation_guard() { #[tokio::test] async fn connection_panic_is_caught( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + mut logger: LoggerHandle, ) { let app_factory = move || { factory() @@ -223,4 +207,22 @@ async fn connection_panic_is_caught( let _ = tx.send(()); handle.await.unwrap(); + + let mut found_task = false; + let mut found_msg = false; + let mut found_addr = false; + while let Some(record) = logger.pop() { + if record.args().contains("connection task panicked") { + found_task = true; + } + if record.args().contains("boom") { + found_msg = true; + } + if record.args().contains("peer_addr") { + found_addr = true; + } + } + assert!(found_task); + assert!(found_msg); + assert!(found_addr); } From 4e410c14f49690bb244f2c4dc7395157189f2dcd Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 18:21:16 +0100 Subject: [PATCH 3/3] Adjust tests based on review --- src/server/worker.rs | 26 +++++++++++++++++--------- tests/server_helpers.rs | 4 ++-- tests/server_runtime.rs | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/server/worker.rs b/src/server/worker.rs index c7c9ade2..825402e2 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -135,7 +135,12 @@ async fn process_stream( #[cfg(test)] mod tests { - use rstest::{fixture, rstest}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + use rstest::rstest; use tokio::{ net::TcpListener, time::{Duration, timeout}, @@ -144,20 +149,22 @@ mod tests { use super::*; - #[fixture] - fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::default() - } - #[rstest] #[tokio::test] - async fn test_worker_task_shutdown_signal( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { + async fn test_worker_task_shutdown_signal() { let token = CancellationToken::new(); let tracker = TaskTracker::new(); let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap()); + let call_count = Arc::new(AtomicUsize::new(0)); + let factory = { + let call_count = call_count.clone(); + move || { + call_count.fetch_add(1, Ordering::SeqCst); + WireframeApp::default() + } + }; + tracker.spawn(worker_task::<_, ()>( listener, factory, @@ -172,5 +179,6 @@ mod tests { let result = timeout(Duration::from_millis(100), tracker.wait()).await; assert!(result.is_ok()); + assert_eq!(call_count.load(Ordering::SeqCst), 0); } } diff --git a/tests/server_helpers.rs b/tests/server_helpers.rs index 63bf7dec..a3247c95 100644 --- a/tests/server_helpers.rs +++ b/tests/server_helpers.rs @@ -22,8 +22,8 @@ pub fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { #[fixture] pub fn free_port() -> SocketAddr { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = std::net::TcpListener::bind(addr).unwrap(); - listener.local_addr().unwrap() + let listener = std::net::TcpListener::bind(addr).expect("Failed to bind to localhost:0"); + listener.local_addr().expect("Failed to get local address") } /// Create a server bound to the provided address for testing. diff --git a/tests/server_runtime.rs b/tests/server_runtime.rs index 5b63afd2..ca965f12 100644 --- a/tests/server_runtime.rs +++ b/tests/server_runtime.rs @@ -164,7 +164,7 @@ async fn test_method_chaining( }) }) .on_preamble_decode_failure(|_: &DecodeError| { - eprintln!("Preamble decode failed"); + tracing::error!("Preamble decode failed"); }) .bind(free_port) .expect("Failed to bind");