Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

优化get_node_data_v2加载速度 #6930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions gcloud/taskflow3/domains/dispatchers/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,35 @@

import logging
from copy import deepcopy
from typing import Optional, List
from typing import List, Optional

from bamboo_engine import exceptions as bamboo_exceptions
from bamboo_engine import api as bamboo_engine_api
from bamboo_engine import exceptions as bamboo_exceptions
from bamboo_engine import states as bamboo_engine_states
from bamboo_engine.eri import ContextValueType

from engine_pickle_obj.context import SystemObject
from gcloud.project_constants.domains.context import get_project_constants_context
from pipeline.engine import states as pipeline_states
from django.utils.translation import ugettext_lazy as _
from opentelemetry import trace
from pipeline.component_framework.library import ComponentLibrary
from pipeline.engine import api as pipeline_api
from pipeline.service import task_service
from pipeline.models import PipelineInstance
from pipeline.engine import exceptions as pipeline_exceptions
from pipeline.engine import models as pipeline_engine_models
from pipeline.parser.context import get_pipeline_context
from pipeline.engine import states as pipeline_states
from pipeline.eri.runtime import BambooDjangoRuntime
from pipeline.log.models import LogEntry
from pipeline.component_framework.library import ComponentLibrary
from pipeline.engine import exceptions as pipeline_exceptions
from opentelemetry import trace
from pipeline.models import PipelineInstance
from pipeline.parser.context import get_pipeline_context
from pipeline.service import task_service

from engine_pickle_obj.context import SystemObject
from gcloud import err_code
from gcloud.utils.handlers import handle_plain_log
from gcloud.project_constants.domains.context import get_project_constants_context
from gcloud.taskflow3.utils import format_pipeline_status
from gcloud.tasktmpl3.domains.constants import preview_node_inputs
from gcloud.utils.handlers import handle_plain_log
from pipeline_web.parser import WebPipelineAdapter
from pipeline_web.parser.format import format_web_data_to_pipeline

from .base import EngineCommandDispatcher, ensure_return_is_dict
from django.utils.translation import ugettext_lazy as _

logger = logging.getLogger("root")

Expand Down Expand Up @@ -554,17 +554,17 @@ def get_node_data_v2(
)

formatted_pipeline = format_web_data_to_pipeline(pipeline_instance.execution_data)
preview_result = bamboo_engine_api.preview_node_inputs(
runtime=runtime,
pipeline=formatted_pipeline,
node_id=self.node_id,
subprocess_stack=subprocess_stack,
root_pipeline_data=root_pipeline_data,
parent_params=root_pipeline_context,
)

