From 7a753f970971ae5d70cdb61e04878f9cdb4aca3a Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 28 Feb 2022 16:32:55 -0500 Subject: [PATCH] Delay update of artifacts until final job save --- awx/main/tasks/callback.py | 26 ++++++++++++++++++++------ awx/main/tasks/jobs.py | 15 +++++++-------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index ccd9c39815fc..8721ec67422b 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -21,6 +21,8 @@ class RunnerCallback: event_data_key = 'job_id' + DICT_FIELDS = ('artifacts',) + STR_FIELDS = ('job_explanation', 'job_traceback') def __init__(self, model=None): self.parent_workflow_job_id = None @@ -32,10 +34,26 @@ def __init__(self, model=None): self.safe_env = {} self.event_ct = 0 self.model = model + self.extra_update_fields = {} def update_model(self, pk, _attempt=0, **updates): return update_model(self.model, pk, _attempt=0, **updates) + def delay_update(self, **kwargs): + """Save fields to update after the job finishes. Do a merge of selected fields.""" + for key, value in kwargs.items(): + if key in self.extra_update_fields: + if key in self.DICT_FIELDS: + self.extra_update_fields[key].update(value) + if key in self.STR_FIELDS: + self.extra_update_fields[key] = '\n'.join([str(self.extra_update_fields[key]), str(value)]) + else: + self.extra_update_fields[key] = value + + def get_extra_update_fields(self): + self.extra_update_fields['emitted_events'] = self.event_ct + return self.extra_update_fields + def event_handler(self, event_data): # # ⚠️ D-D-D-DANGER ZONE ⚠️ @@ -137,8 +155,7 @@ def event_handler(self, event_data): Handle artifacts ''' if event_data.get('event_data', {}).get('artifact_data', {}): - self.instance.artifacts = event_data['event_data']['artifact_data'] - self.instance.save(update_fields=['artifacts']) + self.delay_update(artifacts=event_data['event_data']['artifact_data']) return False @@ -204,10 +221,7 @@ def status_handler(self, status_data, runner_config): elif status_data['status'] == 'error': result_traceback = status_data.get('result_traceback', None) if result_traceback: - from awx.main.signals import disable_activity_stream # Circular import - - with disable_activity_stream(): - self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback) + self.delay_update(result_traceback=result_traceback) class RunnerCallbackForProjectUpdate(RunnerCallback): diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 7dea3830141a..56a674f968c2 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -432,7 +432,6 @@ def run(self, pk, **kwargs): self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords self.instance.websocket_emit_status("running") status, rc = 'error', None - extra_update_fields = {} fact_modification_times = {} self.runner_callback.event_ct = 0 @@ -560,19 +559,19 @@ def run(self, pk, **kwargs): if status in ('timeout', 'error'): job_explanation = f"Job terminated due to {status}" - self.instance.job_explanation = self.instance.job_explanation or job_explanation if status == 'timeout': status = 'failed' - extra_update_fields['job_explanation'] = self.instance.job_explanation + if not self.instance.job_explanation: + self.runner_callback.delay_update(job_explanation=job_explanation) # ensure failure notification sends even if playbook_on_stats event is not triggered handle_success_and_failure_notifications.apply_async([self.instance.id]) except ReceptorNodeNotFound as exc: - extra_update_fields['job_explanation'] = str(exc) + self.runner_callback.delay_update(job_explanation=str(exc)) except Exception: # this could catch programming or file system errors - extra_update_fields['result_traceback'] = traceback.format_exc() + self.runner_callback.delay_update(result_traceback=traceback.format_exc()) logger.exception('%s Exception occurred while running task', self.instance.log_format) finally: logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct) @@ -582,14 +581,14 @@ def run(self, pk, **kwargs): except PostRunError as exc: if status == 'successful': status = exc.status - extra_update_fields['job_explanation'] = exc.args[0] + self.runner_callback.delay_update(job_explanation=exc.args[0]) if exc.tb: - extra_update_fields['result_traceback'] = exc.tb + self.runner_callback.delay_update(result_traceback=exc.tb) except Exception: logger.exception('{} Post run hook errored.'.format(self.instance.log_format)) self.instance = self.update_model(pk) - self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, **extra_update_fields) + self.instance = self.update_model(pk, status=status, **self.runner_callback.get_extra_update_fields()) try: self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)