-
Notifications
You must be signed in to change notification settings - Fork 377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: 快速分发文件插件支持文件源类型 #7474 #7486
Merged
lTimej
merged 3 commits into
TencentBlueKing:release_3.32.2_by_master
from
lTimej:release_3.32.2_by_master_7474
Jun 12, 2024
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
291 changes: 291 additions & 0 deletions
291
pipeline_plugins/components/collections/sites/open/job/fast_push_file/v3_0.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,291 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community | ||
Edition) available. | ||
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. | ||
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://opensource.org/licenses/MIT | ||
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations under the License. | ||
""" | ||
|
||
import re | ||
from copy import deepcopy | ||
from functools import partial | ||
|
||
from django.utils import translation | ||
from django.utils.translation import ugettext_lazy as _ | ||
from pipeline.component_framework.component import Component | ||
from pipeline.core.flow.io import ArrayItemSchema, BooleanItemSchema, ObjectItemSchema, StringItemSchema | ||
|
||
from gcloud.conf import settings | ||
from gcloud.constants import JobBizScopeType | ||
from gcloud.utils.handlers import handle_api_error | ||
from pipeline_plugins.components.collections.sites.open.job.base import JobScheduleService | ||
from pipeline_plugins.components.collections.sites.open.job.ipv6_base import GetJobTargetServerMixin | ||
from pipeline_plugins.components.utils import chunk_table_data, get_job_instance_url, loose_strip | ||
from pipeline_plugins.components.utils.common import batch_execute_func | ||
|
||
__group_name__ = _("作业平台(JOB)") | ||
|
||
get_client_by_user = settings.ESB_GET_CLIENT_BY_USER | ||
|
||
job_handle_api_error = partial(handle_api_error, __group_name__) | ||
|
||
|
||
class JobFastPushFileService(JobScheduleService, GetJobTargetServerMixin): | ||
need_show_failure_inst_url = True | ||
|
||
def inputs_format(self): | ||
return [ | ||
self.InputItem( | ||
name=_("服务器"), | ||
key="job_server", | ||
type="array", | ||
schema=ArrayItemSchema( | ||
description=_("待分发文件列表"), | ||
item_schema=ObjectItemSchema( | ||
description=_("待分发文件信息"), | ||
property_schemas={ | ||
"ip": StringItemSchema(description=_("机器 IP")), | ||
"files": StringItemSchema(description=_("文件路径")), | ||
"account": StringItemSchema(description=_("执行账户")), | ||
}, | ||
), | ||
), | ||
), | ||
self.InputItem( | ||
name=_("上传限速"), | ||
key="upload_speed_limit", | ||
type="string", | ||
schema=StringItemSchema(description=_("MB/s")), | ||
), | ||
self.InputItem( | ||
name=_("下载限速"), | ||
key="download_speed_limit", | ||
type="string", | ||
schema=StringItemSchema(description=_("MB/s")), | ||
), | ||
self.InputItem( | ||
name=_("目标 IP"), | ||
key="job_ip_list", | ||
type="string", | ||
schema=StringItemSchema(description=_("文件分发目标机器 IP,多个用英文逗号 `,` 分隔")), | ||
), | ||
self.InputItem( | ||
name=_("目标账户"), | ||
key="job_account", | ||
type="string", | ||
schema=StringItemSchema(description=_("文件分发目标机器账户")), | ||
), | ||
self.InputItem( | ||
name=_("目标路径"), | ||
key="job_target_path", | ||
type="string", | ||
schema=StringItemSchema(description=_("文件分发目标路径")), | ||
), | ||
self.InputItem( | ||
name=_("超时时间"), key="job_timeout", type="string", schema=StringItemSchema(description=_("超时时间")) | ||
), | ||
self.InputItem( | ||
name=_("滚动执行"), | ||
key="job_rolling_execute", | ||
type="boolean", | ||
schema=BooleanItemSchema(description=_("是否开启滚动执行")), | ||
), | ||
self.InputItem( | ||
name=_("滚动策略"), | ||
key="job_rolling_expression", | ||
type="string", | ||
schema=StringItemSchema(description=_("滚动策略,仅在滚动执行开启时生效")), | ||
), | ||
self.InputItem( | ||
name=_("滚动机制"), | ||
key="job_rolling_mode", | ||
type="string", | ||
schema=StringItemSchema(description=_("滚动机制,仅在滚动执行开启时生效")), | ||
), | ||
] | ||
|
||
def execute(self, data, parent_data): | ||
executor = parent_data.get_one_of_inputs("executor") | ||
client = get_client_by_user(executor) | ||
if parent_data.get_one_of_inputs("language"): | ||
setattr(client, "language", parent_data.get_one_of_inputs("language")) | ||
translation.activate(parent_data.get_one_of_inputs("language")) | ||
|
||
biz_cc_id = data.get_one_of_inputs("biz_cc_id", parent_data.inputs.biz_cc_id) | ||
job_source_files_origin = data.get_one_of_inputs("job_source_files_origin") | ||
upload_speed_limit = data.get_one_of_inputs("upload_speed_limit") | ||
download_speed_limit = data.get_one_of_inputs("download_speed_limit") | ||
job_timeout = data.get_one_of_inputs("job_timeout") | ||
job_rolling_config = data.get_one_of_inputs("job_rolling_config", {}) | ||
job_rolling_execute = job_rolling_config.get("job_rolling_execute", None) | ||
file_source = [] | ||
if job_source_files_origin == "server": | ||
original_source_files = deepcopy(data.get_one_of_inputs("job_server", [])) | ||
for item in original_source_files: | ||
clean_source_ip_result, server = self.get_target_server_hybrid( | ||
executor, biz_cc_id, data, item["ip"], logger_handle=self.logger | ||
) | ||
if not clean_source_ip_result: | ||
return False | ||
file_source.append( | ||
{ | ||
"file_list": [_file.strip() for _file in item["files"].split("\n") if _file.strip()], | ||
"server": server, | ||
"account": { | ||
"alias": loose_strip(item["account"]), | ||
}, | ||
"file_type": 1, | ||
} | ||
) | ||
elif job_source_files_origin == "file_source": | ||
original_source_files = data.get_one_of_inputs("job_source_files", []) | ||
for item in original_source_files: | ||
file_source.append( | ||
{ | ||
"file_list": [_file.strip() for _file in re.split(r"[, \n]", item["files"]) if _file.strip()], | ||
"file_source_code": item["file_source"], | ||
"file_type": 3, | ||
} | ||
) | ||
else: | ||
data.outputs.ex_data = "只支持服务器和源文件类型" | ||
return False | ||
|
||
select_method = data.get_one_of_inputs("select_method") | ||
break_line = data.get_one_of_inputs("break_line") or "," | ||
job_dispatch_attr = data.get_one_of_inputs("job_dispatch_attr") | ||
attr_list = [] | ||
for attr in job_dispatch_attr: | ||
# 如果用户选择了单行扩展 | ||
if select_method == "auto": | ||
chunk_result = chunk_table_data(attr, break_line) | ||
if not chunk_result["result"]: | ||
data.set_outputs("ex_data", chunk_result["message"]) | ||
return False | ||
attr_list.extend(chunk_result["data"]) | ||
else: | ||
# 非单行扩展的情况无需处理 | ||
attr_list.append(attr) | ||
|
||
# 如果开启了滚动执行,填充rolling_config配置 | ||
if job_rolling_execute: | ||
# 滚动策略 | ||
job_rolling_expression = job_rolling_config.get("job_rolling_expression") | ||
# 滚动机制 | ||
job_rolling_mode = job_rolling_config.get("job_rolling_mode") | ||
rolling_config = {"expression": job_rolling_expression, "mode": job_rolling_mode} | ||
|
||
# 拼装参数列表 | ||
params_list = [] | ||
for attr in attr_list: | ||
# 获取目标IP | ||
original_ip_list = attr["job_ip_list"] | ||
clean_result, target_server = self.get_target_server_hybrid( | ||
executor, biz_cc_id, data, original_ip_list, logger_handle=self.logger | ||
) | ||
if not clean_result: | ||
return False | ||
job_kwargs = { | ||
"bk_scope_type": JobBizScopeType.BIZ.value, | ||
"bk_scope_id": str(biz_cc_id), | ||
"bk_biz_id": biz_cc_id, | ||
"file_source_list": file_source, | ||
"target_server": target_server, | ||
"account_alias": attr["job_account"], | ||
"file_target_path": attr["job_target_path"], | ||
} | ||
if upload_speed_limit: | ||
job_kwargs["upload_speed_limit"] = int(upload_speed_limit) | ||
if download_speed_limit: | ||
job_kwargs["download_speed_limit"] = int(download_speed_limit) | ||
if job_timeout: | ||
job_kwargs["timeout"] = int(job_timeout) | ||
if job_rolling_execute: | ||
job_kwargs["rolling_config"] = rolling_config | ||
|
||
params_list.append(job_kwargs) | ||
task_count = len(params_list) | ||
# 并发请求接口 | ||
job_result_list = batch_execute_func(client.jobv3.fast_transfer_file, params_list, interval_enabled=True) | ||
job_instance_id_list, job_inst_name, job_inst_url = [], [], [] | ||
data.outputs.requests_error = "" | ||
for index, res in enumerate(job_result_list): | ||
job_result = res["result"] | ||
if job_result["result"]: | ||
job_instance_id_list.append(job_result["data"]["job_instance_id"]) | ||
job_inst_name.append(job_result["data"]["job_instance_name"]) | ||
job_inst_url.append(get_job_instance_url(biz_cc_id, job_instance_id_list)) | ||
else: | ||
message = job_handle_api_error("jobv3.fast_transfer_file", params_list[index], job_result) | ||
self.logger.error(message) | ||
data.outputs.requests_error += "{}\n".format(message) | ||
if data.outputs.requests_error: | ||
data.outputs.requests_error = "Request Error:\n{}".format(data.outputs.requests_error) | ||
|
||
# 总任务数 | ||
data.outputs.task_count = task_count | ||
data.outputs.job_instance_id_list = job_instance_id_list | ||
# 批量请求使用 | ||
data.outputs.job_id_of_batch_execute = job_instance_id_list | ||
data.outputs.job_inst_url = [get_job_instance_url(biz_cc_id, job_id) for job_id in job_instance_id_list] | ||
# 请求成功数 | ||
data.outputs.request_success_count = len(job_instance_id_list) | ||
# 执行成功数 | ||
data.outputs.success_count = 0 | ||
# 所有请求都失败,则返回 | ||
if not data.outputs.request_success_count: | ||
data.outputs.ex_data = data.outputs.requests_error | ||
return False | ||
data.outputs.final_res = task_count == len(job_instance_id_list) | ||
return True | ||
|
||
def outputs_format(self): | ||
return [ | ||
self.OutputItem( | ||
name=_("总任务数"), key="task_count", type="string", schema=StringItemSchema(description=_("总任务数")) | ||
), | ||
self.OutputItem( | ||
name=_("分发请求成功数"), | ||
key="request_success_count", | ||
type="string", | ||
schema=StringItemSchema(description=_("分发请求成功数")), | ||
), | ||
self.OutputItem( | ||
name=_("分发成功数"), key="success_count", type="string", schema=StringItemSchema(description=_("上传成功数")) | ||
), | ||
self.OutputItem( | ||
name=_("任务id"), | ||
key="job_instance_id_list", | ||
type="string", | ||
schema=StringItemSchema(description=_("任务id")), | ||
), | ||
self.OutputItem( | ||
name=_("任务url"), key="job_inst_url", type="string", schema=StringItemSchema(description=_("任务url")) | ||
), | ||
self.OutputItem( | ||
name=_("执行失败的任务URL"), | ||
key="failure_inst_url", | ||
type="string", | ||
schema=StringItemSchema(description=_("执行失败的任务URL")), | ||
), | ||
] | ||
|
||
|
||
class JobFastPushFileComponent(Component): | ||
name = _("快速分发文件") | ||
code = "job_fast_push_file" | ||
bound_service = JobFastPushFileService | ||
form = "%scomponents/atoms/job/fast_push_file/v3_0.js" % settings.STATIC_URL | ||
version = "v3.0" | ||
desc = _( | ||
"1. 填参方式支持手动填写和结合模板生成(单行自动扩展)\n" | ||
"2. 使用单行自动扩展模式时,每一行支持填写多个已自定义分隔符或是英文逗号分隔的数据," | ||
'插件后台会自动将其扩展成多行,如 "1,2,3,4" 会被扩展成四行:1 2 3 4\n' | ||
"3. 结合模板生成(单行自动扩展)当有一列有多条数据时,其他列要么也有相等个数的数据,要么只有一条数据\n" | ||
"4. v3.0版本新增文件源的支持,要求作业平台版本>=3.6.0.0" | ||
) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有修改不需要deepcopy