if not preview_result.result:
message = _(f"节点数据请求失败: 请重试, 如多次失败可联系管理员处理. {preview_result.exc} | get_node_data_v2")
try:
preview_inputs = preview_node_inputs(
runtime=runtime,
pipeline=formatted_pipeline,
node_id=self.node_id,
subprocess_stack=subprocess_stack,
root_pipeline_data=root_pipeline_data,
parent_params=root_pipeline_context,
)
except Exception as e:
message = _(f"节点数据请求失败: 请重试, 如多次失败可联系管理员处理. {e} | get_node_data_v2")
logger.error(message)
return {
"result": False,
Expand All @@ -575,9 +575,9 @@ def get_node_data_v2(

if node_info["type"] == "SubProcess":
# remove prefix '${' and subfix '}' in subprocess execution input
inputs = {k[2:-1]: v for k, v in preview_result.data.items()}
inputs = {k[2:-1]: v for k, v in preview_inputs.items()}
else:
inputs = preview_result.data
inputs = preview_inputs

except Exception as err:
return {
Expand Down
78 changes: 71 additions & 7 deletions gcloud/tasktmpl3/domains/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,29 @@
"""
import copy
import logging
import re
from typing import List

from bamboo_engine.context import Context
from bamboo_engine.eri import ContextValue
from bamboo_engine.eri import ContextValue, NodeType
from bamboo_engine.template import Template
from bamboo_engine.utils.boolrule import BoolRule
from bamboo_engine.utils.constants import VAR_CONTEXT_MAPPING
from pipeline.core.constants import PE
from pipeline.eri.runtime import BambooDjangoRuntime

from pipeline.core.data.expression import ConstantTemplate

from pipeline.core.data import var
from pipeline.core.data.expression import ConstantTemplate
from pipeline.core.data.library import VariableLibrary
from pipeline.eri.runtime import BambooDjangoRuntime

from pipeline_web.graph import get_graph_from_pipeline_tree, get_ordered_necessary_nodes_and_paths_between_nodes
from pipeline_web.graph import (
get_graph_from_pipeline_tree,
get_ordered_necessary_nodes_and_paths_between_nodes,
)
from pipeline_web.parser.format import format_data_to_pipeline_inputs
from pipeline_web.preview_base import PipelineTemplateWebPreviewer

var_pattern = re.compile(r"\${(\w+)}")

logger = logging.getLogger("root")


Expand Down Expand Up @@ -75,6 +81,63 @@ def get_constant_values(constants, extra_data):
return {**constant_values, **hydrated_context}


def preview_node_inputs(
runtime: BambooDjangoRuntime,
pipeline: dict,
node_id: str,
subprocess_stack: List[str] = [],
root_pipeline_data: dict = {},
parent_params: dict = {},
):
def get_need_render_context_keys():
keys = set()
node_info = pipeline["activities"][node_id]
for item in node_info.get("component").get("inputs").values():

if not item["need_render"]:
continue
if isinstance(item["value"], str):
for value in var_pattern.findall(item["value"]):
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
keys.add("${" + value + "}")
return keys

need_render_context_keys = get_need_render_context_keys()
context_values = [
ContextValue(key=key, type=VAR_CONTEXT_MAPPING[info["type"]], value=info["value"], code=info.get("custom_type"))
for key, info in list(pipeline["data"].get("inputs", {}).items()) + list(parent_params.items())
if key in need_render_context_keys
]
context = Context(runtime, context_values, root_pipeline_data)

if subprocess_stack:
subprocess = subprocess_stack[0]
child_pipeline = pipeline["activities"][subprocess]["pipeline"]
param_data = {key: info["value"] for key, info in pipeline["activities"][subprocess]["params"].items()}
hydrated_context = context.hydrate(deformat=True)
hydrated_param_data = Template(param_data).render(hydrated_context)
formatted_param_data = {key: {"value": value, "type": "plain"} for key, value in hydrated_param_data.items()}
return preview_node_inputs(
runtime=runtime,
pipeline=child_pipeline,
node_id=node_id,
subprocess_stack=subprocess_stack[1:],
root_pipeline_data=root_pipeline_data,
parent_params=formatted_param_data,
)

node_type = pipeline["activities"][node_id]["type"]
if node_type == NodeType.ServiceActivity.value:
raw_inputs = pipeline["activities"][node_id]["component"]["inputs"]
elif node_type == NodeType.SubProcess.value:
raw_inputs = pipeline["activities"][node_id]["params"]
else:
raise Exception(f"can not preview inputs for node type: {node_type}")
raw_inputs = {key: info["value"] for key, info in raw_inputs.items()}
hydrated_context = context.hydrate(deformat=True)
inputs = Template(raw_inputs).render(hydrated_context)
return inputs


def _system_constants_to_mako_str(value):
"""
将内置系统变量(_system.xxx)转换为可用于mako渲染统计的变量(_system点xxx)
Expand Down Expand Up @@ -217,7 +280,8 @@ def get_task_referenced_constants(pipeline_tree: dict, constants: dict, extra_da
if node_id in unused_nodes:
copy_pipeline_tree[PE.gateways].pop(node_id)
PipelineTemplateWebPreviewer.remove_useless_constants(
exclude_task_nodes_id=unused_nodes, pipeline_tree=copy_pipeline_tree,
exclude_task_nodes_id=unused_nodes,
pipeline_tree=copy_pipeline_tree,
)
referenced_constant_keys = set(copy_pipeline_tree["constants"].keys())
logger.info("[get_task_referenced_constants] referenced_constant_keys: {}".format(referenced_constant_keys))
Expand Down
1 change: 1 addition & 0 deletions gcloud/tests/mock_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
TASKFLOW_DISPATCHERS_NODE_FORMAT_PIPELINE_STATUS = "gcloud.taskflow3.domains.dispatchers.node.format_pipeline_status"
TASKFLOW_DISPATCHERS_NODE_GET_PIPELINE_CONTEXT = "gcloud.taskflow3.domains.dispatchers.node.get_pipeline_context"
TASKFLOW_DISPATCHERS_NODE_SYSTEM_OBJ = "gcloud.taskflow3.domains.dispatchers.node.SystemObject"
TASKFLOW_CUSTOM_PREVIEW_NODE_INPUTS = "gcloud.tasktmpl3.domains.constants.preview_node_inputs"

TASKFLOW_DISPATCHERS_TASK_PIPELINE_MODEL = "gcloud.taskflow3.domains.dispatchers.task.PipelineModel"
TASKFLOW_DISPATCHERS_TASK_BAMBOO_DJANGO_RUNTIME = "gcloud.taskflow3.domains.dispatchers.task.BambooDjangoRuntime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
specific language governing permissions and limitations under the License.
"""
from bamboo_engine.eri import ContextValue, ContextValueType
from bamboo_engine.exceptions import NotFoundError
from django.test import TestCase

from gcloud import err_code
from gcloud.taskflow3.domains.dispatchers.node import NodeCommandDispatcher
from bamboo_engine.exceptions import NotFoundError

from gcloud.tests.mock import * # noqa
from gcloud.tests.mock_settings import * # noqa

Expand Down Expand Up @@ -90,43 +89,34 @@ def test_act_not_started(self):
dispatcher = NodeCommandDispatcher(engine_ver=2, node_id="node_id")
dispatcher._get_node_info = MagicMock(return_value={"type": "ServiceActivity"})

preview_node_inputs_result = MagicMock()
preview_node_inputs_result.result = True
preview_node_inputs_result.data = "inputs"
preview_node_inputs_result = MagicMock(return_value="inputs")

bamboo_api.preview_node_inputs = MagicMock(return_value=preview_node_inputs_result)
format_outputs = "format_outputs"
dispatcher._format_outputs = MagicMock(return_value=(True, None, format_outputs))

with patch(TASKFLOW_DISPATCHERS_NODE_BAMBOO_RUNTIME, runtime_init):
with patch(TASKFLOW_DISPATCHERS_NODE_BAMBOO_API, bamboo_api):
with patch(TASKFLOW_DISPATCHERS_NODE_GET_PIPELINE_CONTEXT, get_pipeline_context):
with patch(TASKFLOW_DISPATCHERS_NODE_SYSTEM_OBJ, system_obj):
node_data = dispatcher.get_node_data_v2(
username=username,
component_code=component_code,
subprocess_stack=subprocess_stack,
loop=loop,
**kwargs
)
with patch(TASKFLOW_CUSTOM_PREVIEW_NODE_INPUTS, preview_node_inputs_result):
node_data = dispatcher.get_node_data_v2(
username=username,
component_code=component_code,
subprocess_stack=subprocess_stack,
loop=loop,
**kwargs
)

bamboo_api.get_children_states.assert_called_once_with(runtime=runtime, node_id=dispatcher.node_id)
bamboo_api.preview_node_inputs.assert_called_once_with(
runtime=runtime,
pipeline=pipeline_instance.execution_data,
node_id=dispatcher.node_id,
subprocess_stack=subprocess_stack,
root_pipeline_data={},
parent_params={
"${_system}": {"type": "plain", "value": "system_obj"},
"${log_output}": {"type": "plain", "value": "test"},
},
)

dispatcher._get_node_info.assert_called_once_with(
node_id=dispatcher.node_id, pipeline=pipeline_instance.execution_data, subprocess_stack=subprocess_stack
)
dispatcher._format_outputs.assert_called_once_with(
outputs={}, component_code=component_code, pipeline_instance=pipeline_instance, subprocess_stack=["1"]
outputs={},
component_code=component_code,
pipeline_instance=pipeline_instance,
subprocess_stack=["1"],
)
self.assertEqual(
node_data,
Expand Down
Loading