diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index e7845fa5cb1..27b1d6a9303 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -12,9 +12,7 @@ import errno import os import re -import time from datetime import datetime, timedelta -from hashlib import md5 # # from DIRAC from DIRAC import S_ERROR, S_OK @@ -655,7 +653,7 @@ def __removeWMSTasks(self, transJobIDs): jobIDs = [int(j) for j in transJobIDs if int(j)] allRemove = True for jobList in breakListIntoChunks(jobIDs, 500): - res = self.wmsClient.killJob(jobList) + res = self.wmsClient.killJob(jobList, force=True) if res["OK"]: self.log.info(f"Successfully killed {len(jobList)} jobs from WMS") elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res): @@ -721,6 +719,11 @@ def __submitRemovalRequests(self, lfns, transID=0): :param int transID: transformationID, only used in RequestName :returns: S_ERROR/S_OK """ + + # These imports are used only in this function + import time + from hashlib import md5 + for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)): oRequest = Request() requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}" diff --git a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py index 37f7d175132..d5d70c653c6 100755 --- a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py @@ -212,17 +212,17 @@ def submitJob(self, jdl, jobDescriptionObject=None): return result - def killJob(self, jobID): + def killJob(self, jobID, force=False): """Kill running job. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - return self.jobManager.killJob(jobID) + return self.jobManager.killJob(jobID, force=force) - def deleteJob(self, jobID): + def deleteJob(self, jobID, force=False): """Delete job(s) (set their status to DELETED) from the WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - return self.jobManager.deleteJob(jobID) + return self.jobManager.deleteJob(jobID, force=force) def removeJob(self, jobID): """Fully remove job(s) from the WMS Job database. diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 540358b4ce2..15d89ee5f29 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -600,7 +600,7 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False): return self._update(cmd) - def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""): + def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False): """Set status of the job specified by its jobID""" # Do not update the LastUpdate time stamp if setting the Stalled status update_flag = True @@ -619,7 +619,7 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""): attrNames.append("ApplicationStatus") attrValues.append(applicationStatus[:255]) - result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag) + result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag, force=force) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index f6f56837e95..577e6a99d89 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -432,14 +432,14 @@ def export_removeJob(self, jobIDs): return S_OK(validJobList) - def __deleteJob(self, jobID): + def __deleteJob(self, jobID, force=False): """Set the job status to "Deleted" and remove the pilot that ran and its logging info if the pilot is finished. :param int jobID: job ID :return: S_OK()/S_ERROR() """ - result = self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting") + result = self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force) if not result["OK"]: return result @@ -475,7 +475,7 @@ def __deleteJob(self, jobID): return S_OK() - def __killJob(self, jobID, sendKillCommand=True): + def __killJob(self, jobID, sendKillCommand=True, force=False): """Kill one job :param int jobID: job ID @@ -488,14 +488,16 @@ def __killJob(self, jobID, sendKillCommand=True): return result self.log.info("Job marked for termination", jobID) - if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination"))["OK"]: + if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[ + "OK" + ]: self.log.warn("Failed to set job Killed status", result["Message"]) if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]: self.log.warn("Failed to delete job from the TaskQueue", result["Message"]) return S_OK() - def _kill_delete_jobs(self, jobIDList, right): + def _kill_delete_jobs(self, jobIDList, right, force=False): """Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary :param list jobIDList: job IDs @@ -535,12 +537,12 @@ def _kill_delete_jobs(self, jobIDList, right): stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING] for jobID in killJobList: - result = self.__killJob(jobID) + result = self.__killJob(jobID, force=force) if not result["OK"]: badIDs.append(jobID) for jobID in deleteJobList: - result = self.__deleteJob(jobID) + result = self.__deleteJob(jobID, force=force) if not result["OK"]: badIDs.append(jobID) @@ -573,7 +575,7 @@ def _kill_delete_jobs(self, jobIDList, right): ########################################################################### types_deleteJob = [] - def export_deleteJob(self, jobIDs): + def export_deleteJob(self, jobIDs, force=False): """Delete jobs specified in the jobIDs list :param list jobIDs: list of job IDs @@ -581,12 +583,12 @@ def export_deleteJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self._kill_delete_jobs(jobIDs, RIGHT_DELETE) + return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force) ########################################################################### types_killJob = [] - def export_killJob(self, jobIDs): + def export_killJob(self, jobIDs, force=False): """Kill jobs specified in the jobIDs list :param list jobIDs: list of job IDs @@ -594,7 +596,7 @@ def export_killJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self._kill_delete_jobs(jobIDs, RIGHT_KILL) + return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force) ########################################################################### types_resetJob = []