diff --git a/gcloud/contrib/monitor/views.py b/gcloud/contrib/monitor/views.py index a98a8a146..06f5379dc 100644 --- a/gcloud/contrib/monitor/views.py +++ b/gcloud/contrib/monitor/views.py @@ -11,12 +11,15 @@ specific language governing permissions and limitations under the License. """ -from django.db import connection +from django.db.models import Q from django.http import JsonResponse +from django.utils import timezone from django.views.decorators.http import require_GET +from pipeline.eri.models import Schedule, State from gcloud.iam_auth.intercept import iam_intercept from gcloud.iam_auth.view_interceptors.statistics import StatisticsViewInpterceptor +from gcloud.taskflow3.models import TaskFlowInstance @require_GET @@ -27,34 +30,30 @@ def get_failed_task(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) - failed_sql = f"""SELECT - tt.id AS task_id, - cp.NAME AS project_name, - pp.NAME AS task_name - FROM - `taskflow3_taskflowinstance` AS tt, - `core_project` AS cp, - `pipeline_pipelineinstance` AS pp, - `eri_state` AS es - WHERE - pp.instance_id = es.root_id - AND tt.pipeline_instance_id = pp.id - AND tt.project_id = cp.id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - AND es.NAME = "FAILED" - ORDER BY - pp.id DESC - LIMIT - {offset},{limit}""" - with connection.cursor() as cursor: - cursor.execute(failed_sql) - failed_tasks = [ - {"task_id": item[0], "project_name": item[1], "task_name": item[2]} for item in cursor.fetchall() - ] + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id") + root_ids = [state["root_id"] for state in states] + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=root_ids, + ) + .values("id", "project__name", "pipeline_instance__name")[offset : limit + offset] + ) + failed_tasks = [ + { + "task_id": task["id"], + "project_name": task["project__name"], + "task_name": task["pipeline_instance__name"], + } + for task in tasks + ] return JsonResponse({"result": True, "data": failed_tasks}) @@ -66,64 +65,53 @@ def get_executing_task(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + failed_states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id") + failed_root_ids = [state["root_id"] for state in failed_states] + # 失败的任务 + failed_tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=failed_root_ids, + ) + .values( + "pipeline_instance__id", + )[offset : limit + offset] + ) + failed_task_ids = [task["pipeline_instance__id"] for task in failed_tasks] - def get_data(offset, limit): - failed_sql = f"""SELECT - pp.id - FROM - `pipeline_pipelineinstance` AS pp, - `eri_state` AS es - WHERE - pp.instance_id = es.root_id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - AND es.NAME = "FAILED" - ORDER BY - pp.id DESC - LIMIT - {offset}, {limit}""" - - with connection.cursor() as cursor: - cursor.execute(failed_sql) - failed_task_ids = [item[0] for item in cursor.fetchall()] - no_failed_sql = f"""SELECT - pp.id, - tt.id AS task_id, - cp.NAME AS project_name, - pp.NAME AS task_name - FROM - `taskflow3_taskflowinstance` AS tt, - `core_project` AS cp, - `pipeline_pipelineinstance` AS pp - WHERE - tt.pipeline_instance_id = pp.id - AND tt.project_id = cp.id - AND pp.is_deleted = 0 - AND pp.is_expired = 0 - AND pp.is_finished = 0 - AND pp.is_revoked = 0 - AND pp.is_started = 1 - ORDER BY - pp.id DESC - LIMIT - {offset}, {limit}""" - with connection.cursor() as cursor: - cursor.execute(no_failed_sql) - no_failed_tasks = [ - {"task_id": item[1], "project_name": item[2], "task_name": item[3]} - for item in cursor.fetchall() - if item[0] not in failed_task_ids - ] - return no_failed_tasks - - no_failed_tasks = [] - for i in range(offset, 5 * limit + offset, limit): - if len(no_failed_tasks) < limit: - no_failed_tasks.extend(get_data(i, limit)) - return JsonResponse({"result": True, "data": no_failed_tasks}) + states = State.objects.filter(~Q(name="FAILED")).filter(started_time__gte=start_time).values("root_id") + root_ids = [state["root_id"] for state in states] + # 非失败的任务 + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter( + pipeline_instance__is_deleted=False, + pipeline_instance__is_expired=False, + pipeline_instance__is_finished=False, + pipeline_instance__is_revoked=False, + pipeline_instance__is_started=True, + pipeline_instance__instance_id__in=root_ids, + ) + .values("id", "project__name", "pipeline_instance__name", "pipeline_instance__id")[offset : limit + offset] + ) + # 求差获得执行中的任务 + executing_tasks = [ + { + "task_id": task["id"], + "project_name": task["project__name"], + "task_name": task["pipeline_instance__name"], + } + for task in tasks + if task["pipeline_instance__id"] not in failed_task_ids + ] + return JsonResponse({"result": True, "data": executing_tasks}) @require_GET @@ -134,23 +122,32 @@ def get_schedule_times(request): """ limit = int(request.GET.get("limit", 100)) offset = int(request.GET.get("offset", 0)) - schedule_times_sql = f"""SELECT - pp.id, - pp.creator, - esc.schedule_times - FROM - eri_schedule AS esc, - eri_state AS es, - pipeline_pipelineinstance AS pp - WHERE - esc.node_id = es.node_id - AND es.root_id = pp.instance_id - AND esc.scheduling = 0 - ORDER BY - esc.schedule_times DESC - LIMIT - {offset},{limit}""" - with connection.cursor() as cursor: - cursor.execute(schedule_times_sql) - schedule_times = [{"id": item[0], "creator": item[1], "schedule_times": item[2]} for item in cursor.fetchall()] + st = timezone.now() - timezone.timedelta(days=30) + start_time = request.GET.get("start_time", st) + schedules = Schedule.objects.filter(scheduling=False).values("node_id", "schedule_times") + schedules = {schedule["node_id"]: schedule["schedule_times"] for schedule in schedules} + states = State.objects.filter(started_time__gte=start_time, node_id__in=list(schedules.keys())).values( + "node_id", "root_id" + ) + root_ids = {state["root_id"]: schedules[state["node_id"]] for state in states} + tasks = ( + TaskFlowInstance.objects.select_related("project", "pipeline_instance") + .filter(pipeline_instance__instance_id__in=list(root_ids.keys())) + .values( + "id", + "project__name", + "pipeline_instance__name", + "pipeline_instance__creator", + "pipeline_instance__instance_id", + )[offset : offset + limit] + ) + schedule_times = [ + { + "id": task["id"], + "project_name": task["project__name"], + "creator": task["pipeline_instance__name"], + "schedule_times": root_ids[task["pipeline_instance__instance_id"]], + } + for task in tasks + ] return JsonResponse({"result": True, "data": schedule_times})