Skip to content

Commit

Permalink
optimization: 任务历史搜索优化 (closed #1856)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt authored and ping15 committed Nov 3, 2023
1 parent 9d5284f commit 52d9351
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 61 deletions.
34 changes: 7 additions & 27 deletions apps/node_man/handlers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from apps.node_man.handlers.cloud import CloudHandler
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.node_man.handlers.host import HostHandler
from apps.node_man.tools import JobTools
from apps.utils import APIModel
from apps.utils.basic import filter_values, to_int_or_default
from apps.utils.local import get_request_username
Expand Down Expand Up @@ -169,30 +170,6 @@ def list(self, params: dict, username: str):
job_ids.add(job_id)
kwargs["id__in"] = job_ids

# 业务权限
search_biz_ids = params.get("bk_biz_id")
all_biz_ids = set(all_biz_info.keys())

if search_biz_ids:
# 字典的 in 比列表性能更高
biz_scope = [bk_biz_id for bk_biz_id in search_biz_ids if bk_biz_id in biz_info]
else:
biz_scope = biz_permission

if not biz_scope:
return {"total": 0, "list": []}

if set(biz_scope) & all_biz_ids == all_biz_ids:
# 查询全部业务且拥有全部业务权限
biz_scope_query_q = Q()
else:
biz_scope_query_q = reduce(
operator.or_, [Q(bk_biz_scope__contains=bk_biz_id) for bk_biz_id in biz_scope], Q()
)
# 仅查询所有业务时,自身创建的 job 可见
if not search_biz_ids:
biz_scope_query_q |= Q(created_by=username)

# ip 搜索
inner_ip_query_q = Q()
if params.get("inner_ip_list"):
Expand All @@ -218,10 +195,13 @@ def list(self, params: dict, username: str):

# 过滤None值并筛选Job
# 此处不过滤空列表(filter_empty=False),job_id, job_type 存在二次解析,若全部值非法得到的是空列表,期望应是查不到数据
job_result = models.Job.objects.filter(biz_scope_query_q, inner_ip_query_q, **filter_values(kwargs))
job_result = JobTools().get_job_queryset_with_biz_scope(
all_biz_info, biz_info, biz_permission, params.get("bk_biz_id"), kwargs
)
if job_result is None:
return {"total": 0, "list": []}

# 过滤没有业务的Job
job_result = job_result.filter(~Q(bk_biz_scope__isnull=True) & ~Q(bk_biz_scope={}))
job_result = job_result.filter(inner_ip_query_q)

# 排序
if params.get("sort"):
Expand Down
49 changes: 33 additions & 16 deletions apps/node_man/handlers/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from apps.node_man.handlers.cloud import CloudHandler
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.node_man.handlers.install_channel import InstallChannelHandler
from apps.node_man.tools import JobTools
from apps.utils import APIModel


Expand Down Expand Up @@ -180,23 +181,40 @@ def fetch_host_condition(self):
]
)

def fetch_job_list_condition(self, job_category):
def fetch_job_list_condition(self, job_category, params=None):
"""
获取任务历史接口的条件
:return: Host接口所有条件
"""
params = params or {}
kwargs = {
"start_time__gte": params.get("start_time"),
"start_time__lte": params.get("end_time"),
}

# 获得业务id与名字的映射关系(用户有权限获取的业务)
biz_permission = list(CmdbHandler().biz_id_name({"action": constants.IamActionType.task_history_view}))
all_biz_info = CmdbHandler().biz_id_name_without_permission()
biz_info = CmdbHandler().biz_id_name({"action": constants.IamActionType.task_history_view})
biz_permission = list(biz_info.keys())

