Skip to content

Commit

Permalink
feat: 优化订阅任务按顺序执行而非抛错 (closed #2447)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed Nov 13, 2024
1 parent f14f01b commit feb708f
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 115 deletions.
18 changes: 18 additions & 0 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]:
# redis Gse Agent 配置缓存
REDIS_AGENT_CONF_KEY_TPL = f"{settings.APP_CODE}:backend:agent:config:" + "{file_name}:str:{sub_inst_id}"

# 更新订阅参数储存redis键名模板
UPDATE_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:update_subscription:params"

# 执行订阅参数储存redis键名模板
RUN_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:run_subscription:params"


class SubscriptionSwithBizAction(enum.EnhanceEnum):
ENABLE = "enable"
Expand Down Expand Up @@ -166,3 +172,15 @@ def needs_batch_request(self) -> bool:
DEFAULT_CLEAN_RECORD_LIMIT = 5000

POWERSHELL_SERVICE_CHECK_SSHD = "powershell -c Get-Service -Name sshd"

# 处理更新订阅任务间隔
UPDATE_SUBSCRIPTION_TASK_INTERVAL = 2 * 60

# 处理执行订阅任务间隔
RUN_SUBSCRIPTION_TASK_INTERVAL = 3 * 60

# 最大订阅任务数量
MAX_SUBSCRIPTION_TASK_COUNT = 50

# 订阅删除时间
SUBSCRIPTION_DELETE_DAYS = 1
115 changes: 115 additions & 0 deletions apps/backend/periodic_tasks/schedule_running_subscription_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import json
from typing import Any, Dict, List

from celery.schedules import crontab
from celery.task import periodic_task
from django.db.models import QuerySet
from django.utils import timezone

from apps.backend import constants
from apps.backend.subscription.handler import SubscriptionHandler
from apps.backend.utils.redis import REDIS_INST
from apps.node_man import models
from common.log import logger


def get_need_clean_subscription_app_code():
"""
获取配置需要清理的appcode
"""
app_codes: List[str] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.NEED_CLEAN_SUBSCRIPTION_APP_CODE.value, default=[]
)
return app_codes


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_update_subscription():
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
# 先计算出要从redis取数据的长度
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
# 从redis中取出对应长度的数据
update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
# 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失
REDIS_INST.ltrim(name, 0, -length - 1)
# 翻转数据,先进的数据先处理
update_params.reverse()
results = []
if not update_params:
return
for update_param in update_params:
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(update_param.decode())
subscription_id = params["subscription_id"]
try:
result: Dict[str, int] = SubscriptionHandler.update_subscription(params=params)
except Exception as e:
logger.exception(f"{subscription_id} update scription failed with error: {e}")
result = {"subscription_id": subscription_id, "update_result": False}
results.append(result)
logger.info(f" update scription with results: {results}, length -> {len(results)} ")


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_run_scription():
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
REDIS_INST.ltrim(name, 0, -length - 1)
run_params.reverse()
results = []
if not run_params:
return
for run_param in run_params:
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(run_param.decode())
subscription_id = params["subscription_id"]
scope = params["scope"]
actions = params["actions"]
try:
result: Dict[str, int] = SubscriptionHandler(subscription_id).run(scope=scope, actions=actions)
except Exception as e:
logger.exception(f"{subscription_id} run scription failed with error: {e}")
result = {"subscription_id": subscription_id, "run_result": False}
results.append(result)
logger.info(f"run subscription with results: {results}, length -> {len(results)}")


@periodic_task(
run_every=crontab(hour="3", minute="0", day_of_week="*", day_of_month="*", month_of_year="*"),
queue="default",
options={"queue": "default"},
)
def clean_deleted_subscription():
query_kwargs: Dict[str, Any] = {
"is_deleted": True,
"from_system": "bkmonitorv3",
"deleted_time__range": (
timezone.now() - timezone.timedelta(days=constants.SUBSCRIPTION_DELETE_DAYS),
timezone.now(),
),
}

app_codes = get_need_clean_subscription_app_code()
if app_codes:
query_kwargs.pop("from_system")
query_kwargs["from_system__in"] = app_codes

subscription_qs: QuerySet = models.Subscription.objects.filter(**query_kwargs)
if not subscription_qs.exists():
# 没有需要更新的订阅
return

subscription_ids = list(subscription_qs.values_list("id", flat=True))
subscription_qs.update(nodes=[], is_deleted=False, enable=True)
logger.info(f"set {subscription_ids} nodes be null and enable auto trigger, length -> {len(subscription_ids)}")
6 changes: 6 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ class SubscriptionIncludeGrayBizError(AppBaseException):
ERROR_CODE = 19
MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")
MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")


class SubscriptionNotDeletedCantOperateError(AppBaseException):
ERROR_CODE = 20
MESSAGE = _("订阅未被删除,无法操作")
MESSAGE_TPL = _("订阅ID:{subscription_id}未被删除,无法进行清理操作,可增加参数is_force=true强制操作")
132 changes: 127 additions & 5 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
from __future__ import absolute_import, unicode_literals

import json
import logging
import random
from collections import Counter, defaultdict
Expand All @@ -18,13 +19,15 @@

from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.db.models import Max, Q, QuerySet, Value
from django.utils.translation import get_language
from django.utils.translation import ugettext as _

from apps.backend import constants as backend_constants
from apps.backend.subscription import errors, task_tools, tasks, tools
from apps.backend.subscription.errors import InstanceTaskIsRunning
from apps.backend.utils.pipeline_parser import PipelineParser
from apps.backend.utils.redis import REDIS_INST
from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
Expand Down Expand Up @@ -432,17 +435,19 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i
subscription = models.Subscription.objects.get(id=self.subscription_id)
except models.Subscription.DoesNotExist:
raise errors.SubscriptionNotExist({"subscription_id": self.subscription_id})

