Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 第三方插件节点支持配置最长执行时间 #7487

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,10 @@ def monitor_report_config():
# 节点历史最大执行记录数
MAX_RECORDED_NODE_EXECUTION_TIMES = env.MAX_RECORDED_NODE_EXECUTION_TIMES

# 节点最大执行时间限制(需要插件实现中进行主动失败)
WITHOUT_NODE_MAX_EXECUTION_DAYS_TAG = env.WITHOUT_NODE_MAX_EXECUTION_DAYS_TAG
NODE_MAX_EXECUTION_DAYS = env.NODE_MAX_EXECUTION_DAYS


# engine admin permission settings
def check_engine_admin_permission(request, *args, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@
# 节点历史最大执行记录数
MAX_RECORDED_NODE_EXECUTION_TIMES = int(os.getenv("BKAPP_MAX_RECORDED_NODE_EXECUTION_TIMES", 5))

# 节点最大执行时间限制(需要插件实现中进行主动失败)
WITHOUT_NODE_MAX_EXECUTION_DAYS_TAG = -1
NODE_MAX_EXECUTION_DAYS = int(os.getenv("BKAPP_NODE_MAX_EXECUTION_DAYS", WITHOUT_NODE_MAX_EXECUTION_DAYS_TAG))

# 获取 PaaS 注入的蓝鲸域名
BKPAAS_BK_DOMAIN = os.getenv("BKPAAS_BK_DOMAIN", "") or os.getenv("BK_DOMAIN", "")

Expand Down
10 changes: 8 additions & 2 deletions pipeline_plugins/components/collections/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def schedule(self, data, parent_data, callback_data=None):
def inputs_format(self):
return [
self.InputItem(
name=_("描述"), key="description", type="string", schema=StringItemSchema(description=_("描述")),
name=_("描述"),
key="description",
type="string",
schema=StringItemSchema(description=_("描述")),
)
]

Expand All @@ -55,7 +58,10 @@ def outputs_format(self):
name=_("API回调数据"),
key="callback_data",
type="object",
schema=ObjectItemSchema(description=_("通过node_callback API接口回调并传入数据,支持dict数据"), property_schemas={},),
schema=ObjectItemSchema(
description=_("通过node_callback API接口回调并传入数据,支持dict数据"),
property_schemas={},
),
),
]

Expand Down
37 changes: 37 additions & 0 deletions pipeline_plugins/components/collections/remote_plugin/v1_0_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
specific language governing permissions and limitations under the License.
"""
import logging
from datetime import datetime, timedelta
from typing import Union

from dateutil import tz
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from pipeline.component_framework.component import Component
from pipeline.core.flow import AbstractIntervalGenerator, Service
from pipeline.core.flow.io import StringItemSchema
from pipeline.eri.runtime import BambooDjangoRuntime

from pipeline_plugins.components.utils.sites.open.utils import get_node_callback_url
from plugin_service.conf import PLUGIN_LOGGER
Expand Down Expand Up @@ -50,6 +54,7 @@ def next(self):

class RemotePluginService(Service):
interval = StepIntervalGenerator()
runtime = BambooDjangoRuntime()

def outputs_format(self):
return [
Expand Down Expand Up @@ -123,6 +128,18 @@ def schedule(self, data, parent_data, callback_data=None):
if plugin_code in settings.REMOTE_PLUGIN_FIX_INTERVAL_CODES:
self.interval.fix_interval = settings.REMOTE_PLUGIN_FIX_INTERVAL

task_start_time = parent_data.get_one_of_inputs("task_start_time")
# 如果插件是轮询模式且任务和节点执行时间均已经过期,则直接失败,通过先计算任务时间减少 DB 查询次数
if (
self.interval
and self._is_start_time_expired(task_start_time)
and self._is_start_time_expired(start_time=self._get_node_start_time())
):
message = _(f"第三方插件执行时间超过最大限制时长:{settings.NODE_MAX_EXECUTION_DAYS} 天")
logger.error(message)
data.set_outputs("ex_data", message)
return False

try:
plugin_client = PluginServiceApiClient(plugin_code)
except PluginServiceException as e:
Expand Down Expand Up @@ -163,6 +180,26 @@ def _inject_result_data_outputs(data, result_data):
for key, output in outputs.items():
data.set_outputs(key, output)

@staticmethod
def _is_start_time_expired(start_time: Union[str, datetime]) -> bool:
if not start_time or settings.NODE_MAX_EXECUTION_DAYS == settings.WITHOUT_NODE_MAX_EXECUTION_DAYS_TAG:
# 如果没有起始时间,无法比较,当作不会过期处理
return False

if isinstance(start_time, str):
# 如果是 str 格式,忽略时区
start_time = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
now_time = datetime.now()
else:
now_time = datetime.now().astimezone(tz.tzutc())

delta_time = now_time - start_time
return delta_time > timedelta(days=settings.NODE_MAX_EXECUTION_DAYS)

def _get_node_start_time(self) -> datetime:
node_state = self.runtime.get_state(self.id)
return node_state.started_time


class RemotePluginComponent(Component):
code = "remote_plugin"
Expand Down
Loading