if job_category == "job":
job_type = constants.JOB_TUPLE
else:
job_type = constants.JOB_TYPE_MAP[job_category.split("_")[0]]
# 获得4列的所有值
job_condition = list(
models.Job.objects.filter(job_type__in=job_type)
.values("created_by", "job_type", "status", "bk_biz_scope", "subscription_id")
.distinct()

job_result = JobTools.get_job_queryset_with_biz_scope(
all_biz_info, biz_info, biz_permission, params.get("bk_biz_ids"), kwargs
)
if job_result is None:
return self.filter_empty_children(
[
{"name": _("任务ID"), "id": "job_id"},
{"name": _("IP"), "id": "inner_ip_list"},
]
)

job_result = job_result.filter(job_type__in=job_type).values_list(
"created_by", "job_type", "status", "subscription_id"
)

# 初始化各个条件集合
Expand All @@ -205,13 +223,11 @@ def fetch_job_list_condition(self, job_category):
statuses = set()
subscription_ids = set()

for job in job_condition:
# 判断权限
if set(job["bk_biz_scope"]) - set(biz_permission) == set():
created_bys.add(job["created_by"])
job_types.add(job["job_type"])
statuses.add(job["status"])
subscription_ids.add(job["subscription_id"])
for created_by, job_type, status, subscription_id in job_result:
created_bys.add(created_by)
job_types.add(job_type)
statuses.add(status)
subscription_ids.add(subscription_id)

created_bys_children = [
{"name": created_by, "id": created_by} for created_by in created_bys if created_by != ""
Expand Down Expand Up @@ -469,17 +485,18 @@ def fetch_os_type_children(os_types: Tuple = constants.OsType):
os_type_children.append({"id": os_type, "name": constants.OS_CHN.get(os_type, os_type)})
return os_type_children

def filter_condition(self, category):
def filter_condition(self, category, params=None):
"""
获取过滤条件
:param category: 接口, host, cloud, Job等
:param params: 请求参数的字典
:return: 某接口所有条件
"""

if category == "host":
return self.fetch_host_condition()
elif category == "job":
return self.fetch_job_list_condition("job")
return self.fetch_job_list_condition("job", params=params)
elif category == "agent_job":
return self.fetch_job_list_condition("agent_job")
elif category == "proxy_job":
Expand Down
9 changes: 9 additions & 0 deletions apps/node_man/serializers/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,12 @@ class JobSettingSerializer(serializers.Serializer):
install_download_limit_speed = serializers.IntegerField(label=_("安装下载限速"), max_value=JOB_MAX_VALUE, min_value=0)
parallel_install_number = serializers.IntegerField(label=_("并行安装数"), max_value=JOB_MAX_VALUE, min_value=0)
node_man_log_level = serializers.ChoiceField(label=_("节点管理日志级别"), choices=list(NODE_MAN_LOG_LEVEL))


class FilterConditionSerializer(serializers.Serializer):
category = serializers.CharField(label=_("分类"), required=False, default="")
bk_biz_ids = serializers.ListField(label=_("业务列表"), required=False, default=[])

# 时间范围
start_time = serializers.DateTimeField(label=_("起始时间"), required=False)
end_time = serializers.DateTimeField(label=_("终止时间"), required=False)
127 changes: 125 additions & 2 deletions apps/node_man/tests/test_handlers/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
"""
import random
from unittest.mock import patch
import os

from django.conf import settings
from django.test import override_settings
from django.utils.translation import ugettext as _

from apps.node_man import constants as const
from apps.node_man import tools
Expand All @@ -28,6 +30,8 @@
)
from apps.utils.unittest import testcase

FILTER_CONDITION_JOB_COUNT_FOR_TEST = int(os.environ.get("FILTER_CONDITION_JOB_COUNT_FOR_TEST", 100000))


class TestMeta(testcase.CustomAPITestCase):
def fetch_host_unique_col_count(self, col):
Expand Down Expand Up @@ -58,11 +62,13 @@ def fetch_cloud_unique_col_count(self, col):
"""
return Cloud.objects.values_list(col, flat=True).distinct().count()

def fetch_Job_unique_col_count(self):
def fetch_Job_unique_col_count(self, search_business=None):
"""
返回Job中指定列的唯一值
:return: 唯一值的数量
"""
search_business = search_business or SEARCH_BUSINESS

# 获得4列的所有值
job_condition = list(Job.objects.values("created_by", "job_type", "status", "bk_biz_scope").distinct())

Expand All @@ -73,13 +79,27 @@ def fetch_Job_unique_col_count(self):

for job in job_condition:
# 判断权限
if set(job["bk_biz_scope"]) - {biz["bk_biz_id"] for biz in SEARCH_BUSINESS} == set():
if any(biz_id in {biz["bk_biz_id"] for biz in search_business} for biz_id in set(job["bk_biz_scope"])):
created_bys.add(job["created_by"])
job_types.add(job["job_type"])
statuses.add(job["status"])

return created_bys, job_types, statuses

@staticmethod
def generate_random_biz_scope(sample_biz_ids, min_biz_count=1, max_biz_count=5):
"""
从业务列表中随机获取其中几个业务
:param sample_biz_ids: 业务列表
:param min_biz_count: 最少获取几个业务
:param max_biz_count: 最多获取几个业务
:return: 子业务列表
"""
def wrapper():
return random.sample(sample_biz_ids, k=random.randint(min_biz_count, max_biz_count))

return wrapper

@patch("apps.node_man.handlers.cmdb.client_v2", MockClient)
def test_search(self):
# 服务商搜索接口
Expand Down Expand Up @@ -271,3 +291,106 @@ def test_global_settings__install_default_values(self):
},
}
self.assertDictEqual(kv, {GlobalSettings.KeyEnum.INSTALL_DEFAULT_VALUES.value: expect_install_default_values})

