Skip to content

Commit

Permalink
Merge pull request #7189 from simon-mazenoux/feat-factorize-jobstatus…
Browse files Browse the repository at this point in the history
…utiliy

[8.1] Refactor JobStatusUtility to reuse the logic in diracx
  • Loading branch information
chrisburr authored Sep 6, 2023
2 parents 262eab1 + 76986e8 commit e20e0f1
Showing 1 changed file with 83 additions and 57 deletions.
140 changes: 83 additions & 57 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Utility to set the job status in the jobDB"""

import datetime
from __future__ import annotations

from DIRAC import gLogger, S_OK, S_ERROR
from datetime import datetime
from typing import TYPE_CHECKING, Any

from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB

if TYPE_CHECKING:
from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB


class JobStatusUtility:
Expand Down Expand Up @@ -81,7 +86,7 @@ def setJobStatus(
if source:
sDict["Source"] = source
if not dateTime:
dateTime = str(datetime.datetime.utcnow())
dateTime = str(datetime.utcnow())
return self.setJobStatusBulk(jobID, {dateTime: sDict}, force=force)
return S_OK()

Expand Down Expand Up @@ -133,60 +138,16 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
updateTimes = sorted(statusDict)
log.debug("*** New call ***", f"Last update time {lastTime} - Sorted new times {updateTimes}")
# Get the status (if any) at the time of the first update
newStat = ""
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
for ts, st in timeStamps:
if firstUpdate >= ts:
newStat = st
# Pick up start and end times from all updates
for updTime in updateTimes:
sDict = statusDict[updTime]
newStat = sDict.get("Status", newStat)

if not startTime and newStat == JobStatus.RUNNING:
# Pick up the start date when the job starts running if not existing
startTime = updTime
log.debug("Set job start time", startTime)
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
# Pick up the end time when the job is in a final status
endTime = updTime
log.debug("Set job end time", endTime)
newStartTime, newEndTime = getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict)

# We should only update the status to the last one if its time stamp is more recent than the last update
attrNames = []
attrValues = []
if updateTimes[-1] >= lastTime:
minor = ""
application = ""
# Get the last status values looping on the most recent upupdateTimes in chronological order
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
sDict = statusDict[updTime]
log.debug("\t", f"Time {updTime} - Statuses {str(sDict)}")
status = sDict.get("Status", currentStatus)
# evaluate the state machine if the status is changing
if not force and status != currentStatus:
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
if not res["OK"]:
return res
newStat = res["Value"]
# If the JobsStateMachine does not accept the candidate, don't update
if newStat != status:
# keeping the same status
log.error(
"Job Status Error",
f"{jobID} can't move from {currentStatus} to {status}: using {newStat}",
)
status = newStat
sDict["Status"] = newStat
# Change the source to indicate this is not what was requested
source = sDict.get("Source", "")
sDict["Source"] = source + "(SM)"
# at this stage status == newStat. Set currentStatus to this new status
currentStatus = newStat

minor = sDict.get("MinorStatus", minor)
application = sDict.get("ApplicationStatus", application)

res = getNewStatus(jobID, updateTimes, lastTime, statusDict, currentStatus, force, log)
if not res["OK"]:
return res
status, minor, application = res["Value"]
log.debug("Final statuses:", f"status '{status}', minor '{minor}', application '{application}'")
if status:
attrNames.append("Status")
Expand All @@ -206,11 +167,13 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
if not result["OK"]:
return result
# Update start and end time if needed
if endTime:
if not endTime and newEndTime:
log.debug("Set job end time", endTime)
result = self.jobDB.setEndExecTime(jobID, endTime)
if not result["OK"]:
return result
if startTime:
if not startTime and newStartTime:
log.debug("Set job start time", startTime)
result = self.jobDB.setStartExecTime(jobID, startTime)
if not result["OK"]:
return result
Expand All @@ -237,3 +200,66 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
return result

return S_OK((attrNames, attrValues))


def getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict):
newStat = ""
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
for ts, st in timeStamps:
if firstUpdate >= ts:
newStat = st
# Pick up start and end times from all updates
for updTime in updateTimes:
sDict = statusDict[updTime]
newStat = sDict.get("Status", newStat)

if not startTime and newStat == JobStatus.RUNNING:
# Pick up the start date when the job starts running if not existing
startTime = updTime
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
# Pick up the end time when the job is in a final status
endTime = updTime

return startTime, endTime


def getNewStatus(
jobID: int,
updateTimes: list[datetime],
lastTime: datetime,
statusDict: dict[datetime, Any],
currentStatus,
force: bool,
log,
):
status = ""
minor = ""
application = ""
# Get the last status values looping on the most recent upupdateTimes in chronological order
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
sDict = statusDict[updTime]
log.debug(f"\tTime {updTime} - Statuses {str(sDict)}")
status = sDict.get("Status", currentStatus)
# evaluate the state machine if the status is changing
if not force and status != currentStatus:
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
if not res["OK"]:
return res
newStat = res["Value"]
# If the JobsStateMachine does not accept the candidate, don't update
if newStat != status:
# keeping the same status
log.error(
f"Job Status Error: {jobID} can't move from {currentStatus} to {status}: using {newStat}",
)
status = newStat
sDict["Status"] = newStat
# Change the source to indicate this is not what was requested
source = sDict.get("Source", "")
sDict["Source"] = source + "(SM)"
# at this stage status == newStat. Set currentStatus to this new status
currentStatus = newStat

minor = sDict.get("MinorStatus", minor)
application = sDict.get("ApplicationStatus", application)
return S_OK((status, minor, application))

0 comments on commit e20e0f1

Please sign in to comment.