Skip to content

Commit

Permalink
fix: minor changes in HTCondor
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Aug 1, 2023
1 parent 71254ad commit 1a1415b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 59 deletions.
78 changes: 44 additions & 34 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
# Transfer all output files, even if the job is failed
# No other files are to be transferred
transfer_output_files = ""
# Transfer outputs, even if the job is failed
should_transfer_files = YES
when_to_transfer_output = ON_EXIT_OR_EVICT
Expand All @@ -70,8 +72,8 @@
on_exit_hold = ExitCode != 0
# A random subcode to identify who put the job on hold
on_exit_hold_subcode = %(holdReasonSubcode)s
# Jobs are then deleted from the system after N days if they are not running
period_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))
# Jobs are then deleted from the system after N days if they are not idle or running
periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))
# Specific options
# ----------------
Expand All @@ -88,52 +90,60 @@
def parseCondorStatus(lines, jobID):
"""parse the condor_q or condor_history output for the job status
:param lines: list of lines from the output of the condor commands, each line is a pair of jobID, statusID, and holdReasonCode
:param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode
:type lines: python:list
:param str jobID: jobID of condor job, e.g.: 123.53
:returns: Status as known by DIRAC, and a reason if the job is being held
"""
jobID = str(jobID)

holdReason = ""
status = None
for line in lines:
l = line.strip().split()

# Make sure the job ID exists
if len(l) < 1 or l[0] != jobID:
continue

# Make sure the status is present and is an integer
try:
status = int(l[1])
except (ValueError, IndexError):
continue
break

if l[0] == jobID:
# A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
if status == 5:

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
status = 3
try:
holdReasonCode = l[2]
holdReasonSubcode = l[3]
holdReason = " ".join(l[4:])
except IndexError:
# This should not happen in theory
# Just set the status to unknown such as
status = -1
holdReasonCode = "undefined"
holdReasonSubcode = "undefined"

# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
elif holdReasonCode == "16":
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)
return ("Unknown", holdReason)
# Stop here if the status is not held (5): result should be found in STATES_MAP
if status != 5:
break

# A job can be held for various reasons,
# we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
status = 3
try:
holdReasonCode = l[2]
holdReasonSubcode = l[3]
holdReason = " ".join(l[4:])
except IndexError:
# This should not happen in theory
# Just set the status to unknown such as
status = None
holdReasonCode = "undefined"
holdReasonSubcode = "undefined"
break

# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
elif holdReasonCode == "16":
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)


class Condor(object):
Expand Down
37 changes: 14 additions & 23 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def __init__(self, ceUniqueID):
#############################################################################

def _DiracToCondorID(self, diracJobID):
"""Convert a DIRAC jobID into an Condor jobID.
Example: https://<ce>/1234/0 becomes 1234.0
"""Convert a DIRAC jobID into a Condor jobID.
Example: htcondorce://<ce>/1234.0 becomes 1234.0
:param str: DIRAC jobID
:return: Condor jobID
Expand All @@ -128,7 +128,7 @@ def _condorToDiracID(self, condorJobIDs):
:param str condorJobIDs: the output of condor_submit
:return: job references such as htcondorce://<CE name>/<clusterID>.<i>
:return: job references such as htcondorce://<ce>/<clusterID>.<i>
"""
clusterIDs = condorJobIDs.split("-")
if len(clusterIDs) != 2:
Expand Down Expand Up @@ -179,7 +179,6 @@ def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=No

# Remote schedd options by default
targetUniverse = "vanilla"
# This is used to remove outputs from the remote schedd
scheddOptions = ""
if self.useLocalSchedd:
targetUniverse = "grid"
Expand Down Expand Up @@ -227,7 +226,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False):
:param list cmd: list of the condor command elements
:param bool keepTokenFile: flag to reuse or not the previously created token file
:return: S_OK/S_ERROR - the result of the executeGridCommand() call
:return: S_OK/S_ERROR - the stdout parameter of the executeGridCommand() call
"""
if not self.token and not self.proxy:
return S_ERROR(f"Cannot execute the command, token and proxy not found: {cmd}")
Expand Down Expand Up @@ -406,8 +405,7 @@ def getJobStatus(self, jobIDList):
# Get all condorIDs so we can just call condor_q and condor_history once
for diracJobID in jobIDList:
diracJobID = diracJobID.split(":::")[0]
condorJobID = self._DiracToCondorID(diracJobID)
condorIDs[diracJobID] = condorJobID
condorIDs[diracJobID] = self._DiracToCondorID(diracJobID)

self.tokenFile = None

Expand All @@ -422,8 +420,7 @@ def getJobStatus(self, jobIDList):
if not result["OK"]:
return result

_qList = result["Value"].split("\n")
qList.extend(_qList)
qList.extend(result["Value"].split("\n"))

condorHistCall = ["condor_history"]
condorHistCall.extend(self.remoteScheddOptions.strip().split(" "))
Expand All @@ -433,21 +430,15 @@ def getJobStatus(self, jobIDList):
if not result["OK"]:
return result

_qList = result["Value"].split("\n")
qList.extend(_qList)
qList.extend(result["Value"].split("\n"))

jobsToCancel = []
for job, jobID in condorIDs.items():
pilotStatus, reason = parseCondorStatus(qList, jobID)
jobStatus, reason = parseCondorStatus(qList, jobID)

if pilotStatus == PilotStatus.ABORTED:
self.log.verbose("Held job", f"{jobID} because: {reason}")
jobsToCancel.append(jobID)
if jobStatus == PilotStatus.ABORTED:
self.log.verbose("Job", f"{jobID} held: {reason}")

resultDict[job] = pilotStatus

# Make sure the pilot stays dead and gets taken out of the condor_q
self.killJob(jobsToCancel)
resultDict[job] = jobStatus

self.tokenFile = None

Expand Down Expand Up @@ -480,7 +471,7 @@ def getJobOutput(self, jobID):

return S_OK((result["Value"]["output"], result["Value"]["error"]))

def _findFile(self, workingDir, fileName, pathToResult):
def _findFile(self, fileName, pathToResult):
"""Find a file in a file system.
:param str workingDir: the name of the directory containing the given file to search for
Expand All @@ -489,7 +480,7 @@ def _findFile(self, workingDir, fileName, pathToResult):
:return: path leading to the file
"""
path = os.path.join(workingDir, pathToResult, fileName)
path = os.path.join(self.workingDirectory, pathToResult, fileName)
if os.path.exists(path):
return S_OK(path)
return S_ERROR(errno.ENOENT, f"Could not find {path}")
Expand Down Expand Up @@ -535,7 +526,7 @@ def __getJobOutput(self, jobID, outTypes):
outputsSuffix = {"output": "out", "error": "err", "logging": "log"}
outputs = {}
for output, suffix in outputsSuffix.items():
resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult)
resOut = self._findFile(f"{condorJobID}.{suffix}", pathToResult)
if not resOut["OK"]:
# Return an error if the output type was targeted, else we continue
if output in outTypes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_parseCondorStatus():
}

for jobID, expected in expectedResults.items():
print(jobID, expected)
assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected


Expand Down Expand Up @@ -103,7 +102,6 @@ def test_getJobStatus(mocker):
"htcondorce://condorce.foo.arg/123.2": "Aborted",
"htcondorce://condorce.foo.arg/333.3": "Unknown",
}
print(ret)
assert ret["OK"] is True
assert expectedResults == ret["Value"]

Expand Down

0 comments on commit 1a1415b

Please sign in to comment.