@patch("apps.node_man.handlers.cmdb.get_request_username", return_value="admin")
def test_job_filter_condition_with_large_biz_and_job(self, *args, **kwargs):
biz_count = 4000
biz_ids = list(range(1, biz_count + 1))
create_job(
FILTER_CONDITION_JOB_COUNT_FOR_TEST,
generate_bk_biz_scope_func=self.generate_random_biz_scope(sample_biz_ids=biz_ids)
)

create_job(
FILTER_CONDITION_JOB_COUNT_FOR_TEST // 10,
start_id=FILTER_CONDITION_JOB_COUNT_FOR_TEST + 1,
bk_biz_scope={}
)

kwargs = {
"bk_biz_ids": random.sample(list(range(1, biz_count)), k=random.randint(3000, 3900))
}
filter_condition = MetaHandler().filter_condition("job", kwargs)

id__filter_item_map = {filter_item["id"]: filter_item for filter_item in filter_condition}

api_statuses = {child["id"] for child in id__filter_item_map["status"]["children"]}
api_op_types = {child["id"] for child in id__filter_item_map["op_type"]["children"]}
api_step_types = {child["id"] for child in id__filter_item_map["step_type"]["children"]}
api_created_bys = {child["id"] for child in id__filter_item_map["created_by"]["children"]}

created_bys, job_types, statuses = self.fetch_Job_unique_col_count(
search_business=[
{"bk_biz_id": bk_biz_id,
"bk_biz_name": ""} for bk_biz_id in set(range(1, 4001))
])

job_type_infos = [tools.JobTools.unzip_job_type(job_type) for job_type in job_types]

self.assertEqual(id__filter_item_map["job_id"], {"name": "任务ID", "id": "job_id"})
self.assertEqual(api_op_types, {job_type_info["op_type"] for job_type_info in job_type_infos})
self.assertEqual(api_step_types, {job_type_info["step_type"] for job_type_info in job_type_infos})
self.assertEqual(api_created_bys, created_bys)
self.assertEqual(api_statuses, statuses)

@patch("apps.node_man.handlers.cmdb.get_request_username", return_value="admin")
def test_job_filter_condition_with_time_filter(self, *args, **kwargs):
create_job(1, created_by="test1")
create_job(1, created_by="test2", start_id=2)

# created_job中手动指定start_time无效,具体原因不清楚
Job.objects.filter(id=1).update(start_time="2023-10-01 12:00:00")
Job.objects.filter(id=2).update(start_time="2023-10-03 12:00:00")

