Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8.1] Make Job.setNumberOfProcessors safer to use #7194

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 38 additions & 48 deletions src/DIRAC/Interfaces/API/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
COMPONENT_NAME = "/Interfaces/API/Job"


class BadJobParameterError(ValueError):
pass


class Job(API):
"""DIRAC jobs"""

Expand Down Expand Up @@ -517,7 +521,10 @@ def setDestination(self, destination):

#############################################################################
def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=None, maxNumberOfProcessors=None):
"""Helper function.
"""Helper function to set the number of processors required by the job.

The DIRAC_JOB_PROCESSORS environment variable can be used by the job to
determine how many processors are actually assigned.

Example usage:

Expand Down Expand Up @@ -551,75 +558,50 @@ def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=N

>>> job = Job()
>>> job.setNumberOfProcessors(numberOfProcessors=3, maxNumberOfProcessors=4)
will lead to ignore the second parameter
will result in a BadJobParameterError

>>> job = Job()
>>> job.setNumberOfProcessors(numberOfProcessors=3, minNumberOfProcessors=2)
will lead to ignore the second parameter
will result in a BadJobParameterError

:param int processors: number of processors required by the job (exact number, unless a min/max are set)
:param int minNumberOfProcessors: optional min number of processors the job applications can use
:param int maxNumberOfProcessors: optional max number of processors the job applications can use

:return: S_OK/S_ERROR
"""
if numberOfProcessors:
if not minNumberOfProcessors:
nProc = numberOfProcessors
else:
nProc = max(numberOfProcessors, minNumberOfProcessors)
if nProc > 1:
self._addParameter(
self.workflow, "NumberOfProcessors", "JDL", nProc, "Exact number of processors requested"
)
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
"JDL",
nProc,
"Max Number of processors the job applications may use",
)
return S_OK()
:return: S_OK

if maxNumberOfProcessors and not minNumberOfProcessors:
minNumberOfProcessors = 1
:raises BadJobParameterError: If the function arguments are not valid.
"""
# If min and max are the same then that's the same as setting numberOfProcessors
if minNumberOfProcessors and maxNumberOfProcessors and minNumberOfProcessors == maxNumberOfProcessors:
numberOfProcessors = minNumberOfProcessors
minNumberOfProcessors = maxNumberOfProcessors = None

if minNumberOfProcessors and maxNumberOfProcessors and minNumberOfProcessors >= maxNumberOfProcessors:
minNumberOfProcessors = maxNumberOfProcessors
if numberOfProcessors is not None:
if minNumberOfProcessors is not None:
raise BadJobParameterError("minNumberOfProcessors cannot be used with numberOfProcessors")
if maxNumberOfProcessors is not None:
raise BadJobParameterError("maxNumberOfProcessors cannot be used with numberOfProcessors")

if (
minNumberOfProcessors
and maxNumberOfProcessors
and minNumberOfProcessors == maxNumberOfProcessors
and minNumberOfProcessors > 1
):
self._addParameter(
self.workflow,
"NumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Exact number of processors requested",
self.workflow, "NumberOfProcessors", "JDL", numberOfProcessors, "Exact number of processors requested"
)
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
"JDL",
minNumberOfProcessors,
numberOfProcessors,
"Max Number of processors the job applications may use",
)
return S_OK()

# By this point there should be a min
self._addParameter(
self.workflow,
"MinNumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Min Number of processors the job applications may use",
)
if minNumberOfProcessors is None and maxNumberOfProcessors is None:
return S_OK()

# If not set, will be "all"
if maxNumberOfProcessors:
minNumberOfProcessors = minNumberOfProcessors or 1
if maxNumberOfProcessors is not None:
if maxNumberOfProcessors < minNumberOfProcessors:
raise BadJobParameterError("minNumberOfProcessors must be less than or equal to maxNumberOfProcessors")
self._addParameter(
self.workflow,
"MaxNumberOfProcessors",
Expand All @@ -628,6 +610,14 @@ def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=N
"Max Number of processors the job applications may use",
)

self._addParameter(
self.workflow,
"MinNumberOfProcessors",
"JDL",
minNumberOfProcessors,
"Min Number of processors the job applications may use",
)

return S_OK()

#############################################################################
Expand Down
32 changes: 22 additions & 10 deletions src/DIRAC/Interfaces/API/test/Test_JobAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest

from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Interfaces.API.Job import Job
from DIRAC.Interfaces.API.Job import Job, BadJobParameterError


def test_basicJob():
Expand Down Expand Up @@ -74,31 +74,43 @@ def test_SimpleParametricJob():
"proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc",
[
(4, None, None, 4, None, 4),
(4, 2, None, 4, None, 4),
(4, 2, 8, 4, None, 4),
(4, 8, 6, 8, None, 8), # non-sense
(None, 2, 8, None, 2, 8),
(None, 1, None, None, 1, None),
(None, None, 8, None, 1, 8),
(None, 8, 8, 8, None, 8),
(None, 12, 8, 8, None, 8), # non-sense
],
)
def test_setNumberOfProcessors(proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc):
# Arrange
def test_setNumberOfProcessors_successful(proc, minProc, maxProc, expectedProc, expectedMinProc, expectedMaxProc):
job = Job()

# Act
res = job.setNumberOfProcessors(proc, minProc, maxProc)

# Assert
assert res["OK"], res["Message"]
jobDescription = ClassAd(f"[{job._toJDL()}]")
assert expectedProc == jobDescription.getAttributeInt("NumberOfProcessors")
assert expectedMinProc == jobDescription.getAttributeInt("MinNumberOfProcessors")
assert expectedMaxProc == jobDescription.getAttributeInt("MaxNumberOfProcessors")


@pytest.mark.parametrize(
"proc, minProc, maxProc",
[
(4, 2, None),
(4, 2, 8),
(4, 8, 6),
(None, 12, 8),
],
)
def test_setNumberOfProcessors_unsuccessful(proc, minProc, maxProc):
job = Job()
with pytest.raises(BadJobParameterError):
job.setNumberOfProcessors(proc, minProc, maxProc)

jobDescription = ClassAd(f"[{job._toJDL()}]")
assert jobDescription.getAttributeInt("NumberOfProcessors") is None
assert jobDescription.getAttributeInt("MinNumberOfProcessors") is None
assert jobDescription.getAttributeInt("MaxNumberOfProcessors") is None


@pytest.mark.parametrize(
"sites, expectedSites",
[
Expand Down
Loading