Skip to content

Commit

Permalink
Merge pull request #6566 from fstagni/11_removeDN
Browse files Browse the repository at this point in the history
[8.1] WMS: Working with usernames instead of OwnerDNs
  • Loading branch information
fstagni authored Jul 6, 2023
2 parents e68f939 + a71b16d commit 54db53b
Show file tree
Hide file tree
Showing 27 changed files with 384 additions and 526 deletions.
18 changes: 14 additions & 4 deletions docs/source/DeveloperGuide/CodeTesting/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -309,23 +309,33 @@ Running the above might take a while. Supposing you are interested in running on
./integration_tests.py prepare-environment [FLAGS]
./integration_tests.py install-server
which (in some minutes) will give you a fully dockerized server setup (`docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`). Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`.
The first thing to do is that you should first login in the docker container, by doing:
which (in some minutes) will give you a fully dockerized server setup
(`docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`.
Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`,
the first thing to do is that you should first login in the docker container, by doing:

.. code-block:: bash
./integration_tests.py exec-server
The installations automatically pick up external changes to the DIRAC code and tests)
(The docker installation automatically picks up external changes to the DIRAC code and tests)

Now you can run the test with:

.. code-block:: bash
pytest LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py
pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py
You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/`

You can also login in client and mysql with:

.. code-block:: bash
./integration_tests.py exec-client
./integration_tests.py exec-mysql
Validation and System tests
---------------------------
Expand Down
12 changes: 0 additions & 12 deletions src/DIRAC/Interfaces/API/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,18 +702,6 @@ def setOwnerGroup(self, ownerGroup):
self._addParameter(self.workflow, "OwnerGroup", "JDL", ownerGroup, "User specified owner group.")
return S_OK()

#############################################################################
def setOwnerDN(self, ownerDN):
"""Developer function.
Allows to force expected owner DN of proxy.
"""
if not isinstance(ownerDN, str):
return self._reportError("Expected string for job owner DN", **{"ownerGroup": ownerDN})

self._addParameter(self.workflow, "OwnerDN", "JDL", ownerDN, "User specified owner DN.")
return S_OK()

#############################################################################
def setType(self, jobType):
"""Developer function.
Expand Down
8 changes: 4 additions & 4 deletions src/DIRAC/Interfaces/scripts/dirac_wms_job_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
'MinorStatus': 'Execution Complete',
'OSandboxReadyFlag': 'False',
'Owner': 'vhamar',
'OwnerDN': '/O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar',
'OwnerGroup': 'eela_user',
'RescheduleCounter': '0',
'RescheduleTime': 'None',
Expand All @@ -42,7 +41,8 @@
'UserPriority': '1',
'VerifiedFlag': 'True'}
"""
import DIRAC
from DIRAC import exit as dExit
from DIRAC import gLogger
from DIRAC.Core.Base.Script import Script


Expand All @@ -65,9 +65,9 @@ def main():
exitCode = 2

for error in errorList:
print("ERROR %s: %s" % error)
gLogger.error(f"{error}")

DIRAC.exit(exitCode)
dExit(exitCode)


if __name__ == "__main__":
Expand Down
4 changes: 1 addition & 3 deletions src/DIRAC/Interfaces/scripts/dirac_wms_job_get_jdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
'Executable': '/bin/ls',
'JobID': '1',
'JobName': 'DIRAC_vhamar_602138',
'JobRequirements': '[OwnerDN = /O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar;
OwnerGroup = eela_user;
'JobRequirements': '[OwnerGroup = eela_user;
Setup = EELA-Production;
UserPriority = 1;
CPUTime = 0 ]',
'OutputSandbox': ['std.out', 'std.err'],
'Owner': 'vhamar',
'OwnerDN': '/O=GRID-FR/C=FR/O=CNRS/OU=CPPM/CN=Vanessa Hamar',
'OwnerGroup': 'eela_user',
'Priority': '1'}
"""
Expand Down
12 changes: 4 additions & 8 deletions src/DIRAC/TransformationSystem/Client/WorkflowTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,17 @@ def prepareTransformationTasks(self, transBody, taskDict, owner="", ownerGroup="
ownerDN = res["Value"][0]

if bulkSubmissionFlag:
return self.__prepareTasksBulk(transBody, taskDict, owner, ownerGroup, ownerDN)
return self.__prepareTasksBulk(transBody, taskDict, owner, ownerGroup)
# not a bulk submission
return self.__prepareTasks(transBody, taskDict, owner, ownerGroup, ownerDN)
return self.__prepareTasks(transBody, taskDict, owner, ownerGroup)

def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup):
"""Prepare transformation tasks with a single job object for bulk submission
:param str transBody: transformation job template
:param dict taskDict: dictionary of per task parameters
:param str owner: owner of the transformation
:param str ownerGroup: group of the owner of the transformation
:param str ownerDN: DN of the owner of the transformation
:return: S_OK/S_ERROR with updated taskDict
"""
Expand All @@ -137,7 +136,6 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
self._logVerbose(f"Setting job owner:group to {owner}:{ownerGroup}", transID=transID, method=method)
oJob.setOwner(owner)
oJob.setOwnerGroup(ownerGroup)
oJob.setOwnerDN(ownerDN)

try:
site = oJob.workflow.findParameter("Site").getValue()
Expand Down Expand Up @@ -253,14 +251,13 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
taskDict["BulkJobObject"] = oJob
return S_OK(taskDict)

def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN):
def __prepareTasks(self, transBody, taskDict, owner, ownerGroup):
"""Prepare transformation tasks with a job object per task
:param str transBody: transformation job template
:param dict taskDict: dictionary of per task parameters
:param owner: owner of the transformation
:param str ownerGroup: group of the owner of the transformation
:param str ownerDN: DN of the owner of the transformation
:return: S_OK/S_ERROR with updated taskDict
"""
Expand All @@ -275,7 +272,6 @@ def __prepareTasks(self, transBody, taskDict, owner, ownerGroup, ownerDN):
oJobTemplate = self.jobClass(transBody)
oJobTemplate.setOwner(owner)
oJobTemplate.setOwnerGroup(ownerGroup)
oJobTemplate.setOwnerDN(ownerDN)

try:
site = oJobTemplate.workflow.findParameter("Site").getValue()
Expand Down
48 changes: 25 additions & 23 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from diraccfg import CFG

from DIRAC import S_OK, S_ERROR, gConfig, rootPath, siteName
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
Expand Down Expand Up @@ -202,7 +203,7 @@ def execute(self):
self.matchFailedCount = 0

# Check matcher information returned
matcherParams = ["JDL", "DN", "Group"]
matcherParams = ["JDL", "Owner", "Group"]
matcherInfo = jobRequest["Value"]
jobID = matcherInfo["JobID"]
self.jobReport.setJob(jobID)
Expand All @@ -217,7 +218,7 @@ def execute(self):

jobJDL = matcherInfo["JDL"]
jobGroup = matcherInfo["Group"]
ownerDN = matcherInfo["DN"]
owner = matcherInfo["Owner"]
ceDict = matcherInfo["CEDict"]
matchTime = matcherInfo["matchTime"]

Expand All @@ -242,7 +243,7 @@ def execute(self):
jobType = submissionParams["jobType"]

self.log.verbose("Job request successful: \n", jobRequest["Value"])
self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}")
self.log.info("Received", f"JobID={jobID}, JobType={jobType}, Owner={owner}, JobGroup={jobGroup}")
self.jobCount += 1
self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False)
if "BOINC_JOB_ID" in os.environ:
Expand All @@ -253,6 +254,7 @@ def execute(self):
)

self.jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
ownerDN = getDNForUsername(owner)["Value"]
result_setupProxy = self._setupProxy(ownerDN, jobGroup)
if not result_setupProxy["OK"]:
result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"])
Expand Down Expand Up @@ -472,26 +474,26 @@ def _setupProxy(self, ownerDN, ownerGroup):
self.log.error("Failed to setup proxy", proxyResult["Message"])
return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}")
return S_OK(proxyResult["Value"])
else:
ret = getProxyInfo(disableVOMS=True)
if not ret["OK"]:
self.log.error("Invalid Proxy", ret["Message"])
return S_ERROR("Invalid Proxy")

proxyChain = ret["Value"]["chain"]
if "groupProperties" not in ret["Value"]:
print(ret["Value"])
print(proxyChain.dumpAllToString())
self.log.error("Invalid Proxy", "Group has no properties defined")
return S_ERROR("Proxy has no group properties defined")

groupProps = ret["Value"]["groupProperties"]
if Properties.GENERIC_PILOT in groupProps or Properties.PILOT in groupProps:
proxyResult = self._requestProxyFromProxyManager(ownerDN, ownerGroup)
if not proxyResult["OK"]:
self.log.error("Invalid Proxy", proxyResult["Message"])
return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}")
proxyChain = proxyResult["Value"]

ret = getProxyInfo(disableVOMS=True)
if not ret["OK"]:
self.log.error("Invalid Proxy", ret["Message"])
return S_ERROR("Invalid Proxy")

proxyChain = ret["Value"]["chain"]
if "groupProperties" not in ret["Value"]:
print(ret["Value"])
print(proxyChain.dumpAllToString())
self.log.error("Invalid Proxy", "Group has no properties defined")
return S_ERROR("Proxy has no group properties defined")

groupProps = ret["Value"]["groupProperties"]
if Properties.GENERIC_PILOT in groupProps or Properties.PILOT in groupProps:
proxyResult = self._requestProxyFromProxyManager(ownerDN, ownerGroup)
if not proxyResult["OK"]:
self.log.error("Invalid Proxy", proxyResult["Message"])
return S_ERROR(f"Failed to setup proxy: {proxyResult['Message']}")
proxyChain = proxyResult["Value"]

return S_OK(proxyChain)

Expand Down
63 changes: 26 additions & 37 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import datetime
import os

import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
Expand Down Expand Up @@ -57,6 +57,7 @@ def __init__(self, *args, **kwargs):
self.prodTypes = []
self.removeStatusDelay = {}
self.removeStatusDelayHB = {}
self.maxHBJobsAtOnce = 0

#############################################################################
def initialize(self):
Expand All @@ -80,7 +81,7 @@ def initialize(self):
self.removeStatusDelayHB[JobStatus.DONE] = self.am_getOption("RemoveStatusDelayHB/Done", -1)
self.removeStatusDelayHB[JobStatus.KILLED] = self.am_getOption("RemoveStatusDelayHB/Killed", -1)
self.removeStatusDelayHB[JobStatus.FAILED] = self.am_getOption("RemoveStatusDelayHB/Failed", -1)
self.maxHBJobsAtOnce = self.am_getOption("MaxHBJobsAtOnce", 0)
self.maxHBJobsAtOnce = self.am_getOption("MaxHBJobsAtOnce", self.maxHBJobsAtOnce)

return S_OK()

Expand All @@ -93,7 +94,7 @@ def _getAllowedJobTypes(self):
for jobType in result["Value"]:
if jobType not in self.prodTypes:
cleanJobTypes.append(jobType)
self.log.notice(f"JobTypes to clean {cleanJobTypes}")
self.log.notice("JobTypes to clean", cleanJobTypes)
return S_OK(cleanJobTypes)

def execute(self):
Expand All @@ -102,7 +103,7 @@ def execute(self):
# First, fully remove jobs in JobStatus.DELETED state
result = self.removeDeletedJobs()
if not result["OK"]:
self.log.error(f"Failed to remove jobs with status {JobStatus.DELETED}")
self.log.error("Failed to remove jobs with status", JobStatus.DELETED)

# Second: set the status to JobStatus.DELETED for certain jobs

Expand All @@ -117,8 +118,7 @@ def execute(self):

baseCond = {"JobType": result["Value"]}
# Delete jobs with final status
for status in self.removeStatusDelay:
delay = self.removeStatusDelay[status]
for status, delay in self.removeStatusDelay.items():
if delay < 0:
# Negative delay means don't delete anything...
continue
Expand Down Expand Up @@ -185,26 +185,7 @@ def removeDeletedJobs(self):
if not jobList:
return S_OK()

ownerJobsDict = self._getOwnerJobsDict(jobList)

fail = False
for owner, jobsList in ownerJobsDict.items():
ownerDN = owner.split(";")[0]
ownerGroup = owner.split(";")[1]
self.log.verbose("Attempting to remove jobs", f"(n={len(jobsList)}) for {ownerDN} : {ownerGroup}")
wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup)
result = wmsClient.removeJob(jobsList)
if not result["OK"]:
self.log.error(
"Could not remove jobs",
f"for {ownerDN} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}",
)
fail = True

if fail:
return S_ERROR()

return S_OK()
return self._deleteRemoveJobs(jobList, remove=True)

def deleteJobsByStatus(self, condDict, delay=False):
"""Sets the job status to "DELETED" for jobs in condDict.
Expand Down Expand Up @@ -234,19 +215,29 @@ def deleteJobsByStatus(self, condDict, delay=False):
if not jobList:
return S_OK()

return self._deleteRemoveJobs(jobList)

def _deleteRemoveJobs(self, jobList, remove=False):
"""Delete or removes a jobList"""
ownerJobsDict = self._getOwnerJobsDict(jobList)

fail = False
for owner, jobsList in ownerJobsDict.items():
ownerDN = owner.split(";")[0]
ownerGroup = owner.split(";")[1]
self.log.verbose("Attempting to delete jobs", f"(n={len(jobsList)}) for {ownerDN} : {ownerGroup}")
wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup)
result = wmsClient.deleteJob(jobsList)
user, ownerGroup = owner.split(";", maxsplit=1)
self.log.verbose("Attempting to delete jobs", f"(n={len(jobsList)}) for {user} : {ownerGroup}")
res = getDNForUsername(user)
if not res["OK"]:
self.log.error("No DN found", f"for {user}")
return res
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
if remove:
result = wmsClient.removeJob(jobsList)
else:
result = wmsClient.deleteJob(jobsList)
if not result["OK"]:
self.log.error(
"Could not delete jobs",
f"for {ownerDN} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}",
"Could not {'remove' if remove else 'delete'} jobs",
f"for {user} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}",
)
fail = True

Expand Down Expand Up @@ -279,7 +270,7 @@ def _getOwnerJobsDict(self, jobList):
:returns: a dict with a grouping of them by owner, e.g.{'dn;group': [1, 3, 4], 'dn;group_1': [5], 'dn_1;group': [2]}
"""
res = self.jobDB.getJobsAttributes(jobList, ["OwnerDN", "OwnerGroup"])
res = self.jobDB.getJobsAttributes(jobList, ["Owner", "OwnerGroup"])
if not res["OK"]:
self.log.error("Could not get the jobs attributes", res["Message"])
return res
Expand Down Expand Up @@ -327,8 +318,7 @@ def deleteJobOversizedSandbox(self, jobIDList):
else:
successful[jobID] = lfn

result = {"Successful": successful, "Failed": failed}
return S_OK(result)
return S_OK({"Successful": successful, "Failed": failed})

def __setRemovalRequest(self, lfn, owner, ownerGroup):
"""Set removal request with the given credentials"""
Expand Down Expand Up @@ -369,4 +359,3 @@ def removeHeartBeatLoggingInfo(self, status, delayDays):
self.log.error("Failed to delete from HeartBeatLoggingInfo", result["Message"])
else:
self.log.info("Deleted HeartBeatLogging info")
return
Loading

0 comments on commit 54db53b

Please sign in to comment.