diff --git a/docs/source/DeveloperGuide/CodeTesting/index.rst b/docs/source/DeveloperGuide/CodeTesting/index.rst index d5c7f168c50..630d9f375e0 100644 --- a/docs/source/DeveloperGuide/CodeTesting/index.rst +++ b/docs/source/DeveloperGuide/CodeTesting/index.rst @@ -309,23 +309,33 @@ Running the above might take a while. Supposing you are interested in running on ./integration_tests.py prepare-environment [FLAGS] ./integration_tests.py install-server -which (in some minutes) will give you a fully dockerized server setup (`docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`). Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`. -The first thing to do is that you should first login in the docker container, by doing: +which (in some minutes) will give you a fully dockerized server setup +(`docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`. +Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`, +the first thing to do is that you should first login in the docker container, by doing: .. code-block:: bash ./integration_tests.py exec-server -The installations automatically pick up external changes to the DIRAC code and tests) +(The docker installation automatically picks up external changes to the DIRAC code and tests) Now you can run the test with: .. code-block:: bash - pytest LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py + pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/` +You can also login in client and mysql with: + +.. code-block:: bash + + ./integration_tests.py exec-client + ./integration_tests.py exec-mysql + + Validation and System tests --------------------------- diff --git a/src/DIRAC/Interfaces/API/Job.py b/src/DIRAC/Interfaces/API/Job.py index c38bd4798ca..16dda22cf9b 100755 --- a/src/DIRAC/Interfaces/API/Job.py +++ b/src/DIRAC/Interfaces/API/Job.py @@ -702,18 +702,6 @@ def setOwnerGroup(self, ownerGroup): self._addParameter(self.workflow, "OwnerGroup", "JDL", ownerGroup, "User specified owner group.") return S_OK() - ############################################################################# - def setOwnerDN(self, ownerDN): - """Developer function. - - Allows to force expected owner DN of proxy. - """ - if not isinstance(ownerDN, str): - return self._reportError("Expected string for job owner DN", **{"ownerGroup": ownerDN}) - - self._addParameter(self.workflow, "OwnerDN", "JDL", ownerDN, "User specified owner DN.") - return S_OK() - ############################################################################# def setType(self, jobType): """Developer function. diff --git a/src/DIRAC/Interfaces/scripts/dirac_wms_job_attributes.py b/src/DIRAC/Interfaces/scripts/dirac_wms_job_attributes.py index c35eff61741..4ddc4b78e10 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_wms_job_attributes.py +++ b/src/DIRAC/Interfaces/scripts/dirac_wms_job_attributes.py @@ -28,7 +28,6 @@ 'MinorStatus': 'Execution Complete', 'OSandboxReadyFlag': 'False', 'Owner': 'vhamar', - 'OwnerDN': '/O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar', 'OwnerGroup': 'eela_user', 'RescheduleCounter': '0', 'RescheduleTime': 'None', @@ -42,7 +41,8 @@ 'UserPriority': '1', 'VerifiedFlag': 'True'} """ -import DIRAC +from DIRAC import exit as dExit +from DIRAC import gLogger from DIRAC.Core.Base.Script import Script @@ -65,9 +65,9 @@ def main(): exitCode = 2 for error in errorList: - print("ERROR %s: %s" % error) + gLogger.error(f"{error}") - DIRAC.exit(exitCode) + dExit(exitCode) if __name__ == "__main__": diff --git a/src/DIRAC/Interfaces/scripts/dirac_wms_job_get_jdl.py b/src/DIRAC/Interfaces/scripts/dirac_wms_job_get_jdl.py index fe170f34977..332df9f97fb 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_wms_job_get_jdl.py +++ b/src/DIRAC/Interfaces/scripts/dirac_wms_job_get_jdl.py @@ -19,14 +19,12 @@ 'Executable': '/bin/ls', 'JobID': '1', 'JobName': 'DIRAC_vhamar_602138', - 'JobRequirements': '[OwnerDN = /O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar; - OwnerGroup = eela_user; + 'JobRequirements': '[OwnerGroup = eela_user; Setup = EELA-Production; UserPriority = 1; CPUTime = 0 ]', 'OutputSandbox': ['std.out', 'std.err'], 'Owner': 'vhamar', - 'OwnerDN': '/O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar', 'OwnerGroup': 'eela_user', 'Priority': '1'} """ diff --git a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py index 452935f0338..e0e1e7aa73a 100644 --- a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py +++ b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py @@ -109,18 +109,17 @@ def prepareTransformationTasks(self, transBody, taskDict, owner="", ownerGroup=" ownerDN = res["Value"][0] if bulkSubmissionFlag: - return self.__prepareTasksBulk(transBody, taskDict, owner, ownerGroup, ownerDN) + return self.__prepareTasksBulk(transBody, taskDict, owner, ownerGroup) # not a bulk submission - return self.__prepareTasks(transBody, taskDict, owner, ownerGroup, ownerDN) + return self.__prepareTasks(transBody, taskDict, owner, ownerGroup) - def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): + def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup): """Prepare transformation tasks with a single job object for bulk submission :param str transBody: transformation job template :param dict taskDict: dictionary of per task parameters :param str owner: owner of the transformation :param str ownerGroup: group of the owner of the transformation - :param str ownerDN: DN of the owner of the transformation :return: S_OK/S_ERROR with updated taskDict """ @@ -137,7 +136,6 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): self._logVerbose(f"Setting job owner:group to {owner}:{ownerGroup}", transID=transID, method=method) oJob.setOwner(owner) oJob.setOwnerGroup(ownerGroup) - oJob.setOwnerDN(ownerDN) try: site = oJob.workflow.findParameter("Site").getValue() @@ -253,14 +251,13 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): taskDict["BulkJobObject"] = oJob return S_OK(taskDict) - def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN): + def __prepareTasks(self, transBody, taskDict, owner, ownerGroup): """Prepare transformation tasks with a job object per task :param str transBody: transformation job template :param dict taskDict: dictionary of per task parameters :param owner: owner of the transformation :param str ownerGroup: group of the owner of the transformation - :param str ownerDN: DN of the owner of the transformation :return: S_OK/S_ERROR with updated taskDict """ @@ -275,7 +272,6 @@ def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN): oJobTemplate = self.jobClass(transBody) oJobTemplate.setOwner(owner) oJobTemplate.setOwnerGroup(ownerGroup) - oJobTemplate.setOwnerDN(ownerDN) try: site = oJobTemplate.workflow.findParameter("Site").getValue() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index 4d4abdb0a9a..af84b7bb986 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -12,6 +12,7 @@ from diraccfg import CFG from DIRAC import S_OK, S_ERROR, gConfig, rootPath, siteName +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Security.ProxyInfo import getProxyInfo @@ -202,7 +203,7 @@ def execute(self): self.matchFailedCount = 0 # Check matcher information returned - matcherParams = ["JDL", "DN", "Group"] + matcherParams = ["JDL", "Owner", "Group"] matcherInfo = jobRequest["Value"] jobID = matcherInfo["JobID"] self.jobReport.setJob(jobID) @@ -217,7 +218,7 @@ def execute(self): jobJDL = matcherInfo["JDL"] jobGroup = matcherInfo["Group"] - ownerDN = matcherInfo["DN"] + owner = matcherInfo["Owner"] ceDict = matcherInfo["CEDict"] matchTime = matcherInfo["matchTime"] @@ -242,7 +243,7 @@ def execute(self): jobType = submissionParams["jobType"] self.log.verbose("Job request successful: \n", jobRequest["Value"]) - self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}") + self.log.info("Received", f"JobID={jobID}, JobType={jobType}, Owner={owner}, JobGroup={jobGroup}") self.jobCount += 1 self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False) if "BOINC_JOB_ID" in os.environ: @@ -253,6 +254,7 @@ def execute(self): ) self.jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False) + ownerDN = getDNForUsername(owner)["Value"] result_setupProxy = self._setupProxy(ownerDN, jobGroup) if not result_setupProxy["OK"]: result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) @@ -472,26 +474,26 @@ def _setupProxy(self, ownerDN, ownerGroup): self.log.error("Failed to setup proxy", proxyResult["Message"]) return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}") return S_OK(proxyResult["Value"]) - else: - ret = getProxyInfo(disableVOMS=True) - if not ret["OK"]: - self.log.error("Invalid Proxy", ret["Message"]) - return S_ERROR("Invalid Proxy") - - proxyChain = ret["Value"]["chain"] - if "groupProperties" not in ret["Value"]: - print(ret["Value"]) - print(proxyChain.dumpAllToString()) - self.log.error("Invalid Proxy", "Group has no properties defined") - return S_ERROR("Proxy has no group properties defined") - - groupProps = ret["Value"]["groupProperties"] - if Properties.GENERIC_PILOT in groupProps or Properties.PILOT in groupProps: - proxyResult = self._requestProxyFromProxyManager(ownerDN, ownerGroup) - if not proxyResult["OK"]: - self.log.error("Invalid Proxy", proxyResult["Message"]) - return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}") - proxyChain = proxyResult["Value"] + + ret = getProxyInfo(disableVOMS=True) + if not ret["OK"]: + self.log.error("Invalid Proxy", ret["Message"]) + return S_ERROR("Invalid Proxy") + + proxyChain = ret["Value"]["chain"] + if "groupProperties" not in ret["Value"]: + print(ret["Value"]) + print(proxyChain.dumpAllToString()) + self.log.error("Invalid Proxy", "Group has no properties defined") + return S_ERROR("Proxy has no group properties defined") + + groupProps = ret["Value"]["groupProperties"] + if Properties.GENERIC_PILOT in groupProps or Properties.PILOT in groupProps: + proxyResult = self._requestProxyFromProxyManager(ownerDN, ownerGroup) + if not proxyResult["OK"]: + self.log.error("Invalid Proxy", proxyResult["Message"]) + return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}") + proxyChain = proxyResult["Value"] return S_OK(proxyChain) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index 4dbe8b1f236..e10e24e6211 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -25,11 +25,11 @@ import datetime import os -import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Utilities import TimeUtilities from DIRAC.RequestManagementSystem.Client.File import File from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient @@ -57,6 +57,7 @@ def __init__(self, *args, **kwargs): self.prodTypes = [] self.removeStatusDelay = {} self.removeStatusDelayHB = {} + self.maxHBJobsAtOnce = 0 ############################################################################# def initialize(self): @@ -80,7 +81,7 @@ def initialize(self): self.removeStatusDelayHB[JobStatus.DONE] = self.am_getOption("RemoveStatusDelayHB/Done", -1) self.removeStatusDelayHB[JobStatus.KILLED] = self.am_getOption("RemoveStatusDelayHB/Killed", -1) self.removeStatusDelayHB[JobStatus.FAILED] = self.am_getOption("RemoveStatusDelayHB/Failed", -1) - self.maxHBJobsAtOnce = self.am_getOption("MaxHBJobsAtOnce", 0) + self.maxHBJobsAtOnce = self.am_getOption("MaxHBJobsAtOnce", self.maxHBJobsAtOnce) return S_OK() @@ -93,7 +94,7 @@ def _getAllowedJobTypes(self): for jobType in result["Value"]: if jobType not in self.prodTypes: cleanJobTypes.append(jobType) - self.log.notice(f"JobTypes to clean {cleanJobTypes}") + self.log.notice("JobTypes to clean", cleanJobTypes) return S_OK(cleanJobTypes) def execute(self): @@ -102,7 +103,7 @@ def execute(self): # First, fully remove jobs in JobStatus.DELETED state result = self.removeDeletedJobs() if not result["OK"]: - self.log.error(f"Failed to remove jobs with status {JobStatus.DELETED}") + self.log.error("Failed to remove jobs with status", JobStatus.DELETED) # Second: set the status to JobStatus.DELETED for certain jobs @@ -117,8 +118,7 @@ def execute(self): baseCond = {"JobType": result["Value"]} # Delete jobs with final status - for status in self.removeStatusDelay: - delay = self.removeStatusDelay[status] + for status, delay in self.removeStatusDelay.items(): if delay < 0: # Negative delay means don't delete anything... continue @@ -185,26 +185,7 @@ def removeDeletedJobs(self): if not jobList: return S_OK() - ownerJobsDict = self._getOwnerJobsDict(jobList) - - fail = False - for owner, jobsList in ownerJobsDict.items(): - ownerDN = owner.split(";")[0] - ownerGroup = owner.split(";")[1] - self.log.verbose("Attempting to remove jobs", f"(n={len(jobsList)}) for {ownerDN} : {ownerGroup}") - wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup) - result = wmsClient.removeJob(jobsList) - if not result["OK"]: - self.log.error( - "Could not remove jobs", - f"for {ownerDN} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}", - ) - fail = True - - if fail: - return S_ERROR() - - return S_OK() + return self._deleteRemoveJobs(jobList, remove=True) def deleteJobsByStatus(self, condDict, delay=False): """Sets the job status to "DELETED" for jobs in condDict. @@ -234,19 +215,29 @@ def deleteJobsByStatus(self, condDict, delay=False): if not jobList: return S_OK() + return self._deleteRemoveJobs(jobList) + + def _deleteRemoveJobs(self, jobList, remove=False): + """Delete or removes a jobList""" ownerJobsDict = self._getOwnerJobsDict(jobList) fail = False for owner, jobsList in ownerJobsDict.items(): - ownerDN = owner.split(";")[0] - ownerGroup = owner.split(";")[1] - self.log.verbose("Attempting to delete jobs", f"(n={len(jobsList)}) for {ownerDN} : {ownerGroup}") - wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup) - result = wmsClient.deleteJob(jobsList) + user, ownerGroup = owner.split(";", maxsplit=1) + self.log.verbose("Attempting to delete jobs", f"(n={len(jobsList)}) for {user} : {ownerGroup}") + res = getDNForUsername(user) + if not res["OK"]: + self.log.error("No DN found", f"for {user}") + return res + wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup) + if remove: + result = wmsClient.removeJob(jobsList) + else: + result = wmsClient.deleteJob(jobsList) if not result["OK"]: self.log.error( - "Could not delete jobs", - f"for {ownerDN} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}", + "Could not {'remove' if remove else 'delete'} jobs", + f"for {user} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}", ) fail = True @@ -279,7 +270,7 @@ def _getOwnerJobsDict(self, jobList): :returns: a dict with a grouping of them by owner, e.g.{'dn;group': [1, 3, 4], 'dn;group_1': [5], 'dn_1;group': [2]} """ - res = self.jobDB.getJobsAttributes(jobList, ["OwnerDN", "OwnerGroup"]) + res = self.jobDB.getJobsAttributes(jobList, ["Owner", "OwnerGroup"]) if not res["OK"]: self.log.error("Could not get the jobs attributes", res["Message"]) return res @@ -327,8 +318,7 @@ def deleteJobOversizedSandbox(self, jobIDList): else: successful[jobID] = lfn - result = {"Successful": successful, "Failed": failed} - return S_OK(result) + return S_OK({"Successful": successful, "Failed": failed}) def __setRemovalRequest(self, lfn, owner, ownerGroup): """Set removal request with the given credentials""" @@ -369,4 +359,3 @@ def removeHeartBeatLoggingInfo(self, status, delayDays): self.log.error("Failed to delete from HeartBeatLoggingInfo", result["Message"]) else: self.log.info("Deleted HeartBeatLogging info") - return diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index ebb15b29238..89ce0a1dc9d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -9,22 +9,22 @@ """ -import sys -import re import random +import sys from collections import defaultdict from DIRAC import S_OK, gConfig from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities import DErrno from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials -from DIRAC.RequestManagementSystem.Client.Request import Request MAX_JOBS_MANAGED = 100 @@ -184,7 +184,7 @@ def execute(self): jobRequest = self._matchAJob(ceDictList) while jobRequest["OK"]: # Check matcher information returned - matcherParams = ["JDL", "DN", "Group"] + matcherParams = ["JDL", "Owner", "Group"] matcherInfo = jobRequest["Value"] jobID = matcherInfo["JobID"] self.jobReport.setJob(jobID) @@ -195,7 +195,7 @@ def execute(self): jobJDL = matcherInfo["JDL"] jobGroup = matcherInfo["Group"] - ownerDN = matcherInfo["DN"] + owner = matcherInfo["Owner"] ceDict = matcherInfo["CEDict"] matchTime = matcherInfo["matchTime"] @@ -222,7 +222,7 @@ def execute(self): jobType = submissionParams["jobType"] self.log.verbose("Job request successful: \n", jobRequest["Value"]) - self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}") + self.log.info("Received", f"JobID={jobID}, JobType={jobType}, Owner={owner}, JobGroup={jobGroup}") self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False) self.jobReport.setJobStatus( @@ -230,6 +230,7 @@ def execute(self): ) # Setup proxy + ownerDN = getDNForUsername(owner)["Value"] result_setupProxy = self._setupProxy(ownerDN, jobGroup) if not result_setupProxy["OK"]: result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index d81e132a907..3ec867968f0 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -1,14 +1,12 @@ -""" The StalledJobAgent hunts for stalled jobs in the Job database. Jobs in "running" - state not receiving a heart beat signal for more than stalledTime - seconds will be assigned the "Stalled" state. - +"""The StalledJobAgent hunts for stalled jobs in the Job database. Jobs in +"running" state not receiving a heart beat signal for more than stalledTime +seconds will be assigned the "Stalled" state. .. literalinclude:: ../ConfigTemplate.cfg :start-after: ##BEGIN StalledJobAgent :end-before: ##END :dedent: 2 :caption: StalledJobAgent options - """ import concurrent.futures import datetime @@ -20,6 +18,7 @@ from DIRAC.Core.Utilities.TimeUtilities import fromString, toEpoch, second from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.ConfigurationSystem.Client.Helpers import cfgPath +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemInstance from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient @@ -31,10 +30,13 @@ class StalledJobAgent(AgentModule): - """Agent for setting Running jobs Stalled, and Stalled jobs Failed. And a few more.""" + """Agent for setting Running jobs Stalled, and Stalled jobs Failed. + + And a few more. + """ def __init__(self, *args, **kwargs): - """c'tor""" + """c'tor.""" super().__init__(*args, **kwargs) self.jobDB = None @@ -42,12 +44,14 @@ def __init__(self, *args, **kwargs): self.matchedTime = 7200 self.rescheduledTime = 600 self.submittingTime = 300 + self.stalledJobsToleranceTime = 0 self.stalledJobsTolerantSites = [] self.stalledJobsToRescheduleSites = [] + self.threadPoolExecutor = None ############################################################################# def initialize(self): - """Sets default parameters""" + """Sets default parameters.""" self.jobDB = JobDB() self.logDB = JobLoggingDB() @@ -56,19 +60,21 @@ def initialize(self): if not self.am_getOption("Enable", True): self.log.info("Stalled Job Agent running in disabled mode") - wms_instance = getSystemInstance("WorkloadManagement") - if not wms_instance: + wmsInstance = getSystemInstance("WorkloadManagement") + if not wmsInstance: return S_ERROR("Can not get the WorkloadManagement system instance") - self.stalledJobsTolerantSites = self.am_getOption("StalledJobsTolerantSites", []) - self.stalledJobsToleranceTime = self.am_getOption("StalledJobsToleranceTime", 0) + self.stalledJobsTolerantSites = self.am_getOption("StalledJobsTolerantSites", self.stalledJobsTolerantSites) + self.stalledJobsToleranceTime = self.am_getOption("StalledJobsToleranceTime", self.stalledJobsToleranceTime) - self.stalledJobsToRescheduleSites = self.am_getOption("StalledJobsToRescheduleSites", []) + self.stalledJobsToRescheduleSites = self.am_getOption( + "StalledJobsToRescheduleSites", self.stalledJobsToRescheduleSites + ) self.submittingTime = self.am_getOption("SubmittingTime", self.submittingTime) self.matchedTime = self.am_getOption("MatchedTime", self.matchedTime) self.rescheduledTime = self.am_getOption("RescheduledTime", self.rescheduledTime) - wrapperSection = cfgPath("Systems", "WorkloadManagement", wms_instance, "JobWrapper") + wrapperSection = cfgPath("Systems", "WorkloadManagement", wmsInstance, "JobWrapper") failedTime = self.am_getOption("FailedTimeHours", 6) watchdogCycle = gConfig.getValue(cfgPath(wrapperSection, "CheckingTime"), 30 * 60) @@ -88,14 +94,14 @@ def initialize(self): # setting up the threading maxNumberOfThreads = self.am_getOption("MaxNumberOfThreads", 15) - self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads) + self.log.verbose(f"Multithreaded with {maxNumberOfThreads} threads") self.threadPoolExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=maxNumberOfThreads) return S_OK() ############################################################################# def execute(self): - """The main agent execution method""" + """The main agent execution method.""" # Now we are getting what's going to be checked futures = [] @@ -161,7 +167,7 @@ def execute(self): return S_OK() def finalize(self): - """graceful finalization""" + """Graceful finalization.""" self.log.info("Wait for threads to get empty before terminating the agent") self.threadPoolExecutor.shutdown() @@ -169,14 +175,15 @@ def finalize(self): return S_OK() def _execute(self, job_Op): - """ - Doing the actual job. This is run inside the threads + """Doing the actual job. + + This is run inside the threads """ jobID, jobOp = job_Op.split(":") jobID = int(jobID) res = getattr(self, f"{jobOp}")(jobID) if not res["OK"]: - self.log.error(f"Failure executing {jobOp}", "on %d: %s" % (jobID, res["Message"])) + self.log.error(f"Failure executing {jobOp}", f"on {jobID}: {res['Message']}") ############################################################################# def _markStalledJobs(self, jobID): @@ -204,8 +211,8 @@ def _markStalledJobs(self, jobID): ############################################################################# def _failStalledJobs(self, jobID): - """ - Changes the Stalled status to Failed for jobs long in the Stalled status. + """Changes the Stalled status to Failed for jobs long in the Stalled + status. Run inside thread. """ @@ -214,7 +221,7 @@ def _failStalledJobs(self, jobID): # Check if the job pilot is lost result = self._getJobPilotStatus(jobID) if not result["OK"]: - self.log.error("Failed to get pilot status", "for job %d: %s" % (jobID, result["Message"])) + self.log.error("Failed to get pilot status", f"for job {jobID}: {result['Message']}") return result pilotStatus = result["Value"] if pilotStatus != "Running": @@ -223,7 +230,7 @@ def _failStalledJobs(self, jobID): # Verify that there was no sign of life for long enough result = self._getLatestUpdateTime(jobID) if not result["OK"]: - self.log.error("Failed to get job update time", "for job %d: %s" % (jobID, result["Message"])) + self.log.error("Failed to get job update time", f"for job {jobID}: {result['Message']}") return result elapsedTime = toEpoch() - result["Value"] if elapsedTime > self.failedTime: @@ -232,7 +239,9 @@ def _failStalledJobs(self, jobID): # Set the jobs Failed, send them a kill signal in case they are not really dead # and send accounting info if setFailed: - self._sendKillCommand(jobID) # always returns None + res = self._sendKillCommand(jobID) + if not res["OK"]: + self.log.error("Failed to kill job", jobID) # For some sites we might want to reschedule rather than fail the jobs if self.stalledJobsToRescheduleSites: @@ -248,7 +257,7 @@ def _failStalledJobs(self, jobID): return S_OK() def _getJobPilotStatus(self, jobID): - """Get the job pilot status""" + """Get the job pilot status.""" result = JobMonitoringClient().getJobParameter(jobID, "Pilot_Reference") if not result["OK"]: return result @@ -260,9 +269,9 @@ def _getJobPilotStatus(self, jobID): result = PilotManagerClient().getPilotInfo(pilotReference) if not result["OK"]: if DErrno.cmpError(result, DErrno.EWMSNOPILOT): - self.log.warn("No pilot found", "for job %d: %s" % (jobID, result["Message"])) + self.log.warn("No pilot found", f"for job {jobID}: {result['Message']}") return S_OK("NoPilot") - self.log.error("Failed to get pilot information", "for job %d: %s" % (jobID, result["Message"])) + self.log.error("Failed to get pilot information", f"for job {jobID}: {result['Message']}") return result pilotStatus = result["Value"][pilotReference]["Status"] @@ -271,8 +280,7 @@ def _getJobPilotStatus(self, jobID): ############################################################################# def _checkJobStalled(self, job, stalledTime): """Compares the most recent of LastUpdateTime and HeartBeatTime against - the stalledTime limit. - """ + the stalledTime limit.""" result = self._getLatestUpdateTime(job) if not result["OK"]: return result @@ -289,7 +297,7 @@ def _checkJobStalled(self, job, stalledTime): ############################################################################# def _getLatestUpdateTime(self, job): - """Returns the most recent of HeartBeatTime and LastUpdateTime""" + """Returns the most recent of HeartBeatTime and LastUpdateTime.""" result = self.jobDB.getJobAttributes(job, ["HeartBeatTime", "LastUpdateTime"]) if not result["OK"] or not result["Value"]: self.log.error( @@ -317,7 +325,7 @@ def _getLatestUpdateTime(self, job): ############################################################################# def _updateJobStatus(self, job, status, minorStatus=None, force=False): - """This method updates the job status in the JobDB""" + """This method updates the job status in the JobDB.""" if not self.am_getOption("Enable", True): return S_OK("Disabled") @@ -327,15 +335,13 @@ def _updateJobStatus(self, job, status, minorStatus=None, force=False): self.log.debug(f"self.jobDB.setJobAttribute({job},'Status','{status}',update=True)") result = self.jobDB.setJobAttribute(job, "Status", status, update=True, force=force) if not result["OK"]: - self.log.error("Failed setting Status", "%s for job %d: %s" % (status, job, result["Message"])) + self.log.error("Failed setting Status", f"{status} for job {job}: {result['Message']}") toRet = result if minorStatus: self.log.debug(f"self.jobDB.setJobAttribute({job},'MinorStatus','{minorStatus}',update=True)") result = self.jobDB.setJobAttribute(job, "MinorStatus", minorStatus, update=True) if not result["OK"]: - self.log.error( - "Failed setting MinorStatus", "%s for job %d: %s" % (minorStatus, job, result["Message"]) - ) + self.log.error("Failed setting MinorStatus", f"{minorStatus} for job {job}: {result['Message']}") toRet = result if not minorStatus: # Retain last minor status for stalled jobs @@ -343,7 +349,7 @@ def _updateJobStatus(self, job, status, minorStatus=None, force=False): if result["OK"] and "MinorStatus" in result["Value"]: minorStatus = result["Value"]["MinorStatus"] else: - self.log.error("Failed getting MinorStatus", "for job %d: %s" % (job, result["Message"])) + self.log.error("Failed getting MinorStatus", f"for job {job}: {result['Message']}") minorStatus = "idem" toRet = result @@ -355,7 +361,8 @@ def _updateJobStatus(self, job, status, minorStatus=None, force=False): return toRet def _getProcessingType(self, jobID): - """Get the Processing Type from the JDL, until it is promoted to a real Attribute""" + """Get the Processing Type from the JDL, until it is promoted to a real + Attribute.""" processingType = "unknown" result = self.jobDB.getJobJDL(jobID, original=True) if not result["OK"]: @@ -366,8 +373,7 @@ def _getProcessingType(self, jobID): return processingType def _sendAccounting(self, jobID): - """ - Send WMS accounting data for the given job. + """Send WMS accounting data for the given job. Run inside thread. """ @@ -447,11 +453,11 @@ def _sendAccounting(self, jobID): if result["OK"]: self.jobDB.setJobAttribute(jobID, "AccountedFlag", "True") else: - self.log.error("Failed to send accounting report", "Job: %d, Error: %s" % (int(jobID), result["Message"])) + self.log.error("Failed to send accounting report", f"for job {jobID}: {result['Message']}") return result def _checkHeartBeat(self, jobID, jobDict): - """Get info from HeartBeat""" + """Get info from HeartBeat.""" result = self.jobDB.getHeartBeatData(jobID) lastCPUTime = 0 lastWallTime = 0 @@ -483,7 +489,7 @@ def _checkHeartBeat(self, jobID, jobDict): return lastCPUTime, lastWallTime, lastHeartBeatTime def _checkLoggingInfo(self, jobID, jobDict): - """Get info from JobLogging""" + """Get info from JobLogging.""" logList = [] result = self.logDB.getJobLoggingInfo(jobID) if result["OK"]: @@ -517,7 +523,8 @@ def _checkLoggingInfo(self, jobID, jobDict): return startTime, endTime def _kickStuckJobs(self): - """Reschedule jobs stuck in initialization status Rescheduled, Matched""" + """Reschedule jobs stuck in initialization status Rescheduled, + Matched.""" message = "" @@ -564,6 +571,7 @@ def _kickStuckJobs(self): def _failSubmittingJobs(self): """Failed Jobs stuck in Submitting Status for a long time. + They are due to a failed bulk submission transaction. """ @@ -587,17 +595,18 @@ def _sendKillCommand(self, job): :param int job: ID of job to send kill command """ - ownerDN = self.jobDB.getJobAttribute(job, "OwnerDN") - ownerGroup = self.jobDB.getJobAttribute(job, "OwnerGroup") - if ownerDN["OK"] and ownerGroup["OK"]: - wmsClient = WMSClient( - useCertificates=True, delegatedDN=ownerDN["Value"], delegatedGroup=ownerGroup["Value"] - ) - resKill = wmsClient.killJob(job) - if not resKill["OK"]: - self.log.error("Failed to send kill command to job", f"{job}: {resKill['Message']}") - else: - self.log.error( - "Failed to get ownerDN or Group for job:", - f"{job}: {ownerDN.get('Message', '')}, {ownerGroup.get('Message', '')}", - ) + + res = self.jobDB.getJobAttribute(job, "Owner") + if not res["OK"]: + return res + owner = res["Value"] + + res = self.jobDB.getJobAttribute(job, "OwnerGroup") + if not res["OK"]: + return res + ownerGroup = res["Value"] + + wmsClient = WMSClient( + useCertificates=True, delegatedDN=getDNForUsername(owner)["Value"][0], delegatedGroup=ownerGroup + ) + return wmsClient.killJob(job) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py index a2761b06a36..22a24de8fd0 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py @@ -31,6 +31,7 @@ def sja(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotManagerClient", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getSystemInstance", return_value="/bof/bih") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock()) stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobManifest.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobManifest.py index e284a38e579..882d30658f6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobManifest.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobManifest.py @@ -154,7 +154,7 @@ def check(self): """ Check that the manifest is OK """ - for k in ["Owner", "OwnerDN", "OwnerGroup"]: + for k in ["Owner", "OwnerGroup"]: if k not in self.__manifest: return S_ERROR(f"Missing var {k} in manifest") diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py index 0ebec4d16a7..118d0681b24 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py @@ -97,7 +97,7 @@ def selectJob(self, resourceDescription, credDict): return {} jobID = result["jobId"] - resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup", "Status"]) + resAtt = self.jobDB.getJobAttributes(jobID, ["Status"]) if not resAtt["OK"]: raise RuntimeError("Could not retrieve job attributes") if not resAtt["Value"]: @@ -127,7 +127,7 @@ def selectJob(self, resourceDescription, credDict): if resOpt["OK"]: for key, value in resOpt["Value"].items(): resultDict[key] = value - resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup"]) + resAtt = self.jobDB.getJobAttributes(jobID, ["Owner", "OwnerGroup"]) if not resAtt["OK"]: raise RuntimeError("Could not retrieve job attributes") if not resAtt["Value"]: @@ -141,7 +141,7 @@ def selectJob(self, resourceDescription, credDict): self._updatePilotInfo(resourceDict) self._updatePilotJobMapping(resourceDict, jobID) - resultDict["DN"] = resAtt["Value"]["OwnerDN"] + resultDict["Owner"] = resAtt["Value"]["Owner"] resultDict["Group"] = resAtt["Value"]["OwnerGroup"] resultDict["PilotInfoReportedFlag"] = True diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index d5bf149fdfd..29f63f042b5 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -908,7 +908,6 @@ def insertNewJobIntoDB( self, jdl, owner, - ownerDN, ownerGroup, initialStatus=JobStatus.RECEIVED, initialMinorStatus="Job accepted", @@ -919,7 +918,6 @@ def insertNewJobIntoDB( :param str jdl: job description JDL :param str owner: job owner user name - :param str ownerDN: job owner DN :param str ownerGroup: job owner group :param str initialStatus: optional initial job status (Received by default) :param str initialMinorStatus: optional initial minor job status @@ -929,7 +927,7 @@ def insertNewJobIntoDB( result = jobManifest.load(jdl) if not result["OK"]: return result - jobManifest.setOptionsFromDict({"Owner": owner, "OwnerDN": ownerDN, "OwnerGroup": ownerGroup}) + jobManifest.setOptionsFromDict({"Owner": owner, "OwnerGroup": ownerGroup}) result = jobManifest.check() if not result["OK"]: return result @@ -959,9 +957,6 @@ def insertNewJobIntoDB( jobAttrNames.append("Owner") jobAttrValues.append(owner) - jobAttrNames.append("OwnerDN") - jobAttrValues.append(ownerDN) - jobAttrNames.append("OwnerGroup") jobAttrValues.append(ownerGroup) @@ -993,7 +988,7 @@ def insertNewJobIntoDB( classAdJob.insertAttributeInt("JobID", jobID) result = self.__checkAndPrepareJob( - jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, jobAttrNames, jobAttrValues + jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues ) if not result["OK"]: return result @@ -1081,9 +1076,7 @@ def insertNewJobIntoDB( return retVal - def __checkAndPrepareJob( - self, jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, jobAttrNames, jobAttrValues - ): + def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues): """ Check Consistency of Submitted JDL and set some defaults Prepare subJDL with Job Requirements @@ -1092,27 +1085,22 @@ def __checkAndPrepareJob( vo = getVOForGroup(ownerGroup) jdlOwner = classAdJob.getAttributeString("Owner") - jdlOwnerDN = classAdJob.getAttributeString("OwnerDN") jdlOwnerGroup = classAdJob.getAttributeString("OwnerGroup") jdlVO = classAdJob.getAttributeString("VirtualOrganization") if jdlOwner and jdlOwner != owner: error = "Wrong Owner in JDL" - elif jdlOwnerDN and jdlOwnerDN != ownerDN: - error = "Wrong Owner DN in JDL" elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup: error = "Wrong Owner Group in JDL" elif jdlVO and jdlVO != vo: error = "Wrong Virtual Organization in JDL" classAdJob.insertAttributeString("Owner", owner) - classAdJob.insertAttributeString("OwnerDN", ownerDN) classAdJob.insertAttributeString("OwnerGroup", ownerGroup) if vo: classAdJob.insertAttributeString("VirtualOrganization", vo) - classAdReq.insertAttributeString("OwnerDN", ownerDN) classAdReq.insertAttributeString("OwnerGroup", ownerGroup) if vo: classAdReq.insertAttributeString("VirtualOrganization", vo) @@ -1219,7 +1207,6 @@ def rescheduleJob(self, jobID): "VerifiedFlag", "RescheduleCounter", "Owner", - "OwnerDN", "OwnerGroup", ], ) @@ -1299,7 +1286,6 @@ def rescheduleJob(self, jobID): classAdJob, classAdReq, resultDict["Owner"], - resultDict["OwnerDN"], resultDict["OwnerGroup"], jobAttrNames, jobAttrValues, diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql index b5736313bad..721eb27418c 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql @@ -41,7 +41,6 @@ CREATE TABLE `Jobs` ( `Site` VARCHAR(100) NOT NULL DEFAULT 'ANY', `JobName` VARCHAR(128) NOT NULL DEFAULT 'Unknown', `Owner` VARCHAR(64) NOT NULL DEFAULT 'Unknown', - `OwnerDN` VARCHAR(255) NOT NULL DEFAULT 'Unknown', `OwnerGroup` VARCHAR(128) NOT NULL DEFAULT 'Unknown', `SubmissionTime` DATETIME DEFAULT NULL, `RescheduleTime` DATETIME DEFAULT NULL, @@ -71,7 +70,6 @@ CREATE TABLE `Jobs` ( KEY `JobSplitType` (`JobSplitType`), KEY `Site` (`Site`), KEY `Owner` (`Owner`), - KEY `OwnerDN` (`OwnerDN`), KEY `OwnerGroup` (`OwnerGroup`), KEY `Status` (`Status`), KEY `MinorStatus` (`MinorStatus`), diff --git a/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py b/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py index 2754a67c1ac..20e12bc6009 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py @@ -1,10 +1,10 @@ """ SandboxMetadataDB class is a front-end to the metadata for sandboxes """ -from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK, gLogger +from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Core.Base.DB import DB -from DIRAC.Core.Utilities import List from DIRAC.Core.Security import Properties -from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.Core.Utilities import List class SandboxMetadataDB(DB): @@ -32,7 +32,6 @@ def __initializeDB(self): "Fields": { "OwnerId": "INTEGER(10) UNSIGNED AUTO_INCREMENT NOT NULL", "Owner": "VARCHAR(32) NOT NULL", - "OwnerDN": "VARCHAR(255) NOT NULL", "OwnerGroup": "VARCHAR(32) NOT NULL", }, "PrimaryKey": "OwnerId", @@ -72,18 +71,13 @@ def __initializeDB(self): return self._createTables(tablesToCreate) - def __registerAndGetOwnerId(self, owner, ownerDN, ownerGroup): + def __registerAndGetOwnerId(self, owner, ownerGroup): """ Get the owner ID and register it if it's not there """ ownerEscaped = self._escapeString(owner)["Value"] - ownerDNEscaped = self._escapeString(ownerDN)["Value"] ownerGroupEscaped = self._escapeString(ownerGroup)["Value"] - sqlCmd = "SELECT OwnerId FROM `sb_Owners` WHERE Owner = {} AND OwnerDN = {} AND OwnerGroup = {}".format( - ownerEscaped, - ownerDNEscaped, - ownerGroupEscaped, - ) + sqlCmd = f"SELECT OwnerId FROM `sb_Owners` WHERE Owner = {ownerEscaped} AND OwnerGroup = {ownerGroupEscaped}" result = self._query(sqlCmd) if not result["OK"]: return result @@ -91,10 +85,8 @@ def __registerAndGetOwnerId(self, owner, ownerDN, ownerGroup): if data: return S_OK(data[0][0]) # Its not there, insert it - sqlCmd = "INSERT INTO `sb_Owners` ( OwnerId, Owner, OwnerDN, OwnerGroup ) VALUES ( 0, {}, {}, {} )".format( - ownerEscaped, - ownerDNEscaped, - ownerGroupEscaped, + sqlCmd = ( + f"INSERT INTO `sb_Owners` ( OwnerId, Owner, OwnerGroup ) VALUES ( 0, {ownerEscaped}, {ownerGroupEscaped} )" ) result = self._update(sqlCmd) if not result["OK"]: @@ -106,12 +98,12 @@ def __registerAndGetOwnerId(self, owner, ownerDN, ownerGroup): return S_ERROR("Can't determine owner id after insertion") return S_OK(result["Value"][0][0]) - def registerAndGetSandbox(self, owner, ownerDN, ownerGroup, sbSE, sbPFN, size=0): + def registerAndGetSandbox(self, owner, ownerGroup, sbSE, sbPFN, size=0): """ Register a new sandbox in the metadata catalog Returns ( sbid, newSandbox ) """ - result = self.__registerAndGetOwnerId(owner, ownerDN, ownerGroup) + result = self.__registerAndGetOwnerId(owner, ownerGroup) if not result["OK"]: return result ownerId = result["Value"] @@ -386,13 +378,13 @@ def getSandboxOwner(self, SEName, SEPFN, requesterDN, requesterGroup): :param requestDN: host DN used as credentials :param requesterGroup: group used to use as credentials (should be 'hosts') - :returns: S_OK with tuple (owner, ownerDN, ownerGroup) + :returns: S_OK with tuple (owner, ownerGroup) """ res = self.getSandboxId(SEName, SEPFN, None, requesterGroup, "OwnerId", requesterDN=requesterDN) if not res["OK"]: return res - sqlCmd = "SELECT `Owner`, `OwnerDN`, `OwnerGroup` FROM `sb_Owners` WHERE `OwnerId` = %d" % res["Value"] + sqlCmd = "SELECT `Owner`, `OwnerGroup` FROM `sb_Owners` WHERE `OwnerId` = %d" % res["Value"] res = self._query(sqlCmd) if not res["OK"]: return res diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index 7965a077bdb..dd5ea79be74 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -3,21 +3,21 @@ import random import string -from DIRAC import gConfig, S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK, gConfig +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.Base.DB import DB +from DIRAC.Core.Security import Properties from DIRAC.Core.Utilities import List -from DIRAC.Core.Utilities.PrettyPrint import printDict from DIRAC.Core.Utilities.DictCache import DictCache -from DIRAC.Core.Security import Properties -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.Core.Utilities.PrettyPrint import printDict from DIRAC.WorkloadManagementSystem.private.SharesCorrector import SharesCorrector DEFAULT_GROUP_SHARE = 1000 TQ_MIN_SHARE = 0.001 # For checks at insertion time, and not only -singleValueDefFields = ("OwnerDN", "OwnerGroup", "CPUTime") +singleValueDefFields = ("Owner", "OwnerGroup", "CPUTime") multiValueDefFields = ("Sites", "GridCEs", "BannedSites", "Platforms", "JobTypes", "Tags") # Used for matching @@ -58,7 +58,6 @@ def __init__(self, parentLogger=None): self.__groupShares = {} self.__deleteTQWithDelay = DictCache(self.__deleteTQIfEmpty) self.__opsHelper = Operations() - self.__ensureInsertionIsSingle = False self.__sharesCorrector = SharesCorrector(self.__opsHelper) result = self.__initializeDB() if not result["OK"]: @@ -81,9 +80,6 @@ def isSharesCorrectionEnabled(self): def __getCSOption(self, optionName, defValue): return self.__opsHelper.getValue(f"JobScheduling/{optionName}", defValue) - def getValidPilotTypes(self): - return self.__getCSOption("AllPilotTypes", ["private"]) - def __initializeDB(self): """ Create the tables @@ -93,20 +89,20 @@ def __initializeDB(self): return result tablesInDB = [t[0] for t in result["Value"]] - tablesToCreate = {} self.__tablesDesc = {} self.__tablesDesc["tq_TaskQueues"] = { "Fields": { "TQId": "INTEGER(11) UNSIGNED AUTO_INCREMENT NOT NULL", - "OwnerDN": "VARCHAR(255) NOT NULL", + "Owner": "VARCHAR(255) NOT NULL", + "OwnerDN": "VARCHAR(255)", "OwnerGroup": "VARCHAR(32) NOT NULL", "CPUTime": "BIGINT(20) UNSIGNED NOT NULL", "Priority": "FLOAT NOT NULL", "Enabled": "TINYINT(1) NOT NULL DEFAULT 0", }, "PrimaryKey": "TQId", - "Indexes": {"TQOwner": ["OwnerDN", "OwnerGroup", "CPUTime"]}, + "Indexes": {"TQOwner": ["Owner", "OwnerGroup", "CPUTime"]}, } self.__tablesDesc["tq_Jobs"] = { @@ -130,9 +126,10 @@ def __initializeDB(self): "ForeignKeys": {"TQId": "tq_TaskQueues.TQId"}, } - for tableName in self.__tablesDesc: + tablesToCreate = {} + for tableName, tableDef in self.__tablesDesc.items(): if tableName not in tablesInDB: - tablesToCreate[tableName] = self.__tablesDesc[tableName] + tablesToCreate[tableName] = tableDef return self._createTables(tablesToCreate) @@ -171,11 +168,9 @@ def _checkTaskQueueDefinition(self, tqDefDict): """ for field in singleValueDefFields: - if field not in tqDefDict: - return S_ERROR(f"Missing mandatory field '{field}' in task queue definition") - if field in ["CPUTime"]: + if field == "CPUTime": if not isinstance(tqDefDict[field], int): - return S_ERROR(f"Mandatory field {field} value type is not valid: {type(tqDefDict[field])}") + return S_ERROR(f"Mandatory field 'CPUTime' value type is not valid: {type(tqDefDict['CPUTime'])}") else: if not isinstance(tqDefDict[field], str): return S_ERROR(f"Mandatory field {field} value type is not valid: {type(tqDefDict[field])}") @@ -265,7 +260,7 @@ def __createTaskQueue(self, tqDefDict, priority=1, connObj=False): ) result = self._update(cmd, conn=connObj) if not result["OK"]: - self.log.error("Can't insert TQ in DB", result["Value"]) + self.log.error("Can't insert TQ in DB", result["Message"]) return result if "lastRowId" in result: tqId = result["lastRowId"] @@ -285,9 +280,9 @@ def __createTaskQueue(self, tqDefDict, priority=1, connObj=False): cmd += ", ".join([f"( {tqId}, {str(value)} )" for value in values]) result = self._update(cmd, conn=connObj) if not result["OK"]: - self.log.error("Failed to insert condition", f"{field} {result['Message']}") + self.log.error("Failed to insert condition", f"{field} : {result['Message']}") self.cleanOrphanedTaskQueues(connObj=connObj) - return S_ERROR(f"Can't insert values {str(values)} for field {field}: {result['Message']}") + return S_ERROR(f"Can't insert values {values} for field {field}: {result['Message']}") self.log.info("Created TQ", tqId) return S_OK(tqId) @@ -366,7 +361,7 @@ def insertJob(self, jobId, tqDefDict, jobPriority, skipTQDefCheck=False): tqDefDict = retVal["Value"] tqDefDict["CPUTime"] = self.fitCPUTimeToSegments(tqDefDict["CPUTime"]) self.log.info("Inserting job with requirements", f"({jobId} : {printDict(tqDefDict)})") - retVal = self.__findAndDisableTaskQueue(tqDefDict, skipDefinitionCheck=True, connObj=connObj) + retVal = self.__findAndDisableTaskQueue(tqDefDict, connObj=connObj) if not retVal["OK"]: return retVal tqInfo = retVal["Value"] @@ -387,7 +382,7 @@ def insertJob(self, jobId, tqDefDict, jobPriority, skipTQDefCheck=False): self.log.error("Error inserting job in TQ", f"Job {jobId} TQ {tqId}: {result['Message']}") return result if newTQ: - self.recalculateTQSharesForEntity(tqDefDict["OwnerDN"], tqDefDict["OwnerGroup"], connObj=connObj) + self.recalculateTQSharesForEntity(tqDefDict["Owner"], tqDefDict["OwnerGroup"], connObj=connObj) finally: self.__setTaskQueueEnabled(tqId, True) return S_OK() @@ -423,20 +418,16 @@ def __insertJobInTaskQueue(self, jobId, tqId, jobPriority, checkTQExists=True, c return result return S_OK() - def __generateTQFindSQL(self, tqDefDict, skipDefinitionCheck=False): + def __generateTQFindSQL( + self, + tqDefDict, + ): """ Generate the SQL to find a task queue that has exactly the given requirements :param dict tqDefDict: dict for TQ definition :returns: S_OK() / S_ERROR """ - if not skipDefinitionCheck: - tqDefDict = dict(tqDefDict) - result = self._checkTaskQueueDefinition(tqDefDict) - if not result["OK"]: - return result - tqDefDict = result["Value"] - sqlCondList = [] for field in singleValueDefFields: sqlCondList.append(f"`tq_TaskQueues`.{field} = {tqDefDict[field]}") @@ -465,14 +456,14 @@ def __generateTQFindSQL(self, tqDefDict, skipDefinitionCheck=False): # END MAGIC: That was easy ;) return S_OK(" AND ".join(sqlCondList)) - def __findAndDisableTaskQueue(self, tqDefDict, skipDefinitionCheck=False, retries=10, connObj=False): + def __findAndDisableTaskQueue(self, tqDefDict, retries=10, connObj=False): """Disable and find TQ :param dict tqDefDict: dict for TQ definition :returns: S_OK() / S_ERROR """ for _ in range(retries): - result = self.__findSmallestTaskQueue(tqDefDict, skipDefinitionCheck=skipDefinitionCheck, connObj=connObj) + result = self.__findSmallestTaskQueue(tqDefDict, connObj=connObj) if not result["OK"]: return result data = result["Value"] @@ -485,14 +476,14 @@ def __findAndDisableTaskQueue(self, tqDefDict, skipDefinitionCheck=False, retrie return S_OK(data) return S_ERROR("Could not disable TQ") - def __findSmallestTaskQueue(self, tqDefDict, skipDefinitionCheck=False, connObj=False): + def __findSmallestTaskQueue(self, tqDefDict, connObj=False): """ Find a task queue that has at least the given requirements :param dict tqDefDict: dict for TQ definition :returns: S_OK() / S_ERROR """ - result = self.__generateTQFindSQL(tqDefDict, skipDefinitionCheck=skipDefinitionCheck) + result = self.__generateTQFindSQL(tqDefDict) if not result["OK"]: return result @@ -557,7 +548,7 @@ def matchAndGetJob(self, tqMatchDict, numJobsPerTry=50, numQueuesPerTry=10, nega if not tqList: self.log.info("No TQ matches requirements") return S_OK({"matchFound": False, "tqMatch": tqMatchDict}) - for tqId, tqOwnerDN, tqOwnerGroup in tqList: + for tqId, tqOwner, tqOwnerGroup in tqList: self.log.verbose("Trying to extract jobs from TQ", tqId) retVal = self._query(prioSQL % tqId, conn=connObj) if not retVal["OK"]: @@ -572,7 +563,7 @@ def matchAndGetJob(self, tqMatchDict, numJobsPerTry=50, numQueuesPerTry=10, nega jobTQList = [(row[0], row[1]) for row in retVal["Value"]] if not jobTQList: self.log.info("Task queue seems to be empty, triggering a cleaning of", tqId) - self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwnerDN, tqOwnerGroup)) + self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwner, tqOwnerGroup)) while jobTQList: jobId, tqId = jobTQList.pop(random.randint(0, len(jobTQList) - 1)) self.log.verbose("Trying to extract job from TQ", f"{jobId} : {tqId}") @@ -689,25 +680,22 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) # Only enabled TQs sqlCondList = [] sqlTables = {"tq_TaskQueues": "tq"} - # If OwnerDN and OwnerGroup are defined only use those combinations that make sense - if "OwnerDN" in tqMatchDict and "OwnerGroup" in tqMatchDict: + # If Owner and OwnerGroup are defined only use those combinations that make sense + if "Owner" in tqMatchDict and "OwnerGroup" in tqMatchDict: groups = tqMatchDict["OwnerGroup"] if not isinstance(groups, (list, tuple)): groups = [groups] - dns = tqMatchDict["OwnerDN"] - if not isinstance(dns, (list, tuple)): - dns = [dns] + owner = tqMatchDict["Owner"] ownerConds = [] for group in groups: if Properties.JOB_SHARING in Registry.getPropertiesForGroup(group.replace('"', "")): ownerConds.append(f"tq.OwnerGroup = {group}") else: - for dn in dns: - ownerConds.append(f"( tq.OwnerDN = {dn} AND tq.OwnerGroup = {group} )") + ownerConds.append(f"( tq.Owner = {owner} AND tq.OwnerGroup = {group} )") sqlCondList.append(" OR ".join(ownerConds)) else: # If not both are defined, just add the ones that are defined - for field in ("OwnerGroup", "OwnerDN"): + for field in ("OwnerGroup", "Owner"): if field in tqMatchDict: sqlCondList.append(self.__generateSQLSubCond("tq.%s = %%s" % field, tqMatchDict[field])) # Type of single value conditions @@ -837,7 +825,7 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) sqlCondList.append(self.__generateNotSQL(negativeCond)) # Generate the final query string - tqSqlCmd = "SELECT tq.TQId, tq.OwnerDN, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % ( + tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % ( " AND ".join(sqlCondList) ) @@ -891,7 +879,7 @@ def deleteJob(self, jobId, connObj=False): return S_ERROR(f"Can't delete job: {retVal['Message']}") connObj = retVal["Value"] retVal = self._query( - "SELECT t.TQId, t.OwnerDN, t.OwnerGroup \ + "SELECT t.TQId, t.Owner, t.OwnerGroup \ FROM `tq_TaskQueues` t, `tq_Jobs` j \ WHERE j.JobId = %s AND t.TQId = j.TQId" % jobId, @@ -902,7 +890,7 @@ def deleteJob(self, jobId, connObj=False): data = retVal["Value"] if not data: return S_OK(False) - tqId, tqOwnerDN, tqOwnerGroup = data[0] + tqId, tqOwner, tqOwnerGroup = data[0] self.log.verbose("Deleting job", jobId) retVal = self._update(f"DELETE FROM `tq_Jobs` WHERE JobId = {jobId}", conn=connObj) if not retVal["OK"]: @@ -911,7 +899,7 @@ def deleteJob(self, jobId, connObj=False): # No job deleted return S_OK(False) # Always return S_OK() because job has already been taken out from the TQ - self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwnerDN, tqOwnerGroup)) + self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwner, tqOwnerGroup)) return S_OK(True) def getTaskQueueForJob(self, jobId, connObj=False): @@ -935,34 +923,8 @@ def getTaskQueueForJob(self, jobId, connObj=False): return S_OK(retVal["Value"][0][0]) - def getTaskQueueForJobs(self, jobIDs, connObj=False): - """ - Return TaskQueues for a given list of Jobs - """ - if not connObj: - retVal = self._getConnection() - if not retVal["OK"]: - self.log.error("Can't get TQs for a job list", retVal["Message"]) - return retVal - connObj = retVal["Value"] - - cmd = f"SELECT JobId,TQId FROM `tq_Jobs` WHERE JobId IN ({','.join(str(x) for x in jobIDs)}) " - retVal = self._query(cmd, conn=connObj) - - if not retVal["OK"]: - return retVal - - if not retVal["Value"]: - return S_ERROR("Not in TaskQueues") - - resultDict = {} - for jobID, tqID in retVal["Value"]: - resultDict[int(jobID)] = int(tqID) - - return S_OK(resultDict) - def __getOwnerForTaskQueue(self, tqId, connObj=False): - retVal = self._query(f"SELECT OwnerDN, OwnerGroup from `tq_TaskQueues` WHERE TQId={tqId}", conn=connObj) + retVal = self._query(f"SELECT Owner, OwnerGroup from `tq_TaskQueues` WHERE TQId={tqId}", conn=connObj) if not retVal["OK"]: return retVal data = retVal["Value"] @@ -971,16 +933,16 @@ def __getOwnerForTaskQueue(self, tqId, connObj=False): return S_OK(retVal["Value"][0]) def __deleteTQIfEmpty(self, args): - (tqId, tqOwnerDN, tqOwnerGroup) = args + (tqId, tqOwner, tqOwnerGroup) = args retries = 3 while retries: retries -= 1 - result = self.deleteTaskQueueIfEmpty(tqId, tqOwnerDN, tqOwnerGroup) + result = self.deleteTaskQueueIfEmpty(tqId, tqOwner, tqOwnerGroup) if result["OK"]: return self.log.error("Could not delete TQ", f"{tqId}: {result['Message']}") - def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, connObj=False): + def deleteTaskQueueIfEmpty(self, tqId, tqOwner=False, tqOwnerGroup=False, connObj=False): """ Try to delete a task queue if its empty """ @@ -990,14 +952,14 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, conn self.log.error("Can't insert job", retVal["Message"]) return retVal connObj = retVal["Value"] - if not tqOwnerDN or not tqOwnerGroup: + if not tqOwner or not tqOwnerGroup: retVal = self.__getOwnerForTaskQueue(tqId, connObj=connObj) if not retVal["OK"]: return retVal data = retVal["Value"] if not data: return S_OK(False) - tqOwnerDN, tqOwnerGroup = data + tqOwner, tqOwnerGroup = data sqlCmd = f"SELECT TQId FROM `tq_TaskQueues` WHERE Enabled >= 1 AND `tq_TaskQueues`.TQId = {tqId} " sqlCmd += "AND `tq_TaskQueues`.TQId not in ( SELECT DISTINCT TQId from `tq_Jobs` )" @@ -1015,47 +977,11 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, conn retVal = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId = {tqId}", conn=connObj) if not retVal["OK"]: return retVal - self.recalculateTQSharesForEntity(tqOwnerDN, tqOwnerGroup, connObj=connObj) + self.recalculateTQSharesForEntity(tqOwner, tqOwnerGroup, connObj=connObj) self.log.info("Deleted empty and enabled TQ", tqId) return S_OK() return S_OK(False) - def deleteTaskQueue(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, connObj=False): - """ - Try to delete a task queue even if it has jobs - """ - self.log.info("Deleting TQ", tqId) - if not connObj: - retVal = self._getConnection() - if not retVal["OK"]: - return S_ERROR(f"Can't insert job: {retVal['Message']}") - connObj = retVal["Value"] - if not tqOwnerDN or not tqOwnerGroup: - retVal = self.__getOwnerForTaskQueue(tqId, connObj=connObj) - if not retVal["OK"]: - return retVal - data = retVal["Value"] - if not data: - return S_OK(False) - tqOwnerDN, tqOwnerGroup = data - sqlCmd = f"DELETE FROM `tq_TaskQueues` WHERE `tq_TaskQueues`.TQId = {tqId}" - retVal = self._update(sqlCmd, conn=connObj) - if not retVal["OK"]: - return S_ERROR(f"Could not delete task queue {tqId}: {retVal['Message']}") - delTQ = retVal["Value"] - sqlCmd = f"DELETE FROM `tq_Jobs` WHERE `tq_Jobs`.TQId = {tqId}" - retVal = self._update(sqlCmd, conn=connObj) - if not retVal["OK"]: - return S_ERROR(f"Could not delete task queue {tqId}: {retVal['Message']}") - for field in multiValueDefFields: - retVal = self._update(f"DELETE FROM `tq_TQTo{field}` WHERE TQId = {tqId}", conn=connObj) - if not retVal["OK"]: - return retVal - if delTQ > 0: - self.recalculateTQSharesForEntity(tqOwnerDN, tqOwnerGroup, connObj=connObj) - return S_OK(True) - return S_OK(False) - def getMatchingTaskQueues(self, tqMatchDict, negativeCond=False): """Get the info of the task queues that match a resource""" result = self.matchAndGetTaskQueue(tqMatchDict, numQueuesToGet=0, negativeCond=negativeCond) @@ -1063,16 +989,6 @@ def getMatchingTaskQueues(self, tqMatchDict, negativeCond=False): return result return self.retrieveTaskQueues([tqTuple[0] for tqTuple in result["Value"]]) - def getNumTaskQueues(self): - """ - Get the number of task queues in the system - """ - sqlCmd = "SELECT COUNT( TQId ) FROM `tq_TaskQueues`" - retVal = self._query(sqlCmd) - if not retVal["OK"]: - return retVal - return S_OK(retVal["Value"][0][0]) - def retrieveTaskQueues(self, tqIdList=None): """ Get all the task queues @@ -1173,22 +1089,20 @@ def recalculateTQSharesForAll(self): self.recalculateTQSharesForEntity("all", group) return S_OK() - def recalculateTQSharesForEntity(self, userDN, userGroup, connObj=False): + def recalculateTQSharesForEntity(self, user, userGroup, connObj=False): """ - Recalculate the shares for a userDN/userGroup combo + Recalculate the shares for a user/userGroup combo """ - self.log.info("Recalculating shares", f"for {userDN}@{userGroup} TQs") + self.log.info("Recalculating shares", f"for {user}@{userGroup} TQs") if userGroup in self.__groupShares: share = self.__groupShares[userGroup] else: share = float(DEFAULT_GROUP_SHARE) if Properties.JOB_SHARING in Registry.getPropertiesForGroup(userGroup): - # If group has JobSharing just set prio for that entry, userDN is irrelevant - return self.__setPrioritiesForEntity(userDN, userGroup, share, connObj=connObj) + # If group has JobSharing just set prio for that entry, user is irrelevant + return self.__setPrioritiesForEntity(user, userGroup, share, connObj=connObj) - selSQL = "SELECT OwnerDN, COUNT(OwnerDN) FROM `tq_TaskQueues` WHERE OwnerGroup='%s' GROUP BY OwnerDN" % ( - userGroup - ) + selSQL = f"SELECT Owner, COUNT(Owner) FROM `tq_TaskQueues` WHERE OwnerGroup='{userGroup}' GROUP BY Owner" result = self._query(selSQL, conn=connObj) if not result["OK"]: return result @@ -1208,26 +1122,26 @@ def recalculateTQSharesForEntity(self, userDN, userGroup, connObj=False): owners = dict(data) # IF the user is already known and has more than 1 tq, the rest of the users don't need to be modified # (The number of owners didn't change) - if userDN in owners and owners[userDN] > 1: - return self.__setPrioritiesForEntity(userDN, userGroup, entitiesShares[userDN], connObj=connObj) + if user in owners and owners[user] > 1: + return self.__setPrioritiesForEntity(user, userGroup, entitiesShares[user], connObj=connObj) # Oops the number of owners may have changed so we recalculate the prio for all owners in the group - for userDN in owners: - self.__setPrioritiesForEntity(userDN, userGroup, entitiesShares[userDN], connObj=connObj) + for user in owners: + self.__setPrioritiesForEntity(user, userGroup, entitiesShares[user], connObj=connObj) return S_OK() - def __setPrioritiesForEntity(self, userDN, userGroup, share, connObj=False, consolidationFunc="AVG"): + def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consolidationFunc="AVG"): """ - Set the priority for a userDN/userGroup combo given a splitted share + Set the priority for a user/userGroup combo given a splitted share """ - self.log.info("Setting priorities", f"to {userDN}@{userGroup} TQs") + self.log.info("Setting priorities", f"to {user}@{userGroup} TQs") tqCond = [f"t.OwnerGroup='{userGroup}'"] allowBgTQs = gConfig.getValue(f"/Registry/Groups/{userGroup}/AllowBackgroundTQs", False) if Properties.JOB_SHARING not in Registry.getPropertiesForGroup(userGroup): - res = self._escapeString(userDN) + res = self._escapeString(user) if not res["OK"]: return res userDN = res["Value"] - tqCond.append(f"t.OwnerDN= {userDN} ") + tqCond.append(f"t.Owner= {user} ") tqCond.append("t.TQId = j.TQId") if consolidationFunc == "AVG": selectSQL = "SELECT j.TQId, SUM( j.RealPriority )/COUNT(j.RealPriority) \ @@ -1298,8 +1212,8 @@ def __setPrioritiesForEntity(self, userDN, userGroup, share, connObj=False, cons prioDict[prio].append(tqId) # Execute updates - for prio in prioDict: - tqList = ", ".join([str(tqId) for tqId in prioDict[prio]]) + for prio, tqs in prioDict.items(): + tqList = ", ".join([str(tqId) for tqId in tqs]) updateSQL = f"UPDATE `tq_TaskQueues` SET Priority={prio:.4f} WHERE TQId in ( {tqList} )" self._update(updateSQL, conn=connObj) return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index b294a6254eb..150d740f386 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -12,11 +12,12 @@ from pydantic import ValidationError from DIRAC import S_OK, S_ERROR +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.DISET.MessageClient import MessageClient -from DIRAC.Core.Utilities.DErrno import EWMSJDL, EWMSSUBM from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from DIRAC.Core.Utilities.DErrno import EWMSJDL, EWMSSUBM from DIRAC.Core.Utilities.JDL import jdlToBaseJobDescriptionModel from DIRAC.Core.Utilities.JEncode import strToIntDict from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader @@ -27,13 +28,14 @@ from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( - JobPolicy, - RIGHT_SUBMIT, - RIGHT_RESCHEDULE, RIGHT_DELETE, RIGHT_KILL, + RIGHT_RESCHEDULE, RIGHT_RESET, + RIGHT_SUBMIT, + JobPolicy, ) +from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength MAX_PARAMETRIC_JOBS = 20 @@ -76,14 +78,14 @@ def initializeHandler(cls, serviceInfoDict): def initializeRequest(self): credDict = self.getRemoteCredentials() - self.ownerDN = credDict["DN"] self.ownerGroup = credDict["group"] self.userProperties = credDict["properties"] self.owner = credDict["username"] self.peerUsesLimitedProxy = credDict["isLimitedProxy"] self.maxParametricJobs = self.srv_getCSOption("MaxParametricJobs", MAX_PARAMETRIC_JOBS) - self.jobPolicy = JobPolicy(self.ownerDN, self.ownerGroup, self.userProperties) + self.jobPolicy = JobPolicy(self.owner, self.ownerGroup, self.userProperties) self.jobPolicy.jobDB = self.jobDB + self.ownerDN = getDNForUsername(self.owner)["Value"][0] return S_OK() def __sendJobsToOptimizationMind(self, jids): @@ -199,7 +201,6 @@ def export_submitJob(self, jobDesc): result = self.jobDB.insertNewJobIntoDB( jobDescription, self.owner, - self.ownerDN, self.ownerGroup, initialStatus=initialStatus, initialMinorStatus=initialMinorStatus, @@ -208,7 +209,7 @@ def export_submitJob(self, jobDesc): return result jobID = result["JobID"] - self.log.info(f'Job added to the JobDB", "{jobID} for {self.ownerDN}/{self.ownerGroup}') + self.log.info(f'Job added to the JobDB", "{jobID} for {self.owner}/{self.ownerGroup}') self.jobLoggingDB.addLoggingRecord( jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager" diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index d7365634314..41f75b76eae 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -32,11 +32,6 @@ def initializeHandler(cls, svcInfoDict): return result cls.jobLoggingDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") - if not result["OK"]: - return result - cls.taskQueueDB = result["Value"](parentLogger=cls.log) - except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") @@ -318,17 +313,17 @@ def export_getJobPageSummaryWeb(self, selectDict, sortList, startItem, maxItems, # initialize jobPolicy credDict = self.getRemoteCredentials() - ownerDN = credDict["DN"] + owner = credDict["username"] ownerGroup = credDict["group"] operations = Operations(group=ownerGroup) globalJobsInfo = operations.getValue("/Services/JobMonitoring/GlobalJobsInfo", True) - jobPolicy = JobPolicy(ownerDN, ownerGroup, globalJobsInfo) + jobPolicy = JobPolicy(owner, ownerGroup, globalJobsInfo) jobPolicy.jobDB = self.jobDB result = jobPolicy.getControlledUsers(RIGHT_GET_INFO) if not result["OK"]: return result if not result["Value"]: - return S_ERROR(f"User and group combination has no job rights ({ownerDN!r}, {ownerGroup!r})") + return S_ERROR(f"User and group combination has no job rights ({owner!r}, {ownerGroup!r})") if result["Value"] != "ALL": selectDict[("Owner", "OwnerGroup")] = result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py index e9f81dd32f8..81b34ff6f70 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py @@ -1,14 +1,14 @@ """ JobPolicy encapsulates authorization rules for different groups with respect to job related operations """ -from DIRAC import S_OK, S_ERROR, gLogger -from DIRAC.Core.Security import Properties +from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.ConfigurationSystem.Client.Helpers.Registry import ( - getUsernameForDN, getGroupsForUser, getPropertiesForGroup, + getUsernameForDN, getUsersInGroup, ) +from DIRAC.Core.Security import Properties RIGHT_GET_JOB = "GetJob" RIGHT_GET_INFO = "GetInfo" @@ -73,12 +73,8 @@ class JobPolicy: - def __init__(self, userDN, userGroup, allInfo=True): - self.userDN = userDN - self.userName = "" - result = getUsernameForDN(userDN) - if result["OK"]: - self.userName = result["Value"] + def __init__(self, user, userGroup, allInfo=True): + self.userName = user self.userGroup = userGroup self.userProperties = getPropertiesForGroup(userGroup, []) self.jobDB = None @@ -95,7 +91,7 @@ def getUserRightsForJob(self, jobID, owner=None, group=None): if not result["OK"]: return result elif result["Value"]: - owner = result["Value"]["OwnerDN"] + owner = result["Value"]["Owner"] group = result["Value"]["OwnerGroup"] else: return S_ERROR("Job not found") @@ -135,8 +131,8 @@ def _getUserJobPolicy(self): self._permissions[right] = True def getJobPolicy(self, jobOwner="", jobOwnerGroup=""): - """Get the job operations rights for a job owned by jobOwnerDN/jobOwnerGroup - for a user with userDN/userGroup. + """Get the job operations rights for a job owned by jobOwner/jobOwnerGroup + for a user with user/userGroup. Returns a dictionary of various operations rights """ permDict = dict(self._permissions) @@ -154,7 +150,7 @@ def getJobPolicy(self, jobOwner="", jobOwnerGroup=""): return S_OK(permDict) def evaluateJobRights(self, jobList, right): - """Get access rights to jobID for the user ownerDN/ownerGroup""" + """Get access rights to jobID for the user owner/ownerGroup""" validJobList = [] invalidJobList = [] nonauthJobList = [] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index f89f931e77d..bcf6fe5199d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -96,7 +96,7 @@ def export_getPilotOutput(self, pilotReference): pilotDict = result["Value"][pilotReference] - owner = pilotDict["OwnerDN"] + ownerDN = pilotDict["OwnerDN"] group = pilotDict["OwnerGroup"] # FIXME: What if the OutputSandBox is not StdOut and StdErr, what do we do with other files? @@ -108,7 +108,7 @@ def export_getPilotOutput(self, pilotReference): resultDict = {} resultDict["StdOut"] = stdout resultDict["StdErr"] = error - resultDict["OwnerDN"] = owner + resultDict["OwnerDN"] = ownerDN resultDict["OwnerGroup"] = group resultDict["FileList"] = [] return S_OK(resultDict) @@ -146,7 +146,7 @@ def export_getPilotOutput(self, pilotReference): resultDict = {} resultDict["StdOut"] = stdout resultDict["StdErr"] = error - resultDict["OwnerDN"] = owner + resultDict["OwnerDN"] = ownerDN resultDict["OwnerGroup"] = group resultDict["FileList"] = [] shutil.rmtree(ce.ceParameters["WorkingDirectory"]) @@ -325,7 +325,7 @@ def export_killPilot(cls, pilotRefList): if isinstance(pilotRefList, str): pilotRefs = [pilotRefList] - # Regroup pilots per site and per owner + # Regroup pilots per site and per ownerDN pilotRefDict = {} for pilotReference in pilotRefs: result = cls.pilotAgentsDB.getPilotInfo(pilotReference) @@ -333,9 +333,11 @@ def export_killPilot(cls, pilotRefList): return S_ERROR("Failed to get info for pilot " + pilotReference) pilotDict = result["Value"][pilotReference] - owner = pilotDict["OwnerDN"] + ownerDN = pilotDict["OwnerDN"] group = pilotDict["OwnerGroup"] - queue = "@@@".join([owner, group, pilotDict["GridSite"], pilotDict["DestinationSite"], pilotDict["Queue"]]) + queue = "@@@".join( + [ownerDN, group, pilotDict["GridSite"], pilotDict["DestinationSite"], pilotDict["Queue"]] + ) gridType = pilotDict["GridType"] pilotRefDict.setdefault(queue, {}) pilotRefDict[queue].setdefault("PilotList", []) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py index 00727dbf6a3..8f20b96ad33 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py @@ -7,23 +7,23 @@ :dedent: 2 :caption: SandboxStore options """ +import hashlib import os -import time -import threading import tempfile -import hashlib +import threading +import time -from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security import Locations, Properties, X509Certificate from DIRAC.Core.Utilities.File import mkDir from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.DataManagementSystem.Client.DataManager import DataManager from DIRAC.DataManagementSystem.Service.StorageElementHandler import getDiskSpace +from DIRAC.RequestManagementSystem.Client.File import File +from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.RequestManagementSystem.Client.Operation import Operation -from DIRAC.RequestManagementSystem.Client.File import File from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.Core.Utilities.File import getGlobbedTotalSize @@ -175,7 +175,6 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""): fSize = getGlobbedTotalSize(hdPath) result = self.sandboxDB.registerAndGetSandbox( credDict["username"], - credDict["DN"], credDict["group"], self.__seNameToUse, sbPath, @@ -224,7 +223,7 @@ def transfer_bulkFromClient(self, fileId, token, _fileSize, fileHelper): return S_OK(f"SB:{seName}|{sePFN}") result = self.sandboxDB.registerAndGetSandbox( - credDict["username"], credDict["DN"], credDict["group"], seName, sePFN, fileHelper.getTransferedBytes() + credDict["username"], credDict["group"], seName, sePFN, fileHelper.getTransferedBytes() ) if not result["OK"]: self.__secureUnlinkFile(tmpFilePath) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSUtilities.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSUtilities.py index dde68b3552d..e18ec02cbbe 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSUtilities.py @@ -46,8 +46,7 @@ def getPilotCE(pilotDict): if not result["OK"]: shutil.rmtree(queueDict["WorkingDirectory"]) return result - ce = result["Value"] - return S_OK(ce) + return result def getPilotProxy(pilotDict): @@ -56,16 +55,15 @@ def getPilotProxy(pilotDict): :param dict pilotDict: pilot parameters :return: S_OK/S_ERROR with proxy as Value """ - owner = pilotDict["OwnerDN"] + ownerDN = pilotDict["OwnerDN"] group = pilotDict["OwnerGroup"] groupVOMS = getGroupOption(group, "VOMSRole", group) - result = gProxyManager.getPilotProxyFromVOMSGroup(owner, groupVOMS) + result = gProxyManager.getPilotProxyFromVOMSGroup(ownerDN, groupVOMS) if not result["OK"]: - gLogger.error("Could not get proxy:", f"User \"{owner}\" Group \"{groupVOMS}\" : {result['Message']}") + gLogger.error("Could not get proxy:", f"User \"{ownerDN}\" Group \"{groupVOMS}\" : {result['Message']}") return S_ERROR("Failed to get the pilot's owner proxy") - proxy = result["Value"] - return S_OK(proxy) + return result def setPilotCredentials(ce, pilotDict): diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index 914bd018b2f..1e930487d37 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -182,7 +182,6 @@ def test_submitJob(jobType, inputData, expectedSite): # # Check that the JDL contains some fields # assert jobDescription.lookupAttribute("Owner") is True # assert jobDescription.lookupAttribute("OwnerGroup") is True - # assert jobDescription.lookupAttribute("OwnerDN") is True # assert jobDescription.lookupAttribute("CPUTime") is True # assert jobDescription.lookupAttribute("Priority") is True # assert jobDescription.lookupAttribute("JobID") is True @@ -193,7 +192,6 @@ def test_submitJob(jobType, inputData, expectedSite): # resourceDescription = { # "OwnerGroup": jobDescription.getAttributeString("OwnerGroup"), - # "OwnerDN": jobDescription.getAttributeString("OwnerDN"), # "VirtualOrganization": jobDescription.getAttributeString("VirtualOrganization"), # "CPUTime": jobDescription.getAttributeInt("CPUTime"), # "DIRACVersion": "pippo", @@ -313,7 +311,6 @@ def test_WMSClient_rescheduleJob(): # Check that the JDL contains some fields assert jobDescription.lookupAttribute("Owner") is True assert jobDescription.lookupAttribute("OwnerGroup") is True - assert jobDescription.lookupAttribute("OwnerDN") is True assert jobDescription.lookupAttribute("CPUTime") is True assert jobDescription.lookupAttribute("Priority") is True assert jobDescription.lookupAttribute("JobID") is True @@ -325,7 +322,6 @@ def test_WMSClient_rescheduleJob(): # resourceDescription = { # "OwnerGroup": jobDescription.getAttributeString("OwnerGroup"), - # "OwnerDN": jobDescription.getAttributeString("OwnerDN"), # "VirtualOrganization": jobDescription.getAttributeString("VirtualOrganization"), # "CPUTime": jobDescription.getAttributeInt("CPUTime"), # "DIRACVersion": "pippo", @@ -376,7 +372,6 @@ def test_WMSClient_rescheduleJob(): assert jobDescription.lookupAttribute("Owner") is True assert jobDescription.lookupAttribute("OwnerGroup") is True - assert jobDescription.lookupAttribute("OwnerDN") is True assert jobDescription.lookupAttribute("JobID") is True finally: diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py index 6fea463e9ce..bfa539de2c4 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py @@ -44,7 +44,6 @@ "std.out" }; Owner = "owner"; - OwnerDN = "/DN/OF/owner"; OwnerGroup = "ownerGroup"; Priority = 1; StdError = "std.err"; @@ -83,7 +82,7 @@ def test_insertNewJobIntoDB(jobDB): """Test the insertNewJobIntoDB method""" # Act - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") # Assert assert res["OK"], res["Message"] @@ -108,7 +107,7 @@ def test_insertNewJobIntoDB(jobDB): def test_removeJobFromDB(jobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -123,7 +122,7 @@ def test_getJobJDL_original(jobDB: JobDB): """Test of the getJobJDL method with the original parameter set to True""" # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -140,7 +139,7 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): """Test of the getJobJDL method with the original parameter set to True""" # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = int(res["JobID"]) @@ -169,7 +168,6 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): JobRequirements = [ CPUTime = 86400; - OwnerDN = "/DN/OF/owner"; OwnerGroup = "ownerGroup"; UserPriority = 1; VirtualOrganization = "vo"; @@ -183,7 +181,6 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): "std.out" }}; Owner = "owner"; - OwnerDN = "/DN/OF/owner"; OwnerGroup = "ownerGroup"; Priority = 1; StdError = "std.err"; @@ -195,11 +192,11 @@ def test_getJobJDL_nonOriginal(jobDB: JobDB): def test_getJobsAttributes(jobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_1 = int(res["JobID"]) - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_2 = int(res["JobID"]) @@ -216,7 +213,7 @@ def test_getJobsAttributes(jobDB): def test_rescheduleJob(jobDB): # Arrange - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -263,7 +260,7 @@ def test_getCounters(jobDB): def test_heartBeatLogging(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -305,7 +302,7 @@ def test_heartBeatLogging(jobDB): def test_getJobParameters(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID = res["JobID"] @@ -323,10 +320,10 @@ def test_getJobParameters(jobDB): def test_setJobsMajorStatus(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_1 = res["JobID"] - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_2 = res["JobID"] @@ -369,10 +366,10 @@ def test_setJobsMajorStatus(jobDB): def test_attributes(jobDB): - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_1 = res["JobID"] - res = jobDB.insertNewJobIntoDB(jdl, "owner", "/DN/OF/owner", "ownerGroup") + res = jobDB.insertNewJobIntoDB(jdl, "owner", "ownerGroup") assert res["OK"], res["Message"] jobID_2 = res["JobID"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py b/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py index 531d621b28c..2d1087d47de 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py @@ -117,9 +117,9 @@ def test_MySQLandES_jobParameters(): # So now we are using the ES backend # This will still be in MySQL, but first it will look if it's in ES - res = jobMonitoringClient.getJobParameter(jobID, "ParName-fromMySQL") - assert res["OK"], res["Message"] - assert res["Value"] == {"ParName-fromMySQL": "ParValue-fromMySQL"}, res["Value"] + _checkWithRetries( + jobMonitoringClient.getJobParameter, (jobID, "ParName-fromMySQL"), {"ParName-fromMySQL": "ParValue-fromMySQL"} + ) # Now we insert (in ES) res = jobStateUpdateClient.setJobParameter(jobID, "ParName-fromES", "ParValue-fromES") diff --git a/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py b/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py index 07e831795cc..a06dc352cd5 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_SandboxMetadataDB.py @@ -17,25 +17,24 @@ def test_SandboxMetadataDB(): owner = "adminusername" ownerDN = "/C=ch/O=DIRAC/OU=DIRAC CI/CN=ciuser" ownerGroup = "dirac_admin" - enSetup = "enSetup" sbSE = "ProductionSandboxSE" sbPFN = "/sb/pfn/1.tar.bz2" - res = smDB.registerAndGetSandbox(owner, ownerDN, ownerGroup, sbSE, sbPFN, 123) + res = smDB.registerAndGetSandbox(owner, ownerGroup, sbSE, sbPFN, 123) assert res["OK"], res["Message"] sbId, newSandbox = res["Value"] print(f"sbId:{sbId}") print(f"newSandbox:{newSandbox}") assignTo = {owner: [(f"SB:{sbSE}|{sbPFN}", ownerGroup)]} - res = smDB.assignSandboxesToEntities(assignTo, owner, ownerGroup, enSetup) + res = smDB.assignSandboxesToEntities(assignTo, owner, ownerGroup) assert res["OK"], res["Message"] assert res["Value"] == 1 res = smDB.getSandboxOwner(sbSE, sbPFN, ownerDN, ownerGroup) assert res["OK"], res["Message"] - assert res["Value"] == (owner, ownerDN, ownerGroup) + assert res["Value"] == (owner, ownerGroup) res = smDB.getSandboxId(sbSE, sbPFN, owner, ownerGroup) assert res["OK"], res["Message"] @@ -48,5 +47,4 @@ def test_SandboxMetadataDB(): assert res["OK"], res["Message"] res = smDB.getUnusedSandboxes() - print(res) assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py index 634bf3a08b8..717240be35a 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py @@ -23,15 +23,14 @@ tqDB = TaskQueueDB() -def test_basicChain(): +def test_basiChain(): """a basic put - remove""" - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 50000} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} result = tqDB.insertJob(123, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([123]) + result = tqDB.getTaskQueueForJob(123) assert result["OK"] - assert 123 in result["Value"] - tq = result["Value"][123] + tq = result["Value"] result = tqDB.deleteJob(123) assert result["OK"] result = tqDB.cleanOrphanedTaskQueues() @@ -42,16 +41,15 @@ def test_basicChain(): def test_chainWithParameter(): """put - remove with parameters""" - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 50000} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} # first job result = tqDB.insertJob(123, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([123]) + result = tqDB.getTaskQueueForJob(123) assert result["OK"] - tq = result["Value"][123] - result = tqDB.deleteTaskQueue(tq) - assert result["OK"] is False # This will fail because of the foreign key + tq = result["Value"] + result = tqDB.cleanOrphanedTaskQueues() assert result["OK"] result = tqDB.deleteTaskQueueIfEmpty(tq) # this won't delete anything @@ -60,17 +58,15 @@ def test_chainWithParameter(): # second job result = tqDB.insertJob(125, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([125]) - tq = result["Value"][125] - result = tqDB.deleteTaskQueue(tq) - assert result["OK"] is False # This will fail because of the foreign key + result = tqDB.getTaskQueueForJob(125) + tq = result["Value"] result = tqDB.deleteTaskQueueIfEmpty(tq) # this won't delete anything, as both 123 and 125 are in assert result["OK"] # but still it won't fail assert result["Value"] is False result = tqDB.retrieveTaskQueues() assert result["OK"] assert list(result["Value"].values())[0] == { - "OwnerDN": "/my/DN", + "Owner": "userName", "Jobs": 2, "OwnerGroup": "myGroup", "CPUTime": 86400, @@ -92,42 +88,42 @@ def test_chainWithParameter(): def test_chainWithSites(): """put - remove with parameters including sites""" tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Sites": ["LCG.CERN.ch"], } result = tqDB.insertJob(201, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([201]) - tq_job1 = result["Value"][201] + result = tqDB.getTaskQueueForJob(201) + tq_job1 = result["Value"] result = tqDB.insertJob(2011, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([2011]) - tq_job11 = result["Value"][2011] + result = tqDB.getTaskQueueForJob(2011) + tq_job11 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Sites": ["CLOUD.IN2P3.fr"], } result = tqDB.insertJob(203, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([203]) - tq_job2 = result["Value"][203] + result = tqDB.getTaskQueueForJob(203) + tq_job2 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Sites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], } result = tqDB.insertJob(203, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([203]) - tq_job3 = result["Value"][203] + result = tqDB.getTaskQueueForJob(203) + tq_job3 = result["Value"] # matching # this should match everything @@ -166,26 +162,26 @@ def test_chainWithSites(): def test_chainWithBannedSites(): """put - remove with parameters including Banned sites""" tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "BannedSites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], } result = tqDB.insertJob(127, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([127]) - tq_job1 = result["Value"][127] + result = tqDB.getTaskQueueForJob(127) + tq_job1 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "BannedSites": ["CLOUD.IN2P3.fr", "DIRAC.Test.org"], } result = tqDB.insertJob(128, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([128]) - tq_job2 = result["Value"][128] + result = tqDB.getTaskQueueForJob(128) + tq_job2 = result["Value"] # matching # this should match everything @@ -249,57 +245,57 @@ def test_chainWithPlatforms(): # and of course what runs on rhel family does not run on debian family tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["centos7"], } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([1]) - tq_job1 = result["Value"][1] + result = tqDB.getTaskQueueForJob(1) + tq_job1 = result["Value"] assert tq_job1 > 0 result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([2]) - tq_job2 = result["Value"][2] + result = tqDB.getTaskQueueForJob(2) + tq_job2 = result["Value"] assert tq_job1 == tq_job2 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["ubuntu"], } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([3]) - tq_job3 = result["Value"][3] + result = tqDB.getTaskQueueForJob(3) + tq_job3 = result["Value"] assert tq_job3 == tq_job1 + 1 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["centos7", "slc6"], } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([4]) - tq_job4 = result["Value"][4] + result = tqDB.getTaskQueueForJob(4) + tq_job4 = result["Value"] assert tq_job4 == tq_job3 + 1 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["debian", "ubuntu"], } result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([5]) - tq_job5 = result["Value"][5] + result = tqDB.getTaskQueueForJob(5) + tq_job5 = result["Value"] assert tq_job5 == tq_job4 + 1 # We should be in this situation (TQIds are obviously invented): @@ -367,11 +363,11 @@ def test_chainWithPlatforms(): # Now we insert a TQ without platform - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 5000} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} result = tqDB.insertJob(6, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([6]) - tq_job6 = result["Value"][6] + result = tqDB.getTaskQueueForJob(6) + tq_job6 = result["Value"] assert tq_job6 == tq_job5 + 1 # matching for this one @@ -423,11 +419,11 @@ def test_chainWithPlatforms(): # Now we insert a TQ with platform "ANY" (same as no platform) - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platform": "ANY"} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platform": "ANY"} result = tqDB.insertJob(7, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([7]) - tq_job7 = result["Value"][7] + result = tqDB.getTaskQueueForJob(7) + tq_job7 = result["Value"] assert tq_job7 == tq_job6 # would be inserted in the same TQ # matching for this one @@ -493,70 +489,70 @@ def test_chainWithTags(): # 6 : MultiProcessor, 17Processors tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["MultiProcessor"], } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([1]) - tq_job1 = result["Value"][1] + result = tqDB.getTaskQueueForJob(1) + tq_job1 = result["Value"] assert tq_job1 > 0 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["SingleProcessor"], } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([2]) - tq_job2 = result["Value"][2] + result = tqDB.getTaskQueueForJob(2) + tq_job2 = result["Value"] assert tq_job2 > tq_job1 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["SingleProcessor", "MultiProcessor"], } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([3]) - tq_job3 = result["Value"][3] + result = tqDB.getTaskQueueForJob(3) + tq_job3 = result["Value"] assert tq_job3 > tq_job2 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["MultiProcessor", "GPU"], } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([4]) - tq_job4 = result["Value"][4] + result = tqDB.getTaskQueueForJob(4) + tq_job4 = result["Value"] assert tq_job4 > tq_job3 - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 5000} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([5]) - tq_job5 = result["Value"][5] + result = tqDB.getTaskQueueForJob(5) + tq_job5 = result["Value"] assert tq_job5 > tq_job4 tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["MultiProcessor", "17Processors"], } result = tqDB.insertJob(6, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([6]) - tq_job6 = result["Value"][6] + result = tqDB.getTaskQueueForJob(6) + tq_job6 = result["Value"] assert tq_job6 > tq_job5 # We should be in this situation (TQIds are obviously invented): @@ -717,33 +713,33 @@ def test_chainWithTagsAndPlatforms(): # platform only tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["centos7"], } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([1]) - tq_job1 = result["Value"][1] + result = tqDB.getTaskQueueForJob(1) + tq_job1 = result["Value"] assert tq_job1 > 0 # Tag only tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Tags": ["MultiProcessor"], } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([2]) - tq_job2 = result["Value"][2] + result = tqDB.getTaskQueueForJob(2) + tq_job2 = result["Value"] assert tq_job2 > tq_job1 # Platforms and Tag tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["centos7"], @@ -751,13 +747,13 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([3]) - tq_job3 = result["Value"][3] + result = tqDB.getTaskQueueForJob(3) + tq_job3 = result["Value"] assert tq_job3 > tq_job2 # Tag and another platform tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platforms": ["slc6"], @@ -765,8 +761,8 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([4]) - tq_job4 = result["Value"][4] + result = tqDB.getTaskQueueForJob(4) + tq_job4 = result["Value"] assert tq_job4 > tq_job3 # We should be in this situation (TQIds are obviously invented): @@ -854,7 +850,7 @@ def test_ComplexMatching(): # Let's first insert few jobs (no tags, for now, and always a platform) tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "admin", "CPUTime": 5000, "Sites": ["Site_1", "Site_2"], @@ -862,11 +858,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([1]) - tq_job1 = result["Value"][1] + result = tqDB.getTaskQueueForJob(1) + tq_job1 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "prod", "CPUTime": 5000, "Sites": ["Site_1"], @@ -874,11 +870,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([2]) - tq_job2 = result["Value"][2] + result = tqDB.getTaskQueueForJob(2) + tq_job2 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "user", "CPUTime": 5000, "Sites": ["Site_2"], @@ -886,11 +882,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([3]) - tq_job3 = result["Value"][3] + result = tqDB.getTaskQueueForJob(3) + tq_job3 = result["Value"] tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "user", "CPUTime": 5000, "Sites": ["Site_1", "Site_2"], @@ -898,8 +894,8 @@ def test_ComplexMatching(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([4]) - tq_job4 = result["Value"][4] + result = tqDB.getTaskQueueForJob(4) + tq_job4 = result["Value"] # now let's try some matching @@ -1019,15 +1015,15 @@ def test_ComplexMatching(): # now inserting one without platform, and try again tqDefDict = { - "OwnerDN": "/my/DN", + "Owner": "userName", "OwnerGroup": "user", "CPUTime": 5000, "Sites": ["Site_1", "Site_2"], } result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] - result = tqDB.getTaskQueueForJobs([5]) - tq_job5 = result["Value"][5] + result = tqDB.getTaskQueueForJob(5) + tq_job5 = result["Value"] result = tqDB.matchAndGetTaskQueue( { @@ -1108,16 +1104,13 @@ def test_ComplexMatching(): def test_TQ(): """test of various functions""" - tqDefDict = {"OwnerDN": "/my/DN", "OwnerGroup": "myGroup", "CPUTime": 50000} + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} tqDB.insertJob(123, tqDefDict, 10) - result = tqDB.getNumTaskQueues() - assert result["OK"] - assert result["Value"] == 1 result = tqDB.retrieveTaskQueues() assert result["OK"] assert list(result["Value"].values())[0] == { - "OwnerDN": "/my/DN", + "Owner": "userName", "Jobs": 1, "OwnerGroup": "myGroup", "CPUTime": 86400,