diff --git a/Cargo.lock b/Cargo.lock index 5e3016f054..a5d4161751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3783,6 +3783,7 @@ dependencies = [ "blake2", "digest", "futures", + "hickory-resolver", "hostname", "libp2p", "libp2p-stream", diff --git a/overlay/Cargo.toml b/overlay/Cargo.toml index 97e48248bf..ff709f0ef0 100644 --- a/overlay/Cargo.toml +++ b/overlay/Cargo.toml @@ -28,6 +28,7 @@ stellar-xdr = { version = "=26.0.0", default-features = true } libp2p = { version = "0.54", features = ["tokio", "macros", "identify", "quic"] } libp2p-stream = "0.2.0-alpha" futures = "0.3" +hickory-resolver = "0.24.4" # Logging tracing = "0.1" diff --git a/overlay/src/libp2p_overlay.rs b/overlay/src/libp2p_overlay.rs index ecb4aa1959..9b729c5630 100644 --- a/overlay/src/libp2p_overlay.rs +++ b/overlay/src/libp2p_overlay.rs @@ -533,7 +533,7 @@ impl StellarOverlay { } OverlayCommand::DialPeer { peer_id, addr } => { let opts = DialOpts::peer_id(peer_id) - .condition(PeerCondition::Disconnected) + .condition(PeerCondition::DisconnectedAndNotDialing) .addresses(vec![addr.clone()]) .build(); self.state.metrics.outbound_attempt.fetch_add(1, Ordering::Relaxed); @@ -4628,6 +4628,7 @@ async fn test_dial_peer_skips_when_connected() { // Record outbound_attempt before the PeerId-based dial let attempts_before = m1.outbound_attempt.load(Ordering::Relaxed); + let pending_before = m1.connection_pending.load(Ordering::Relaxed); // PeerId-based dial should be a no-op (already connected) handle1.dial_peer(peer_id2, addr2.clone()).await; @@ -4638,11 +4639,16 @@ async fn test_dial_peer_skips_when_connected() { // outbound_attempt increments (we submitted the command), but connection_pending // should NOT have changed (DialPeer was rejected by libp2p before handshake) let attempts_after = m1.outbound_attempt.load(Ordering::Relaxed); + let pending_after = m1.connection_pending.load(Ordering::Relaxed); assert_eq!( attempts_after, attempts_before + 1, "outbound_attempt should increment by 1" ); + assert_eq!( + pending_after, pending_before, + "connection_pending should not change for an already-connected peer" + ); handle1.shutdown().await; handle2.shutdown().await; diff --git a/overlay/src/main.rs b/overlay/src/main.rs index f14c4f359e..a608e5e296 100644 --- a/overlay/src/main.rs +++ b/overlay/src/main.rs @@ -16,8 +16,10 @@ pub mod libp2p_overlay; mod metrics; mod xdr; +use hickory_resolver::config::LookupIpStrategy; +use hickory_resolver::TokioAsyncResolver; use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -36,6 +38,9 @@ use libp2p_overlay::{ }; use metrics::OverlayMetrics; +const CONFIGURED_PEER_RECONNECT_INTERVAL: Duration = Duration::from_secs(1); +const DNS_LOOKUP_TIMEOUT: Duration = Duration::from_secs(1); + /// Command-line arguments struct Args { config_path: Option, @@ -142,6 +147,72 @@ fn multiaddr_to_socket_addr(addr: &Multiaddr) -> Option { } } +struct PeerDnsResolver { + resolver: TokioAsyncResolver, +} + +impl PeerDnsResolver { + fn new() -> Result { + let (config, mut opts) = hickory_resolver::system_conf::read_system_conf() + .map_err(|e| format!("failed to read DNS system configuration: {}", e))?; + + opts.cache_size = 0; + opts.timeout = DNS_LOOKUP_TIMEOUT; + opts.attempts = 1; + opts.ip_strategy = LookupIpStrategy::Ipv4thenIpv6; + + Ok(Self { + resolver: TokioAsyncResolver::tokio(config, opts), + }) + } + + async fn resolve(&self, host: &str, port: u16) -> Result { + let addrs = self.resolve_all(host, port).await?; + + addrs + .iter() + .copied() + .find(|addr| addr.is_ipv4()) + .or_else(|| addrs.into_iter().next()) + .ok_or_else(|| format!("DNS returned no addresses for '{}:{}'", host, port)) + } + + async fn resolve_all(&self, host: &str, port: u16) -> Result, String> { + self.resolver.clear_cache(); + + let addrs: Vec<_> = self + .resolver + .lookup_ip(host) + .await + .map_err(|e| format!("failed to resolve '{}:{}': {}", host, port, e))? + .iter() + .map(|addr| SocketAddr::new(addr, port)) + .collect(); + + if addrs.is_empty() { + Err(format!("DNS returned no addresses for '{}:{}'", host, port)) + } else { + Ok(addrs) + } + } +} + +fn split_host_port_or_default(addr_str: &str, default_port: u16) -> Result<(String, u16), String> { + if let Ok(ip) = addr_str.parse::() { + return Ok((ip.to_string(), default_port)); + } + + match addr_str.rsplit_once(':') { + Some((host, port_str)) if !host.contains(':') => { + let port = port_str + .parse::() + .map_err(|e| format!("invalid port in '{}': {}", addr_str, e))?; + Ok((host.to_string(), port)) + } + _ => Ok((addr_str.to_string(), default_port)), + } +} + /// Extract TX set hashes from an SCP envelope. fn extract_txset_hashes_from_scp(envelope: &[u8]) -> Vec<[u8; 32]> { xdr::extract_txset_hashes_from_scp(envelope) @@ -153,31 +224,18 @@ fn extract_txset_hashes_from_scp(envelope: &[u8]) -> Vec<[u8; 32]> { /// - `IP:port` (e.g. "10.0.0.1:11625") — parsed directly /// - DNS hostname (e.g. "pod-0.svc.cluster.local") — resolved via DNS, using `default_port` /// - DNS hostname with port (e.g. "pod-0.svc.cluster.local:11625") — resolved via DNS -async fn resolve_peer_addr(addr_str: &str, default_port: u16) -> Result { +async fn resolve_peer_addr( + dns_resolver: &PeerDnsResolver, + addr_str: &str, + default_port: u16, +) -> Result { // Try direct SocketAddr parse first (handles "IP:port") if let Ok(addr) = addr_str.parse::() { return Ok(addr); } - // It's a hostname — append default port if none present - let host_port = if addr_str.contains(':') { - addr_str.to_string() - } else { - format!("{}:{}", addr_str, default_port) - }; - - // DNS resolution via tokio (async, non-blocking) - let addrs: Vec<_> = tokio::net::lookup_host(&host_port) - .await - .map_err(|e| format!("failed to resolve '{}': {}", host_port, e))? - .collect(); - - addrs - .iter() - .copied() - .find(|addr| addr.is_ipv4()) - .or_else(|| addrs.into_iter().next()) - .ok_or_else(|| format!("DNS returned no addresses for '{}'", host_port)) + let (host, port) = split_host_port_or_default(addr_str, default_port)?; + dns_resolver.resolve(&host, port).await } /// Result of resolve_and_dial: either we dialed successfully (with the libp2p SocketAddr) @@ -196,11 +254,12 @@ enum DialResult { /// Resolve a peer address to a libp2p SocketAddr and Multiaddr, without dialing. /// Returns the libp2p SocketAddr (port+1000) on success. async fn resolve_peer_to_libp2p( + dns_resolver: &PeerDnsResolver, addr_str: &str, default_port: u16, local_addrs: &RwLock>, ) -> DialResult { - match resolve_peer_addr(addr_str, default_port).await { + match resolve_peer_addr(dns_resolver, addr_str, default_port).await { Ok(addr) => { let libp2p_port = addr.port() + 1000; let libp2p_sock = SocketAddr::new(addr.ip(), libp2p_port); @@ -224,12 +283,13 @@ async fn resolve_peer_to_libp2p( /// Resolve a peer address and dial it. async fn resolve_and_dial( + dns_resolver: &PeerDnsResolver, addr_str: &str, default_port: u16, local_addrs: &RwLock>, handle: &LibP2pOverlayHandle, ) -> DialResult { - match resolve_peer_addr(addr_str, default_port).await { + match resolve_peer_addr(dns_resolver, addr_str, default_port).await { Ok(addr) => { let libp2p_port = addr.port() + 1000; let libp2p_sock = SocketAddr::new(addr.ip(), libp2p_port); @@ -271,6 +331,7 @@ fn spawn_peer_retry_task( local_addrs: Arc>>, configured_peers: Arc>, handle: LibP2pOverlayHandle, + dns_resolver: Arc, ) { if unresolved.is_empty() { return; @@ -300,7 +361,9 @@ fn spawn_peer_retry_task( let mut still_pending = Vec::new(); for addr_str in &pending { - match resolve_and_dial(addr_str, default_port, &local_addrs, &handle).await { + match resolve_and_dial(&dns_resolver, addr_str, default_port, &local_addrs, &handle) + .await + { DialResult::Dialed(libp2p_sock) => { configured_peers .write() @@ -329,7 +392,10 @@ fn spawn_peer_retry_task( /// Collect local IP addresses for self-dial detection. /// Returns a set of SocketAddrs at the libp2p port (peer_port + 1000). /// Starts with instantly-available addresses; DNS resolution runs in background. -fn collect_local_addrs(libp2p_port: u16) -> Arc>> { +fn collect_local_addrs( + libp2p_port: u16, + dns_resolver: Arc, +) -> Arc>> { let mut addrs = HashSet::new(); // Always include loopback @@ -356,17 +422,13 @@ fn collect_local_addrs(libp2p_port: u16) -> Arc>> { tokio::spawn(async move { if let Ok(hostname) = hostname::get() { if let Ok(hostname_str) = hostname.into_string() { - let lookup = format!("{}:{}", hostname_str, libp2p_port); - match tokio::net::lookup_host(lookup).await { + match dns_resolver.resolve_all(&hostname_str, libp2p_port).await { Ok(resolved) => { - let resolved: Vec<_> = resolved.collect(); - if !resolved.is_empty() { - let mut addrs = addrs_ref.write().await; - for addr in &resolved { - addrs.insert(*addr); - } - debug!("DNS self-detection resolved hostname to {:?}", resolved); + let mut addrs = addrs_ref.write().await; + for addr in &resolved { + addrs.insert(*addr); } + debug!("DNS self-detection resolved hostname to {:?}", resolved); } Err(e) => { debug!( @@ -420,6 +482,9 @@ struct App { next_scp_request_id: Arc, /// Local addresses for self-dial detection (populated at startup + async DNS) local_addrs: Arc>>, + /// In-process DNS resolver for peer hostnames. This bypasses libc resolver + /// caching while still using nameservers from /etc/resolv.conf. + dns_resolver: Arc, /// Configured peer addresses and listen port — kept for reconnection on disconnect. /// Updated each time SetPeerConfig is received from Core. configured_peers: Arc>, @@ -477,9 +542,10 @@ impl App { // Use peer_port + 1000 for libp2p QUIC to avoid collision with legacy TCP let libp2p_port = config.peer_port + 1000; let libp2p_listen_ip = config.libp2p_listen_ip.clone(); + let dns_resolver = Arc::new(PeerDnsResolver::new()?); // Compute local addresses for self-dial detection (instant + async DNS in background) - let local_addrs = collect_local_addrs(libp2p_port); + let local_addrs = collect_local_addrs(libp2p_port, dns_resolver.clone()); // Spawn libp2p overlay task tokio::spawn(async move { @@ -502,6 +568,7 @@ impl App { pending_scp_state_requests: Arc::new(RwLock::new(HashMap::new())), next_scp_request_id: Arc::new(AtomicU64::new(1)), local_addrs, + dns_resolver, configured_peers: Arc::new(RwLock::new(ConfiguredPeers { addrs: Vec::new(), listen_port: 11625, @@ -517,11 +584,11 @@ impl App { async fn run(mut self) { info!("Overlay started, processing Core messages"); - // Safety-net reconnect timer: re-dial all configured peers every 30s. + // Configured-peer reconnect timer: actively re-dial while below the + // configured peer set. // Uses PeerId-based dials for known peers (libp2p skips if already connected). // Falls back to address-based dials for peers we haven't connected to yet. - // This is a fallback — targeted reconnection on disconnect handles the fast path. - let mut reconnect_interval = tokio::time::interval(Duration::from_secs(30)); + let mut reconnect_interval = tokio::time::interval(CONFIGURED_PEER_RECONNECT_INTERVAL); reconnect_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { @@ -551,7 +618,7 @@ impl App { self.handle_libp2p_event(event).await; } - // Safety-net reconnect: PeerId-based dials for known peers, + // Configured-peer reconnect: PeerId-based dials for known peers, // address-based ONLY for peers we've never learned a PeerId for. _ = reconnect_interval.tick() => { let cp = self.configured_peers.read().await; @@ -577,7 +644,7 @@ impl App { let connected = self.libp2p_handle.connected_peer_count().await; if connected < expected_peers { info!( - "Safety-net reconnect: {}/{} peers connected", + "Configured-peer reconnect: {}/{} peers connected", connected, expected_peers ); @@ -610,31 +677,35 @@ impl App { if !unknown_addrs.is_empty() { info!( - "Safety-net: resolving {} unknown peer(s)", + "Configured-peer reconnect: resolving {} unknown peer(s)", unknown_addrs.len() ); let handle = self.libp2p_handle.clone(); let local_addrs = self.local_addrs.clone(); let configured_peers = self.configured_peers.clone(); + let dns_resolver = self.dns_resolver.clone(); tokio::spawn(async move { for addr_str in &unknown_addrs { // Step 1: resolve DNS only (no dial) match resolve_peer_to_libp2p( - addr_str, listen_port, &local_addrs, + &dns_resolver, + addr_str, + listen_port, + &local_addrs, ).await { DialResult::Resolved(libp2p_sock) => { // Step 2: check if resolved addr is already known if known_addrs.contains(&libp2p_sock) { debug!( - "Safety-net: {} resolved to known addr {}, skipping dial", + "Configured-peer reconnect: {} resolved to known addr {}, skipping dial", addr_str, libp2p_sock ); continue; } // Step 3: truly unknown — dial let maddr = socket_addr_to_multiaddr(&libp2p_sock); - info!("Safety-net: dialing unknown peer {} at {}", addr_str, maddr); + info!("Configured-peer reconnect: dialing unknown peer {} at {}", addr_str, maddr); handle.dial(maddr).await; configured_peers .write() @@ -881,6 +952,7 @@ impl App { let local_addrs = self.local_addrs.clone(); let known_peers = self.known_peers.clone(); let configured_peers = self.configured_peers.clone(); + let dns_resolver = self.dns_resolver.clone(); tokio::spawn(async move { let mut delay = Duration::from_secs(1); let max_delay = Duration::from_secs(30); @@ -907,6 +979,7 @@ impl App { attempt, peer_id, hostname ); match resolve_and_dial( + &dns_resolver, &hostname, listen_port, &local_addrs, @@ -1340,12 +1413,19 @@ impl App { let handle = self.libp2p_handle.clone(); let local_addrs = self.local_addrs.clone(); let configured_peers = self.configured_peers.clone(); + let dns_resolver = self.dns_resolver.clone(); tokio::spawn(async move { let mut unresolved = Vec::new(); for addr_str in &all_peers { - match resolve_and_dial(addr_str, listen_port, &local_addrs, &handle) - .await + match resolve_and_dial( + &dns_resolver, + addr_str, + listen_port, + &local_addrs, + &handle, + ) + .await { DialResult::Dialed(libp2p_sock) => { // Record mapping so we can reconnect on disconnect @@ -1369,6 +1449,7 @@ impl App { local_addrs, configured_peers, handle, + dns_resolver, ); }); } @@ -1526,30 +1607,46 @@ mod tests { // --- DNS resolution tests --- + fn test_dns_resolver() -> Arc { + Arc::new(PeerDnsResolver::new().expect("test DNS resolver")) + } + #[tokio::test] async fn test_resolve_peer_addr_ip_port() { // Bare IP:port should parse directly without DNS - let addr = resolve_peer_addr("10.0.0.1:11625", 9999).await.unwrap(); + let dns_resolver = test_dns_resolver(); + let addr = resolve_peer_addr(&dns_resolver, "10.0.0.1:11625", 9999) + .await + .unwrap(); assert_eq!(addr, "10.0.0.1:11625".parse::().unwrap()); // default_port is ignored when addr already has a port } #[tokio::test] async fn test_resolve_peer_addr_ip_port_various() { + let dns_resolver = test_dns_resolver(); + // Loopback - let addr = resolve_peer_addr("127.0.0.1:8080", 0).await.unwrap(); + let addr = resolve_peer_addr(&dns_resolver, "127.0.0.1:8080", 0) + .await + .unwrap(); assert_eq!(addr.ip().to_string(), "127.0.0.1"); assert_eq!(addr.port(), 8080); // High port - let addr = resolve_peer_addr("192.168.1.1:65535", 0).await.unwrap(); + let addr = resolve_peer_addr(&dns_resolver, "192.168.1.1:65535", 0) + .await + .unwrap(); assert_eq!(addr.port(), 65535); } #[tokio::test] async fn test_resolve_peer_addr_dns_no_port() { // "localhost" is a DNS name; should resolve and use default_port - let addr = resolve_peer_addr("localhost", 11625).await.unwrap(); + let dns_resolver = test_dns_resolver(); + let addr = resolve_peer_addr(&dns_resolver, "localhost", 11625) + .await + .unwrap(); assert!( addr.ip().is_loopback(), "localhost should resolve to loopback, got {}", @@ -1565,7 +1662,10 @@ mod tests { #[tokio::test] async fn test_resolve_peer_addr_dns_with_port() { // "localhost:9999" — DNS name with explicit port - let addr = resolve_peer_addr("localhost:9999", 11625).await.unwrap(); + let dns_resolver = test_dns_resolver(); + let addr = resolve_peer_addr(&dns_resolver, "localhost:9999", 11625) + .await + .unwrap(); assert!(addr.ip().is_loopback()); assert_eq!( addr.port(), @@ -1577,7 +1677,13 @@ mod tests { #[tokio::test] async fn test_resolve_peer_addr_unresolvable() { // Bogus hostname should return an error - let result = resolve_peer_addr("this.host.definitely.does.not.exist.invalid", 11625).await; + let dns_resolver = test_dns_resolver(); + let result = resolve_peer_addr( + &dns_resolver, + "this.host.definitely.does.not.exist.invalid", + 11625, + ) + .await; assert!(result.is_err(), "Unresolvable hostname should return Err"); let err = result.unwrap_err(); assert!( @@ -1590,7 +1696,10 @@ mod tests { #[tokio::test] async fn test_resolve_peer_addr_ipv6_bracket() { // Bracketed IPv6 with port should parse directly - let addr = resolve_peer_addr("[::1]:11625", 9999).await.unwrap(); + let dns_resolver = test_dns_resolver(); + let addr = resolve_peer_addr(&dns_resolver, "[::1]:11625", 9999) + .await + .unwrap(); assert!(addr.ip().is_ipv6()); assert_eq!(addr.port(), 11625); } @@ -1599,7 +1708,7 @@ mod tests { #[tokio::test] async fn test_collect_local_addrs_includes_loopback() { - let addrs = collect_local_addrs(12625); + let addrs = collect_local_addrs(12625, test_dns_resolver()); // Loopback is inserted synchronously, should be present immediately let set = addrs.read().await; let loopback = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 12625); @@ -1612,7 +1721,7 @@ mod tests { #[tokio::test] async fn test_collect_local_addrs_has_nonloopback() { // The UDP probe should find at least our primary interface IP - let addrs = collect_local_addrs(12625); + let addrs = collect_local_addrs(12625, test_dns_resolver()); let set = addrs.read().await; assert!( set.len() >= 2, @@ -1639,7 +1748,15 @@ mod tests { let (handle, _evt_rx, _tx_rx, _overlay) = create_overlay(keypair, Arc::new(OverlayMetrics::new())).unwrap(); - let result = resolve_and_dial("127.0.0.1:11625", 11625, &local_addrs, &handle).await; + let dns_resolver = test_dns_resolver(); + let result = resolve_and_dial( + &dns_resolver, + "127.0.0.1:11625", + 11625, + &local_addrs, + &handle, + ) + .await; assert!( matches!(result, DialResult::SelfSkipped), "Self-dial should be skipped" @@ -1653,7 +1770,15 @@ mod tests { let (handle, _evt_rx, _tx_rx, _overlay) = create_overlay(keypair, Arc::new(OverlayMetrics::new())).unwrap(); - let result = resolve_and_dial("unresolvable.invalid", 11625, &local_addrs, &handle).await; + let dns_resolver = test_dns_resolver(); + let result = resolve_and_dial( + &dns_resolver, + "unresolvable.invalid", + 11625, + &local_addrs, + &handle, + ) + .await; assert!( matches!(result, DialResult::ResolutionFailed(ref s) if s == "unresolvable.invalid"), "Failed DNS should return ResolutionFailed with the address string" @@ -1668,7 +1793,15 @@ mod tests { let (handle, _evt_rx, _tx_rx, _overlay) = create_overlay(keypair, Arc::new(OverlayMetrics::new())).unwrap(); - let result = resolve_and_dial("10.255.255.1:11625", 11625, &local_addrs, &handle).await; + let dns_resolver = test_dns_resolver(); + let result = resolve_and_dial( + &dns_resolver, + "10.255.255.1:11625", + 11625, + &local_addrs, + &handle, + ) + .await; assert!( matches!(result, DialResult::Dialed(_)), "Valid IP:port should resolve and return Dialed" @@ -1683,7 +1816,9 @@ mod tests { let (handle, _evt_rx, _tx_rx, _overlay) = create_overlay(keypair, Arc::new(OverlayMetrics::new())).unwrap(); - let result = resolve_and_dial("localhost", 11625, &local_addrs, &handle).await; + let dns_resolver = test_dns_resolver(); + let result = + resolve_and_dial(&dns_resolver, "localhost", 11625, &local_addrs, &handle).await; assert!( matches!(result, DialResult::Dialed(_)), "localhost should resolve via DNS and return Dialed" @@ -1715,6 +1850,7 @@ mod tests { local_addrs, make_test_configured_peers(), handle, + test_dns_resolver(), ); // No panic, no hang — that's the test } @@ -1737,6 +1873,7 @@ mod tests { local_addrs, make_test_configured_peers(), handle, + test_dns_resolver(), ); // Advance time past the first retry delay (2s) @@ -1765,6 +1902,7 @@ mod tests { local_addrs, make_test_configured_peers(), handle, + test_dns_resolver(), ); // Advance through many retry cycles — the task should not exit or panic. @@ -1814,9 +1952,11 @@ mod tests { format!("127.0.0.1:{}", port2 - 1000), // bare IP:port for node2 format!("localhost:{}", port3 - 1000), // DNS name:port for node3 ]; + let dns_resolver = test_dns_resolver(); for addr_str in &known_peers { - let result = resolve_and_dial(addr_str, 11625, &local_addrs, &handle1).await; + let result = + resolve_and_dial(&dns_resolver, addr_str, 11625, &local_addrs, &handle1).await; assert!( matches!(result, DialResult::Dialed(_)), "Peer {} should resolve and dial on first try", @@ -1891,6 +2031,7 @@ mod tests { local_addrs, make_test_configured_peers(), handle, + test_dns_resolver(), ); // After 3s: first retry runs. "localhost" resolves, "invalid" stays pending. diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index 18036651c2..340061313d 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -90,10 +90,10 @@ AppConnector::getNetworkID() const } void -AppConnector::postOnMainThread(std::function&& f, std::string&& message, - Scheduler::ActionType type) +AppConnector::postOnMainThread(std::function&& f, + std::string&& message) { - mApp.postOnMainThread(std::move(f), std::move(message), type); + mApp.postOnMainThread(std::move(f), std::move(message)); } void diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 1708b9f430..4164d0435e 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -48,9 +48,7 @@ class AppConnector // Thread-safe methods SorobanMetrics& getSorobanMetrics() const; - void postOnMainThread( - std::function&& f, std::string&& message, - Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION); + void postOnMainThread(std::function&& f, std::string&& message); void postOnOverlayThread(std::function&& f, std::string const& message); void postOnBackgroundThread(std::function&& f, diff --git a/src/main/Application.h b/src/main/Application.h index fd20378423..c86c4ac98a 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -255,9 +255,8 @@ class Application virtual asio::io_context& getOverlayIOContext() = 0; virtual asio::io_context& getLedgerCloseIOContext() = 0; - virtual void postOnMainThread( - std::function&& f, std::string&& name, - Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION) = 0; + virtual void postOnMainThread(std::function&& f, + std::string&& name) = 0; // While both are lower priority than the main thread, eviction threads have // more priority than regular worker background threads diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index cb3f6c095a..1aaac0a98b 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -1536,8 +1536,7 @@ ApplicationImpl::getLedgerCloseIOContext() } void -ApplicationImpl::postOnMainThread(std::function&& f, std::string&& name, - Scheduler::ActionType type) +ApplicationImpl::postOnMainThread(std::function&& f, std::string&& name) { JITTER_INJECT_DELAY(); LogSlowExecution isSlow{name, LogSlowExecution::Mode::MANUAL, @@ -1554,8 +1553,7 @@ ApplicationImpl::postOnMainThread(std::function&& f, std::string&& name, std::this_thread::sleep_for(sleepFor); } f(); - }, - std::move(name), type); + }); } void diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index ee83c4942f..752c3aa71a 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -90,8 +90,8 @@ class ApplicationImpl : public Application virtual asio::io_context& getOverlayIOContext() override; virtual asio::io_context& getLedgerCloseIOContext() override; - virtual void postOnMainThread(std::function&& f, std::string&& name, - Scheduler::ActionType type) override; + virtual void postOnMainThread(std::function&& f, + std::string&& name) override; virtual void postOnBackgroundThread(std::function&& f, std::string jobName) override; virtual void postOnEvictionBackgroundThread(std::function&& f, diff --git a/src/main/CommandHandler.cpp b/src/main/CommandHandler.cpp index 6382e72b43..8bf587e613 100644 --- a/src/main/CommandHandler.cpp +++ b/src/main/CommandHandler.cpp @@ -361,6 +361,8 @@ CommandHandler::info(std::string const& params, std::string& retStr) std::map retMap; http::server::server::parseParams(params, retMap); + mApp.syncAllMetrics(); + retStr = mApp.getJsonInfo(retMap["compact"] == "false").toStyledString(); } diff --git a/src/process/ProcessManagerImpl.h b/src/process/ProcessManagerImpl.h index 675d95cd1d..ccf8636f1f 100644 --- a/src/process/ProcessManagerImpl.h +++ b/src/process/ProcessManagerImpl.h @@ -11,6 +11,7 @@ #include #include #include +#include namespace stellar { diff --git a/src/util/Scheduler.cpp b/src/util/Scheduler.cpp deleted file mode 100644 index cbaa3e8ed0..0000000000 --- a/src/util/Scheduler.cpp +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright 2020 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#include "util/Scheduler.h" -#include "lib/util/finally.h" -#include "util/GlobalChecks.h" -#include "util/Timer.h" -#include - -namespace stellar -{ -using nsecs = std::chrono::nanoseconds; - -class Scheduler::ActionQueue - : public std::enable_shared_from_this -{ - struct Element - { - Action mAction; - VirtualClock::time_point mEnqueueTime; - Element(VirtualClock& clock, Action&& action) - : mAction(std::move(action)), mEnqueueTime(clock.now()) - { - } - }; - - std::string mName; - ActionType mType; - nsecs mTotalService{0}; - std::chrono::steady_clock::time_point mLastService; - std::deque mActions; - - // mIdleList is a reference to the mIdleList member of the Scheduler that - // owns this ActionQueue. mIdlePosition is an iterator to the position in - // that list that this ActionQueue occupies, or mIdleList.end() if it's - // not in mIdleList. - std::list& mIdleList; - std::list::iterator mIdlePosition; - - public: - ActionQueue(std::string const& name, ActionType type, - std::list& idleList) - : mName(name) - , mType(type) - , mLastService(std::chrono::steady_clock::time_point::max()) - , mIdleList(idleList) - , mIdlePosition(mIdleList.end()) - { - } - - bool - isInIdleList() const - { - return mIdlePosition != mIdleList.end(); - } - - void - addToIdleList() - { - releaseAssert(!isInIdleList()); - releaseAssert(isEmpty()); - mIdleList.push_front(shared_from_this()); - mIdlePosition = mIdleList.begin(); - } - - void - removeFromIdleList() - { - releaseAssert(isInIdleList()); - releaseAssert(isEmpty()); - mIdleList.erase(mIdlePosition); - mIdlePosition = mIdleList.end(); - } - - std::string const& - name() const - { - return mName; - } - - ActionType - type() const - { - return mType; - } - - nsecs - totalService() const - { - return mTotalService; - } - - std::chrono::steady_clock::time_point - lastService() const - { - return mLastService; - } - - size_t - size() const - { - return mActions.size(); - } - - bool - isEmpty() const - { - return mActions.empty(); - } - - bool - isOverloaded(nsecs latencyWindow, VirtualClock::time_point now) const - { - if (!mActions.empty()) - { - auto timeInQueue = now - mActions.front().mEnqueueTime; - return timeInQueue > latencyWindow; - } - return false; - } - - size_t - tryTrim(nsecs latencyWindow, VirtualClock::time_point now) - { - size_t n = 0; - while (mType == ActionType::DROPPABLE_ACTION && !mActions.empty() && - isOverloaded(latencyWindow, now)) - { - mActions.pop_front(); - n++; - } - return n; - } - - void - enqueue(VirtualClock& clock, Action&& action) - { - auto elt = Element(clock, std::move(action)); - mActions.emplace_back(std::move(elt)); - } - - void - runNext(VirtualClock& clock, nsecs minTotalService) - { - ZoneScoped; - ZoneText(mName.c_str(), mName.size()); - auto before = clock.now(); - Action action = std::move(mActions.front().mAction); - mActions.pop_front(); - - auto fini = gsl::finally([&]() { - auto after = clock.now(); - nsecs duration = std::chrono::duration_cast(after - before); - mTotalService = std::max(mTotalService + duration, minTotalService); - mLastService = after; - }); - - action(); - } -}; - -Scheduler::Scheduler(VirtualClock& clock, - std::chrono::nanoseconds latencyWindow) - : mRunnableActionQueues([](Qptr a, Qptr b) -> bool { - return a->totalService() > b->totalService(); - }) - , mClock(clock) - , mLatencyWindow(latencyWindow) -{ - setOverloaded(false); -} - -void -Scheduler::trimSingleActionQueue(Qptr q, VirtualClock::time_point now) -{ - size_t trimmed = q->tryTrim(mLatencyWindow, now); - mStats.mActionsDroppedDueToOverload += trimmed; - mSize -= trimmed; -} - -void -Scheduler::trimIdleActionQueues(VirtualClock::time_point now) -{ - if (mIdleActionQueues.empty()) - { - return; - } - Qptr old = mIdleActionQueues.back(); - if (old->lastService() + mLatencyWindow < now) - { - releaseAssert(old->isEmpty()); - mAllActionQueues.erase(std::make_pair(old->name(), old->type())); - old->removeFromIdleList(); - } -} - -void -Scheduler::shutdown() -{ - if (!mIsShutdown) - { - mIsShutdown = true; - mAllActionQueues.clear(); - mRunnableActionQueues = - std::priority_queue, - std::function>(); - mIdleActionQueues.clear(); - mSize = 0; - } -} - -void -Scheduler::setOverloaded(bool overloaded) -{ - if (overloaded) - { - mOverloadedStart = mClock.now(); - } - else - { - mOverloadedStart = std::chrono::steady_clock::time_point::max(); - } -} - -void -Scheduler::enqueue(std::string&& name, Action&& action, ActionType type) -{ - if (mIsShutdown) - { - return; - } - - auto key = std::make_pair(name, type); - auto qi = mAllActionQueues.find(key); - if (qi == mAllActionQueues.end()) - { - mStats.mQueuesActivatedFromFresh++; - auto q = std::make_shared(name, type, mIdleActionQueues); - qi = mAllActionQueues.emplace(key, q).first; - mRunnableActionQueues.push(qi->second); - } - else - { - if (qi->second->isInIdleList()) - { - releaseAssert(qi->second->isEmpty()); - mStats.mQueuesActivatedFromIdle++; - qi->second->removeFromIdleList(); - mRunnableActionQueues.push(qi->second); - } - } - mStats.mActionsEnqueued++; - qi->second->enqueue(mClock, std::move(action)); - mSize += 1; -} - -size_t -Scheduler::runOne() -{ - auto start = mClock.now(); - trimIdleActionQueues(start); - if (mRunnableActionQueues.empty()) - { - releaseAssert(mSize == 0); - return 0; - } - else - { - auto q = mRunnableActionQueues.top(); - mRunnableActionQueues.pop(); - trimSingleActionQueue(q, start); - - auto putQueueBackInIdleOrActive = gsl::finally([&]() { - auto now = mClock.now(); - if (q->isOverloaded(mLatencyWindow, now)) - { - if (mOverloadedStart == - std::chrono::steady_clock::time_point::max()) - { - setOverloaded(true); - } - } - else if (mOverloadedStart < - std::chrono::steady_clock::time_point::max()) - { - // see if we're not overloaded anymore - bool overloaded = std::any_of( - mAllActionQueues.begin(), mAllActionQueues.end(), - [&](std::pair, - Qptr> const& qp) { - return qp.second->isOverloaded(mLatencyWindow, now); - }); - if (!overloaded) - { - setOverloaded(false); - } - } - if (q->isEmpty()) - { - mStats.mQueuesSuspended++; - q->addToIdleList(); - } - else - { - mRunnableActionQueues.push(q); - } - }); - - if (!q->isEmpty()) - { - // We pass along a "minimum service time" floor that the service - // time of the queue will be incremented to, at minimum. - auto minTotalService = mMaxTotalService - mLatencyWindow; - mSize -= 1; - mStats.mActionsDequeued++; - auto updateMaxTotalService = gsl::finally([&]() { - mMaxTotalService = - std::max(q->totalService(), mMaxTotalService); - mCurrentActionType = ActionType::NORMAL_ACTION; - }); - mCurrentActionType = q->type(); - q->runNext(mClock, minTotalService); - } - return 1; - } -} - -std::chrono::seconds -Scheduler::getOverloadedDuration() const -{ - auto now = mClock.now(); - std::chrono::seconds res; - if (now > mOverloadedStart) - { - // round up - res = std::chrono::duration_cast( - now - mOverloadedStart) + - std::chrono::seconds{1}; - } - else - { - res = std::chrono::seconds{0}; - } - return res; -} - -Scheduler::ActionType -Scheduler::currentActionType() const -{ - return mCurrentActionType; -} - -#ifdef BUILD_TESTS -std::shared_ptr -Scheduler::getExistingQueue(std::string const& name, ActionType type) const -{ - auto qi = mAllActionQueues.find(std::make_pair(name, type)); - if (qi == mAllActionQueues.end()) - { - return nullptr; - } - return qi->second; -} - -std::string const& -Scheduler::nextQueueToRun() const -{ - static std::string empty; - if (mRunnableActionQueues.empty()) - { - return empty; - } - return mRunnableActionQueues.top()->name(); -} -std::chrono::nanoseconds -Scheduler::totalService(std::string const& q, ActionType type) const -{ - auto eq = getExistingQueue(q, type); - releaseAssert(eq); - return eq->totalService(); -} - -size_t -Scheduler::queueLength(std::string const& q, ActionType type) const -{ - auto eq = getExistingQueue(q, type); - releaseAssert(eq); - return eq->size(); -} -#endif -} diff --git a/src/util/Scheduler.h b/src/util/Scheduler.h deleted file mode 100644 index 6f1b2a5860..0000000000 --- a/src/util/Scheduler.h +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2020 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// This class implements a multi-queue scheduler for "actions" (deferred-work -// callbacks that some subsystem wants to run "soon" on the main thread), -// attempting to satisfy a variety of constraints and goals simultaneously: -// -// 0. Non-preemption: We have no ability to preempt actions while they're -// running so this is a hard constraint, not just a goal. -// -// 1. Serial execution: within a queue, actions must run in the order they are -// enqueued (or a subsequence thereof, if there are dropped actions) so -// that clients can use queue-names to sequence logically-sequential -// actions. Scheduling happens between queues but not within them. -// -// 2. Non-starvation: Everything enqueued (and not dropped) should run -// eventually and the longer something waits, generally the more likely it -// is to run. -// -// 3. Fairness: time given to each queue should be roughly equal, over time. -// -// 4. Load-shedding and back-pressure: we want to be able to define a load -// limit (in terms of worst-case time actions are delayed in the queue) -// beyond which we consider the system to be "overloaded" and both shed -// load where we can (dropping non-essential actions) and exert -// backpressure on our called (eg. by having them throttle IO that -// ultimately drives queue growth). -// -// 5. Simplicity: clients of the scheduler shouldn't need to adjust a lot of -// knobs, and the implementation should be as simple as possible and -// exhibit as fixed a behaviour as possible. We don't want surprises in -// dynamics. -// -// 6. Non-anticipation: many scheduling algorithms require more information -// than we have, or are so-called "anticipation" algorithms that need to -// know (or guess) the size or duration of the next action. We certainly -// don't know these, and while we _could_ try to estimate them, the -// algorithms that need anticipation can go wrong if fed bad estimates; -// we'd prefer a non-anticipation (or "blind") approach that lacks this -// risk. -// -// Given these goals and constraints, our current best guess is a lightly -// customized algorithm in a family called FB ("foreground-background" or -// "feedback") or LAS ("least attained service") or SET ("shortest elapsed -// time"). -// -// For an idea with so many names, the algorithm is utterly simple: each queue -// tracks the accumulated runtime of all actions it has run, and on each step we -// run the next action in the queue with the lowest accumulated runtime (the -// queues themselves are therefore stored in an outer priority queue to enable -// quick retrieval of the next lowest queue). -// -// This has a few interesting properties: -// -// - A low-frequency action (eg. a ledger close) will usually be scheduled -// immediately, as it has built up some "credit" in its queue in the form of -// zero new runtime in the period since its last run, lowering its -// accumulation relative to other queues. -// -// - A continuously-rescheduled multipart action (eg. bucket-apply or -// catchup-replay) will quickly consume any "credit" it might have and be -// throttled back to an equal time-share with other queues: since it spent a -// long time on-CPU it will have to wait at least until everyone else has -// had a similar amount of time before going again. -// -// - If a very-short-duration action occurs it has little affect on anything -// else, either its own queue or others, in the relative scheduling order. A -// queue that's got lots of very-small actions (eg. just issuing a pile of -// async IOs or writing to in-memory buffers) may run them _all_ before -// anyone else gets to go, but that's ok precisely because they're very -// small actions. The scheduler will shift to other queues exactly when a -// queue uses up a _noticeable amount of time_ relative to others. -// -// This is an old algorithm that was not used for a long time out of fear that -// it would starve long-duration actions; but it's received renewed study in -// recent years based on the observation that such starvation only occurs in -// certain theoretically-tidy but practically-rare distributions of action -// durations, and the distributions that occur in reality behave quite well -// under it. -// -// The customizations we make are minor: -// -// - We put a floor on the cumulative durations; low cumulative durations -// represent a form of "credit" that a queue might use in a burst if it were -// to be suddenly full of ready actions, or continuously-reschedule itself, -// so we make sure no queue can have less than some (steadily rising) floor. -// -// - We record the enqueue time and "droppability" of an action, to allow us -// to measure load level and perform load shedding. - -namespace stellar -{ - -class VirtualClock; - -class Scheduler -{ - public: - using Action = std::function; - enum class ActionType - { - NORMAL_ACTION, - DROPPABLE_ACTION - }; - - struct Stats - { - size_t mActionsEnqueued{0}; - size_t mActionsDequeued{0}; - size_t mActionsDroppedDueToOverload{0}; - size_t mQueuesActivatedFromFresh{0}; - size_t mQueuesActivatedFromIdle{0}; - size_t mQueuesSuspended{0}; - }; - - private: - class ActionQueue; - using Qptr = std::shared_ptr; - - // Stores all ActionQueues by name+type, either runnable or idle. - std::map, Qptr> mAllActionQueues; - - // Stores the Runnable ActionQueues, with top() being the ActionQueue with - // the least total service time. An ActionQueue is "runnable" if it is - // nonempty; empty ActionQueues are considered "idle" and are tracked - // in the mIdleActionQueues member below. - std::priority_queue, - std::function> - mRunnableActionQueues; - - Stats mStats; - - // Clock we get time from. - VirtualClock& mClock; - - // This "latency window" serves 3 distinct purposes simultaneously. - // - // Theoretically they could be 3 separate knobs but in practice they - // all seem both related to one another and roughly the same order - // of magnitude so we use a single number for all 3 presently: - // - // 1. The time-delta subtracted from the observed maximum totalService of - // any queue in order to calculate a minimum totalSerice "floor", that - // we always advance queues' totalService values to when we run - // them. One way to think of this is as a latency cap -- the longest we - // want to let one queue that's built up a bunch of "credit" by being - // lightly-loaded monopolize scheduling if it's suddenly full of work. - // - // 2. The maximum duration between an action's enqueue and dequeue times - // beyond which we consider the queue "overloaded" and begin providing - // backpressure / load-shedding. One way to think of this is "the - // longest tolerable response-delay", which if you squint is similar - // to the way of thinking about purpose #1 above: a latency cap. - // - // 3. The maximum duration a queue can be idle before we forget about it, - // reclaiming its memory. This is even more-obviously related to purpose - // #1 above: if an idle queue were made runnable after a time greater - // than this duration, it'd have its totalService value set to the - // totalService floor, just as it would if it were forgotten entirely - // and remade anew. - - std::chrono::nanoseconds const mLatencyWindow; - - // Largest totalService seen in any queue. This number will continuously - // advance as queues are serviced; it exists to serve as the upper limit - // of the window, from which mTotalServiceWindow is subtracted to derive - // the lower limit. - std::chrono::nanoseconds mMaxTotalService{0}; - - // Sum of sizes of all the active queues. Maintained as items are enqueued - // or run. - size_t mSize{0}; - - std::atomic mIsShutdown{false}; - - void trimSingleActionQueue(Qptr q, - std::chrono::steady_clock::time_point now); - void trimIdleActionQueues(std::chrono::steady_clock::time_point now); - - // List of ActionQueues that are currently idle. Idle ActionQueues maintain - // a list::iterator pointing to their own position in this list, which - // can be used to make them runnable at any time. Idled ActionQueues are - // placed at the front of this list and expired (if they are too old) from - // the back of the list, where they've been idle the longest. - std::list mIdleActionQueues; - - // transitions overloaded state - void setOverloaded(bool overloaded); - - // records the time the scheduler transitioned to the overloaded or max if - // not - std::chrono::steady_clock::time_point mOverloadedStart; - - // Records the currently-executing action type, or NORMAL_ACTION when no - // action is running. This can be retrieved through currentActionType(). - ActionType mCurrentActionType{ActionType::NORMAL_ACTION}; - - public: - Scheduler(VirtualClock& clock, std::chrono::nanoseconds latencyWindow); - - // Adds an action to the named ActionQueue with a given type. - void enqueue(std::string&& name, Action&& action, ActionType type); - - // Runs 0 or 1 action from the next ActionQueue in the queue-of-queues. - size_t runOne(); - - // Return the ActionType of the currently-executing action; if no action - // is currently running, return NORMAL_ACTION. - ActionType currentActionType() const; - - // Returns how long ActionQueues have been overloaded (0 means not - // overloaded) - std::chrono::seconds getOverloadedDuration() const; - - size_t - size() const - { - return mSize; - } - - std::chrono::nanoseconds - maxTotalService() const - { - return mMaxTotalService; - } - - Stats const& - stats() const - { - return mStats; - } - - void shutdown(); - -#ifdef BUILD_TESTS - // Testing interface - Qptr getExistingQueue(std::string const& name, ActionType type) const; - std::string const& nextQueueToRun() const; - std::chrono::nanoseconds - totalService(std::string const& q, - ActionType type = ActionType::NORMAL_ACTION) const; - size_t queueLength(std::string const& q, - ActionType type = ActionType::NORMAL_ACTION) const; -#endif -}; -} diff --git a/src/util/Timer.cpp b/src/util/Timer.cpp index 3a90b6a74e..2e1804f471 100644 --- a/src/util/Timer.cpp +++ b/src/util/Timer.cpp @@ -6,7 +6,6 @@ #include "main/Application.h" #include "util/GlobalChecks.h" #include "util/Logging.h" -#include "util/Scheduler.h" #include #include #include @@ -19,12 +18,9 @@ using namespace std; static std::chrono::milliseconds const CRANK_TIME_SLICE(500); static size_t const CRANK_EVENT_SLICE = 100; -std::chrono::seconds const SCHEDULER_LATENCY_WINDOW(5); VirtualClock::VirtualClock(Mode mode) : mMode(mode) - , mActionScheduler( - std::make_unique(*this, SCHEDULER_LATENCY_WINDOW)) , mRealTimer(mIOContext) { } @@ -270,16 +266,11 @@ VirtualClock::shutdown() getIOContext().stop(); - // Clear pending queue for the scheduler + // Clear pending action queue. { LOCK_GUARD(mPendingActionQueueMutex, guard); - mPendingActionQueue = - std::queue, std::string, - Scheduler::ActionType>>(); + mPendingActionQueue = std::queue>(); } - - // Clear scheduler queues - mActionScheduler->shutdown(); } } @@ -382,31 +373,13 @@ VirtualClock::crank(bool block) // Dispatch some IO event completions. mLastDispatchStart = now(); - // Bias towards the execution queue exponentially based on how long the - // scheduler has been overloaded. - auto overloadedDuration = - std::min(static_cast(30), - mActionScheduler->getOverloadedDuration().count()); - std::string overloadStr = - overloadedDuration > 0 ? "overloaded" : "slack"; - size_t ioDivisor = 1ULL << overloadedDuration; { ZoneNamedN(ioPollZone, "ASIO polling", true); - ZoneText(overloadStr.c_str(), overloadStr.size()); progressCount += crankStep( - *this, [this] { return this->mIOContext.poll_one(); }, - ioDivisor); + *this, [this] { return this->mIOContext.poll_one(); }); } - // Dispatch some scheduled actions. - mLastDispatchStart = now(); - { - ZoneNamedN(schedZone, "scheduler", true); - progressCount += crankStep( - *this, [this] { return this->mActionScheduler->runOne(); }); - } - - // Subtract out any timer cancellations from the above two steps. + // Subtract out any timer cancellations from the above step. progressCount -= nRealTimerCancelEvents; if (mMode == VIRTUAL_TIME && progressCount == 0 && @@ -431,16 +404,12 @@ VirtualClock::crank(bool block) } } - // Transfer any pending actions to the scheduler, counting them as - // "progress" also. + // Transfer any pending actions to ASIO, counting them as "progress" also. { LOCK_GUARD(mPendingActionQueueMutex, guard); while (!mPendingActionQueue.empty()) { - auto& f = mPendingActionQueue.front(); - mActionScheduler->enqueue(std::move(std::get<1>(f)), - std::move(std::get<0>(f)), - std::get<2>(f)); + asio::post(mIOContext, std::move(mPendingActionQueue.front())); mPendingActionQueue.pop(); progressCount++; } @@ -474,8 +443,7 @@ VirtualClock::crank(bool block) } void -VirtualClock::postAction(std::function&& f, std::string&& name, - Scheduler::ActionType type) +VirtualClock::postAction(std::function&& f) { if (isStopped()) { @@ -486,7 +454,7 @@ VirtualClock::postAction(std::function&& f, std::string&& name, { LOCK_GUARD(mPendingActionQueueMutex, lock); queueWasEmpty = mPendingActionQueue.empty(); - mPendingActionQueue.emplace(std::move(f), std::move(name), type); + mPendingActionQueue.emplace(std::move(f)); } // The pending queue is emptied by the main thread just before the main @@ -515,24 +483,16 @@ VirtualClock::postAction(std::function&& f, std::string&& name, size_t VirtualClock::getActionQueueSize() const { - size_t pending = 0; { LOCK_GUARD(mPendingActionQueueMutex, guard); - pending = mPendingActionQueue.size(); + return mPendingActionQueue.size(); } - return pending + mActionScheduler->size(); } bool VirtualClock::actionQueueIsOverloaded() const { - return mActionScheduler->getOverloadedDuration().count() != 0; -} - -Scheduler::ActionType -VirtualClock::currentSchedulerActionType() const -{ - return mActionScheduler->currentActionType(); + return false; } asio::io_context& diff --git a/src/util/Timer.h b/src/util/Timer.h index 089b486fe7..3933bd50c8 100644 --- a/src/util/Timer.h +++ b/src/util/Timer.h @@ -9,7 +9,6 @@ // else. #include "util/asio.h" #include "util/NonCopyable.h" -#include "util/Scheduler.h" #include "util/ThreadAnnotations.h" #include @@ -47,9 +46,8 @@ using RealSteadyTimer = asio::steady_timer; * for any time-based transitions to _literally_ occur due to the passage of * wall-clock time, when there's no other work to do. * - * The VirtualClock type also contains an instance of Scheduler, which is a - * fair time-slicing and load-shedding system for balancing multiple streams - * of callbacks each competing for main thread execution. + * The VirtualClock type also owns the ASIO event loop that dispatches callbacks + * on the main thread. */ class VirtualTimer; @@ -62,8 +60,6 @@ class VirtualClockEventCompare std::shared_ptr b); }; -extern std::chrono::seconds const SCHEDULER_LATENCY_WINDOW; - class VirtualClock { public: @@ -171,28 +167,20 @@ class VirtualClock std::atomic mBackgroundWorkCount{0}; - // There are three separate queue-like things in a given VirtualClock. - // - // The first is the action Scheduler, which multiplexes (with some level of - // fair real-time-slicing) multiple streams of callbacks competing for main - // thread (real) time. Only the main thread ever accesses the Scheduler. + // There are two separate queue-like things in a given VirtualClock. // - // The second is a simple "pending actions" queue that is protected by a + // The first is a simple "pending actions" queue that is protected by a // short-duration mutex. This is a threadsafe _submission_ point for adding - // actions to the Scheduler -- any thread can enqueue, and only the main - // thread will dequeue (immediately re-enqueueing into the Scheduler for - // further time-slicing / load-shedding). + // actions that the main thread will transfer to the ASIO queue. // - // The third is a priority queue of VirtualClockEvents, which is the part of - // the VirtualClock that manages the progress of virtual time and the + // The second is a priority queue of VirtualClockEvents, which is the part + // of the VirtualClock that manages the progress of virtual time and the // dispatch of timers as virtual time advances past them. std::chrono::steady_clock::time_point mLastDispatchStart; - std::unique_ptr mActionScheduler; mutable ANNOTATED_MUTEX(mPendingActionQueueMutex); - std::queue< - std::tuple, std::string, Scheduler::ActionType>> - mPendingActionQueue GUARDED_BY(mPendingActionQueueMutex); + std::queue> mPendingActionQueue + GUARDED_BY(mPendingActionQueueMutex); using PrQueue = std::priority_queue, @@ -264,12 +252,10 @@ class VirtualClock // Returns the time of the next scheduled event. time_point next() const; - void postAction(std::function&& f, std::string&& name, - Scheduler::ActionType type); + void postAction(std::function&& f); size_t getActionQueueSize() const; bool actionQueueIsOverloaded() const; - Scheduler::ActionType currentSchedulerActionType() const; #ifdef BUILD_TESTS // Inject a wall-clock offset into system_now() to simulate clock drift. diff --git a/src/util/test/SchedulerTests.cpp b/src/util/test/SchedulerTests.cpp deleted file mode 100644 index 3ce31cf250..0000000000 --- a/src/util/test/SchedulerTests.cpp +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2020 Stellar Development Foundation and contributors. Licensed -// under the Apache License, Version 2.0. See the COPYING file at the root -// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 - -#include "util/Scheduler.h" - -#include "test/Catch2.h" -#include "util/Logging.h" -#include "util/Timer.h" -#include - -using namespace stellar; - -TEST_CASE("scheduler basic functionality", "[scheduler]") -{ - std::chrono::seconds window(10); - VirtualClock clock; - Scheduler sched(clock, window); - - std::string A("a"), B("b"), C("c"); - - size_t nEvents{0}; - auto step = std::chrono::microseconds(1); - auto microsleep = [&] { - clock.sleep_for(step); - ++nEvents; - }; - - sched.enqueue(std::string(A), microsleep, - Scheduler::ActionType::NORMAL_ACTION); - - CHECK(sched.size() == 1); - CHECK(sched.nextQueueToRun() == A); - CHECK(sched.totalService(A).count() == 0); - CHECK(sched.queueLength(A) == 1); - CHECK(sched.stats().mActionsEnqueued == 1); - CHECK(sched.stats().mActionsDequeued == 0); - CHECK(sched.stats().mActionsDroppedDueToOverload == 0); - CHECK(sched.stats().mQueuesActivatedFromFresh == 1); - CHECK(sched.stats().mQueuesActivatedFromIdle == 0); - CHECK(sched.stats().mQueuesSuspended == 0); - - CHECK(sched.runOne() == 1); // run A - CHECK(nEvents == 1); - CHECK(sched.totalService(A).count() != 0); - CHECK(sched.stats().mActionsDequeued == 1); - CHECK(sched.stats().mQueuesSuspended == 1); - - sched.enqueue(std::string(A), microsleep, - Scheduler::ActionType::NORMAL_ACTION); - sched.enqueue(std::string(B), microsleep, - Scheduler::ActionType::NORMAL_ACTION); - sched.enqueue(std::string(C), microsleep, - Scheduler::ActionType::NORMAL_ACTION); - - CHECK(sched.size() == 3); - CHECK(sched.nextQueueToRun() != A); - CHECK(sched.totalService(A).count() != 0); - CHECK(sched.totalService(B).count() == 0); - CHECK(sched.totalService(C).count() == 0); - CHECK(sched.queueLength(A) == 1); - CHECK(sched.queueLength(B) == 1); - CHECK(sched.queueLength(C) == 1); - CHECK(sched.stats().mActionsEnqueued == 4); - CHECK(sched.stats().mActionsDroppedDueToOverload == 0); - CHECK(sched.stats().mQueuesActivatedFromFresh == 3); - CHECK(sched.stats().mQueuesActivatedFromIdle == 1); - - auto aruntime = sched.totalService(A).count(); - CHECK(sched.runOne() == 1); // run B or C - CHECK(sched.runOne() == 1); // run B or C - CHECK(nEvents == 3); - CHECK(sched.totalService(A).count() == aruntime); - CHECK(sched.totalService(B).count() != 0); - CHECK(sched.totalService(C).count() != 0); - CHECK(sched.queueLength(A) == 1); - CHECK(sched.queueLength(B) == 0); - CHECK(sched.queueLength(C) == 0); - CHECK(sched.stats().mActionsDequeued == 3); - CHECK(sched.stats().mActionsDroppedDueToOverload == 0); - CHECK(sched.stats().mQueuesSuspended == 3); - - CHECK(sched.runOne() == 1); // run A - CHECK(nEvents == 4); - CHECK(sched.queueLength(A) == 0); - CHECK(sched.queueLength(B) == 0); - CHECK(sched.queueLength(C) == 0); - CHECK(sched.stats().mActionsDequeued == 4); - CHECK(sched.stats().mQueuesSuspended == 4); -} - -TEST_CASE("scheduler load shedding -- overload", "[scheduler]") -{ - std::chrono::microseconds window(100); - VirtualClock clock; - Scheduler sched(clock, window); - - std::string A("a"), B("b"), C("c"); - - size_t nEvents{0}; - auto step = std::chrono::microseconds(1); - auto microsleep = [&] { - clock.sleep_for(step); - ++nEvents; - }; - - for (size_t i = 0; i < 1000; ++i) - { - sched.enqueue(std::string(A), microsleep, - Scheduler::ActionType::DROPPABLE_ACTION); - sched.enqueue(std::string(B), microsleep, - Scheduler::ActionType::DROPPABLE_ACTION); - sched.enqueue(std::string(C), microsleep, - Scheduler::ActionType::DROPPABLE_ACTION); - sched.runOne(); - sched.runOne(); - } - while (sched.size() != 0) - { - sched.runOne(); - } - CHECK(sched.stats().mActionsDroppedDueToOverload == 901); - CHECK(sched.stats().mActionsDequeued == 2099); - auto tot = sched.stats().mActionsDequeued + - sched.stats().mActionsDroppedDueToOverload; - CHECK(sched.stats().mActionsEnqueued == tot); -}