diff --git a/gcloud/analysis_statistics/tasks.py b/gcloud/analysis_statistics/tasks.py index 07c567a0c..fd9ff709e 100644 --- a/gcloud/analysis_statistics/tasks.py +++ b/gcloud/analysis_statistics/tasks.py @@ -17,34 +17,33 @@ from datetime import datetime import ujson as json +from bamboo_engine import api as bamboo_engine_api from celery import task -from celery.task import periodic_task from celery.schedules import crontab -from bamboo_engine import api as bamboo_engine_api - +from celery.task import periodic_task +from django.core.paginator import Paginator from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION from pipeline.contrib.statistics.utils import count_pipeline_tree_nodes from pipeline.core.constants import PE -from pipeline.models import PipelineTemplate from pipeline.engine import api as pipeline_api from pipeline.engine import states from pipeline.engine.utils import calculate_elapsed_time from pipeline.eri.runtime import BambooDjangoRuntime +from pipeline.models import PipelineTemplate -from gcloud.tasktmpl3.models import TaskTemplate -from gcloud.common_template.models import CommonTemplate from gcloud.analysis_statistics import variable from gcloud.analysis_statistics.models import ( + TaskflowExecutedNodeStatistics, TaskflowStatistics, + TemplateCustomVariableSummary, TemplateNodeStatistics, TemplateStatistics, - TaskflowExecutedNodeStatistics, TemplateVariableStatistics, - TemplateCustomVariableSummary, ) -from gcloud.taskflow3.models import TaskFlowInstance +from gcloud.common_template.models import CommonTemplate from gcloud.taskflow3.domains.dispatchers.task import TaskCommandDispatcher - +from gcloud.taskflow3.models import TaskFlowInstance +from gcloud.tasktmpl3.models import TaskTemplate logger = logging.getLogger("celery") @@ -379,26 +378,39 @@ def backfill_template_variable_statistics_task(): custom_variables_records.setdefault(t, {"common": 0, "project": 0})["common"] += 1 # process task template - task_templates = TaskTemplate.objects.all() + # 分页拉取,防止内存溢出 + paginator = Paginator(TaskTemplate.objects.all(), 500) + processed_count = 0 task_templates_counts = TaskTemplate.objects.all().count() - for i, template in enumerate(task_templates, 1): - logger.info( - "[backfill_template_variable_statistics_task] process {}/{} task template".format(i, task_templates_counts) - ) - if template.is_deleted: - TemplateVariableStatistics.objects.filter(project_id=template.project_id, template_id=template.id).delete() - else: - try: - custom_constants_types = variable.update_statistics( - project_id=template.project_id, template_id=template.id, pipeline_tree=template.pipeline_tree - ) - except Exception: - logger.exception( - "[backfill_template_variable_statistics_task]backfill task template {} failed".format(template.id) + for page_number in paginator.page_range: + page = paginator.page(page_number) + task_templates = page.object_list + + for template in task_templates: + processed_count += 1 + logger.info( + "[backfill_template_variable_statistics_task] process {}/{} task template".format( + processed_count, task_templates_counts ) + ) + if template.is_deleted: + TemplateVariableStatistics.objects.filter( + project_id=template.project_id, template_id=template.id + ).delete() else: - for t in custom_constants_types: - custom_variables_records.setdefault(t, {"common": 0, "project": 0})["project"] += 1 + try: + custom_constants_types = variable.update_statistics( + project_id=template.project_id, template_id=template.id, pipeline_tree=template.pipeline_tree + ) + except Exception: + logger.info( + "[backfill_template_variable_statistics_task]backfill task template {} failed".format( + template.id + ) + ) + else: + for t in custom_constants_types: + custom_variables_records.setdefault(t, {"common": 0, "project": 0})["project"] += 1 # save summary TemplateCustomVariableSummary.objects.all().delete()