Skip to content

Commit

Permalink
minor: review fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 19, 2023
1 parent b2d94ac commit e1817da
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 81 deletions.
2 changes: 1 addition & 1 deletion gcloud/core/apis/drf/serilaziers/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_notify_type(self, obj):
return (
notify_type
if isinstance(notify_type, dict)
else {"success": notify_type, "fail": notify_type, "pending_processing": []}
else {"success": notify_type, "fail": notify_type, "pending_processing": notify_type}
)
except Exception as e:
logger.exception(f"[get_notify_type] error: {e}")
Expand Down
25 changes: 1 addition & 24 deletions gcloud/taskflow3/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from gcloud.constants import CallbackStatus
from gcloud.shortcuts.message import send_task_flow_message
from gcloud.taskflow3.domains.callback import TaskCallBacker
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.domains.dispatchers.node import NodeCommandDispatcher
from gcloud.taskflow3.domains.node_timeout_strategy import node_timeout_handler
from gcloud.taskflow3.models import (
Expand All @@ -44,7 +43,7 @@


@task
def send_taskflow_message(task_id, msg_type, node_name="", skip_if_not_status="", use_root=False):
def send_taskflow_message(task_id, msg_type, node_name="", use_root=False):
try:
taskflow = TaskFlowInstance.objects.get(id=task_id)
if use_root and taskflow.is_child_taskflow:
Expand All @@ -55,28 +54,6 @@ def send_taskflow_message(task_id, msg_type, node_name="", skip_if_not_status=""
"send_task_flow_message[taskflow_id=%s] use root taskflow[id=%s] to send message", task_id, root_task_id
)

if skip_if_not_status:
# 满足某个具体状态才发通知
dispatcher = TaskCommandDispatcher(
engine_ver=taskflow.engine_ver,
taskflow_id=taskflow.id,
pipeline_instance=taskflow.pipeline_instance,
project_id=taskflow.project_id,
)
get_task_status_result = dispatcher.get_task_status(with_ex_data=False)
if get_task_status_result.get("result") and get_task_status_result["data"]["state"] == skip_if_not_status:
logger.info(
"send_task_flow_message[taskflow_id=%s] taskflow[id=%s] check status -> %s success.",
task_id,
taskflow.id,
skip_if_not_status,
)
else:
raise ValueError(
f"taskflow[id={taskflow.id}] status not match: actual -> {get_task_status_result}, "
f"expect -> {skip_if_not_status}",
)

send_task_flow_message(taskflow, msg_type, node_name)
except Exception as e:
logger.exception("send_task_flow_message[taskflow_id=%s] send message error: %s" % (task_id, e))
Expand Down
7 changes: 0 additions & 7 deletions gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,6 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,

_finish_taskflow_and_send_signal(root_id, taskflow_finished, True)

elif to_state == bamboo_engine_states.SUSPENDED and node_id == root_id:
# TODO 发送通知,向上找到根流程,发送通知
# 问题1:独立子流程场景,暂停后应该是由父流程决定是否通知,如何将消息通知给 Root Taskflow?
# 问题2:等待确认 / 等待审批场景也需要通知到父流程 -> 向上找到根流程
# 问题3:子流程、父流程都有通知,以哪一方为准?(继承父流程配置?)
pass

try:
_node_timeout_info_update(settings.redis_inst, to_state, node_id, version)
except Exception:
Expand Down
50 changes: 7 additions & 43 deletions gcloud/tests/taskflow3/tasks/test_send_taskflow_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from django.test import TestCase
from mock import MagicMock, call, patch

from gcloud.constants import TaskExtraStatus
from gcloud.shortcuts.message import ATOM_FAILED
from gcloud.taskflow3.celery.tasks import send_taskflow_message
from gcloud.tests.mock_settings import * # noqa
Expand Down Expand Up @@ -64,50 +63,15 @@ def test_send_taskflow_message__use_root(self):
taskflow_relation_model = MagicMock()
taskflow_relation_model.objects.get = MagicMock(return_value=taskflow_relation)
send_task_flow_message = MagicMock()
get_task_status = MagicMock(
return_value={"result": True, "data": {"state": TaskExtraStatus.PENDING_PROCESSING.value}}
)
with patch(TASKFLOW_TASKS_TASKFLOW_RELATION, taskflow_relation_model):
with patch(TASKFLOW_TASKS_TASKFLOW_INSTANCE, taskflow_model):
with patch(TASKFLOW_TASKS_TASK_COMMAND_DISPATCHER_GET_STATUS, get_task_status):
with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message):
send_taskflow_message(
child_taskflow.id,
ATOM_FAILED,
node_name="test",
skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value,
use_root=True,
)
with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message):
send_taskflow_message(
child_taskflow.id,
ATOM_FAILED,
node_name="test",
use_root=True,
)

