Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal review #6

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open

internal review #6

wants to merge 33 commits into from

Conversation

Chao1Han
Copy link
Owner

Fixes #ISSUE_NUMBER

auto currentTimepoint = std::chrono::steady_clock::now();
auto timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
currentTimepoint - workStartTime_);
std::chrono::milliseconds opTimeout = std::chrono::milliseconds(60000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do you use it?

@@ -67,17 +67,15 @@ ccl::reduction getXcclReduceOp(const ReduceOp& reduceOp, at::Tensor& input) {
return xcclOps.at(reduceOp);
} catch (const std::out_of_range&) {
switch (reduceOp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to switch

: Work(rank, opType, "profilingTitle", inputs),
device_(device),
workStartTime_(std::chrono::steady_clock::now()) {
unsigned char enable_timing = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you always set it as 0, then we don't need to keep it, right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Defining this variable serves as a form of annotation, informing reviewers and users that 0 represents the state of enable_timing, which is meaningful.

"Work ran for ",
timeElapsed.count(),
" milliseconds before timing out.");
TORCH_CHECK(false, exceptionMsg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TORCH_CHECK(false, exceptionMsg);
abort();

} // namespace

static std::mutex xcclCommDevIdxMapMutex;
static std::unordered_map<std::shared_ptr<xcclComm_t>, int> xcclCommDevIdxMap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those static variables are not used in your code. Please check.

blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false);
init();

// Intel oneCCL requires passing CCL_LOCAL_RANK and CCL_LOCAL_SIZE for non-MPI
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comment for why we use LOCAL_RANK and LOCAL_WORLD_SIZE.

std::shared_ptr<xcclComm_t> ProcessGroupXCCL::getXCCLComm(
const std::string& deviceKey,
at::Device& device) {
if (deviceKey.empty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C10_THROW_ERROR_WITH

devXCCLCommMap_.emplace(deviceKey, XCCLComm);
}

xcclStreamsMap_.emplace(deviceKey, std::move(stream));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so xcclEventsMap does not needed?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore it

PreProcess pre,
PostProcess post,
OpType opType) {
using traits = function_traits<Fn>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which collective need attribute as a must?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, allgather meet build error

for (const auto i : c10::irange(inputs.size())) {
c10::xpu::XPUCachingAllocator::recordStream(
inputs[i].storage().data_ptr(), stream);
fn(inputs[i], outputs[i], attr, *comm, stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for output record stream.

false, "ProcessGroupXCCL::WorkXCCL::isSuccess not implemented");
}

void abort() override {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort here?


bool isCompleted() override;

bool isSuccess() const override {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

case ReduceOp::BXOR:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BXOR with NCCL");
C10_THROW_ERROR(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change NCCL now.


c10::impl::VirtualGuardImpl impl(device.type());
c10::Stream stream = impl.getStream(device);
sycl::queue& q = c10::xpu::XPUStream(stream).queue();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a big bug to use current stream as communication stream.

int rank,
OpType opType,
const std::optional<std::vector<at::Tensor>>& inputs)
: Work(rank, opType, "profilingTitle", inputs),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need change

@@ -126,6 +131,13 @@ class TORCH_API ProcessGroup : public torch::CustomClassHolder {
return backendType_;
};

inline bool backendSupportsSequenceNumbers(BackendType backendType) {
if (backendType == BackendType::GLOO || backendType == BackendType::NCCL ||
backendType == BackendType::XCCL || backendType == BackendType::UCC)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you make sure that we need to support this sequence number?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequence number used by RECORD_PARAM_COMMS. so we need it

@@ -180,7 +181,8 @@ def skip_if_lt_x_gpu(x):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if torch.cuda.is_available() and torch.cuda.device_count() >= x:
if (torch.cuda.is_available() and torch.cuda.device_count() >= x) or \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use if for accelerator related check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants