Skip to content

Commit

Permalink
feat: use user proxy when interacting with JobWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Aug 22, 2024
1 parent ff08839 commit 2dfe036
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 125 deletions.
2 changes: 2 additions & 0 deletions src/DIRAC/Core/Utilities/Proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def undecoratedFunction(foo='bar'):
"""
import functools
import os

from DIRAC import gConfig, gLogger, S_ERROR, S_OK
Expand Down Expand Up @@ -61,6 +62,7 @@ def executeWithUserProxy(fcn):
:param bool executionLock: flag to execute with a lock for the time of user proxy application ( default False )
"""

@functools.wraps(fcn)
def wrapped_fcn(*args, **kwargs):
userName = kwargs.pop("proxyUserName", "")
userDN = kwargs.pop("proxyUserDN", "")
Expand Down
219 changes: 123 additions & 96 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.Version import getVersion
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.Resources.Computing import ComputingElement
Expand Down Expand Up @@ -106,9 +107,10 @@ def initialize(self):
if self.submissionPolicy not in ["Application", "JobWrapper"]:
return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper")

result = self._initializeComputingElement("Pool")
if not result["OK"]:
return result
if self.submissionPolicy == "Application":
result = self._initializeComputingElement("Pool")
if not result["OK"]:
return result

# on-the fly imports
ol = ObjectLoader()
Expand All @@ -133,14 +135,15 @@ def beginExecution(self):
return result
self.pilotDN, _ = result["Value"]

# Maximum number of jobs that can be handled at the same time by the agent
# (only for Application submission) Maximum number of jobs that can be handled at the same time by the agent
self.maxJobsToSubmit = self.am_getOption("MaxJobsToSubmit", self.maxJobsToSubmit)
self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit})
if self.submissionPolicy == "Application":
self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit})

self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)
self.cleanTask = self.am_getOption("CleanTask", self.cleanTask)

# Get the location of the CVMFS installation
# (only for JobWrapper submission) Get the location of the CVMFS installation
if self.submissionPolicy == "JobWrapper":
self.cvmfsLocation = self.am_getOption("CVMFSLocation", self.cvmfsLocation)
self.log.info("CVMFS location:", self.cvmfsLocation)
Expand Down Expand Up @@ -218,21 +221,23 @@ def execute(self):
if not result["OK"]:
return result

# Get a pilot proxy
cpuTime = 86400 * 3
self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime))
pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup")
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime)
if not result["OK"]:
return result
pilotProxy = result["Value"]

for queueName, queueDictionary in queueDictItems:
# Make sure there is no problem with the queue before trying to submit
if not self._allowedToSubmit(queueName):
continue

# Get a working proxy
# Get the CE instance
ce = queueDictionary["CE"]
cpuTime = 86400 * 3
self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime))
pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup")
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime)
if not result["OK"]:
return result
proxy = result["Value"]
ce.setProxy(proxy)
ce.setProxy(pilotProxy)

if self.submissionPolicy == "JobWrapper":
# Check errors that could have occurred during job submission and/or execution
Expand Down Expand Up @@ -465,6 +470,48 @@ def _checkMatchingIssues(self, jobRequest):

return S_OK()

#############################################################################

@executeWithUserProxy
def preProcessJob(self, job: JobWrapper):
"""Preprocess the job before submission: should be executed with the user proxy associated with the payload
:param JobWrapper job: job to preprocess
"""
if "InputSandbox" in job.jobArgs:
self.log.verbose("Getting the inputSandbox of job", job.jobID)
if not transferInputSandbox(job, job.jobArgs["InputSandbox"]):
return S_ERROR(f"Cannot get input sandbox of job {job.jobID}")
job.jobReport.commit()

if "InputData" in job.jobArgs and job.jobArgs["InputData"]:
self.log.verbose("Getting the inputData of job", job.jobID)
if not resolveInputData(job):
return S_ERROR(f"Cannot get input data of job {job.jobID}")
job.jobReport.commit()

# Preprocess the payload
try:
self.log.verbose("Pre-processing job", job.jobID)
result = job.preProcess()
if not result["OK"]:
self.log.error("JobWrapper failed the preprocessing phase for", f"{job.jobID}: {result['Message']}")
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION)
except Exception:
self.log.exception("JobWrapper failed the preprocessing phase for", job.jobID)
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return S_ERROR(f"JobWrapper failed the preprocessing phase for {job.jobID}")

job.jobReport.commit()
return S_OK(result["Value"])

