Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8.0] Introduce Scout Agent and Optimizer #7251

Open
wants to merge 20 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/ScoutingJobStatusAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
""" Agent for scout job framework to monitor scout status
and update main job status according to scout status.
"""

__RCSID__ = "$Id$"
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB


class ScoutingJobStatusAgent(AgentModule):
"""
The specific agents must provide the following methods:
- initialize() for initial settings
- beginExecution()
- execute() - the main method called in the agent cycle
- endExecution()
- finalize() - the graceful exit of the method, this one is usually used
for the agent restart
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, *args, **kwargs):
""" c'tor
"""
AgentModule.__init__(self, *args, **kwargs)

self.jobDB = None
self.logDB = None

#############################################################################
def initialize(self):
"""Sets defaults
"""

self.am_setOption('PollingTime', 120)
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
self.jobDB = JobDB()
self.logDB = JobLoggingDB()

return S_OK()

#############################################################################
def beginExecution(self):

self.totalScoutJobs = Operations().getValue('WorkloadManagement/Scouting/totalScoutJobs', 10)
self.criteriaFailedRate = Operations().getValue('WorkloadManagement/Scouting/criteriaFailedRate', 0.5)
self.criteriaSucceededRate = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceededRate', 0.3)
self.criteriaStalledRate = Operations().getValue('WorkloadManagement/Scouting/criteriaStalledRate', 1.0)
self.criteriaFailed = Operations().getValue('WorkloadManagement/Scouting/criteriaFailed',
int(self.totalScoutJobs * self.criteriaFailedRate))
self.criteriaSucceeded = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceeded',
int(self.totalScoutJobs * self.criteriaSucceededRate))
self.criteriaStalled = Operations().getValue('WorkloadManagement/Scouting/criteriaStalled',
int(self.totalScoutJobs * self.criteriaStalledRate))
Comment on lines +39 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this go via the Agent options, or is this used also in other places?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case you have to add the agent to the ConfigTemplate file?


if int(self.totalScoutJobs * self.criteriaFailedRate) > self.criteriaFailed:
self.criteriaFailedRate = int(self.criteriaFailed / self.totalScoutJobs)
if int(self.totalScoutJobs * self.criteriaSucceededRate) > self.criteriaSucceeded:
self.criteriaSucceededRate = int(self.criteriaSucceeded / self.totalScoutJobs)
if int(self.totalScoutJobs * self.criteriaStalledRate) > self.criteriaStalled:
self.criteriaStalledRate = int(self.criteriaStalled / self.totalScoutJobs)

self.log.info('Scouting parameters: Total: %s, Succeeded: %s(%s), Failed: %s(%s), Stalled: %s(%s), '
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
% (self.totalScoutJobs, self.criteriaSucceeded, self.criteriaSucceededRate, self.criteriaFailed, self.criteriaFailedRate, self.criteriaStalled, self.criteriaStalledRate))

return S_OK()

def execute(self):
"""The PilotAgent execution method.
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
"""
result = self.jobDB.selectJobs({'Status': 'Scouting'})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobStateUpdateClient doesn't seem to have a method that does what we want here. However, it seems that getJobs() in JobMonitoringHandler might have this functionality. Would you recommend using getJobs, or implementing this in JobStateUpdateHandler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use getJobs.

if not result['OK']:
return S_ERROR()
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

joblist = result['Value']
if not joblist:
self.log.info('No Jobs with scouting status. Skipping this cycle')
return S_OK()

self.log.info('Check %s Scouting jobs' % len(joblist))
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug('joblist: ' % joblist)
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

scoutIDdict = {}
for jobID in joblist:
result = self.jobDB.getJobParameters(int(jobID), ['ScoutID']) # <lowest jobID>:<Highest jobID>
if not result['OK']:
self.log.warn(result['Message'])
continue
if not result['Value'].get(int(jobID)):
continue

scoutID = result['Value'][int(jobID)]['ScoutID']
scoutStatus = scoutIDdict.get(scoutID)
if not scoutStatus:
result = self.__getScoutStatus(scoutID)
if not result['OK']:
self.log.warn(result['Message'])
continue
scoutStatus = result['Value']

scoutIDdict[scoutID] = scoutStatus
if scoutStatus['Status'] == 'NotComplete':
self.log.verbose("%s: skipping since corresponding scout does not complete yet." % jobID)
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
continue
else:
result = self.__updateJobStatus(jobID, status=scoutStatus['Status'],
minorstatus=scoutStatus['MinorStatus'],
appstatus=scoutStatus['appstatus'])
if not result['OK']:
self.log.warn(result['Message'])

self.log.info('final scoutIDdict:%s' % scoutIDdict)
return S_OK()

def __getScoutStatus(self, scoutid):
ids = scoutid.split(':')
scoutjoblist = list(range(int(ids[0]), int(ids[1]) + 1))

result = self.jobDB.getJobsAttributes(scoutjoblist, ['Status', 'Site'])
if not result['OK']:
return S_ERROR(result['Message'])
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

donejoblist = []
donesitelist = []
failedjoblist = []
failedsitelist = []
stalledjoblist = []
scoutjobs = result['Value'].keys()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beware, this is python3 and not py2 code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safer:

Suggested change
scoutjobs = result['Value'].keys()
scoutjobs = list(result['Value'])

for scoutjob in scoutjobs:
status = result['Value'][scoutjob]['Status']
site = result['Value'][scoutjob]['Site']
jobid = scoutjob
if status == 'Done':
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
donejoblist.append(jobid)
donesitelist.append(site)
elif status == 'Failed':
failedjoblist.append(jobid)
failedsitelist.append(site)
elif status == 'Stalled':
stalledjoblist.append(jobid)

if self.criteriaSucceeded > len(scoutjobs):
criteriaSucceeded = max(int(len(scoutjobs) * self.criteriaSucceededRate), 1)
self.log.verbose('criteriaSucceeded = %s' % self.criteriaSucceeded)
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
else:
criteriaSucceeded = self.criteriaSucceeded
self.log.debug('criteriaSucceeded = %s' % self.criteriaSucceeded)

if self.criteriaFailed > len(scoutjobs):
criteriaFailed = max(int(len(scoutjobs) * self.criteriaFailedRate), 1)
self.log.verbose('criteriaFailed = %s' % self.criteriaFailed)
else:
criteriaFailed = self.criteriaFailed
self.log.debug('criteriaFailed = %s' % self.criteriaFailed)

if self.criteriaStalled > len(scoutjobs):
criteriaStalled = max(int(len(scoutjobs) * self.criteriaStalledRate), 1)
self.log.verbose('criteriaStalled = %s' % self.criteriaStalled)
else:
criteriaStalled = self.criteriaStalled
self.log.debug('criteriaStalled = %s' % self.criteriaStalled)

if len(donejoblist) >= criteriaSucceeded:
self.log.verbose('Scout (ID = %s) are done.' % scoutid)
scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}
scoutStatus = {'Status': JobStatus.CHECKING, 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}

and similar in the lines below.


elif len(failedjoblist) >= criteriaFailed:
self.log.verbose('Scout (ID = %s) are failed.' % scoutid)
msg = 'Failed scout job ' + str(failedjoblist)
scoutStatus = {'Status': 'Failed', 'MinorStatus': 'Failed in scouting', 'appstatus': msg}

elif len(stalledjoblist) >= criteriaStalled:
self.log.verbose('Scout (ID = %s) are stalled.' % scoutid)
msg = 'Stalled scout job ' + str(stalledjoblist)
scoutStatus = {'Status': 'Stalled', 'MinorStatus': 'Stalled in scouting', 'appstatus': msg}

else:
self.log.verbose('Scout (ID = %s) did not completed.' % scoutid)
scoutStatus = {'Status': 'NotComplete'}

return S_OK(scoutStatus)

def __updateJobStatus(self, job, status=None, minorstatus=None, appstatus=None):
""" This method updates the job status in the JobDB.
"""
self.log.info('Job %s set Status="%s", MinorStatus="%s", ApplicationStatus="%s".'
% (job, status, minorstatus, appstatus))
if not self.am_getOption('Enable', True):
result = S_OK('DisabledMode')

# Update ApplicationStatus
if not appstatus:
result = self.jobDB.getJobAttributes(job, ['ApplicationStatus'])
if result['OK']:
minorstatus = result['Value']['ApplicationStatus']

self.log.verbose("self.jobDB.setJobAttribute(%s,'ApplicationStatus','%s',update=True)"
% (job, appstatus))
result = self.jobDB.setJobAttribute(job, 'ApplicationStatus', appstatus, update=True)
if not result['OK']:
return S_ERROR(result['Message'])
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

# Update MinorStatus
if not minorstatus:
result = self.jobDB.getJobAttributes(job, ['MinorStatus'])
if result['OK']:
minorstatus = result['Value']['MinorStatus']

self.log.verbose("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorstatus))
result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorstatus, update=True)
if not result['OK']:
return S_ERROR(result['Message'])

# Update ScoutFlag
result = self.jobDB.setJobParameter(int(job), 'ScoutFlag', 1)
if not result['OK']:
return S_ERROR(result['Message'])

# Update Status
if not status: # Retain last minor status for stalled jobs
result = self.jobDB.getJobAttributes(job, ['Status'])
if result['OK']:
status = result['Value']['Status']

self.log.verbose("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status))
result = self.jobDB.setJobAttribute(job, 'Status', status, update=True)
if not result['OK']:
return S_ERROR(result['Message'])

logStatus = status
result = self.logDB.addLoggingRecord(job, status=logStatus, minor=minorstatus,
source='ScoutingJobStatusAgent')
if not result['OK']:
self.log.warn(result)

return result
134 changes: 134 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Executor/Scouting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
""" Executor to set status "Scouting" for a main job which has scout jobs
"""

__RCSID__ = "$Id: $"
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

from DIRAC import S_OK, S_ERROR

from DIRAC.WorkloadManagementSystem.Executor.Base.OptimizerExecutor import OptimizerExecutor
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB


class Scouting(OptimizerExecutor):
"""
The specific Optimizer must provide the following methods:
- optimizeJob() - the main method called for each job
and it can provide:
- initializeOptimizer() before each execution cycle
"""

@classmethod
def initializeOptimizer(cls):
""" Initialization of the optimizer.
"""
cls.siteClient = SiteStatus()
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
cls.__jobDB = JobDB()
return S_OK()

def optimizeJob(self, jid, jobState):
self.jobLog.info('Getting scoutparams from JobParameters')

result = self.__jobDB.getJobParameters(jid, ['ScoutFlag', 'ScoutID'])
if not result['OK']:
return S_ERROR('Could not retrieve scoutparams')
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

rCounter = 0
if result['Value']:
scoutparams = result['Value'].get(jid)
self.jobLog.info('scoutparams: %s' % scoutparams)
if not scoutparams:
self.jobLog.info('Skipping optimizer, since scoutparams are abnormal')
return self.setNextOptimizer(jobState)

scoutID, scoutFlag = self.__getIDandFlag(scoutparams)
fstagni marked this conversation as resolved.
Show resolved Hide resolved
if not scoutID:
self.jobLog.info('Skipping optimizers, since this job has not enough scoutparams.')
return self.setNextOptimizer(jobState)

else:
result = jobState.getManifest()
if not result['OK']:
return S_ERROR('Could not retrieve job manifest: %s' % result['Message'])
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
jobManifest = result['Value']
scoutID = jobManifest.getOption('ScoutID', None)
if not scoutID:
self.jobLog.info('Skipping optimizer, since no scout \
corresponding to this job group')
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would print lots of empty spaces. Better to do

Suggested change
self.jobLog.info('Skipping optimizer, since no scout \
corresponding to this job group')
self.jobLog.info('Skipping optimizer, since no scout '
'corresponding to this job group')

return self.setNextOptimizer(jobState)

scoutFlag = 0
result = jobState.getAttribute('RescheduleCounter')
if not result['OK']:
return S_ERROR('Could not retrieve RescheduleCounter')
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

rCounter = result['Value']
if int(rCounter) > 0:
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
rCycle = int(rCounter) - 1
result = self.__jobDB.getAtticJobParameters(jid, ['"ScoutFlag"'],
rescheduleCounter=rCycle)
self.jobLog.info("From AtticJobParameter: %s" % result)
fstagni marked this conversation as resolved.
Show resolved Hide resolved
if result['OK']:
try:
scoutFlag = result['Value'].get(rCycle).get('ScoutFlag', 0)
except:
pass
else:
self.jobLog.info(result['Message'])
self.jobLog.info('Setting scoutparams (ID:%s, Flag:%s) to JobParamter'
% (scoutID, scoutFlag))
result = self.__setScoutparamsInJobParameters(jid, scoutID, scoutFlag, jobState)
if not result['OK']:
self.jobLog.info('Skipping, since failed in setting scoutparams of JobParameters.')
return self.setNextOptimizer(jobState)

if int(scoutFlag) == 1:
self.jobLog.info('Skipping optimizer, since corresponding scout jobs complete \
(ScoutFlag = %s)'% scoutFlag)
return self.setNextOptimizer(jobState)

self.jobLog.info('Job %s set scouting status' % jid)
result = self.__setScoutingStatus(jobState)
return result
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

def __getIDandFlag(self, scoutparams):

scoutID = scoutparams.get('ScoutID')
scoutFlag = scoutparams.get('ScoutFlag')
return scoutID, scoutFlag

def __setScoutparamsInJobParameters(self, jid, scoutID, scoutFlag, jobState=None):

if not jobState:
jobState = self.__jobData.jobState

paramList = []
paramList.append(('ScoutID', scoutID))
paramList.append(('ScoutFlag', scoutFlag))
result = self.__jobDB.setJobParameters(jid, paramList)
if not result['OK']:
self.jobLog.info('Skipping, since failed in recovering scoutparams of JobParameters.')
return result

return S_OK()
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

def __setScoutingStatus(self, jobState=None):

if not jobState:
jobState = self.__jobData.jobState

result = jobState.getStatus()
if not result['OK']:
return result
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

opName = self.ex_optimizerName()
result = jobState.setStatus(self.ex_getOption('WaitingStatus', 'Scouting'),
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
minorStatus=self.ex_getOption('WaitingMinorStatus',
'Waiting for Scout Job Completion'),
appStatus="Unknown",
source=opName)
if not result['OK']:
return result

return S_OK()

Loading