Skip to content

Commit

Permalink
rm header and refine profilehead
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao1Han committed Oct 17, 2024
1 parent ae90994 commit ab04fc0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 39 deletions.
3 changes: 0 additions & 3 deletions caffe2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1376,9 +1376,6 @@ if(USE_DISTRIBUTED)
endif()
if(USE_XPU AND USE_C10D_XCCL)
target_compile_definitions(torch_xpu PUBLIC USE_C10D_XCCL)
set_source_files_properties(
${TORCH_SRC_DIR}/csrc/distributed/c10d/ProcessGroupXCCL.cpp
PROPERTIES COMPILE_DEFINITIONS "CCL_ENABLE_ZE;CCL_ENABLE_SYCL")
endif()
if(USE_MPI AND USE_C10D_MPI)
if(CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down
58 changes: 41 additions & 17 deletions torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
#ifdef USE_C10D_XCCL

#include <comm/XPUGuard.h>
#include <torch/csrc/distributed/c10d/ParamCommsUtils.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp>
#include <fstream>
#include <map>
#include <sstream>
#include <stdexcept>
#include <tuple>
#include <unordered_set>
#include <utility>

#include <c10/core/DeviceType.h>
#include <c10/util/Optional.h>

namespace c10d {

Expand Down Expand Up @@ -89,10 +80,13 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL(
at::Device& device,
int rank,
OpType opType,
uint64_t seq,
const char* profilingTitle,
const std::optional<std::vector<at::Tensor>>& inputs)
: Work(rank, opType, "profilingTitle", inputs),
: Work(rank, opType, profilingTitle, inputs),
device_(device),
workStartTime_(std::chrono::steady_clock::now()) {
workStartTime_(std::chrono::steady_clock::now()),
seq_(seq) {
xcclEndEvent_ = std::make_shared<at::xpu::XPUEvent>();
}

Expand All @@ -101,7 +95,8 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL(const WorkXCCL& w)
device_(w.device_),
xcclEndEvent_(w.xcclEndEvent_),
blockingWait_(w.blockingWait_),
workStartTime_(w.workStartTime_) {}
workStartTime_(w.workStartTime_),
seq_(w.seq_) {}

ProcessGroupXCCL::WorkXCCL::~WorkXCCL() = default;

Expand Down Expand Up @@ -156,10 +151,16 @@ c10::intrusive_ptr<ProcessGroupXCCL::WorkXCCL> ProcessGroupXCCL::initWork(
at::Device& device,
int rank,
OpType opType,
const char* profilingTitle,
const std::vector<at::Tensor>& inputs,
const std::vector<at::Tensor>& outputs) {
auto r = c10::make_intrusive<ProcessGroupXCCL::WorkXCCL>(
device, rank, opType, std::optional<std::vector<at::Tensor>>(inputs));
device,
rank,
opType,
seqCollective_,
profilingTitle,
std::optional<std::vector<at::Tensor>>(inputs));
return r;
}

Expand Down Expand Up @@ -212,7 +213,10 @@ c10::intrusive_ptr<Work> ProcessGroupXCCL::collective(
Fn fn,
PreProcess pre,
PostProcess post,
OpType opType) {
OpType opType,
const char* profilingTitle) {
seqCollective_++;

auto device = inputs[0].device();
const auto key = std::to_string(device.index());
auto comm = getXCCLComm(key, device);
Expand All @@ -221,7 +225,7 @@ c10::intrusive_ptr<Work> ProcessGroupXCCL::collective(
syncStream(device, xcclEventsMap_[key], stream);

c10::intrusive_ptr<ProcessGroupXCCL::WorkXCCL> work;
work = initWork(device, rank_, opType);
work = initWork(device, rank_, opType, profilingTitle);
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(outputs);

at::xpu::OptionalXPUGuard gpuGuard(device);
Expand Down Expand Up @@ -253,6 +257,25 @@ c10::intrusive_ptr<Work> ProcessGroupXCCL::allreduce(
auto tensor = tensors.back();
checkXPUTensor(tensor);

RECORD_PARAM_COMMS_DATA(
// static_cast<int>(
// this->getSequenceNumberForGroup() + 1), // seq + 1 to match
// collective
1,
std::make_tuple(pg_uid_, pg_desc_), // PG name tuple
tensors, // inputTensors
tensors, // outputTensors
rank_, // rank
"allreduce", // collective name
tensor.numel(), // inNelems
tensor.numel(), // outNelems
tensor.scalar_type(), // dType
std::vector<int64_t>(), // inSplitSizes
std::vector<int64_t>(), // outSplitSizes
0, // globalRankStart
1, // globalRankStride
this->getSize()); // worldSize

return collective(
tensor,
tensor,
Expand All @@ -273,7 +296,8 @@ c10::intrusive_ptr<Work> ProcessGroupXCCL::allreduce(
ccl_stream);
return;
},
OpType::ALLREDUCE);
OpType::ALLREDUCE,
"xccl:all_reduce");
}

} // namespace c10d
Expand Down
43 changes: 24 additions & 19 deletions torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
#pragma once

#if defined(__linux__)
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif

#ifdef USE_C10D_XCCL
#include <ATen/xpu/XPUEvent.h>
// We will define those flags in XCCL backend file instead of passing to gcc
// compiler.
#define CCL_ENABLE_ZE
#define CCL_ENABLE_SYCL

#include <oneapi/ccl.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <exception>
#include <memory>
#include <vector>

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <list>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>

#include <ATen/xpu/XPUEvent.h>
#include <c10/core/StreamGuard.h>
#include <c10/xpu/XPUCachingAllocator.h>
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
namespace c10d {

static std::vector<std::string> TORCH_XCCL_BLOCKING_WAIT = {
Expand All @@ -45,6 +36,8 @@ class TORCH_API ProcessGroupXCCL : public Backend {
at::Device& device,
int rank,
OpType opType,
uint64_t seq,
const char* profilingTitle = nullptr,
const std::optional<std::vector<at::Tensor>>& inputs = std::nullopt);
WorkXCCL(const WorkXCCL& w);
~WorkXCCL() override;
Expand All @@ -63,6 +56,10 @@ class TORCH_API ProcessGroupXCCL : public Backend {
return future_;
}

uint64_t getSequencenumber() const override {
return seq_;
}

std::vector<at::Tensor> result() override {
return *outputs_;
}
Expand All @@ -72,6 +69,7 @@ class TORCH_API ProcessGroupXCCL : public Backend {
std::shared_ptr<at::xpu::XPUEvent> xcclEndEvent_;
bool blockingWait_ = false;
std::chrono::time_point<std::chrono::steady_clock> workStartTime_;
uint64_t seq_;

private:
void synchronizeInternal(std::chrono::milliseconds timeout);
Expand Down Expand Up @@ -103,6 +101,7 @@ class TORCH_API ProcessGroupXCCL : public Backend {
at::Device& device,
int rank,
OpType opType,
const char* profilingTitle = nullptr,
const std::vector<at::Tensor>& inputs = {},
const std::vector<at::Tensor>& outputs = {});

Expand All @@ -111,7 +110,8 @@ class TORCH_API ProcessGroupXCCL : public Backend {
at::Tensor& input,
at::Tensor& output,
Fn fn,
OpType opType) {
OpType opType,
const char* profilingTitle = nullptr) {
auto inputs = std::vector<at::Tensor>{input};
auto outputs = std::vector<at::Tensor>{output};
return collective<Fn>(
Expand All @@ -132,13 +132,17 @@ class TORCH_API ProcessGroupXCCL : public Backend {
Fn fn,
PreProcess pre,
PostProcess post,
OpType opType);
OpType opType,
const char* profilingTitle = nullptr);

c10::intrusive_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;

void setSequenceNumberForGroup() override {}
uint64_t getSequenceNumberForGroup() override {
return seqCollective_;
}

protected:
std::unordered_map<std::string, at::xpu::XPUStream> xcclStreamsMap_;
Expand All @@ -147,6 +151,7 @@ class TORCH_API ProcessGroupXCCL : public Backend {
c10::intrusive_ptr<Store> store_;
std::mutex mutex_;
bool blockingWait_ = false;
uint64_t seqCollective_{0};

private:
std::mutex kvs_mutex;
Expand Down

0 comments on commit ab04fc0

Please sign in to comment.