def _submitJobWrapper(
self,
jobID: str,
Expand All @@ -482,7 +529,8 @@ def _submitJobWrapper(
:param jobParams: job parameters
:param resourceParams: resource parameters
:param optimizerParams: optimizer parameters
:param proxyChain: proxy chain
:param owner: owner of the job
:param jobGroup: group of the job
:param processors: number of processors
:return: S_OK
Expand Down Expand Up @@ -513,38 +561,12 @@ def _submitJobWrapper(
if not job:
return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}")

if "InputSandbox" in jobParams:
self.log.verbose("Getting the inputSandbox of job", jobID)
if not transferInputSandbox(job, jobParams["InputSandbox"], jobReport):
return S_ERROR(f"Cannot get input sandbox of job {jobID}")
jobReport.commit()

if "InputData" in jobParams and jobParams["InputData"]:
self.log.verbose("Getting the inputData of job", jobID)
if not resolveInputData(job, jobReport):
return S_ERROR(f"Cannot get input data of job {jobID}")
jobReport.commit()

# Preprocess the payload
try:
self.log.verbose("Pre-processing job", jobID)
result = job.preProcess()
if not result["OK"]:
self.log.error("JobWrapper failed the preprocessing phase for", f"{jobID}: {result['Message']}")
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION)
except Exception:
self.log.exception("JobWrapper failed the preprocessing phase for", jobID)
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return S_ERROR(f"JobWrapper failed the preprocessing phase for {jobID}")
result = self.preProcessJob( # pylint: disable=unexpected-keyword-arg
job, proxyUserName=job.owner, proxyUserGroup=job.userGroup
)
if not result["OK"]:
return result
payloadParams = result["Value"]
jobReport.commit()

# Dump the remote CFG config into the job directory: it is needed for the JobWrapperTemplate
cfgFilename = Path(job.jobIDPath) / "dirac.cfg"
Expand Down Expand Up @@ -622,6 +644,8 @@ def _submitJobWrapper(
time.sleep(self.jobSubmissionDelay)
return S_OK()

#############################################################################

def _checkOutputIntegrity(self, workingDirectory: str):
"""Make sure that output files are not corrupted.
Expand Down Expand Up @@ -650,6 +674,55 @@ def _checkOutputIntegrity(self, workingDirectory: str):

return S_OK()

@executeWithUserProxy
def postProcessJob(self, job: JobWrapper, payloadResults):
"""Perform post-processing for a job: should be executed with the user proxy associated with the payload.
:param job: JobWrapper instance
:param payloadResults: Payload results
"""
try:
result = job.postProcess(**payloadResults)
if not result["OK"]:
self.log.error("Failed to post process the job", f"{job.jobID}: {result['Message']}")
if result["Errno"] == DErrno.EWMSRESC:
self.log.warn("Asked to reschedule job")
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
shutil.rmtree(job.jobIDPath)
return

job.jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False)
job.jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
shutil.rmtree(job.jobIDPath)
return
except Exception as exc: # pylint: disable=broad-except
self.log.exception("Job raised exception during post processing phase")
job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
job.jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
shutil.rmtree(job.jobIDPath)
return

if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs:
self.log.verbose("Uploading the outputSandbox/Data of the job")
if not processJobOutputs(job):
shutil.rmtree(job.jobIDPath)
return
job.jobReport.commit()

# Clean job wrapper locally
job.finalize()

def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
"""Check the status of the submitted tasks.
If the task is finished, get the output and post process the job.
Expand Down Expand Up @@ -771,48 +844,9 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
continue

payloadResults = result["Value"]
# Post-process the job
try:
result = job.postProcess(**payloadResults)
if not result["OK"]:
self.log.error("Failed to post process the job", f"{jobID}: {result['Message']}")
if result["Errno"] == DErrno.EWMSRESC:
self.log.warn("Asked to reschedule job")
rescheduleResult = rescheduleFailedJob(
jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
shutil.rmtree(job.jobIDPath)
continue

jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False)
jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
shutil.rmtree(job.jobIDPath)
continue
except Exception as exc: # pylint: disable=broad-except
self.log.exception("Job raised exception during post processing phase")
jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
shutil.rmtree(job.jobIDPath)
continue

if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs:
self.log.verbose("Uploading the outputSandbox/Data of the job")
if not processJobOutputs(job, jobReport):
shutil.rmtree(job.jobIDPath)
continue
jobReport.commit()

# Clean job wrapper locally
job.finalize()
self.postProcessJob( # pylint: disable=unexpected-keyword-arg
job, payloadResults, proxyUserName=job.owner, proxyUserGroup=job.userGroup
)

# Clean job in the remote resource
if self.cleanTask:
Expand All @@ -837,12 +871,5 @@ def finalize(self):
self.log.error("Problem while trying to get status of the last submitted jobs")
break
time.sleep(int(self.am_getOption("PollingTime")))
else:
for _, queueDictionary in self.queueDict.items():
ce = queueDictionary["CE"]
# Check the submitted JobWrappers a last time
result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"])
if not result["OK"]:
self.log.error("Problem while trying to get status of the last submitted JobWrappers")

return S_OK()
Loading

0 comments on commit 2dfe036

Please sign in to comment.