Skip to content

Commit

Permalink
Delay update of artifacts until final job save
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Mar 1, 2022
1 parent cb57752 commit 7a753f9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
26 changes: 20 additions & 6 deletions awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

class RunnerCallback:
event_data_key = 'job_id'
DICT_FIELDS = ('artifacts',)
STR_FIELDS = ('job_explanation', 'job_traceback')

def __init__(self, model=None):
self.parent_workflow_job_id = None
Expand All @@ -32,10 +34,26 @@ def __init__(self, model=None):
self.safe_env = {}
self.event_ct = 0
self.model = model
self.extra_update_fields = {}

def update_model(self, pk, _attempt=0, **updates):
return update_model(self.model, pk, _attempt=0, **updates)

def delay_update(self, **kwargs):
"""Save fields to update after the job finishes. Do a merge of selected fields."""
for key, value in kwargs.items():
if key in self.extra_update_fields:
if key in self.DICT_FIELDS:
self.extra_update_fields[key].update(value)
if key in self.STR_FIELDS:
self.extra_update_fields[key] = '\n'.join([str(self.extra_update_fields[key]), str(value)])
else:
self.extra_update_fields[key] = value

def get_extra_update_fields(self):
self.extra_update_fields['emitted_events'] = self.event_ct
return self.extra_update_fields

def event_handler(self, event_data):
#
# ⚠️ D-D-D-DANGER ZONE ⚠️
Expand Down Expand Up @@ -137,8 +155,7 @@ def event_handler(self, event_data):
Handle artifacts
'''
if event_data.get('event_data', {}).get('artifact_data', {}):
self.instance.artifacts = event_data['event_data']['artifact_data']
self.instance.save(update_fields=['artifacts'])
self.delay_update(artifacts=event_data['event_data']['artifact_data'])

return False

Expand Down Expand Up @@ -204,10 +221,7 @@ def status_handler(self, status_data, runner_config):
elif status_data['status'] == 'error':
result_traceback = status_data.get('result_traceback', None)
if result_traceback:
from awx.main.signals import disable_activity_stream # Circular import

with disable_activity_stream():
self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback)
self.delay_update(result_traceback=result_traceback)


class RunnerCallbackForProjectUpdate(RunnerCallback):
Expand Down
15 changes: 7 additions & 8 deletions awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ def run(self, pk, **kwargs):
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
self.instance.websocket_emit_status("running")
status, rc = 'error', None
extra_update_fields = {}
fact_modification_times = {}
self.runner_callback.event_ct = 0

Expand Down Expand Up @@ -560,19 +559,19 @@ def run(self, pk, **kwargs):

if status in ('timeout', 'error'):
job_explanation = f"Job terminated due to {status}"
self.instance.job_explanation = self.instance.job_explanation or job_explanation
if status == 'timeout':
status = 'failed'

extra_update_fields['job_explanation'] = self.instance.job_explanation
if not self.instance.job_explanation:
self.runner_callback.delay_update(job_explanation=job_explanation)
# ensure failure notification sends even if playbook_on_stats event is not triggered
handle_success_and_failure_notifications.apply_async([self.instance.id])

except ReceptorNodeNotFound as exc:
extra_update_fields['job_explanation'] = str(exc)
self.runner_callback.delay_update(job_explanation=str(exc))
except Exception:
# this could catch programming or file system errors
extra_update_fields['result_traceback'] = traceback.format_exc()
self.runner_callback.delay_update(result_traceback=traceback.format_exc())
logger.exception('%s Exception occurred while running task', self.instance.log_format)
finally:
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct)
Expand All @@ -582,14 +581,14 @@ def run(self, pk, **kwargs):
except PostRunError as exc:
if status == 'successful':
status = exc.status
extra_update_fields['job_explanation'] = exc.args[0]
self.runner_callback.delay_update(job_explanation=exc.args[0])
if exc.tb:
extra_update_fields['result_traceback'] = exc.tb
self.runner_callback.delay_update(result_traceback=exc.tb)
except Exception:
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))

self.instance = self.update_model(pk)
self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, **extra_update_fields)
self.instance = self.update_model(pk, status=status, **self.runner_callback.get_extra_update_fields())

try:
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)
Expand Down

0 comments on commit 7a753f9

Please sign in to comment.