diff --git a/gcloud/core/apis/drf/serilaziers/template.py b/gcloud/core/apis/drf/serilaziers/template.py index fa42fa36b4..e743b3ed61 100644 --- a/gcloud/core/apis/drf/serilaziers/template.py +++ b/gcloud/core/apis/drf/serilaziers/template.py @@ -32,7 +32,7 @@ def get_notify_type(self, obj): return ( notify_type if isinstance(notify_type, dict) - else {"success": notify_type, "fail": notify_type, "pending_processing": []} + else {"success": notify_type, "fail": notify_type, "pending_processing": notify_type} ) except Exception as e: logger.exception(f"[get_notify_type] error: {e}") diff --git a/gcloud/taskflow3/celery/tasks.py b/gcloud/taskflow3/celery/tasks.py index 70689205a0..3f6c9de5b9 100644 --- a/gcloud/taskflow3/celery/tasks.py +++ b/gcloud/taskflow3/celery/tasks.py @@ -26,7 +26,6 @@ from gcloud.constants import CallbackStatus from gcloud.shortcuts.message import send_task_flow_message from gcloud.taskflow3.domains.callback import TaskCallBacker -from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher from gcloud.taskflow3.domains.dispatchers.node import NodeCommandDispatcher from gcloud.taskflow3.domains.node_timeout_strategy import node_timeout_handler from gcloud.taskflow3.models import ( @@ -44,7 +43,7 @@ @task -def send_taskflow_message(task_id, msg_type, node_name="", skip_if_not_status="", use_root=False): +def send_taskflow_message(task_id, msg_type, node_name="", use_root=False): try: taskflow = TaskFlowInstance.objects.get(id=task_id) if use_root and taskflow.is_child_taskflow: @@ -55,28 +54,6 @@ def send_taskflow_message(task_id, msg_type, node_name="", skip_if_not_status="" "send_task_flow_message[taskflow_id=%s] use root taskflow[id=%s] to send message", task_id, root_task_id ) - if skip_if_not_status: - # 满足某个具体状态才发通知 - dispatcher = TaskCommandDispatcher( - engine_ver=taskflow.engine_ver, - taskflow_id=taskflow.id, - pipeline_instance=taskflow.pipeline_instance, - project_id=taskflow.project_id, - ) - get_task_status_result = dispatcher.get_task_status(with_ex_data=False) - if get_task_status_result.get("result") and get_task_status_result["data"]["state"] == skip_if_not_status: - logger.info( - "send_task_flow_message[taskflow_id=%s] taskflow[id=%s] check status -> %s success.", - task_id, - taskflow.id, - skip_if_not_status, - ) - else: - raise ValueError( - f"taskflow[id={taskflow.id}] status not match: actual -> {get_task_status_result}, " - f"expect -> {skip_if_not_status}", - ) - send_task_flow_message(taskflow, msg_type, node_name) except Exception as e: logger.exception("send_task_flow_message[taskflow_id=%s] send message error: %s" % (task_id, e)) diff --git a/gcloud/taskflow3/signals/handlers.py b/gcloud/taskflow3/signals/handlers.py index 346b4f53fa..38ffe3e127 100644 --- a/gcloud/taskflow3/signals/handlers.py +++ b/gcloud/taskflow3/signals/handlers.py @@ -191,13 +191,6 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version, _finish_taskflow_and_send_signal(root_id, taskflow_finished, True) - elif to_state == bamboo_engine_states.SUSPENDED and node_id == root_id: - # TODO 发送通知,向上找到根流程,发送通知 - # 问题1:独立子流程场景,暂停后应该是由父流程决定是否通知,如何将消息通知给 Root Taskflow? - # 问题2:等待确认 / 等待审批场景也需要通知到父流程 -> 向上找到根流程 - # 问题3:子流程、父流程都有通知,以哪一方为准?(继承父流程配置?) - pass - try: _node_timeout_info_update(settings.redis_inst, to_state, node_id, version) except Exception: diff --git a/gcloud/tests/taskflow3/tasks/test_send_taskflow_message.py b/gcloud/tests/taskflow3/tasks/test_send_taskflow_message.py index 7b3eca93af..f41f8664f9 100644 --- a/gcloud/tests/taskflow3/tasks/test_send_taskflow_message.py +++ b/gcloud/tests/taskflow3/tasks/test_send_taskflow_message.py @@ -14,7 +14,6 @@ from django.test import TestCase from mock import MagicMock, call, patch -from gcloud.constants import TaskExtraStatus from gcloud.shortcuts.message import ATOM_FAILED from gcloud.taskflow3.celery.tasks import send_taskflow_message from gcloud.tests.mock_settings import * # noqa @@ -64,50 +63,15 @@ def test_send_taskflow_message__use_root(self): taskflow_relation_model = MagicMock() taskflow_relation_model.objects.get = MagicMock(return_value=taskflow_relation) send_task_flow_message = MagicMock() - get_task_status = MagicMock( - return_value={"result": True, "data": {"state": TaskExtraStatus.PENDING_PROCESSING.value}} - ) with patch(TASKFLOW_TASKS_TASKFLOW_RELATION, taskflow_relation_model): with patch(TASKFLOW_TASKS_TASKFLOW_INSTANCE, taskflow_model): - with patch(TASKFLOW_TASKS_TASK_COMMAND_DISPATCHER_GET_STATUS, get_task_status): - with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message): - send_taskflow_message( - child_taskflow.id, - ATOM_FAILED, - node_name="test", - skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value, - use_root=True, - ) + with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message): + send_taskflow_message( + child_taskflow.id, + ATOM_FAILED, + node_name="test", + use_root=True, + ) - get_task_status.assert_called_once() send_task_flow_message.assert_called_once_with(root_taskflow, ATOM_FAILED, "test") taskflow_model.objects.get.assert_has_calls(calls=[call(id=child_taskflow.id), call(id=root_taskflow.id)]) - - def test_send_taskflow_message__skip_if_status(self): - - root_taskflow, child_taskflow = self.generate_taskflow() - taskflow_model = MagicMock() - taskflow_model.objects.get = MagicMock(side_effect=lambda id: {1: root_taskflow, 2: child_taskflow}[id]) - - taskflow_relation = MagicMock() - taskflow_relation.root_task_id = root_taskflow.id - taskflow_relation_model = MagicMock() - taskflow_relation_model.objects.get = MagicMock(return_value=taskflow_relation) - send_task_flow_message = MagicMock() - # 实际状态是 FAILD - get_task_status = MagicMock(return_value={"result": True, "data": {"state": "FAILED"}}) - with patch(TASKFLOW_TASKS_TASKFLOW_RELATION, taskflow_relation_model): - with patch(TASKFLOW_TASKS_TASKFLOW_INSTANCE, taskflow_model): - with patch(TASKFLOW_TASKS_TASK_COMMAND_DISPATCHER_GET_STATUS, get_task_status): - with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message): - send_taskflow_message( - child_taskflow.id, - ATOM_FAILED, - node_name="test", - skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value, - use_root=True, - ) - - get_task_status.assert_called_once() - send_task_flow_message.assert_not_called() - taskflow_model.objects.get.assert_has_calls(calls=[call(id=child_taskflow.id), call(id=root_taskflow.id)]) diff --git a/pipeline_plugins/components/collections/controller.py b/pipeline_plugins/components/collections/controller.py index 516d03e535..2ff9c7a285 100644 --- a/pipeline_plugins/components/collections/controller.py +++ b/pipeline_plugins/components/collections/controller.py @@ -23,7 +23,6 @@ from pipeline.core.flow.activity import Service, StaticIntervalGenerator from pipeline.core.flow.io import ObjectItemSchema, StringItemSchema -from gcloud.constants import TaskExtraStatus from gcloud.core.models import Project from gcloud.shortcuts.message import PENDING_PROCESSING from gcloud.taskflow3.celery.tasks import send_taskflow_message @@ -41,7 +40,6 @@ def execute(self, data, parent_data): send_taskflow_message.delay( task_id=task_id, msg_type=PENDING_PROCESSING, - skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value, use_root=True, ) return True diff --git a/pipeline_plugins/components/collections/sites/open/bk/approve/v1_0.py b/pipeline_plugins/components/collections/sites/open/bk/approve/v1_0.py index 9eb70ab0de..6ae7039640 100644 --- a/pipeline_plugins/components/collections/sites/open/bk/approve/v1_0.py +++ b/pipeline_plugins/components/collections/sites/open/bk/approve/v1_0.py @@ -21,7 +21,6 @@ from api.collections.itsm import BKItsmClient from gcloud.conf import settings -from gcloud.constants import TaskExtraStatus from gcloud.shortcuts.message import PENDING_PROCESSING from gcloud.taskflow3.celery.tasks import send_taskflow_message from gcloud.utils.handlers import handle_api_error @@ -90,7 +89,6 @@ def execute(self, data, parent_data): send_taskflow_message.delay( task_id=task_id, msg_type=PENDING_PROCESSING, - skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value, use_root=True, ) diff --git a/pipeline_plugins/tests/components/collections/sites/open/bk_test/approve/test_v1_0.py b/pipeline_plugins/tests/components/collections/sites/open/bk_test/approve/test_v1_0.py index 7e15169c86..13e89848ce 100644 --- a/pipeline_plugins/tests/components/collections/sites/open/bk_test/approve/test_v1_0.py +++ b/pipeline_plugins/tests/components/collections/sites/open/bk_test/approve/test_v1_0.py @@ -23,7 +23,6 @@ ScheduleAssertion, ) -from gcloud.constants import TaskExtraStatus from gcloud.shortcuts.message import PENDING_PROCESSING from pipeline_plugins.components.collections.sites.open.bk.approve.v1_0 import ApproveComponent @@ -95,7 +94,6 @@ def __init__(self, create_ticket=None): SEND_TASKFLOW_MESSAGE_CALL = { "task_id": COMMON_PARENT["task_id"], "msg_type": PENDING_PROCESSING, - "skip_if_not_status": TaskExtraStatus.PENDING_PROCESSING.value, "use_root": True, } INPUTS = {