Skip to content

Commit

Permalink
feat: 系统任务执行状态自监控能力支持 #7457
Browse files Browse the repository at this point in the history
  • Loading branch information
lTimej committed Jun 28, 2024
1 parent ea5fbff commit 133a47f
Showing 1 changed file with 102 additions and 105 deletions.
207 changes: 102 additions & 105 deletions gcloud/contrib/monitor/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
specific language governing permissions and limitations under the License.
"""

from django.db import connection
from django.db.models import Q
from django.http import JsonResponse
from django.utils import timezone
from django.views.decorators.http import require_GET
from pipeline.eri.models import Schedule, State

from gcloud.iam_auth.intercept import iam_intercept
from gcloud.iam_auth.view_interceptors.statistics import StatisticsViewInpterceptor
from gcloud.taskflow3.models import TaskFlowInstance


@require_GET
Expand All @@ -27,34 +30,30 @@ def get_failed_task(request):
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
failed_sql = f"""SELECT
tt.id AS task_id,
cp.NAME AS project_name,
pp.NAME AS task_name
FROM
`taskflow3_taskflowinstance` AS tt,
`core_project` AS cp,
`pipeline_pipelineinstance` AS pp,
`eri_state` AS es
WHERE
pp.instance_id = es.root_id
AND tt.pipeline_instance_id = pp.id
AND tt.project_id = cp.id
AND pp.is_deleted = 0
AND pp.is_expired = 0
AND pp.is_finished = 0
AND pp.is_revoked = 0
AND pp.is_started = 1
AND es.NAME = "FAILED"
ORDER BY
pp.id DESC
LIMIT
{offset},{limit}"""
with connection.cursor() as cursor:
cursor.execute(failed_sql)
failed_tasks = [
{"task_id": item[0], "project_name": item[1], "task_name": item[2]} for item in cursor.fetchall()
]
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id")
root_ids = [state["root_id"] for state in states]
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=root_ids,
)
.values("id", "project__name", "pipeline_instance__name")[offset : limit + offset]
)
failed_tasks = [
{
"task_id": task["id"],
"project_name": task["project__name"],
"task_name": task["pipeline_instance__name"],
}
for task in tasks
]
return JsonResponse({"result": True, "data": failed_tasks})


Expand All @@ -66,64 +65,53 @@ def get_executing_task(request):
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
failed_states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id")
failed_root_ids = [state["root_id"] for state in failed_states]
# 失败的任务
failed_tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=failed_root_ids,
)
.values(
"pipeline_instance__id",
)[offset : limit + offset]
)
failed_task_ids = [task["pipeline_instance__id"] for task in failed_tasks]

def get_data(offset, limit):
failed_sql = f"""SELECT
pp.id
FROM
`pipeline_pipelineinstance` AS pp,
`eri_state` AS es
WHERE
pp.instance_id = es.root_id
AND pp.is_deleted = 0
AND pp.is_expired = 0
AND pp.is_finished = 0
AND pp.is_revoked = 0
AND pp.is_started = 1
AND es.NAME = "FAILED"
ORDER BY
pp.id DESC
LIMIT
{offset}, {limit}"""

with connection.cursor() as cursor:
cursor.execute(failed_sql)
failed_task_ids = [item[0] for item in cursor.fetchall()]
no_failed_sql = f"""SELECT
pp.id,
tt.id AS task_id,
cp.NAME AS project_name,
pp.NAME AS task_name
FROM
`taskflow3_taskflowinstance` AS tt,
`core_project` AS cp,
`pipeline_pipelineinstance` AS pp
WHERE
tt.pipeline_instance_id = pp.id
AND tt.project_id = cp.id
AND pp.is_deleted = 0
AND pp.is_expired = 0
AND pp.is_finished = 0
AND pp.is_revoked = 0
AND pp.is_started = 1
ORDER BY
pp.id DESC
LIMIT
{offset}, {limit}"""
with connection.cursor() as cursor:
cursor.execute(no_failed_sql)
no_failed_tasks = [
{"task_id": item[1], "project_name": item[2], "task_name": item[3]}
for item in cursor.fetchall()
if item[0] not in failed_task_ids
]
return no_failed_tasks

no_failed_tasks = []
for i in range(offset, 5 * limit + offset, limit):
if len(no_failed_tasks) < limit:
no_failed_tasks.extend(get_data(i, limit))
return JsonResponse({"result": True, "data": no_failed_tasks})
states = State.objects.filter(~Q(name="FAILED")).filter(started_time__gte=start_time).values("root_id")
root_ids = [state["root_id"] for state in states]
# 非失败的任务
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=root_ids,
)
.values("id", "project__name", "pipeline_instance__name", "pipeline_instance__id")[offset : limit + offset]
)
# 求差获得执行中的任务
executing_tasks = [
{
"task_id": task["id"],
"project_name": task["project__name"],
"task_name": task["pipeline_instance__name"],
}
for task in tasks
if task["pipeline_instance__id"] not in failed_task_ids
]
return JsonResponse({"result": True, "data": executing_tasks})


@require_GET
Expand All @@ -134,23 +122,32 @@ def get_schedule_times(request):
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
schedule_times_sql = f"""SELECT
pp.id,
pp.creator,
esc.schedule_times
FROM
eri_schedule AS esc,
eri_state AS es,
pipeline_pipelineinstance AS pp
WHERE
esc.node_id = es.node_id
AND es.root_id = pp.instance_id
AND esc.scheduling = 0
ORDER BY
esc.schedule_times DESC
LIMIT
{offset},{limit}"""
with connection.cursor() as cursor:
cursor.execute(schedule_times_sql)
schedule_times = [{"id": item[0], "creator": item[1], "schedule_times": item[2]} for item in cursor.fetchall()]
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
schedules = Schedule.objects.filter(scheduling=False).values("node_id", "schedule_times")
schedules = {schedule["node_id"]: schedule["schedule_times"] for schedule in schedules}
states = State.objects.filter(started_time__gte=start_time, node_id__in=list(schedules.keys())).values(
"node_id", "root_id"
)
root_ids = {state["root_id"]: schedules[state["node_id"]] for state in states}
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(pipeline_instance__instance_id__in=list(root_ids.keys()))
.values(
"id",
"project__name",
"pipeline_instance__name",
"pipeline_instance__creator",
"pipeline_instance__instance_id",
)[offset : offset + limit]
)
schedule_times = [
{
"id": task["id"],
"project_name": task["project__name"],
"creator": task["pipeline_instance__name"],
"schedule_times": root_ids[task["pipeline_instance__instance_id"]],
}
for task in tasks
]
return JsonResponse({"result": True, "data": schedule_times})

0 comments on commit 133a47f

Please sign in to comment.