get_task_status.assert_called_once()
send_task_flow_message.assert_called_once_with(root_taskflow, ATOM_FAILED, "test")
taskflow_model.objects.get.assert_has_calls(calls=[call(id=child_taskflow.id), call(id=root_taskflow.id)])

def test_send_taskflow_message__skip_if_status(self):

root_taskflow, child_taskflow = self.generate_taskflow()
taskflow_model = MagicMock()
taskflow_model.objects.get = MagicMock(side_effect=lambda id: {1: root_taskflow, 2: child_taskflow}[id])

taskflow_relation = MagicMock()
taskflow_relation.root_task_id = root_taskflow.id
taskflow_relation_model = MagicMock()
taskflow_relation_model.objects.get = MagicMock(return_value=taskflow_relation)
send_task_flow_message = MagicMock()
# 实际状态是 FAILD
get_task_status = MagicMock(return_value={"result": True, "data": {"state": "FAILED"}})
with patch(TASKFLOW_TASKS_TASKFLOW_RELATION, taskflow_relation_model):
with patch(TASKFLOW_TASKS_TASKFLOW_INSTANCE, taskflow_model):
with patch(TASKFLOW_TASKS_TASK_COMMAND_DISPATCHER_GET_STATUS, get_task_status):
with patch(TASKFLOW_TASKS_SEND_TASK_FLOW_MESSAGE, send_task_flow_message):
send_taskflow_message(
child_taskflow.id,
ATOM_FAILED,
node_name="test",
skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value,
use_root=True,
)

get_task_status.assert_called_once()
send_task_flow_message.assert_not_called()
taskflow_model.objects.get.assert_has_calls(calls=[call(id=child_taskflow.id), call(id=root_taskflow.id)])
2 changes: 0 additions & 2 deletions pipeline_plugins/components/collections/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from pipeline.core.flow.activity import Service, StaticIntervalGenerator
from pipeline.core.flow.io import ObjectItemSchema, StringItemSchema

from gcloud.constants import TaskExtraStatus
from gcloud.core.models import Project
from gcloud.shortcuts.message import PENDING_PROCESSING
from gcloud.taskflow3.celery.tasks import send_taskflow_message
Expand All @@ -41,7 +40,6 @@ def execute(self, data, parent_data):
send_taskflow_message.delay(
task_id=task_id,
msg_type=PENDING_PROCESSING,
skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value,
use_root=True,
)
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from api.collections.itsm import BKItsmClient
from gcloud.conf import settings
from gcloud.constants import TaskExtraStatus
from gcloud.shortcuts.message import PENDING_PROCESSING
from gcloud.taskflow3.celery.tasks import send_taskflow_message
from gcloud.utils.handlers import handle_api_error
Expand Down Expand Up @@ -90,7 +89,6 @@ def execute(self, data, parent_data):
send_taskflow_message.delay(
task_id=task_id,
msg_type=PENDING_PROCESSING,
skip_if_not_status=TaskExtraStatus.PENDING_PROCESSING.value,
use_root=True,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
ScheduleAssertion,
)

from gcloud.constants import TaskExtraStatus
from gcloud.shortcuts.message import PENDING_PROCESSING
from pipeline_plugins.components.collections.sites.open.bk.approve.v1_0 import ApproveComponent

Expand Down Expand Up @@ -95,7 +94,6 @@ def __init__(self, create_ticket=None):
SEND_TASKFLOW_MESSAGE_CALL = {
"task_id": COMMON_PARENT["task_id"],
"msg_type": PENDING_PROCESSING,
"skip_if_not_status": TaskExtraStatus.PENDING_PROCESSING.value,
"use_root": True,
}
INPUTS = {
Expand Down

0 comments on commit e1817da

Please sign in to comment.