Skip to content

Commit

Permalink
test: added a unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Aug 8, 2024
1 parent e445469 commit e9ec307
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 16 deletions.
11 changes: 4 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
from DIRAC.Core.Utilities.Decorators import deprecated

from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient


#:
SUBMITTING = "Submitting"
Expand Down Expand Up @@ -129,7 +131,7 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito
return S_OK()


def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
def filterJobStateTransition(jobIDs, candidateState):
"""Given a list of jobIDs, return a list that are allowed to transition
to the given candidate state.
"""
Expand All @@ -138,12 +140,7 @@ def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]

if not jobMonitoringClient:
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient

jobMonitoringClient = JobMonitoringClient()

res = jobMonitoringClient.getJobsStatus(jobIDs)
res = JobMonitoringClient().getJobsStatus(jobIDs)
if not res["OK"]:
return res

Expand Down
24 changes: 15 additions & 9 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
Expand Down Expand Up @@ -302,6 +303,9 @@ def __getJobList(jobInput):
:return : a list of int job IDs
"""

if not jobInput:
return []

if isinstance(jobInput, int):
return [jobInput]
if isinstance(jobInput, str):
Expand Down Expand Up @@ -491,7 +495,7 @@ def __killJob(self, jobID, sendKillCommand=True):

return S_OK()

def __kill_delete_jobs(self, jobIDList, right):
def _kill_delete_jobs(self, jobIDList, right):
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
:param list jobIDList: job IDs
Expand All @@ -501,24 +505,25 @@ def __kill_delete_jobs(self, jobIDList, right):
"""
jobList = self.__getJobList(jobIDList)
if not jobList:
return S_ERROR("Invalid job specification: " + str(jobIDList))
self.log.warn("No jobs specified")
return S_OK([])

validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right)

badIDs = []

killJobList = []
deleteJobList = []
if validJobList:
killJobList = []
deleteJobList = []
# Get the jobs allowed to transition to the Killed state
filterRes = JobStatus.filterJobStateTransition(validJobList, JobStatus.KILLED)
filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED)
if not filterRes["OK"]:
return filterRes
killJobList.extend(filterRes["Value"])

if not right == RIGHT_KILL:
# Get the jobs allowed to transition to the Deleted state
filterRes = JobStatus.filterJobStateTransition(validJobList, JobStatus.DELETED)
filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED)
if not filterRes["OK"]:
return filterRes
deleteJobList.extend(filterRes["Value"])
Expand Down Expand Up @@ -556,7 +561,8 @@ def __kill_delete_jobs(self, jobIDList, right):
result["FailedJobIDs"] = badIDs
return result

result = S_OK(validJobList)
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
result = S_OK(jobsList)
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()

if invalidJobList:
Expand All @@ -575,7 +581,7 @@ def export_deleteJob(self, jobIDs):
:return: S_OK/S_ERROR
"""

return self.__kill_delete_jobs(jobIDs, RIGHT_DELETE)
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE)

###########################################################################
types_killJob = []
Expand All @@ -588,7 +594,7 @@ def export_killJob(self, jobIDs):
:return: S_OK/S_ERROR
"""

return self.__kill_delete_jobs(jobIDs, RIGHT_KILL)
return self._kill_delete_jobs(jobIDs, RIGHT_KILL)

###########################################################################
types_resetJob = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
""" unit test (pytest) of JobManager service
"""

from unittest.mock import MagicMock
import pytest

from DIRAC import gLogger

gLogger.setLevel("DEBUG")

from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
)

# sut
from DIRAC.WorkloadManagementSystem.Service.JobManagerHandler import JobManagerHandlerMixin

# mocks
jobPolicy_mock = MagicMock()
jobDB_mock = MagicMock()
jobDB_mock.getJobsAttributes.return_value = {"OK": True, "Value": {}}


@pytest.mark.parametrize(
"jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value",
[
([], RIGHT_KILL, ([], [], [], []), [], True, []),
([], RIGHT_DELETE, ([], [], [], []), [], True, []),
(1, RIGHT_KILL, ([], [], [], []), [], True, []),
(1, RIGHT_KILL, ([1], [], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([], [], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([1], [], [], []), [], True, []),
(1, RIGHT_KILL, ([1], [], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [2], [], []), [1], True, [1]),
([1, 2], RIGHT_KILL, ([1], [2], [], []), [], True, []),
([1, 2], RIGHT_KILL, ([1,2], [], [], []), [1,2], True, [1,2]),
],
)
def test___kill_delete_jobs(mocker, jobIDs_list, right, lists, filteredJobsList, expected_res, expected_value):
mocker.patch(
"DIRAC.WorkloadManagementSystem.Service.JobManagerHandler.filterJobStateTransition",
return_value={"OK": True, "Value": filteredJobsList},
)

JobManagerHandlerMixin.log = gLogger
JobManagerHandlerMixin.jobPolicy = jobPolicy_mock
JobManagerHandlerMixin.jobDB = jobDB_mock
JobManagerHandlerMixin.taskQueueDB = MagicMock()

jobPolicy_mock.evaluateJobRights.return_value = lists

jm = JobManagerHandlerMixin()

res = jm._kill_delete_jobs(jobIDs_list, right)
assert res["OK"] == expected_res
assert res["Value"] == expected_value

0 comments on commit e9ec307

Please sign in to comment.