Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Training hang detection based on XPU Timer metric. #1288

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dlrover/python/common/global_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DefaultValues(object):
SEC_TO_CHANGE_PS = 3600 # 1h
SEC_TO_WAIT_FAILED_PS = 600 # 10min
HANG_CPU_USAGE_RATE = 0.05
HANG_DETECTION = 1


class Context(Singleton):
Expand Down Expand Up @@ -92,6 +93,9 @@ def __init__(self):
self.is_tfv1_ps = False
self.master_port = None
self.relaunch_always = False
# The strategy of 'hang detection':
# 0: log only; 1: notify; 2: with fault tolerance
self.hang_detection = DefaultValues.HANG_DETECTION

def set_params_from_brain(self):
self.train_speed_record_num = self.get_param_value_from_brain(
Expand Down
4 changes: 3 additions & 1 deletion dlrover/python/diagnosis/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class DiagnosisDataType(object):
XPU_TIMER_METRIC = "XPU_TIMER_METRIC"


class DiagnosisAction(object):
class DiagnosisActionType(object):
NO_ACTION = "no_action"
RESTART_WORKER = "restart_worker"
RELAUNCH_WORKER = "relaunch_worker"
EVENT = "event"
MASTER_RELAUNCH_WORKER = "master_relaunch_worker"
22 changes: 0 additions & 22 deletions dlrover/python/diagnosis/common/diagnose_action.py

This file was deleted.

106 changes: 106 additions & 0 deletions dlrover/python/diagnosis/common/diagnosis_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2024 The DLRover Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List

from dlrover.python.diagnosis.common.constants import DiagnosisActionType


class DiagnosisAction:
"""
The action describes the expect operation after diagnostician.
The action can be consumed by the master's job manager or directly used
in training node.
"""

def __init__(
self,
diagnosis_type: DiagnosisActionType = DiagnosisActionType.NO_ACTION,
action_config={},
):
"""
Args:
diagnosis_type (DiagnosisActionType): The action type.
"""

self._diagnosis_type = diagnosis_type
self._action_config = action_config

@property
def diagnosis_type(self):
return self._diagnosis_type

@property
def action_config(self):
return self._action_config


class EventAction(DiagnosisAction):
"""Output the specified event."""

def __init__(
self,
event_type: str = "",
instance: str = "",
action: str = "",
msg: str = "",
labels: Dict[str, str] = {},
):
super().__init__(DiagnosisActionType.EVENT)
self._event_type = event_type
self._instance = instance
self._action = action
self._msg = msg
self._labels = labels

@property
def event_type(self):
return self._event_type

@property
def instance(self):
return self._instance

@property
def action(self):
return self._action

@property
def msg(self):
return self._msg

@property
def labels(self):
return self._labels


class NodeRelaunchAction(DiagnosisAction):
"""Relaunch the specified node."""

def __init__(self, node_id, node_status, reason):
super().__init__(DiagnosisActionType.MASTER_RELAUNCH_WORKER)
self._node_id = node_id
self._node_status = node_status
self._reason = reason

@property
def node_id(self):
return self._node_id

@property
def node_status(self):
return self._node_status

@property
def reason(self):
return self._reason
6 changes: 3 additions & 3 deletions dlrover/python/diagnosis/inferencechain/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

from typing import List

from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction
from dlrover.python.diagnosis.common.diagnosis_action import DiagnosisAction
from dlrover.python.diagnosis.common.inference_chain import Inference


def coordinate_inferences(observations: List[Inference]) -> DiagnoseAction:
return DiagnoseAction()
def coordinate_inferences(observations: List[Inference]) -> DiagnosisAction:
return DiagnosisAction()
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
import re
import sys
from typing import Dict, List, Tuple

from dlrover.python.common.global_context import Context
from dlrover.python.common.log import default_logger as logger
from dlrover.python.diagnosis.common.constants import DiagnosisDataType
from dlrover.python.diagnosis.common.diagnosis_data import DiagnosisData
from dlrover.python.diagnosis.common.inference_chain import (
Expand All @@ -24,6 +28,7 @@
)

HANG_METRIC_PREFIX = "XPU_TIMER_COMMON_HANG"
_dlrover_ctx = Context.singleton_instance()


class CheckTrainingHangOperator(InferenceOperator):
Expand All @@ -46,7 +51,14 @@ def is_compatible(self, inference: Inference) -> bool:
return False

def infer(self, inferences: List[Inference]) -> List[Inference]:
if not self.data_manager:
if (
not self.data_manager
or not self.data_manager.with_runtime_context()
):
logger.info(
"Skip training-hang inference for there is "
"no diagnosis data reference."
)
return [
Inference(
name=InferenceName.TRAINING,
Expand All @@ -60,6 +72,7 @@ def infer(self, inferences: List[Inference]) -> List[Inference]:
)

if diagnosis_data and self.is_hang(diagnosis_data):
logger.warning("Training might hanged.")
return [
Inference(
name=InferenceName.TRAINING,
Expand All @@ -77,17 +90,112 @@ def infer(self, inferences: List[Inference]) -> List[Inference]:
]

def is_hang(self, diagnosis_data: List[DiagnosisData]):
hang_metric = []
logger.info(
"Hang detection start using diagnosis data, "
f"data number: {len(diagnosis_data)}, "
f"data size: {sys.getsizeof(diagnosis_data)}."
)
worker_hang_metric: Dict[int, List[Tuple[int, bool]]] = {}
if not diagnosis_data:
return False

for data in diagnosis_data:
# filter hang metric
each_metric = [
line
for line in data.data_content.splitlines()
if line.startswith(HANG_METRIC_PREFIX)
]
hang_metric.append(each_metric)

# TODO: implement the judgement
# if all local rank is hanged, tag worker hang
rank_hang_size = 0
is_worker_hang = False
for each_rank_metric in each_metric:
match = re.search(r"(\d+)(?!.*\d)", each_rank_metric)
if match and match.group(0) == "1":
rank_hang_size += 1
if rank_hang_size == len(each_metric):
is_worker_hang = True

if data.node_rank not in worker_hang_metric:
worker_hang_metric[data.node_rank] = []
worker_hang_metric[data.node_rank].append(
(data.timestamp, is_worker_hang)
)

# hang detection rules:
# 1. 100% worker got hang metric
# 2. last for 5+ minutes
hang_id, hang_last = self._find_hang_intersection(worker_hang_metric)
hang_last_threshold = self._get_hang_time_last_threshold()
if hang_id != -1 and hang_last > hang_last_threshold:
logger.info(
f"Got hang worker: {hang_id}, time last: {hang_last}, "
f"threshold: {hang_last_threshold}"
)
if _dlrover_ctx.hang_detection == 1:
# TODO
pass
elif _dlrover_ctx.hang_detection == 2:
# TODO
pass
return True

return False

def _get_hang_time_last_threshold(self):
# set 5 minutes for now(second)
return 5 * 60

def _find_hang_intersection(
self, worker_hang_metric: Dict[int, List[Tuple[int, bool]]]
) -> Tuple[int, int]:
"""
Require all workers hang from latest and find the hang intersection.

Args:
worker_hang_metric (Dict[int, List[Tuple[int, bool]]]): Input
BalaBalaYi marked this conversation as resolved.
Show resolved Hide resolved
metric in format: node_id: [(timestamp, is_hang), ...]

Returns:
The hang intersection's id and time last in tuple format.
"""

worker_hang_length_min = 0
worker_hang_id = -1

# find the intersection from latest
for worker_id, tuple_list in worker_hang_metric.items():
# sorted by timestamp
tuple_list.sort(key=lambda x: x[0])
worker_hang_length = 0

for tuple_item in reversed(tuple_list):
if tuple_item[1]:
worker_hang_length += 1
else:
break

if worker_hang_length > 0:
if worker_hang_length_min == 0:
worker_hang_length_min = worker_hang_length
worker_hang_id = worker_id
elif worker_hang_length < worker_hang_length_min:
worker_hang_length_min = worker_hang_length
worker_hang_id = worker_id
else:
# there is normal worker
return -1, -1

# get the intersection's time last
if worker_hang_id != -1 and worker_hang_length_min != 0:
hang_worker_metric = worker_hang_metric[worker_hang_id]
time_last = (
hang_worker_metric[len(hang_worker_metric) - 1][0]
- hang_worker_metric[
len(hang_worker_metric) - worker_hang_length_min
][0]
)
return worker_hang_id, time_last

return -1, -1
10 changes: 5 additions & 5 deletions dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from dlrover.python.common.singleton import Singleton
from dlrover.python.common.worker import WorkerContext
from dlrover.python.diagnosis.common.constants import (
DiagnosisAction,
DiagnosisActionType,
DiagnosisConstant,
InferenceConfigKey,
)
from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction
from dlrover.python.diagnosis.common.diagnosis_action import DiagnosisAction
from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric
from dlrover.python.diagnosis.common.inference_chain import (
Inference,
Expand Down Expand Up @@ -105,7 +105,7 @@ def _observe(self) -> List[Inference]:

def _diagnose_observations(
self, observations: List[Inference]
) -> DiagnoseAction:
) -> DiagnosisAction:
conclusions: List[Inference] = []
for ob in observations:
ic = InferenceChain([ob], self._diagnosis_operators)
Expand Down Expand Up @@ -165,7 +165,7 @@ def diagnose_training_failure(self, worker_context: WorkerContext) -> str:
f"{worker_context.worker_spec.max_restarts} "
f"attempts left; will restart worker group."
)
return DiagnosisAction.RESTART_WORKER
return DiagnosisActionType.RESTART_WORKER
else:
logger.info(
f"[{worker_context.worker_spec.role}] Worker group "
Expand All @@ -174,7 +174,7 @@ def diagnose_training_failure(self, worker_context: WorkerContext) -> str:
f"no attempts({worker_context.worker_spec.max_restarts}) "
"left; will relaunch."
)
return DiagnosisAction.RELAUNCH_WORKER
return DiagnosisActionType.RELAUNCH_WORKER

def _report_failure_to_master(
self, failures: Dict[int, ProcessFailure], restart_count: int
Expand Down
Loading
Loading