diff --git a/src/clusterd/src/lib.rs b/src/clusterd/src/lib.rs index 8b49cbeb54358..e48c887181bf3 100644 --- a/src/clusterd/src/lib.rs +++ b/src/clusterd/src/lib.rs @@ -81,12 +81,6 @@ struct Args { default_value = "127.0.0.1:6878" )] internal_http_listen_addr: SocketAddr, - /// The FQDN of this process, for GRPC request validation. - /// - /// Not providing this value or setting it to the empty string disables host validation for - /// GRPC requests. - #[clap(long, env = "GRPC_HOST", value_name = "NAME")] - grpc_host: Option, // === Timely cluster options. === /// Configuration for the storage Timely cluster. @@ -435,7 +429,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { None, ); - let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h)); let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry); let mut storage_timely_config = args.storage_timely_config; @@ -480,7 +473,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { transport::serve( args.storage_controller_listen_addr, BUILD_INFO.semver_version(), - grpc_host.clone(), Duration::MAX, storage_client_builder, grpc_server_metrics.for_server("storage"), @@ -512,7 +504,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { transport::serve( args.compute_controller_listen_addr, BUILD_INFO.semver_version(), - grpc_host.clone(), Duration::MAX, compute_client_builder, grpc_server_metrics.for_server("compute"), diff --git a/src/service/src/transport.rs b/src/service/src/transport.rs index cad6dba6877cd..7ad4c971d715a 100644 --- a/src/service/src/transport.rs +++ b/src/service/src/transport.rs @@ -66,31 +66,14 @@ impl Client { idle_timeout: Duration, metrics: impl Metrics, ) -> anyhow::Result { - let dest_host = host_from_address(address); let stream = mz_ore::future::timeout(connect_timeout, Stream::connect(address)).await?; info!(%address, "ctp: connected to server"); - let conn = Connection::start(stream, version, dest_host, idle_timeout, metrics).await?; + let conn = Connection::start(stream, version, idle_timeout, metrics).await?; Ok(Self { conn }) } } -/// Helper function to extract the host part from an address string. -/// -/// This function assumes addresses to be of the form `:` or `::` -/// and yields `None` otherwise. -fn host_from_address(address: &str) -> Option { - let mut p = address.split(':'); - let (host, port) = match (p.next(), p.next(), p.next(), p.next()) { - (Some(host), Some(port), None, None) => (host, port), - (Some(_protocol), Some(host), Some(port), None) => (host, port), - _ => return None, - }; - - let _: u16 = port.parse().ok()?; - Some(host.into()) -} - impl Client where Out: Message, @@ -138,7 +121,6 @@ impl GenericClient for Client { pub async fn serve( address: SocketAddr, version: Version, - server_fqdn: Option, idle_timeout: Duration, handler_fn: impl Fn() -> H, metrics: impl Metrics, @@ -171,7 +153,6 @@ where let handler = handler_fn(); let version = version.clone(); - let server_fqdn = server_fqdn.clone(); let metrics = metrics.clone(); let (cancel_tx, cancel_rx) = oneshot::channel(); @@ -183,7 +164,6 @@ where stream, handler, version, - server_fqdn, idle_timeout, cancel_rx, metrics, @@ -203,7 +183,6 @@ async fn serve_connection( stream: Stream, mut handler: H, version: Version, - server_fqdn: Option, timeout: Duration, cancel_rx: oneshot::Receiver<()>, metrics: impl Metrics, @@ -213,7 +192,7 @@ where Out: Message, H: GenericClient, { - let mut conn = Connection::start(stream, version, server_fqdn, timeout, metrics).await?; + let mut conn = Connection::start(stream, version, timeout, metrics).await?; let mut cancel_rx = cancel_rx; loop { @@ -271,7 +250,6 @@ impl Connection { async fn start( stream: Stream, version: Version, - server_fqdn: Option, mut timeout: Duration, metrics: impl Metrics, ) -> anyhow::Result { @@ -292,7 +270,7 @@ impl Connection { let mut reader = metrics::Reader::new(reader, metrics.clone()); let mut writer = metrics::Writer::new(writer, metrics.clone()); - handshake(&mut reader, &mut writer, version, server_fqdn).await?; + handshake(&mut reader, &mut writer, version).await?; let (out_tx, out_rx) = mpsc::unbounded_channel(); let (in_tx, in_rx) = mpsc::unbounded_channel(); @@ -420,12 +398,7 @@ impl Connection { /// `Hello` message. The `Hello` message contains information about the originating endpoint that /// is used by the receiver to validate compatibility with its peer. Only if both endpoints /// determine that they are compatible does the handshake succeed. -async fn handshake( - mut reader: R, - mut writer: W, - version: Version, - server_fqdn: Option, -) -> anyhow::Result<()> +async fn handshake(mut reader: R, mut writer: W, version: Version) -> anyhow::Result<()> where R: AsyncRead + Unpin, W: AsyncWrite + Unpin, @@ -437,7 +410,6 @@ where let hello = Hello { version: version.clone(), - server_fqdn: server_fqdn.clone(), }; write_message(&mut writer, Some(&hello)).await?; @@ -448,17 +420,11 @@ where let Hello { version: peer_version, - server_fqdn: peer_server_fqdn, } = read_message(&mut reader).await?; if peer_version != version { bail!("version mismatch: {peer_version} != {version}"); } - if let (Some(other), Some(mine)) = (&peer_server_fqdn, &server_fqdn) { - if other != mine { - bail!("server FQDN mismatch: {other} != {mine}"); - } - } Ok(()) } @@ -468,8 +434,6 @@ where struct Hello { /// The version of the originating endpoint. version: Version, - /// The FQDN of the server endpoint. - server_fqdn: Option, } /// Write a message into the given writer. diff --git a/src/service/tests/transport.rs b/src/service/tests/transport.rs index e77879cea1e5b..f5b5b055d974a 100644 --- a/src/service/tests/transport.rs +++ b/src/service/tests/transport.rs @@ -105,7 +105,6 @@ fn test_bidirectional_communication() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), NoopMetrics, @@ -160,7 +159,6 @@ fn test_server_error() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), NoopMetrics, @@ -241,7 +239,6 @@ fn test_handshake_version_mismatch() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), SERVER_VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), NoopMetrics, @@ -268,49 +265,6 @@ fn test_handshake_version_mismatch() { sim.run().unwrap(); } -#[test] // allow(test-attribute) -#[cfg_attr(miri, ignore)] // too slow -fn test_handshake_fqdn_mismatch() { - let mut sim = setup(); - - sim.host("server", move || async { - let (in_tx, mut in_rx) = mpsc::unbounded_channel::<()>(); - let (_out_tx, out_rx) = mpsc::unbounded_channel::<()>(); - let handler = ChannelHandler::new(in_tx, out_rx); - let handler = Arc::new(Mutex::new(Some(handler))); - - mz_ore::task::spawn( - || "serve", - transport::serve( - "turmoil:0.0.0.0:7777".parse().unwrap(), - VERSION, - Some("wrong.server".into()), - TIMEOUT, - move || handler.lock().unwrap().take().unwrap(), - NoopMetrics, - ), - ); - - // Client has disconnected. - assert_none!(in_rx.recv().await); - - Ok(()) - }); - - sim.client("client", async move { - connect_ctp_error::<(), ()>( - "turmoil:server:7777", - VERSION, - "server FQDN mismatch: wrong.server != server", - ) - .await?; - - Ok(()) - }); - - sim.run().unwrap(); -} - #[test] // allow(test-attribute) #[cfg_attr(miri, ignore)] // too slow fn test_idle_timeout() { @@ -327,7 +281,6 @@ fn test_idle_timeout() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), NoopMetrics, @@ -381,7 +334,6 @@ fn test_keepalive() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), NoopMetrics, @@ -420,7 +372,6 @@ fn test_connection_cancelation() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, OneOutputHandler::new, NoopMetrics, @@ -510,7 +461,6 @@ fn test_metrics() { transport::serve( "turmoil:0.0.0.0:7777".parse().unwrap(), VERSION, - Some("server".into()), TIMEOUT, move || handler.lock().unwrap().take().unwrap(), metrics.clone(), @@ -523,8 +473,9 @@ fn test_metrics() { // Wait for message to be transmitted. tokio::time::sleep(Duration::from_secs(1)).await; - assert!(metrics.bytes_sent.load(Ordering::SeqCst) >= 63); - assert!(metrics.bytes_received.load(Ordering::SeqCst) >= 44); + // Loose lower bounds; exact counts vary by a keepalive frame. + assert!(metrics.bytes_sent.load(Ordering::SeqCst) >= 40); + assert!(metrics.bytes_received.load(Ordering::SeqCst) >= 30); assert_eq!(metrics.messages_sent.load(Ordering::SeqCst), 1); assert_eq!(metrics.messages_received.load(Ordering::SeqCst), 1); @@ -546,8 +497,9 @@ fn test_metrics() { // Wait for message to be transmitted. tokio::time::sleep(Duration::from_secs(1)).await; - assert!(metrics.bytes_sent.load(Ordering::SeqCst) >= 44); - assert!(metrics.bytes_received.load(Ordering::SeqCst) >= 63); + // Loose lower bounds; exact counts vary by a keepalive frame. + assert!(metrics.bytes_sent.load(Ordering::SeqCst) >= 30); + assert!(metrics.bytes_received.load(Ordering::SeqCst) >= 40); assert_eq!(metrics.messages_sent.load(Ordering::SeqCst), 1); assert_eq!(metrics.messages_received.load(Ordering::SeqCst), 1);