From 7af5cfc74bb2495a56889970bfebb43b1d9aa5f6 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 12 Aug 2024 15:40:16 +0200 Subject: [PATCH] feat (TS): RequestTasks use getBulkRequestStatus --- .../Service/ReqManagerHandler.py | 2 +- .../Client/RequestTasks.py | 80 +++++++++++++------ 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py index ca616c17306..2c1753dd7ad 100755 --- a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py +++ b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py @@ -347,7 +347,7 @@ def export_getBulkRequestStatus(cls, requestIDs): """get requests statuses given their ids""" res = cls.__requestDB.getBulkRequestStatus(requestIDs) if not res["OK"]: - gLogger.error(f"getRequestStatus: {res['Message']}") + gLogger.error("getRequestStatus", res["Message"]) return res types_getRequestFileStatus = [int, [str, list]] diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index 762b85d9d06..a5538eaec1f 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -317,26 +317,49 @@ def getSubmittedTaskStatus(self, taskDicts): Check if tasks changed status, and return a list of tasks per new status """ updateDict = {} - badRequestID = 0 + externalIDs = [ + int(taskDict["ExternalID"]) + for taskDict in taskDicts + if taskDict["ExternalID"] and int(taskDict["ExternalID"]) + ] + + # Count how many tasks don't have an valid external ID + badRequestID = len(taskDicts) - len(externalIDs) + + res = self.requestClient.getBulkRequestStatus(externalIDs) + if not res["OK"]: + # We need a transformationID for the log, and although we expect a single one, + # do things ~ properly + tids = list({taskDict["TransformationID"] for taskDict in taskDicts}) + try: + tid = tids[0] + except IndexError: + tid = 0 + + self._logWarn( + "getSubmittedTaskStatus: Failed to get bulk requestIDs", + res["Message"], + transID=tid, + ) + return S_OK({}) + new_statuses = res["Value"] + for taskDict in taskDicts: oldStatus = taskDict["ExternalStatus"] # ExternalID is normally a string - if taskDict["ExternalID"] and int(taskDict["ExternalID"]): - newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"]) - if not newStatus["OK"]: - log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn - log( - "getSubmittedTaskStatus: Failed to get requestID for request", - newStatus["Message"], - transID=taskDict["TransformationID"], - ) - else: - newStatus = newStatus["Value"] - # We don't care updating the tasks to Assigned while the request is being processed - if newStatus != oldStatus and newStatus != "Assigned": - updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + + newStatus = new_statuses.get(taskDict["ExternalID"]) + if not newStatus: + self._logVerbose( + "getSubmittedTaskStatus: Failed to get requestID for request", + f"No such RequestID {taskDict['ExternalID']}", + transID=taskDict["TransformationID"], + ) else: - badRequestID += 1 + # We do not update the tasks status if the Request is Assigned, as it is a very temporary status + if newStatus != oldStatus and newStatus != "Assigned": + updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + if badRequestID: self._logWarn("%d requests have identifier 0" % badRequestID) return S_OK(updateDict) @@ -363,26 +386,35 @@ def getSubmittedFileStatus(self, fileDicts): requestFiles = {} for taskDict in res["Value"]: taskID = taskDict["TaskID"] - externalID = taskDict["ExternalID"] + externalID = int(taskDict["ExternalID"]) # Only consider tasks that are submitted, ExternalID is a string if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID): requestFiles[externalID] = taskFiles[taskID] + res = self.requestClient.getBulkRequestStatus(list(requestFiles)) + if not res["OK"]: + self._logWarn( + "Failed to get request status", + res["Message"], + transID=transID, + method="getSubmittedFileStatus", + ) + return S_OK({}) + reqStatuses = res["Value"] + updateDict = {} for requestID, lfnList in requestFiles.items(): # We only take request in final state to avoid race conditions # https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414 - reqStatus = self.requestClient.getRequestStatus(requestID) - if not reqStatus["OK"]: - log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn - log( + reqStatus = reqStatuses.get(requestID) + if not reqStatus: + self._logVerbose( "Failed to get request status", - reqStatus["Message"], + f"Request {requestID} does not exist", transID=transID, method="getSubmittedFileStatus", ) continue - reqStatus = reqStatus["Value"] if reqStatus not in Request.FINAL_STATES: continue @@ -398,7 +430,7 @@ def getSubmittedFileStatus(self, fileDicts): continue # If we are here, it means the Request is in a final state. - # In principle, you could expect everyfile also be in a final state + # In principle, you could expect every file also be in a final state # but this is only true for simple Request. # Hence, the file is marked as PROCESSED only if the file status is Done # In any other case, we mark it problematic