kwargs_list = [
{
"start_time": "2023-10-01 12:00:00",
"end_time": "2023-10-02 12:00:00",
},
{
"start_time": "2023-10-02 12:00:00",
"end_time": "2023-10-03 12:00:00",
},
{
"start_time": "2023-10-01 12:00:00",
"end_time": "2023-10-03 12:00:00",
},
{

}
]
expected_created_by_lens = [1, 1, 2, 2]
expected_created_by_names = ["test1", "test2", ["test1", "test2"], ["test1", "test2"]]

for kwargs, expected_created_by_len, expected_created_by_name in zip(
kwargs_list, expected_created_by_lens, expected_created_by_names):
filter_condition = MetaHandler().filter_condition("job", params=kwargs)
created_by_info = [single_condition for single_condition in filter_condition
if single_condition["id"] == "created_by"][0]

# 检验长度
self.assertEqual(len(created_by_info["children"]), expected_created_by_len)

# 检验created_by
if expected_created_by_len > 1:
self.assertEqual(sorted(created_by["name"] for created_by in created_by_info["children"]),
expected_created_by_name)
else:
self.assertEqual(created_by_info["children"][0]["name"], expected_created_by_name)

@patch("apps.node_man.handlers.cmdb.get_request_username", return_value="admin")
def test_job_filter_condition_with_nonexistent_biz(self, *args, **kwargs):
number = 1000

create_job(number, generate_bk_biz_scope_func=self.generate_random_biz_scope(
sample_biz_ids=[1, 2, 3, 4, 5], max_biz_count=3
))
create_job(number // 10, bk_biz_scope={}, start_id=number + 1)

result = MetaHandler().filter_condition("job", params={
"bk_biz_ids": [-1, -2]
})
self.assertEqual(result, [
{"name": _("任务ID"), "id": "job_id"},
{"name": _("IP"), "id": "inner_ip_list"},
])
25 changes: 18 additions & 7 deletions apps/node_man/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,20 +674,31 @@ def create_ap(number):
AccessPoint.objects.bulk_create(ap_to_create)


def create_job(number, id=None, end_time=None, bk_biz_scope=None, task_id_list=None, created_by=None):
def create_job(
number,
id=None,
end_time=None,
bk_biz_scope=None,
task_id_list=None,
created_by=None,
generate_bk_biz_scope_func=None,
batch_size=1000,
start_id=1,
):
job_types = list(chain(*list(constants.JOB_TYPE_MAP.values())))
job_types = [
job_type for job_type in job_types if
tools.JobTools.unzip_job_type(job_type)["op_type"] in constants.OP_TYPE_TUPLE
job_type
for job_type in job_types
if tools.JobTools.unzip_job_type(job_type)["op_type"] in constants.OP_TYPE_TUPLE
]

if bk_biz_scope == {} or bk_biz_scope:
if bk_biz_scope == {} or bk_biz_scope or callable(generate_bk_biz_scope_func):
pass
else:
bk_biz_scope = [[biz["bk_biz_id"] for biz in SEARCH_BUSINESS][random.randint(0, 10)]]

jobs = []
for i in range(1, number + 1):
for i in range(start_id, number + start_id):
job = Job(
id=id or i,
job_type=random.choice(job_types),
Expand All @@ -696,14 +707,14 @@ def create_job(number, id=None, end_time=None, bk_biz_scope=None, task_id_list=N
random.randint(0, len(constants.JobStatusType.get_choices()) - 1)
],
statistics={"success_count": 0, "failed_count": 0, "running_count": 0, "total_count": 0},
bk_biz_scope=bk_biz_scope,
bk_biz_scope=generate_bk_biz_scope_func() if callable(generate_bk_biz_scope_func) else bk_biz_scope,
subscription_id=random.randint(1, 100),
task_id_list=task_id_list or [random.randint(1, 100)],
created_by=created_by or "admin",
end_time=end_time,
)
jobs.append(job)
jobs = Job.objects.bulk_create(jobs)
jobs = Job.objects.bulk_create(jobs, batch_size=batch_size)
job_ids = [job.id for job in jobs]
return job_ids

Expand Down
Loading

0 comments on commit 52d9351

Please sign in to comment.