if subscription.is_running():
raise InstanceTaskIsRunning()

if tools.check_subscription_is_disabled(
subscription_identity=f"subscription -> [{subscription.id}]",
scope=subscription.scope,
steps=subscription.steps,
):
raise errors.SubscriptionIncludeGrayBizError()

if subscription.is_running():
params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions})
REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params)
logger.info(f"run subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

subscription_task = models.SubscriptionTask.objects.create(
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
Expand Down Expand Up @@ -670,3 +675,120 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) ->
result.append({"subscription_id": subscription.id, "instances": subscription_result})

return result

def clean_subscription(self, execute_actions: Dict[str, str]):
"""
:param execute_actions: {"bk-beat": "STOP", "exporter": "STOP"}
"""
try:
# 3.调用执行订阅的方法
result = self.run(actions=execute_actions)
except Exception as e:
result = {"result": False, "message": str(e)}
# 4.删除订阅,使用delete()方法才会记录删除时间
models.Subscription.objects.filter(id=self.subscription_id).delete()
return result

@staticmethod
def update_subscription(params: Dict[str, Any]):
scope = params["scope"]
try:
subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False)
except models.Subscription.DoesNotExist:
raise errors.SubscriptionNotExist({"subscription_id": params["subscription_id"]})
# 更新订阅不在序列化器中做校验,因为获取更新订阅的类型 step 需要查一次表
if tools.check_subscription_is_disabled(
subscription_identity=f"subscription -> [{subscription.id}]",
steps=subscription.steps,
scope=scope,
):
raise errors.SubscriptionIncludeGrayBizError()
if subscription.is_running():
REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

with transaction.atomic():
subscription.name = params.get("name", "")
subscription.node_type = scope["node_type"]
subscription.nodes = scope["nodes"]
subscription.bk_biz_id = scope.get("bk_biz_id")
# 避免空列表误判
if scope.get("instance_selector") is not None:
subscription.instance_selector = scope["instance_selector"]
# 策略部署新增
subscription.plugin_name = params.get("plugin_name")
subscription.bk_biz_scope = params.get("bk_biz_scope")
# 指定操作进程用户新增
if params.get("system_account"):
params["operate_info"].insert(0, params["system_account"])
subscription.operate_info = params["operate_info"]
subscription.save()

step_ids: Set[str] = set()
step_id__obj_map: Dict[str, models.SubscriptionStep] = {
step_obj.step_id: step_obj for step_obj in subscription.steps
}
step_objs_to_be_created: List[models.SubscriptionStep] = []
step_objs_to_be_updated: List[models.SubscriptionStep] = []

for index, step_info in enumerate(params["steps"]):

if step_info["id"] in step_id__obj_map:
# 存在则更新
step_obj: models.SubscriptionStep = step_id__obj_map[step_info["id"]]
step_obj.params = step_info["params"]
if "config" in step_info:
step_obj.config = step_info["config"]
step_obj.index = index
step_objs_to_be_updated.append(step_obj)
else:
# 新增场景
try:
step_obj_to_be_created: models.SubscriptionStep = models.SubscriptionStep(
subscription_id=subscription.id,
index=index,
step_id=step_info["id"],
type=step_info["type"],
config=step_info["config"],
params=step_info["params"],
)
except KeyError as e:
logger.warning(
f"update subscription[{subscription.id}] to add step[{step_info['id']}] error: "
f"err_msg -> {e}"
)
raise errors.SubscriptionUpdateError(
{
"subscription_id": subscription.id,
"msg": _("新增订阅步骤[{step_id}] 需要提供 type & config,错误信息 -> {err_msg}").format(
step_id=step_info["id"], err_msg=e
),
}
)
step_objs_to_be_created.append(step_obj_to_be_created)
step_ids.add(step_info["id"])

# 删除更新后不存在的 step
models.SubscriptionStep.objects.filter(
subscription_id=subscription.id, step_id__in=set(step_id__obj_map.keys()) - step_ids
).delete()
models.SubscriptionStep.objects.bulk_update(step_objs_to_be_updated, fields=["config", "params", "index"])
models.SubscriptionStep.objects.bulk_create(step_objs_to_be_created)
# 更新 steps 需要移除缓存
if hasattr(subscription, "_steps"):
delattr(subscription, "_steps")

result = {"subscription_id": subscription.id}

run_immediately = params["run_immediately"]
if run_immediately:
subscription_task = models.SubscriptionTask.objects.create(
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
tasks.run_subscription_task_and_create_instance.delay(
subscription, subscription_task, language=get_language()
)
result["task_id"] = subscription_task.id

return result
6 changes: 6 additions & 0 deletions apps/backend/subscription/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,9 @@ class QueryHostSubscriptionsSerializer(TargetHostSerializer):
class SubscriptionSwitchBizSerializer(serializers.Serializer):
bk_biz_ids = serializers.ListField(child=serializers.IntegerField())
action = serializers.ChoiceField(choices=SubscriptionSwithBizAction.list_choices())


class ClearnSubscriptionSerializer(serializers.Serializer):
subscription_id_list = serializers.ListField(required=True, label=_("订阅ID列表"), child=serializers.IntegerField())
action_type = serializers.ChoiceField(choices=constants.OpType, default="STOP", label=_("执行动作类型"))
is_force = serializers.BooleanField(default=False, label=_("是否强制清理"))
Loading

0 comments on commit feb708f

Please sign in to comment.