Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions ark/api/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@
#include "utils/utils_net.hpp"

#if defined(ARK_CUDA)
#include <cuda/atomic>
#include <mscclpp/atomic_device.hpp>
static int atomicLoadRelaxed(int *ptr) {
return cuda::atomic_ref<int, cuda::thread_scope_system>{*ptr}.load(
cuda::memory_order_relaxed);
return mscclpp::atomicLoad(ptr, mscclpp::memoryOrderRelaxed);
}
static void atomicStoreRelaxed(int *ptr, int val) {
cuda::atomic_ref<int, cuda::thread_scope_system>{*ptr}.store(
val, cuda::memory_order_relaxed);
mscclpp::atomicStore(ptr, val, mscclpp::memoryOrderRelaxed);
}
#elif defined(ARK_ROCM)
static int atomicLoadRelaxed(int *ptr) {
Expand Down Expand Up @@ -161,7 +159,7 @@ class CommResource {
}

struct ConnectionResource {
std::shared_ptr<mscclpp::Connection> connection;
mscclpp::Connection connection;
std::vector<std::shared_ptr<mscclpp::PortChannel>> proxy_channels;
std::vector<std::shared_ptr<mscclpp::MemoryChannel>> sm_channels;
};
Expand Down Expand Up @@ -248,12 +246,11 @@ void CommResource::connect(const PlanJson &plan_json,
mscclpp::RegisteredMemory regmem =
comm_->registerMemory(buffer->ref(), buffer->bytes(), all_transports);

using ConnectionFuture =
mscclpp::NonblockingFuture<std::shared_ptr<mscclpp::Connection>>;
using ConnectionFuture = std::shared_future<mscclpp::Connection>;
std::map<int, ConnectionFuture> rank_to_ipc_connection_future;
std::map<int, ConnectionFuture> rank_to_eth_connection_future;
std::map<int, ConnectionFuture> rank_to_ib_connection_future;
std::map<int, mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>>
std::map<int, std::shared_future<mscclpp::RegisteredMemory>>
rank_to_remote_regmem_future;

for (auto remote_rank : remote_ranks) {
Expand All @@ -266,25 +263,26 @@ void CommResource::connect(const PlanJson &plan_json,
rank_to_resource_[remote_rank] = resource;
int remote_node = rank_to_node(remote_rank);
if (remote_node == this_node) {
rank_to_ipc_connection_future[remote_rank] = comm_->connectOnSetup(
remote_rank, 0, mscclpp::Transport::CudaIpc);
rank_to_ipc_connection_future[remote_rank] = comm_->connect(
mscclpp::EndpointConfig(mscclpp::Transport::CudaIpc),
remote_rank, 0);
resource->ipc = std::make_shared<ConnectionResource>();
}
if ((remote_node != this_node) && get_env().disable_ib) {
rank_to_eth_connection_future[remote_rank] = comm_->connectOnSetup(
remote_rank, 0, mscclpp::Transport::Ethernet);
rank_to_eth_connection_future[remote_rank] = comm_->connect(
mscclpp::EndpointConfig(mscclpp::Transport::Ethernet),
remote_rank, 0);
resource->eth = std::make_shared<ConnectionResource>();
}
if (!get_env().disable_ib) {
rank_to_ib_connection_future[remote_rank] =
comm_->connectOnSetup(remote_rank, 0, IBs[device_id_]);
rank_to_ib_connection_future[remote_rank] = comm_->connect(
mscclpp::EndpointConfig(IBs[device_id_]), remote_rank, 0);
resource->ib = std::make_shared<ConnectionResource>();
}
comm_->sendMemoryOnSetup(regmem, remote_rank, 0);
comm_->sendMemory(regmem, remote_rank, 0);
rank_to_remote_regmem_future[remote_rank] =
comm_->recvMemoryOnSetup(remote_rank, 0);
comm_->recvMemory(remote_rank, 0);
}
comm_->setup();

for (auto &[remote_rank, future] : rank_to_ipc_connection_future) {
rank_to_resource_[remote_rank]->ipc->connection = future.get();
Expand Down Expand Up @@ -323,26 +321,23 @@ void CommResource::connect(const PlanJson &plan_json,
add_proxy_channel(resource->eth);
add_proxy_channel(resource->ib);
}
comm_->setup();

std::map<int,
std::vector<std::shared_ptr<mscclpp::SmDevice2DeviceSemaphore>>>
std::map<
int, std::vector<std::shared_ptr<mscclpp::MemoryDevice2DeviceSemaphore>>>
sm_semaphores;
for (auto &[remote_rank, resource] : rank_to_resource_) {
// NOTE: We can create multiple semaphores here if we need in the future
sm_semaphores[remote_rank].push_back(
std::make_shared<mscclpp::SmDevice2DeviceSemaphore>(
std::make_shared<mscclpp::MemoryDevice2DeviceSemaphore>(
*comm_, resource->ipc->connection));
}
comm_->setup();

for (auto &[remote_rank, resource] : rank_to_resource_) {
// NOTE: We can create multiple sm channels here if we need in the
// future
resource->ipc->sm_channels.push_back(
std::make_shared<mscclpp::MemoryChannel>(
sm_semaphores[remote_rank][0],
rank_to_remote_regmem[remote_rank], regmem.data(), nullptr));
rank_to_remote_regmem[remote_rank], regmem, nullptr));
}
}

Expand Down Expand Up @@ -768,8 +763,9 @@ void PlanResource::init_kernel() {
proxy_secondary_handles[i] = p_hdls[1];
}
}
// Pin current device - see set_current() rationale in compile().
gpu_manager->set_current();
auto tmp_stream = gpu_manager->create_stream();
GLOG(gpuSetDevice(device_id_));
GLOG(gpuMemcpyAsync(
proxy_chan_addr, proxy_handles.data(),
proxy_handles.size() * sizeof(mscclpp::PortChannel::DeviceHandle),
Expand Down Expand Up @@ -887,10 +883,18 @@ void Executor::Impl::compile(const std::string &plan, int device_id,
}
if (prev_device_id != device_id) {
auto gpu_manager = GpuManager::get_instance(device_id);
// Pin the calling thread to this device before allocating per-device
// CUDA resources. The GpuManager singleton only calls gpuSetDevice
// when it's first constructed, so subsequent get_instance() calls
// (or any code path that touched a different device in between) can
// leave the wrong current device on the thread - at world_size ≥ 4
// this manifests as cross-device events/streams that later trigger
// cudaErrorInvalidValue on rank N-1.
gpu_manager->set_current();
timer_begin_ = gpu_manager->create_event();
timer_end_ = gpu_manager->create_event();
flag_ = gpu_manager->malloc_host(
sizeof(int), gpuHostAllocMapped | gpuHostAllocWriteCombined);
sizeof(int), gpuHostAllocMapped);
stream_ = gpu_manager->create_stream();
}
PlanResourceKey key(plan, device_id, name);
Expand Down Expand Up @@ -931,12 +935,20 @@ void Executor::Impl::launch(
loop_mode_ = loop_mode;
elapsed_msec_ = -1;

// Pin current device - see set_current() rationale in compile().
if (foreground_plan_resource_) {
GpuManager::get_instance(foreground_plan_resource_->device_id())
->set_current();
}
if (record) {
timer_begin_->record(stream_raw_);
is_recording_ = true;
}
if (comm_resource_) {
comm_resource_->proxy_service()->startProxy();
// Barrier across all ranks BEFORE launching the persistent kernel
// so that no rank races ahead while others finalize proxy setup.
comm_resource_->bootstrap()->barrier();
}

if (loop_mode_) {
Expand Down Expand Up @@ -1003,9 +1015,6 @@ void Executor::Impl::wait(int64_t max_spin_count) {
}
}
} else {
if (max_spin_count >= 0) {
LOG(WARN, "max_spin_count is ignored in non-loop mode.");
}
GLOG(gpuStreamSynchronize(stream_raw_));
}
}
Expand Down
85 changes: 85 additions & 0 deletions ark/api/executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,96 @@ ark::unittest::State test_executor_invalid() {
return ark::unittest::SUCCESS;
}

// Smoke test: compile + launch on device 0 with a ReLU kernel.
// The set_current() calls in compile() and launch() are exercised but are
// effectively no-ops on device 0. The multi-GPU pinning fix (world_size >= 4)
// requires multi-process testing and is not covered here.
ark::unittest::State test_executor_device_pinning() {
ark::Model m;
auto tensor = m.tensor({256}, ark::FP32);
auto out = m.relu(tensor);

ark::Planner planner(m, 0);
auto plan = planner.plan();

// Compile and launch on device 0 — set_current() is called in
// compile() and launch() but is a no-op on device 0.
ark::Executor exe;
exe.compile(plan, 0);
exe.launch();

// Write data, run, read back
std::vector<float> input(256);
for (int i = 0; i < 256; ++i) input[i] = static_cast<float>(i - 128);
exe.tensor_write(tensor, input.data(), input.size() * sizeof(float));

exe.run(1);
exe.wait();

std::vector<float> output(256);
exe.tensor_read(out, output.data(), output.size() * sizeof(float));

// Verify ReLU: max(0, x)
for (int i = 0; i < 256; ++i) {
float expected = std::max(0.0f, input[i]);
UNITTEST_EQ(output[i], expected);
}

exe.stop();
return ark::unittest::SUCCESS;
}

// Test repeated compile-launch-stop cycles on the same executor to verify
// that device pinning and resource cleanup work correctly across cycles.
ark::unittest::State test_executor_recompile_cycle() {
ark::Model m;
auto tensor = m.tensor({64}, ark::FP32);
m.noop(tensor);

ark::Planner planner(m, 0);
auto plan = planner.plan();

ark::Executor exe;
for (int cycle = 0; cycle < 3; ++cycle) {
exe.compile(plan, 0);
exe.launch();
exe.run(1);
exe.wait();
exe.stop();
}
return ark::unittest::SUCCESS;
}

// Smoke test: run multiple iterations in loop mode to exercise the
// atomicLoad/Store polling path. Does not inspect flag values directly;
// verifies only that the host-side poll completes without hanging.
ark::unittest::State test_executor_flag_polling() {
ark::Model m;
auto tensor = m.tensor({64}, ark::FP32);
m.noop(tensor);

ark::DefaultExecutor executor(m, 0);
executor.launch();

// Run a few iterations and wait — exercises the atomicLoad/Store
// polling loop on the host-mapped flag buffer.
for (int i = 0; i < 5; ++i) {
executor.run(1);
executor.wait();
}

executor.stop();
return ark::unittest::SUCCESS;
}

int main() {
UNITTEST(test_executor_loop);
UNITTEST(test_executor_no_loop);
UNITTEST(test_executor_tensor_read_write_no_stride);
UNITTEST(test_executor_tensor_read_write_stride_offset);
UNITTEST(test_executor_invalid);
UNITTEST(test_executor_device_pinning);
UNITTEST(test_executor_recompile_cycle);
UNITTEST(test_executor_flag_polling);
return 0;
}
6 changes: 5 additions & 1 deletion cmake/CheckNvidiaGpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ if(NOT CUDAToolkit_FOUND)
return()
endif()

set(CMAKE_CUDA_ARCHITECTURES "60")
# Use sm_80 as minimum for the detection check.
# Must be a CACHE variable so cmake applies it before enable_language(CUDA)
# tests the compiler. Without CACHE, cmake 3.25+ may probe with a default
# architecture (e.g., compute_60) that newer CUDA toolkits have dropped.
set(CMAKE_CUDA_ARCHITECTURES "80" CACHE STRING "CUDA architectures for GPU detection" FORCE)
if(NOT CMAKE_CUDA_COMPILER)
# In case the CUDA Toolkit directory is not in the PATH
find_program(CUDA_COMPILER
Expand Down
18 changes: 15 additions & 3 deletions python/executor_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,21 @@ SharedTensor::SharedTensor(Executor &exe, const Tensor &tensor) {

DLTensor SharedTensor::dl_tensor() const {
DLTensor dl_tensor;
dl_tensor.data = data_;
size_t offset_in_elements = offsets_->empty() ? 0 : offsets_->at(0);
dl_tensor.byte_offset = offset_in_elements * dtype_.bytes();
// Compute the linear element offset from multi-dimensional offsets
// and strides: sum(offsets[i] * strides[i]) for all dimensions.
// Bake the offset directly into the data pointer since some frameworks
// (e.g., PyTorch) may not reliably use DLTensor::byte_offset.
size_t offset_in_elements = 0;
if (!offsets_->empty() && !strides_->empty()) {
for (size_t i = 0; i < offsets_->size() && i < strides_->size(); ++i) {
offset_in_elements += static_cast<size_t>(offsets_->at(i)) *
static_cast<size_t>(strides_->at(i));
}
}
size_t byte_offset = offset_in_elements * dtype_.bytes();
dl_tensor.data = reinterpret_cast<void *>(
reinterpret_cast<uintptr_t>(data_) + byte_offset);
dl_tensor.byte_offset = 0;
dl_tensor.device.device_type = get_device_type();
dl_tensor.device.device_id = device_id_;
dl_tensor.ndim = static_cast<int32_t>(shape_->size());
Expand Down
2 changes: 1 addition & 1 deletion third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ include(FetchContent)
FetchContent_Declare(
mscclpp
GIT_REPOSITORY https://github.com/microsoft/mscclpp
GIT_TAG 7f3b088744b184d595c0daeb2d721c2c8908f4bc
GIT_TAG v0.9.0
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/mscclpp
)
set(MSCCLPP_BUILD_TESTS OFF CACHE BOOL "" FORCE)
Expand Down
Loading