Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion mlx/distributed/ring/ring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ namespace mlx::core::distributed::ring {

constexpr const size_t ALL_SUM_SIZE = 8 * 1024 * 1024;
constexpr const size_t ALL_SUM_BUFFERS = 2;
constexpr const int CONN_ATTEMPTS = 5;
// Raised from the upstream default of 5. The ring join requires both ranks to
// reach group init within the connect retry window. When nodes are started
// independently on separate machines (rather than co-launched), each may spend
// seconds loading its shard before reaching accept(); with only 5 attempts we
// observed the connecting side aborting with errno 60 (ETIMEDOUT) before its
// peer was listening. A larger attempt count lets independently-started nodes
// rendezvous reliably. (Does not affect co-launched setups, which connect on
// the first attempt.)
constexpr const int CONN_ATTEMPTS = 60;
constexpr const int CONN_WAIT = 1000;
constexpr const char* RING_TAG = "[ring]";

Expand Down
21 changes: 19 additions & 2 deletions mlx/distributed/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <netdb.h>
#include <unistd.h>
#include <algorithm>
#include <cstring>
#include <sstream>
#include <thread>
Expand Down Expand Up @@ -168,7 +169,18 @@ TCPSocket TCPSocket::connect(
std::function<void(int, int)> cb) {
int sock, success;

// Attempt to connect `num_retries` times with exponential backoff.
// Attempt to connect `num_retries` times.
//
// Two changes that mattered when bringing up a ring across separately-started
// machines (each node connecting many times before its peer is ready):
// 1. Close the socket fd on a failed attempt, before retrying. A fresh
// socket is created each iteration, so without this the failed ones are
// left open; under many retries they accumulate (SYN_SENT/CLOSED), which
// we saw interfere with the peer's accept().
// 2. Cap the inter-attempt wait. The wait doubles each miss; left unbounded
// it grows to many seconds, so a node can be asleep when its peer becomes
// ready and the two stop lining up. Capping it keeps retries frequent so
// they stay in-phase across the `num_retries` window.
for (int attempt = 0; attempt < num_retries; attempt++) {
// Create the socket
sock = socket(AF_INET, SOCK_STREAM, 0);
Expand All @@ -183,6 +195,8 @@ TCPSocket TCPSocket::connect(
if (success == 0) {
break;
}
// Failed attempt: close the fd so we don't leak a half-open socket.
::close(sock);

if (cb != nullptr) {
cb(attempt, wait);
Expand All @@ -191,7 +205,10 @@ TCPSocket TCPSocket::connect(
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}

wait <<= 1;
// Bounded growth: gentle ramp, capped at 2s so we keep retrying steadily.
if (wait < 2000) {
wait = std::min(2000, wait * 2);
}
}

if (success < 0) {
Expand Down