diff --git a/mlx/distributed/ring/ring.cpp b/mlx/distributed/ring/ring.cpp index ea40042844..2ccae859bd 100644 --- a/mlx/distributed/ring/ring.cpp +++ b/mlx/distributed/ring/ring.cpp @@ -91,7 +91,15 @@ namespace mlx::core::distributed::ring { constexpr const size_t ALL_SUM_SIZE = 8 * 1024 * 1024; constexpr const size_t ALL_SUM_BUFFERS = 2; -constexpr const int CONN_ATTEMPTS = 5; +// Raised from the upstream default of 5. The ring join requires both ranks to +// reach group init within the connect retry window. When nodes are started +// independently on separate machines (rather than co-launched), each may spend +// seconds loading its shard before reaching accept(); with only 5 attempts we +// observed the connecting side aborting with errno 60 (ETIMEDOUT) before its +// peer was listening. A larger attempt count lets independently-started nodes +// rendezvous reliably. (Does not affect co-launched setups, which connect on +// the first attempt.) +constexpr const int CONN_ATTEMPTS = 60; constexpr const int CONN_WAIT = 1000; constexpr const char* RING_TAG = "[ring]"; diff --git a/mlx/distributed/utils.cpp b/mlx/distributed/utils.cpp index 2de994cf4d..16dd7ed637 100644 --- a/mlx/distributed/utils.cpp +++ b/mlx/distributed/utils.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -168,7 +169,18 @@ TCPSocket TCPSocket::connect( std::function cb) { int sock, success; - // Attempt to connect `num_retries` times with exponential backoff. + // Attempt to connect `num_retries` times. + // + // Two changes that mattered when bringing up a ring across separately-started + // machines (each node connecting many times before its peer is ready): + // 1. Close the socket fd on a failed attempt, before retrying. A fresh + // socket is created each iteration, so without this the failed ones are + // left open; under many retries they accumulate (SYN_SENT/CLOSED), which + // we saw interfere with the peer's accept(). + // 2. Cap the inter-attempt wait. The wait doubles each miss; left unbounded + // it grows to many seconds, so a node can be asleep when its peer becomes + // ready and the two stop lining up. Capping it keeps retries frequent so + // they stay in-phase across the `num_retries` window. for (int attempt = 0; attempt < num_retries; attempt++) { // Create the socket sock = socket(AF_INET, SOCK_STREAM, 0); @@ -183,6 +195,8 @@ TCPSocket TCPSocket::connect( if (success == 0) { break; } + // Failed attempt: close the fd so we don't leak a half-open socket. + ::close(sock); if (cb != nullptr) { cb(attempt, wait); @@ -191,7 +205,10 @@ TCPSocket TCPSocket::connect( std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } - wait <<= 1; + // Bounded growth: gentle ramp, capped at 2s so we keep retrying steadily. + if (wait < 2000) { + wait = std::min(2000, wait * 2); + } } if (success < 0) {