From 19d5ac8050a8f0a85a8b3a516e7832d771029bf2 Mon Sep 17 00:00:00 2001 From: Taras Shchybovyk Date: Tue, 16 Jun 2026 15:15:00 -0700 Subject: [PATCH] ring: make cross-machine join robust to launch skew MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Darkbloom runs the MLX ring backend across independently-started Macs (separate processes, separate machines). Three problems made the join flaky/impossible in that setting: 1. ring.cpp: CONN_ATTEMPTS was 5, so the connect retry window was only ~5s. Two nodes launched by hand / over SSH with differing shard-load and attestation startup times routinely missed it and aborted with errno 60. Widened to 60 attempts. 2. utils.cpp TCPSocket::connect leaked the socket fd on every failed attempt, piling up SYN_SENT/CLOSED sockets that confused the peer's single accept(). Now closes the fd before retrying. 3. utils.cpp used unbounded exponential backoff (wait <<= 1), so after a few misses a node slept 16s/32s while its peer sat ready — they fell out of phase and never rendezvoused. Capped the wait at 2s so retries stay frequent and in-phase across the whole window. Verified on a 2-Mac cluster (32GB + 24GB) running sharded Mistral-24B-8bit end-to-end through the Darkbloom coordinator. --- mlx/distributed/ring/ring.cpp | 10 +++++++++- mlx/distributed/utils.cpp | 21 +++++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/mlx/distributed/ring/ring.cpp b/mlx/distributed/ring/ring.cpp index ea40042844..2ccae859bd 100644 --- a/mlx/distributed/ring/ring.cpp +++ b/mlx/distributed/ring/ring.cpp @@ -91,7 +91,15 @@ namespace mlx::core::distributed::ring { constexpr const size_t ALL_SUM_SIZE = 8 * 1024 * 1024; constexpr const size_t ALL_SUM_BUFFERS = 2; -constexpr const int CONN_ATTEMPTS = 5; +// Raised from the upstream default of 5. The ring join requires both ranks to +// reach group init within the connect retry window. When nodes are started +// independently on separate machines (rather than co-launched), each may spend +// seconds loading its shard before reaching accept(); with only 5 attempts we +// observed the connecting side aborting with errno 60 (ETIMEDOUT) before its +// peer was listening. A larger attempt count lets independently-started nodes +// rendezvous reliably. (Does not affect co-launched setups, which connect on +// the first attempt.) +constexpr const int CONN_ATTEMPTS = 60; constexpr const int CONN_WAIT = 1000; constexpr const char* RING_TAG = "[ring]"; diff --git a/mlx/distributed/utils.cpp b/mlx/distributed/utils.cpp index 2de994cf4d..16dd7ed637 100644 --- a/mlx/distributed/utils.cpp +++ b/mlx/distributed/utils.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -168,7 +169,18 @@ TCPSocket TCPSocket::connect( std::function cb) { int sock, success; - // Attempt to connect `num_retries` times with exponential backoff. + // Attempt to connect `num_retries` times. + // + // Two changes that mattered when bringing up a ring across separately-started + // machines (each node connecting many times before its peer is ready): + // 1. Close the socket fd on a failed attempt, before retrying. A fresh + // socket is created each iteration, so without this the failed ones are + // left open; under many retries they accumulate (SYN_SENT/CLOSED), which + // we saw interfere with the peer's accept(). + // 2. Cap the inter-attempt wait. The wait doubles each miss; left unbounded + // it grows to many seconds, so a node can be asleep when its peer becomes + // ready and the two stop lining up. Capping it keeps retries frequent so + // they stay in-phase across the `num_retries` window. for (int attempt = 0; attempt < num_retries; attempt++) { // Create the socket sock = socket(AF_INET, SOCK_STREAM, 0); @@ -183,6 +195,8 @@ TCPSocket TCPSocket::connect( if (success == 0) { break; } + // Failed attempt: close the fd so we don't leak a half-open socket. + ::close(sock); if (cb != nullptr) { cb(attempt, wait); @@ -191,7 +205,10 @@ TCPSocket TCPSocket::connect( std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } - wait <<= 1; + // Bounded growth: gentle ramp, capped at 2s so we keep retrying steadily. + if (wait < 2000) { + wait = std::min(2000, wait * 2); + } } if (success < 0) {