From e9ec307c1861806e3f7e6f69020526a0aab7cdc0 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 8 Aug 2024 16:36:52 +0200 Subject: [PATCH] test: added a unit test --- .../Client/JobStatus.py | 11 ++-- .../Service/JobManagerHandler.py | 24 +++++--- .../Service/tests/Test_JobManager.py | 58 +++++++++++++++++++ 3 files changed, 77 insertions(+), 16 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Service/tests/Test_JobManager.py diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py index 2fac939997b..28bc9a75a5e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py @@ -6,6 +6,8 @@ from DIRAC.Core.Utilities.StateMachine import State, StateMachine from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient + #: SUBMITTING = "Submitting" @@ -129,7 +131,7 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito return S_OK() -def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None): +def filterJobStateTransition(jobIDs, candidateState): """Given a list of jobIDs, return a list that are allowed to transition to the given candidate state. """ @@ -138,12 +140,7 @@ def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None): if not isinstance(jobIDs, list): jobIDs = [jobIDs] - if not jobMonitoringClient: - from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient - - jobMonitoringClient = JobMonitoringClient() - - res = jobMonitoringClient.getJobsStatus(jobIDs) + res = JobMonitoringClient().getJobsStatus(jobIDs) if not res["OK"]: return res diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 453cb488fa7..f6f56837e95 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -19,6 +19,7 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_DELETE, @@ -302,6 +303,9 @@ def __getJobList(jobInput): :return : a list of int job IDs """ + if not jobInput: + return [] + if isinstance(jobInput, int): return [jobInput] if isinstance(jobInput, str): @@ -491,7 +495,7 @@ def __killJob(self, jobID, sendKillCommand=True): return S_OK() - def __kill_delete_jobs(self, jobIDList, right): + def _kill_delete_jobs(self, jobIDList, right): """Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary :param list jobIDList: job IDs @@ -501,24 +505,25 @@ def __kill_delete_jobs(self, jobIDList, right): """ jobList = self.__getJobList(jobIDList) if not jobList: - return S_ERROR("Invalid job specification: " + str(jobIDList)) + self.log.warn("No jobs specified") + return S_OK([]) validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right) badIDs = [] + killJobList = [] + deleteJobList = [] if validJobList: - killJobList = [] - deleteJobList = [] # Get the jobs allowed to transition to the Killed state - filterRes = JobStatus.filterJobStateTransition(validJobList, JobStatus.KILLED) + filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED) if not filterRes["OK"]: return filterRes killJobList.extend(filterRes["Value"]) if not right == RIGHT_KILL: # Get the jobs allowed to transition to the Deleted state - filterRes = JobStatus.filterJobStateTransition(validJobList, JobStatus.DELETED) + filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED) if not filterRes["OK"]: return filterRes deleteJobList.extend(filterRes["Value"]) @@ -556,7 +561,8 @@ def __kill_delete_jobs(self, jobIDList, right): result["FailedJobIDs"] = badIDs return result - result = S_OK(validJobList) + jobsList = killJobList if right == RIGHT_KILL else deleteJobList + result = S_OK(jobsList) result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired() if invalidJobList: @@ -575,7 +581,7 @@ def export_deleteJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self.__kill_delete_jobs(jobIDs, RIGHT_DELETE) + return self._kill_delete_jobs(jobIDs, RIGHT_DELETE) ########################################################################### types_killJob = [] @@ -588,7 +594,7 @@ def export_killJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self.__kill_delete_jobs(jobIDs, RIGHT_KILL) + return self._kill_delete_jobs(jobIDs, RIGHT_KILL) ########################################################################### types_resetJob = [] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/tests/Test_JobManager.py b/src/DIRAC/WorkloadManagementSystem/Service/tests/Test_JobManager.py new file mode 100644 index 00000000000..9cc04209565 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Service/tests/Test_JobManager.py @@ -0,0 +1,58 @@ +""" unit test (pytest) of JobManager service +""" + +from unittest.mock import MagicMock +import pytest + +from DIRAC import gLogger + +gLogger.setLevel("DEBUG") + +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( + RIGHT_DELETE, + RIGHT_KILL, +) + +# sut +from DIRAC.WorkloadManagementSystem.Service.JobManagerHandler import JobManagerHandlerMixin + +# mocks +jobPolicy_mock = MagicMock() +jobDB_mock = MagicMock() +jobDB_mock.getJobsAttributes.return_value = {"OK": True, "Value": {}} + + +@pytest.mark.parametrize( + "jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value", + [ + ([], RIGHT_KILL, ([], [], [], []), [], True, []), + ([], RIGHT_DELETE, ([], [], [], []), [], True, []), + (1, RIGHT_KILL, ([], [], [], []), [], True, []), + (1, RIGHT_KILL, ([1], [], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([], [], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([1], [], [], []), [], True, []), + (1, RIGHT_KILL, ([1], [], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [2], [], []), [1], True, [1]), + ([1, 2], RIGHT_KILL, ([1], [2], [], []), [], True, []), + ([1, 2], RIGHT_KILL, ([1,2], [], [], []), [1,2], True, [1,2]), + ], +) +def test___kill_delete_jobs(mocker, jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value): + mocker.patch( + "DIRAC.WorkloadManagementSystem.Service.JobManagerHandler.filterJobStateTransition", + return_value={"OK": True, "Value": filteredJobsList}, + ) + + JobManagerHandlerMixin.log = gLogger + JobManagerHandlerMixin.jobPolicy = jobPolicy_mock + JobManagerHandlerMixin.jobDB = jobDB_mock + JobManagerHandlerMixin.taskQueueDB = MagicMock() + + jobPolicy_mock.evaluateJobRights.return_value = lists + + jm = JobManagerHandlerMixin() + + res = jm._kill_delete_jobs(jobIDs_list, right) + assert res["OK"] == expected_res + assert res["Value"] == expected_value