From 74aa8ebc64976f6aa2c0cdb83f5a50d5d3961bd1 Mon Sep 17 00:00:00 2001 From: bsang Date: Thu, 10 Oct 2024 21:43:05 +0800 Subject: [PATCH 01/11] refactor diagnosis agent --- .../diagnosis/common/diagnose_action.py | 9 ++ .../diagnosis/common/inference_chain.py | 10 +-- .../diagnosis/inferencechain/coordinator.py | 14 +++ .../collect_metrics_operator.py | 51 +++++++++++ .../inferenceoperator/operator.py | 11 +++ .../diagnosis/diagnosis_agent.py | 54 +++++++++--- dlrover/python/tests/test_diagnosis_agent.py | 65 -------------- .../tests/test_diagnosis_data_collector.py | 87 +++++++++++++++++++ dlrover/python/tests/test_inference_chain.py | 38 +++++++- 9 files changed, 256 insertions(+), 83 deletions(-) create mode 100644 dlrover/python/diagnosis/common/diagnose_action.py create mode 100644 dlrover/python/diagnosis/inferencechain/coordinator.py create mode 100644 dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py create mode 100644 dlrover/python/tests/test_diagnosis_data_collector.py diff --git a/dlrover/python/diagnosis/common/diagnose_action.py b/dlrover/python/diagnosis/common/diagnose_action.py new file mode 100644 index 000000000..56635bb9f --- /dev/null +++ b/dlrover/python/diagnosis/common/diagnose_action.py @@ -0,0 +1,9 @@ +from typing import List + + +class DiagnoseAction: + def __init__(self): + self.actions: List[str] = [] + + def add_action(self, action: str): + self.actions.append(action) diff --git a/dlrover/python/diagnosis/common/inference_chain.py b/dlrover/python/diagnosis/common/inference_chain.py index 8587b9507..1a3d1723b 100644 --- a/dlrover/python/diagnosis/common/inference_chain.py +++ b/dlrover/python/diagnosis/common/inference_chain.py @@ -20,17 +20,20 @@ class InferenceName: END = "end" TRAINING = "training" NODE = "node" + WORKER = "worker" class InferenceAttribute: ISORNOT = "is_or_not" IS = "is" NOT = "not" + COLLECT = "collect" class InferenceDescription: HANG = "hang" FAILURE = "failure" + METRICS = "metrics" @dataclass @@ -92,12 +95,7 @@ def combine_inferences( ) -> List[Inference]: inferences = [] for inference2 in inferences2: - is_duplicate = False - for inference1 in inferences1: - if is_same_inference(inference1, inference2): - is_duplicate = True - break - if not is_duplicate: + if not is_inference_included(inferences1, inference2): inferences.append(inference2) for inference1 in inferences1: diff --git a/dlrover/python/diagnosis/inferencechain/coordinator.py b/dlrover/python/diagnosis/inferencechain/coordinator.py new file mode 100644 index 000000000..776a6bdf3 --- /dev/null +++ b/dlrover/python/diagnosis/inferencechain/coordinator.py @@ -0,0 +1,14 @@ +from typing import List +from dlrover.python.diagnosis.common.inference_chain import ( + Inference, +) +from dlrover.python.diagnosis.common.constants import ( + DiagnosisAction as DiagnosisActionConstant, +) +from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction + + +def coordinate_inferences(inferences: List[Inference]) -> DiagnoseAction: + action = DiagnoseAction() + action.add_action(DiagnosisActionConstant.NO_ACTION) + return action diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py new file mode 100644 index 000000000..965ed8d8b --- /dev/null +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py @@ -0,0 +1,51 @@ +from typing import List + +from dlrover.python.diagnosis.common.constants import DiagnosisDataType +from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric +from dlrover.python.diagnosis.common.inference_chain import ( + Inference, + InferenceAttribute, + InferenceDescription, + InferenceName, + InferenceOperator, +) +from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( + XpuTimerMetricsCollector, +) +from dlrover.python.elastic_agent.master_client import MasterClient +from dlrover.python.common import env_utils + + +class CollectMetricsOperator(InferenceOperator): + """ + CollectXPUTimerMetricsOperator is the operator to collect XPU timer metrics. + """ + + def __init__(self): + super().__init__(None) + self._xpu_timer_collector = XpuTimerMetricsCollector() + self._client = MasterClient.singleton_instance() + + def is_compatible(self, inference: Inference) -> bool: + if ( + inference.name == InferenceName.WORKER + and inference.attribution == InferenceAttribute.COLLECT + and inference.description == InferenceDescription.METRICS + ): + return True + else: + return False + + def infer(self, inferences: List[Inference]) -> List[Inference]: + xpu_timer_metric = self._xpu_timer_collector.collect_data() + if xpu_timer_metric: + agent_xpu_metric = WorkerTrainingMetric( + data_type=DiagnosisDataType.XPU_TIMER_METRIC, + data_content=xpu_timer_metric, + node_id=env_utils.get_node_id(), + node_type=env_utils.get_node_type(), + node_rank=env_utils.get_node_rank(), + ) + self._client.report_diagnosis_agent_metrics(agent_xpu_metric) + + return [] \ No newline at end of file diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py index ad0933aac..5ad43765d 100644 --- a/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py @@ -14,7 +14,18 @@ from dlrover.python.diagnosis.inferencechain.inferenceoperator.check_failure_node_operator import ( # noqa: E501 CheckFailureNodeOperator, ) +from dlrover.python.diagnosis.inferencechain.inferenceoperator.collect_metrics_operator import ( # noqa: E501 + CollectMetricsOperator, +) def get_training_failure_operators(): return [CheckFailureNodeOperator()] + + +def get_worker_observe_operators(): + return [CollectMetricsOperator()] + + +def get_worker_diagnosis_operators(): + return [] diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index b2ca6dbc8..1f3d0e66a 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -15,7 +15,7 @@ import threading import time from datetime import datetime -from typing import Dict +from typing import Dict, List from torch.distributed.elastic.multiprocessing.errors import ProcessFailure @@ -38,6 +38,7 @@ InferenceDescription, InferenceName, is_inference_included, + combine_inferences, ) from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( XpuTimerMetricsCollector, @@ -45,10 +46,14 @@ from dlrover.python.diagnosis.inferencechain.inference_chain import ( InferenceChain, ) +from dlrover.python.diagnosis.inferencechain.coordinator import coordinate_inferences from dlrover.python.diagnosis.inferencechain.inferenceoperator.operator import ( # noqa: E501 get_training_failure_operators, + get_worker_observe_operators, + get_worker_diagnosis_operators, ) from dlrover.python.elastic_agent.master_client import MasterClient +from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction class DiagnosisAgent(Singleton): @@ -58,6 +63,15 @@ def __init__(self, training_log_file: str, errors: str): self._errors = errors self._xpu_timer_metric_collector = XpuTimerMetricsCollector() self._stopped = False + self._observe_problems: List[Inference] = [ + Inference( + name=InferenceName.WORKER, + attribution=InferenceAttribute.COLLECT, + description=InferenceDescription.METRICS, + ), + ] + self._observe_operators = get_worker_observe_operators() + self._diagnosis_operators = get_worker_diagnosis_operators() self.start() @@ -81,6 +95,30 @@ def start(self): def stop(self): self._stopped = True + def _observe(self) -> List[Inference]: + observations: List[Inference] = [] + for problem in self._observe_problems: + ic = InferenceChain([problem], self._observe_operators) + try: + infs = ic.infer() + if len(infs) > 0: + observations = combine_inferences(observations, infs) + except Exception as e: + logger.error(f"fail to observe problem {problem}: {e}") + return observations + + def _diagnose_observations(self, observations: List[Inference]) -> DiagnoseAction: + conclusions: List[Inference] = [] + for ob in observations: + ic = InferenceChain([ob], self._diagnosis_operators) + try: + infs = ic.infer() + if len(infs) > 0: + conclusions = combine_inferences(conclusions, infs) + except Exception as e: + logger.error(f"fail to diagnose observation {ob}: {e}") + return coordinate_inferences(conclusions) + def _periodically_diagnosis(self): logger.info("Start periodically diagnosis...") while True: @@ -88,16 +126,10 @@ def _periodically_diagnosis(self): logger.info("Stop periodically diagnosis.") break - xpu_timer_metric = self._xpu_timer_metric_collector.collect_data() - if xpu_timer_metric: - agent_xpu_metric = WorkerTrainingMetric( - data_type=DiagnosisDataType.XPU_TIMER_METRIC, - data_content=xpu_timer_metric, - node_id=env_utils.get_node_id(), - node_type=env_utils.get_node_type(), - node_rank=env_utils.get_node_rank(), - ) - self._report_metric_to_master(agent_xpu_metric) + observations = self._observe() + if len(observations) > 0: + logger.info(f"Observed problems: {observations}") + self._diagnose_observations(observations) time.sleep( DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index 83cd5be83..26995dfe0 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -23,16 +23,8 @@ from dlrover.python.common.worker import WorkerContext from dlrover.python.diagnosis.common.constants import ( DiagnosisAction, - DiagnosisDataType, - EnvConfigKey, ) from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric -from dlrover.python.diagnosis.datacollector.training_log_collector import ( - TrainingLogCollector, -) -from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( - XpuTimerMetricsCollector, -) from dlrover.python.elastic_agent.diagnosis.diagnosis_agent import ( DiagnosisAgent, ) @@ -109,63 +101,6 @@ def test_diagnose_training(self): action = agent.diagnose_training_failure(wc) self.assertEqual(action, DiagnosisAction.RESTART_WORKER) - @patch( - "dlrover.python.diagnosis.datacollector.training_log_collector" - ".read_last_n_lines" - ) - def test_log_collect(self, mock_file_util): - mock_file_util.return_value = [ - "test0", - "DLRover agent started with:", - "test1", - ] - training_log_collector = TrainingLogCollector( - log_file="test", n_line=3 - ) - self.assertTrue(training_log_collector.is_enabled()) - result = training_log_collector.collect_data() - self.assertTrue("test0" not in result.logs) - self.assertTrue("test1" in result.logs) - - def test_xpu_timer_metric_collect(self): - collector = XpuTimerMetricsCollector() - self.assertFalse(collector.is_enabled()) - - env_utils.set_env(EnvConfigKey.XPU_TIMER_PORT, 18889) - collector = XpuTimerMetricsCollector() - self.assertTrue(collector.is_enabled()) - - self.assertEqual(collector.collect_data(), "") - - file = "data/xpu_timer_metrics" - file_path = os.path.join(os.path.dirname(__file__), file) - with open(file_path, "r", encoding="utf-8") as file: - test_metrics = file.read() - result = collector._preprocess_metrics(test_metrics) - self.assertTrue(result) - if "#" in result or "exposer" in result: - self.fail() - - env_utils.set_env(NodeEnv.NODE_ID, 1) - env_utils.set_env(NodeEnv.NODE_TYPE, NodeType.WORKER) - env_utils.set_env(NodeEnv.NODE_RANK, 1) - agent_xpu_metric = WorkerTrainingMetric( - data_type=DiagnosisDataType.XPU_TIMER_METRIC, - data_content=result, - node_id=env_utils.get_node_id(), - node_type=env_utils.get_node_type(), - node_rank=env_utils.get_node_rank(), - ) - self.assertEqual( - agent_xpu_metric.data_type, - DiagnosisDataType.XPU_TIMER_METRIC, - ) - self.assertEqual(agent_xpu_metric.data_content, result) - self.assertEqual(agent_xpu_metric.node_id, 1) - self.assertEqual(agent_xpu_metric.node_type, NodeType.WORKER) - self.assertEqual(agent_xpu_metric.node_rank, 1) - self.assertTrue(agent_xpu_metric.timestamp > 0) - def test_worker_training_metric(self): test = WorkerTrainingMetric( data_content="test123", diff --git a/dlrover/python/tests/test_diagnosis_data_collector.py b/dlrover/python/tests/test_diagnosis_data_collector.py new file mode 100644 index 000000000..29376b5b6 --- /dev/null +++ b/dlrover/python/tests/test_diagnosis_data_collector.py @@ -0,0 +1,87 @@ +import unittest +from dlrover.python.tests.test_utils import start_local_master +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) +from unittest.mock import patch +from dlrover.python.diagnosis.datacollector.training_log_collector import ( + TrainingLogCollector, +) +from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( + XpuTimerMetricsCollector, +) +import os +from dlrover.python.common import env_utils +from dlrover.python.diagnosis.common.constants import ( + DiagnosisDataType, + EnvConfigKey, +) +from dlrover.python.common.constants import NodeEnv, NodeType +from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric + + +class TestDiagnosisDataCollector(unittest.TestCase): + def setUp(self): + self.master_proc, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) + + def tearDown(self): + os.environ.clear() + + @patch( + "dlrover.python.diagnosis.datacollector.training_log_collector" + ".read_last_n_lines" + ) + def test_training_log_collector(self, mock_file_util): + mock_file_util.return_value = [ + "test0", + "DLRover agent started with:", + "test1", + ] + training_log_collector = TrainingLogCollector( + log_file="test", n_line=3 + ) + self.assertTrue(training_log_collector.is_enabled()) + result = training_log_collector.collect_data() + self.assertTrue("test0" not in result.logs) + self.assertTrue("test1" in result.logs) + + def test_xpu_timer_metric_collector(self): + collector = XpuTimerMetricsCollector() + self.assertFalse(collector.is_enabled()) + + env_utils.set_env(EnvConfigKey.XPU_TIMER_PORT, 18889) + collector = XpuTimerMetricsCollector() + self.assertTrue(collector.is_enabled()) + + self.assertEqual(collector.collect_data(), "") + + file = "data/xpu_timer_metrics" + file_path = os.path.join(os.path.dirname(__file__), file) + with open(file_path, "r", encoding="utf-8") as file: + test_metrics = file.read() + result = collector._preprocess_metrics(test_metrics) + self.assertTrue(result) + if "#" in result or "exposer" in result: + self.fail() + + env_utils.set_env(NodeEnv.NODE_ID, 1) + env_utils.set_env(NodeEnv.NODE_TYPE, NodeType.WORKER) + env_utils.set_env(NodeEnv.NODE_RANK, 1) + agent_xpu_metric = WorkerTrainingMetric( + data_type=DiagnosisDataType.XPU_TIMER_METRIC, + data_content=result, + node_id=env_utils.get_node_id(), + node_type=env_utils.get_node_type(), + node_rank=env_utils.get_node_rank(), + ) + self.assertEqual( + agent_xpu_metric.data_type, + DiagnosisDataType.XPU_TIMER_METRIC, + ) + self.assertEqual(agent_xpu_metric.data_content, result) + self.assertEqual(agent_xpu_metric.node_id, 1) + self.assertEqual(agent_xpu_metric.node_type, NodeType.WORKER) + self.assertEqual(agent_xpu_metric.node_rank, 1) + self.assertTrue(agent_xpu_metric.timestamp > 0) \ No newline at end of file diff --git a/dlrover/python/tests/test_inference_chain.py b/dlrover/python/tests/test_inference_chain.py index 5a5124997..a89209e8b 100644 --- a/dlrover/python/tests/test_inference_chain.py +++ b/dlrover/python/tests/test_inference_chain.py @@ -13,6 +13,7 @@ import os import unittest +from unittest.mock import patch from dlrover.python.diagnosis.common.constants import InferenceConfigKey from dlrover.python.diagnosis.common.inference_chain import ( @@ -31,11 +32,25 @@ from dlrover.python.diagnosis.inferencechain.inferenceoperator.check_training_hang_operator import ( # noqa: E501 CheckTrainingHangOperator, ) +from dlrover.python.diagnosis.inferencechain.inferenceoperator.collect_metrics_operator import ( # noqa: E501 + CollectMetricsOperator, +) +from dlrover.python.tests.test_utils import start_local_master +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, +) +from dlrover.python.common import env_utils +from dlrover.python.diagnosis.common.constants import ( + EnvConfigKey, +) +from dlrover.python.common.constants import NodeEnv, NodeType class InferenceChainTest(unittest.TestCase): def setUp(self): - pass + self.master_proc, self.addr = start_local_master() + MasterClient._instance = build_master_client(self.addr, 1) def tearDown(self): pass @@ -127,6 +142,27 @@ def test_inference_chain(self): ) self.assertTrue(is_same_inference(results[0], failure_inf)) + @patch( + "dlrover.python.diagnosis.datacollector.xpu_timer_metrics_collector.XpuTimerMetricsCollector" + ".collect_metrics" + ) + def test_collect_metrics_operator(self, mock_collector): + mock_collector.return_value = "data" + operator = CollectMetricsOperator() + inf = Inference( + name=InferenceName.WORKER, + attribution=InferenceAttribute.COLLECT, + description=InferenceDescription.METRICS, + ) + self.assertTrue(operator.is_compatible(inf)) + + env_utils.set_env(EnvConfigKey.XPU_TIMER_PORT, 18889) + env_utils.set_env(NodeEnv.NODE_ID, 1) + env_utils.set_env(NodeEnv.NODE_TYPE, NodeType.WORKER) + env_utils.set_env(NodeEnv.NODE_RANK, 1) + infs = operator.infer([]) + self.assertEqual(len(infs), 0) + if __name__ == "__main__": unittest.main() From 09584fca3d42ea86a4e2091385e43151f1785350 Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 10:36:03 +0800 Subject: [PATCH 02/11] update --- .../diagnosis/common/diagnose_action.py | 13 ++++++ .../diagnosis/inferencechain/coordinator.py | 18 +++++++-- .../collect_metrics_operator.py | 26 +++++++++--- .../diagnosis/diagnosis_agent.py | 18 ++++----- dlrover/python/tests/test_diagnosis_agent.py | 7 +--- .../tests/test_diagnosis_data_collector.py | 40 +++++++++++++------ dlrover/python/tests/test_inference_chain.py | 18 ++++----- 7 files changed, 94 insertions(+), 46 deletions(-) diff --git a/dlrover/python/diagnosis/common/diagnose_action.py b/dlrover/python/diagnosis/common/diagnose_action.py index 56635bb9f..1557bcb12 100644 --- a/dlrover/python/diagnosis/common/diagnose_action.py +++ b/dlrover/python/diagnosis/common/diagnose_action.py @@ -1,3 +1,16 @@ +# Copyright 2024 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from typing import List diff --git a/dlrover/python/diagnosis/inferencechain/coordinator.py b/dlrover/python/diagnosis/inferencechain/coordinator.py index 776a6bdf3..8bbc9bac0 100644 --- a/dlrover/python/diagnosis/inferencechain/coordinator.py +++ b/dlrover/python/diagnosis/inferencechain/coordinator.py @@ -1,11 +1,23 @@ +# Copyright 2024 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from typing import List -from dlrover.python.diagnosis.common.inference_chain import ( - Inference, -) + from dlrover.python.diagnosis.common.constants import ( DiagnosisAction as DiagnosisActionConstant, ) from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction +from dlrover.python.diagnosis.common.inference_chain import Inference def coordinate_inferences(inferences: List[Inference]) -> DiagnoseAction: diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py index 965ed8d8b..51583f537 100644 --- a/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py @@ -1,5 +1,19 @@ +# Copyright 2024 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from typing import List +from dlrover.python.common import env_utils from dlrover.python.diagnosis.common.constants import DiagnosisDataType from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric from dlrover.python.diagnosis.common.inference_chain import ( @@ -13,12 +27,12 @@ XpuTimerMetricsCollector, ) from dlrover.python.elastic_agent.master_client import MasterClient -from dlrover.python.common import env_utils class CollectMetricsOperator(InferenceOperator): """ - CollectXPUTimerMetricsOperator is the operator to collect XPU timer metrics. + CollectXPUTimerMetricsOperator is the operator to collect + XPU timer metrics. """ def __init__(self): @@ -28,9 +42,9 @@ def __init__(self): def is_compatible(self, inference: Inference) -> bool: if ( - inference.name == InferenceName.WORKER - and inference.attribution == InferenceAttribute.COLLECT - and inference.description == InferenceDescription.METRICS + inference.name == InferenceName.WORKER + and inference.attribution == InferenceAttribute.COLLECT + and inference.description == InferenceDescription.METRICS ): return True else: @@ -48,4 +62,4 @@ def infer(self, inferences: List[Inference]) -> List[Inference]: ) self._client.report_diagnosis_agent_metrics(agent_xpu_metric) - return [] \ No newline at end of file + return [] diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index 1f3d0e66a..7b1619829 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -19,7 +19,6 @@ from torch.distributed.elastic.multiprocessing.errors import ProcessFailure -from dlrover.python.common import env_utils from dlrover.python.common.constants import TrainingExceptionLevel from dlrover.python.common.error import ProcessError from dlrover.python.common.log import default_logger as logger @@ -28,32 +27,30 @@ from dlrover.python.diagnosis.common.constants import ( DiagnosisAction, DiagnosisConstant, - DiagnosisDataType, InferenceConfigKey, ) +from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric from dlrover.python.diagnosis.common.inference_chain import ( Inference, InferenceAttribute, InferenceDescription, InferenceName, - is_inference_included, combine_inferences, + is_inference_included, ) -from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( - XpuTimerMetricsCollector, +from dlrover.python.diagnosis.inferencechain.coordinator import ( + coordinate_inferences, ) from dlrover.python.diagnosis.inferencechain.inference_chain import ( InferenceChain, ) -from dlrover.python.diagnosis.inferencechain.coordinator import coordinate_inferences from dlrover.python.diagnosis.inferencechain.inferenceoperator.operator import ( # noqa: E501 get_training_failure_operators, - get_worker_observe_operators, get_worker_diagnosis_operators, + get_worker_observe_operators, ) from dlrover.python.elastic_agent.master_client import MasterClient -from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction class DiagnosisAgent(Singleton): @@ -61,7 +58,6 @@ def __init__(self, training_log_file: str, errors: str): self._client = MasterClient.singleton_instance() self._training_log_file = training_log_file self._errors = errors - self._xpu_timer_metric_collector = XpuTimerMetricsCollector() self._stopped = False self._observe_problems: List[Inference] = [ Inference( @@ -107,7 +103,9 @@ def _observe(self) -> List[Inference]: logger.error(f"fail to observe problem {problem}: {e}") return observations - def _diagnose_observations(self, observations: List[Inference]) -> DiagnoseAction: + def _diagnose_observations( + self, observations: List[Inference] + ) -> DiagnoseAction: conclusions: List[Inference] = [] for ob in observations: ic = InferenceChain([ob], self._diagnosis_operators) diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index 26995dfe0..12aca35ba 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -13,17 +13,14 @@ import os import unittest -from unittest.mock import patch from torch.distributed.elastic.agent.server.api import RunResult, WorkerState from torch.distributed.launcher.api import LaunchConfig from dlrover.python.common import env_utils -from dlrover.python.common.constants import NodeEnv, NodeType, RendezvousName +from dlrover.python.common.constants import RendezvousName from dlrover.python.common.worker import WorkerContext -from dlrover.python.diagnosis.common.constants import ( - DiagnosisAction, -) +from dlrover.python.diagnosis.common.constants import DiagnosisAction from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric from dlrover.python.elastic_agent.diagnosis.diagnosis_agent import ( DiagnosisAgent, diff --git a/dlrover/python/tests/test_diagnosis_data_collector.py b/dlrover/python/tests/test_diagnosis_data_collector.py index 29376b5b6..7a6eb10e0 100644 --- a/dlrover/python/tests/test_diagnosis_data_collector.py +++ b/dlrover/python/tests/test_diagnosis_data_collector.py @@ -1,24 +1,38 @@ +# Copyright 2024 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os import unittest -from dlrover.python.tests.test_utils import start_local_master -from dlrover.python.elastic_agent.master_client import ( - MasterClient, - build_master_client, -) from unittest.mock import patch + +from dlrover.python.common import env_utils +from dlrover.python.common.constants import NodeEnv, NodeType +from dlrover.python.diagnosis.common.constants import ( + DiagnosisDataType, + EnvConfigKey, +) +from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric from dlrover.python.diagnosis.datacollector.training_log_collector import ( TrainingLogCollector, ) from dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector import ( XpuTimerMetricsCollector, ) -import os -from dlrover.python.common import env_utils -from dlrover.python.diagnosis.common.constants import ( - DiagnosisDataType, - EnvConfigKey, +from dlrover.python.elastic_agent.master_client import ( + MasterClient, + build_master_client, ) -from dlrover.python.common.constants import NodeEnv, NodeType -from dlrover.python.diagnosis.common.diagnosis_data import WorkerTrainingMetric +from dlrover.python.tests.test_utils import start_local_master class TestDiagnosisDataCollector(unittest.TestCase): @@ -84,4 +98,4 @@ def test_xpu_timer_metric_collector(self): self.assertEqual(agent_xpu_metric.node_id, 1) self.assertEqual(agent_xpu_metric.node_type, NodeType.WORKER) self.assertEqual(agent_xpu_metric.node_rank, 1) - self.assertTrue(agent_xpu_metric.timestamp > 0) \ No newline at end of file + self.assertTrue(agent_xpu_metric.timestamp > 0) diff --git a/dlrover/python/tests/test_inference_chain.py b/dlrover/python/tests/test_inference_chain.py index a89209e8b..0e7ad5371 100644 --- a/dlrover/python/tests/test_inference_chain.py +++ b/dlrover/python/tests/test_inference_chain.py @@ -15,7 +15,12 @@ import unittest from unittest.mock import patch -from dlrover.python.diagnosis.common.constants import InferenceConfigKey +from dlrover.python.common import env_utils +from dlrover.python.common.constants import NodeEnv, NodeType +from dlrover.python.diagnosis.common.constants import ( + EnvConfigKey, + InferenceConfigKey, +) from dlrover.python.diagnosis.common.inference_chain import ( Inference, InferenceAttribute, @@ -35,16 +40,11 @@ from dlrover.python.diagnosis.inferencechain.inferenceoperator.collect_metrics_operator import ( # noqa: E501 CollectMetricsOperator, ) -from dlrover.python.tests.test_utils import start_local_master from dlrover.python.elastic_agent.master_client import ( MasterClient, build_master_client, ) -from dlrover.python.common import env_utils -from dlrover.python.diagnosis.common.constants import ( - EnvConfigKey, -) -from dlrover.python.common.constants import NodeEnv, NodeType +from dlrover.python.tests.test_utils import start_local_master class InferenceChainTest(unittest.TestCase): @@ -143,8 +143,8 @@ def test_inference_chain(self): self.assertTrue(is_same_inference(results[0], failure_inf)) @patch( - "dlrover.python.diagnosis.datacollector.xpu_timer_metrics_collector.XpuTimerMetricsCollector" - ".collect_metrics" + "dlrover.python.diagnosis.datacollector.xpu_timer_metric_collector" + ".XpuTimerMetricsCollector.collect_data" ) def test_collect_metrics_operator(self, mock_collector): mock_collector.return_value = "data" From 09453800a3afaba4877886bf6a6b4f44d9cd24dd Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 10:39:27 +0800 Subject: [PATCH 03/11] update --- dlrover/python/diagnosis/inferencechain/coordinator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dlrover/python/diagnosis/inferencechain/coordinator.py b/dlrover/python/diagnosis/inferencechain/coordinator.py index 8bbc9bac0..300fbfcb4 100644 --- a/dlrover/python/diagnosis/inferencechain/coordinator.py +++ b/dlrover/python/diagnosis/inferencechain/coordinator.py @@ -20,7 +20,8 @@ from dlrover.python.diagnosis.common.inference_chain import Inference -def coordinate_inferences(inferences: List[Inference]) -> DiagnoseAction: +def coordinate_inferences(observations: List[Inference]) -> DiagnoseAction: action = DiagnoseAction() - action.add_action(DiagnosisActionConstant.NO_ACTION) + if len(observations) == 0: + action.add_action(DiagnosisActionConstant.NO_ACTION) return action From 388f8dfa3d9384e599a33f6f661c49ac2470ceaf Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 15:47:06 +0800 Subject: [PATCH 04/11] update --- dlrover/python/diagnosis/inferencechain/coordinator.py | 8 +------- dlrover/python/tests/test_diagnosis_agent.py | 2 ++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/dlrover/python/diagnosis/inferencechain/coordinator.py b/dlrover/python/diagnosis/inferencechain/coordinator.py index 300fbfcb4..07cb70326 100644 --- a/dlrover/python/diagnosis/inferencechain/coordinator.py +++ b/dlrover/python/diagnosis/inferencechain/coordinator.py @@ -13,15 +13,9 @@ from typing import List -from dlrover.python.diagnosis.common.constants import ( - DiagnosisAction as DiagnosisActionConstant, -) from dlrover.python.diagnosis.common.diagnose_action import DiagnoseAction from dlrover.python.diagnosis.common.inference_chain import Inference def coordinate_inferences(observations: List[Inference]) -> DiagnoseAction: - action = DiagnoseAction() - if len(observations) == 0: - action.add_action(DiagnosisActionConstant.NO_ACTION) - return action + return DiagnoseAction() diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index 12aca35ba..657fb641a 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -98,6 +98,8 @@ def test_diagnose_training(self): action = agent.diagnose_training_failure(wc) self.assertEqual(action, DiagnosisAction.RESTART_WORKER) + agent.stop() + def test_worker_training_metric(self): test = WorkerTrainingMetric( data_content="test123", From 4b5170191fb35ef6a1e1cf80290ae35e9bc052af Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 16:10:33 +0800 Subject: [PATCH 05/11] update --- dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py | 2 +- dlrover/python/tests/test_diagnosis_agent.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index 7b1619829..62cdd0cd3 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -69,7 +69,7 @@ def __init__(self, training_log_file: str, errors: str): self._observe_operators = get_worker_observe_operators() self._diagnosis_operators = get_worker_diagnosis_operators() - self.start() + # self.start() logger.info( "Initializing diagnosis agent with\n" diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index 657fb641a..fdf9636de 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -98,7 +98,11 @@ def test_diagnose_training(self): action = agent.diagnose_training_failure(wc) self.assertEqual(action, DiagnosisAction.RESTART_WORKER) - agent.stop() + # agent.stop() + # time.sleep( + # DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS + # ) + def test_worker_training_metric(self): test = WorkerTrainingMetric( From a6ededdf2d537778883ccd88a826e82fa1719639 Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 16:26:26 +0800 Subject: [PATCH 06/11] update --- .../diagnosis/diagnosis_agent.py | 2 +- dlrover/python/tests/test_diagnosis_agent.py | 5 --- dlrover/python/tests/test_master_client.py | 34 +++++++++---------- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index 62cdd0cd3..7b1619829 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -69,7 +69,7 @@ def __init__(self, training_log_file: str, errors: str): self._observe_operators = get_worker_observe_operators() self._diagnosis_operators = get_worker_diagnosis_operators() - # self.start() + self.start() logger.info( "Initializing diagnosis agent with\n" diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index fdf9636de..aa37ec037 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -98,11 +98,6 @@ def test_diagnose_training(self): action = agent.diagnose_training_failure(wc) self.assertEqual(action, DiagnosisAction.RESTART_WORKER) - # agent.stop() - # time.sleep( - # DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS - # ) - def test_worker_training_metric(self): test = WorkerTrainingMetric( diff --git a/dlrover/python/tests/test_master_client.py b/dlrover/python/tests/test_master_client.py index e24f296ff..c972e43a2 100644 --- a/dlrover/python/tests/test_master_client.py +++ b/dlrover/python/tests/test_master_client.py @@ -39,23 +39,23 @@ def test_open_channel(self): self._master_client.close_channel() self._master_client.open_channel() - def test_report_used_resource(self): - gpu_stats: list[grpc.GPUStats] = [ - grpc.GPUStats( - index=0, - total_memory_mb=24000, - used_memory_mb=4000, - gpu_utilization=55.5, - ) - ] - result = self._master_client.report_used_resource(1024, 10, gpu_stats) - self.assertTrue(result.success) - - def test_report_failures(self): - res = self._master_client.report_failures( - "test", 0, TrainingExceptionLevel.WARNING - ) - self.assertIsNone(res) + # def test_report_used_resource(self): + # gpu_stats: list[grpc.GPUStats] = [ + # grpc.GPUStats( + # index=0, + # total_memory_mb=24000, + # used_memory_mb=4000, + # gpu_utilization=55.5, + # ) + # ] + # result = self._master_client.report_used_resource(1024, 10, gpu_stats) + # self.assertTrue(result.success) + # + # def test_report_failures(self): + # res = self._master_client.report_failures( + # "test", 0, TrainingExceptionLevel.WARNING + # ) + # self.assertIsNone(res) def test_ready_for_ps_relaunch(self): res = self._master_client.ready_for_ps_relaunch() From 3c958228c6f1b5437b9745f82bb0875fa9d37756 Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 16:39:54 +0800 Subject: [PATCH 07/11] update --- dlrover/python/tests/test_diagnosis_agent.py | 4 +-- dlrover/python/tests/test_inference_chain.py | 6 ++-- dlrover/python/tests/test_master_client.py | 34 ++++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index aa37ec037..f72ee1194 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -38,7 +38,7 @@ class TestDiagnosisAgent(unittest.TestCase): def setUp(self): - self.master_proc, self.addr = start_local_master() + self._master, self.addr = start_local_master() MasterClient._instance = build_master_client(self.addr, 1) launch_config = LaunchConfig( min_nodes=1, @@ -51,6 +51,7 @@ def setUp(self): def tearDown(self): os.environ.clear() + self._master.stop() def test_diagnose_training(self): file = "data/training.log" @@ -98,7 +99,6 @@ def test_diagnose_training(self): action = agent.diagnose_training_failure(wc) self.assertEqual(action, DiagnosisAction.RESTART_WORKER) - def test_worker_training_metric(self): test = WorkerTrainingMetric( data_content="test123", diff --git a/dlrover/python/tests/test_inference_chain.py b/dlrover/python/tests/test_inference_chain.py index 0e7ad5371..2d0655734 100644 --- a/dlrover/python/tests/test_inference_chain.py +++ b/dlrover/python/tests/test_inference_chain.py @@ -49,11 +49,11 @@ class InferenceChainTest(unittest.TestCase): def setUp(self): - self.master_proc, self.addr = start_local_master() - MasterClient._instance = build_master_client(self.addr, 1) + self._master, self._addr = start_local_master() + MasterClient._instance = build_master_client(self._addr, 1) def tearDown(self): - pass + self._master.stop() def test_check_training_hang_operator(self): operator = CheckTrainingHangOperator(None) diff --git a/dlrover/python/tests/test_master_client.py b/dlrover/python/tests/test_master_client.py index c972e43a2..e24f296ff 100644 --- a/dlrover/python/tests/test_master_client.py +++ b/dlrover/python/tests/test_master_client.py @@ -39,23 +39,23 @@ def test_open_channel(self): self._master_client.close_channel() self._master_client.open_channel() - # def test_report_used_resource(self): - # gpu_stats: list[grpc.GPUStats] = [ - # grpc.GPUStats( - # index=0, - # total_memory_mb=24000, - # used_memory_mb=4000, - # gpu_utilization=55.5, - # ) - # ] - # result = self._master_client.report_used_resource(1024, 10, gpu_stats) - # self.assertTrue(result.success) - # - # def test_report_failures(self): - # res = self._master_client.report_failures( - # "test", 0, TrainingExceptionLevel.WARNING - # ) - # self.assertIsNone(res) + def test_report_used_resource(self): + gpu_stats: list[grpc.GPUStats] = [ + grpc.GPUStats( + index=0, + total_memory_mb=24000, + used_memory_mb=4000, + gpu_utilization=55.5, + ) + ] + result = self._master_client.report_used_resource(1024, 10, gpu_stats) + self.assertTrue(result.success) + + def test_report_failures(self): + res = self._master_client.report_failures( + "test", 0, TrainingExceptionLevel.WARNING + ) + self.assertIsNone(res) def test_ready_for_ps_relaunch(self): res = self._master_client.ready_for_ps_relaunch() From d594ee2a1a5dac3b686be39e1f01aa4d37d34fa9 Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 17:38:41 +0800 Subject: [PATCH 08/11] update --- dlrover/python/tests/test_diagnosis_agent.py | 1 - dlrover/python/tests/test_inference_chain.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index f72ee1194..c6770f677 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -51,7 +51,6 @@ def setUp(self): def tearDown(self): os.environ.clear() - self._master.stop() def test_diagnose_training(self): file = "data/training.log" diff --git a/dlrover/python/tests/test_inference_chain.py b/dlrover/python/tests/test_inference_chain.py index 2d0655734..49b5899bc 100644 --- a/dlrover/python/tests/test_inference_chain.py +++ b/dlrover/python/tests/test_inference_chain.py @@ -53,7 +53,7 @@ def setUp(self): MasterClient._instance = build_master_client(self._addr, 1) def tearDown(self): - self._master.stop() + os.environ.clear() def test_check_training_hang_operator(self): operator = CheckTrainingHangOperator(None) From bccb9e9db552b0527ce956223c27f4098986aa52 Mon Sep 17 00:00:00 2001 From: bsang Date: Fri, 11 Oct 2024 19:50:10 +0800 Subject: [PATCH 09/11] update --- dlrover/python/diagnosis/common/diagnose_action.py | 4 ++-- ...t_metrics_operator.py => metrics_collection_operator.py} | 2 +- .../diagnosis/inferencechain/inferenceoperator/operator.py | 6 +++--- dlrover/python/tests/test_inference_chain.py | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) rename dlrover/python/diagnosis/inferencechain/inferenceoperator/{collect_metrics_operator.py => metrics_collection_operator.py} (97%) diff --git a/dlrover/python/diagnosis/common/diagnose_action.py b/dlrover/python/diagnosis/common/diagnose_action.py index 1557bcb12..ea96de464 100644 --- a/dlrover/python/diagnosis/common/diagnose_action.py +++ b/dlrover/python/diagnosis/common/diagnose_action.py @@ -16,7 +16,7 @@ class DiagnoseAction: def __init__(self): - self.actions: List[str] = [] + self._actions: List[str] = [] def add_action(self, action: str): - self.actions.append(action) + self._actions.append(action) diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py similarity index 97% rename from dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py rename to dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py index 51583f537..3bde95610 100644 --- a/dlrover/python/diagnosis/inferencechain/inferenceoperator/collect_metrics_operator.py +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py @@ -29,7 +29,7 @@ from dlrover.python.elastic_agent.master_client import MasterClient -class CollectMetricsOperator(InferenceOperator): +class MetricsCollectionOperator(InferenceOperator): """ CollectXPUTimerMetricsOperator is the operator to collect XPU timer metrics. diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py index 5ad43765d..5f213873a 100644 --- a/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/operator.py @@ -14,8 +14,8 @@ from dlrover.python.diagnosis.inferencechain.inferenceoperator.check_failure_node_operator import ( # noqa: E501 CheckFailureNodeOperator, ) -from dlrover.python.diagnosis.inferencechain.inferenceoperator.collect_metrics_operator import ( # noqa: E501 - CollectMetricsOperator, +from dlrover.python.diagnosis.inferencechain.inferenceoperator.metrics_collection_operator import ( # noqa: E501 + MetricsCollectionOperator, ) @@ -24,7 +24,7 @@ def get_training_failure_operators(): def get_worker_observe_operators(): - return [CollectMetricsOperator()] + return [MetricsCollectionOperator()] def get_worker_diagnosis_operators(): diff --git a/dlrover/python/tests/test_inference_chain.py b/dlrover/python/tests/test_inference_chain.py index 49b5899bc..61a37c160 100644 --- a/dlrover/python/tests/test_inference_chain.py +++ b/dlrover/python/tests/test_inference_chain.py @@ -37,8 +37,8 @@ from dlrover.python.diagnosis.inferencechain.inferenceoperator.check_training_hang_operator import ( # noqa: E501 CheckTrainingHangOperator, ) -from dlrover.python.diagnosis.inferencechain.inferenceoperator.collect_metrics_operator import ( # noqa: E501 - CollectMetricsOperator, +from dlrover.python.diagnosis.inferencechain.inferenceoperator.metrics_collection_operator import ( # noqa: E501 + MetricsCollectionOperator, ) from dlrover.python.elastic_agent.master_client import ( MasterClient, @@ -148,7 +148,7 @@ def test_inference_chain(self): ) def test_collect_metrics_operator(self, mock_collector): mock_collector.return_value = "data" - operator = CollectMetricsOperator() + operator = MetricsCollectionOperator() inf = Inference( name=InferenceName.WORKER, attribution=InferenceAttribute.COLLECT, From cf40a3c514b961b4890e01d25d4359bc993a43a6 Mon Sep 17 00:00:00 2001 From: bsang Date: Sat, 12 Oct 2024 10:26:17 +0800 Subject: [PATCH 10/11] update comments --- .../inferenceoperator/metrics_collection_operator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py b/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py index 3bde95610..460195f5d 100644 --- a/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py +++ b/dlrover/python/diagnosis/inferencechain/inferenceoperator/metrics_collection_operator.py @@ -31,8 +31,8 @@ class MetricsCollectionOperator(InferenceOperator): """ - CollectXPUTimerMetricsOperator is the operator to collect - XPU timer metrics. + MetricsCollectionOperator is the operator to collect + worker diagnosis metrics. """ def __init__(self): From 2a1a3f52ac0d67ff6540302f046133f59988108e Mon Sep 17 00:00:00 2001 From: Tianyi Chen Date: Sat, 12 Oct 2024 11:11:47 +0800 Subject: [PATCH 11/11] add stale issue worker flow (#1292) * add stale issue worker flow * lint --- .github/workflows/stale.yml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .github/workflows/stale.yml diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml new file mode 100644 index 000000000..6a52d6e9a --- /dev/null +++ b/.github/workflows/stale.yml @@ -0,0 +1,20 @@ +name: Close stale issues + +on: + schedule: + - cron: '0 0 * * *' # run every day + +jobs: + stale: + runs-on: ubuntu-latest + steps: + - uses: actions/stale@v4 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + stale-issue-message: 'This issue has been automatically marked as stale because it has not had recent activity.' + close-issue-message: 'This issue is being automatically closed due to inactivity.' + days-before-stale: 90 + days-before-close: 7 + stale-pr-message: 'This pull request has been automatically marked as stale because it has not had recent activity.' + close-pr-message: 'This pull request is being automatically closed due to inactivity.' + stale-label: 'stale'