Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.1] Refactor JobStatusUtility to reuse the logic in diracx #7189

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 83 additions & 57 deletions src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
"""Utility to set the job status in the jobDB"""

import datetime
from __future__ import annotations

from DIRAC import gLogger, S_OK, S_ERROR
from datetime import datetime
from typing import TYPE_CHECKING, Any

from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB

if TYPE_CHECKING:
from DIRAC.WorkloadManagementSystem.DB.ElasticJobParametersDB import ElasticJobParametersDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB


class JobStatusUtility:
Expand Down Expand Up @@ -81,7 +86,7 @@ def setJobStatus(
if source:
sDict["Source"] = source
if not dateTime:
dateTime = str(datetime.datetime.utcnow())
dateTime = str(datetime.utcnow())
return self.setJobStatusBulk(jobID, {dateTime: sDict}, force=force)
return S_OK()

Expand Down Expand Up @@ -133,60 +138,16 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
updateTimes = sorted(statusDict)
log.debug("*** New call ***", f"Last update time {lastTime} - Sorted new times {updateTimes}")
# Get the status (if any) at the time of the first update
newStat = ""
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
for ts, st in timeStamps:
if firstUpdate >= ts:
newStat = st
# Pick up start and end times from all updates
for updTime in updateTimes:
sDict = statusDict[updTime]
newStat = sDict.get("Status", newStat)

if not startTime and newStat == JobStatus.RUNNING:
# Pick up the start date when the job starts running if not existing
startTime = updTime
log.debug("Set job start time", startTime)
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
# Pick up the end time when the job is in a final status
endTime = updTime
log.debug("Set job end time", endTime)
newStartTime, newEndTime = getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict)

# We should only update the status to the last one if its time stamp is more recent than the last update
attrNames = []
attrValues = []
if updateTimes[-1] >= lastTime:
minor = ""
application = ""
# Get the last status values looping on the most recent upupdateTimes in chronological order
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
sDict = statusDict[updTime]
log.debug("\t", f"Time {updTime} - Statuses {str(sDict)}")
status = sDict.get("Status", currentStatus)
# evaluate the state machine if the status is changing
if not force and status != currentStatus:
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
if not res["OK"]:
return res
newStat = res["Value"]
# If the JobsStateMachine does not accept the candidate, don't update
if newStat != status:
# keeping the same status
log.error(
"Job Status Error",
f"{jobID} can't move from {currentStatus} to {status}: using {newStat}",
)
status = newStat
sDict["Status"] = newStat
# Change the source to indicate this is not what was requested
source = sDict.get("Source", "")
sDict["Source"] = source + "(SM)"
# at this stage status == newStat. Set currentStatus to this new status
currentStatus = newStat

minor = sDict.get("MinorStatus", minor)
application = sDict.get("ApplicationStatus", application)

res = getNewStatus(jobID, updateTimes, lastTime, statusDict, currentStatus, force, log)
if not res["OK"]:
return res
status, minor, application = res["Value"]
log.debug("Final statuses:", f"status '{status}', minor '{minor}', application '{application}'")
if status:
attrNames.append("Status")
Expand All @@ -206,11 +167,13 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
if not result["OK"]:
return result
# Update start and end time if needed
if endTime:
if not endTime and newEndTime:
log.debug("Set job end time", endTime)
result = self.jobDB.setEndExecTime(jobID, endTime)
if not result["OK"]:
return result
if startTime:
if not startTime and newStartTime:
log.debug("Set job start time", startTime)
result = self.jobDB.setStartExecTime(jobID, startTime)
if not result["OK"]:
return result
Expand All @@ -237,3 +200,66 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
return result

return S_OK((attrNames, attrValues))


def getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict):
newStat = ""
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
for ts, st in timeStamps:
if firstUpdate >= ts:
newStat = st
# Pick up start and end times from all updates
for updTime in updateTimes:
sDict = statusDict[updTime]
newStat = sDict.get("Status", newStat)

if not startTime and newStat == JobStatus.RUNNING:
# Pick up the start date when the job starts running if not existing
startTime = updTime
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
# Pick up the end time when the job is in a final status
endTime = updTime

return startTime, endTime


def getNewStatus(
jobID: int,
updateTimes: list[datetime],
lastTime: datetime,
statusDict: dict[datetime, Any],
currentStatus,
force: bool,
log,
):
status = ""
minor = ""
application = ""
# Get the last status values looping on the most recent upupdateTimes in chronological order
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
sDict = statusDict[updTime]
log.debug(f"\tTime {updTime} - Statuses {str(sDict)}")
status = sDict.get("Status", currentStatus)
# evaluate the state machine if the status is changing
if not force and status != currentStatus:
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
if not res["OK"]:
return res
newStat = res["Value"]
# If the JobsStateMachine does not accept the candidate, don't update
if newStat != status:
# keeping the same status
log.error(
f"Job Status Error: {jobID} can't move from {currentStatus} to {status}: using {newStat}",
)
status = newStat
sDict["Status"] = newStat
# Change the source to indicate this is not what was requested
source = sDict.get("Source", "")
sDict["Source"] = source + "(SM)"
# at this stage status == newStat. Set currentStatus to this new status
currentStatus = newStat

minor = sDict.get("MinorStatus", minor)
application = sDict.get("ApplicationStatus", application)
return S_OK((status, minor, application))
Loading