Skip to content

Commit

Permalink
fix: apply suggestions to fix the PJA
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Aug 26, 2024
1 parent 2dfe036 commit b03b066
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ Agents
FailedQueueCycleFactor = 10
# How the agent manages the submission of the jobs
SubmissionPolicy = JobWrapper
# The CVMFS location to be used for the job execution on the remote site
CVMFSLocation = "/cvmfs/dirac.egi.eu/dirac/pro"
# Clean the task after the job is done
CleanTask = True
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,45 @@

class JobExecutionCoordinator:
"""
Abstract class for job execution coordinators.
Job Execution Coordinator Class.
This class is responsible for pre-processing and post-processing jobs before and after execution.
It should be implemented by the community job execution coordinator.
It is used by the JobWrapper to handle the execution of jobs.
This class serves as the base class for job execution coordinators, providing
the necessary methods for pre-processing and post-processing jobs before and after
their execution.
Communities who need to implement specific workflows for job pre-processing
and post-processing in their Dirac extension should inherit from this class and
override the relevant methods with their custom implementations.
The `JobExecutionCoordinator` class is primarily used by the `JobWrapper` to manage
the execution of jobs, ensuring that all necessary preparations are made before the
job starts, and that any required cleanup or data handling is performed after the
job completes.
**Example Usage in your Extension:**
.. code-block:: python
from DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator import (
JobExecutionCoordinator as DIRACJobExecutionCoordinator
)
class JobExecutionCoordinator(DIRACJobExecutionCoordinator):
def preProcess(self, job):
# Custom pre-processing code here
pass
def postProcess(self, job):
# Custom post-processing code here
pass
In this example, `JobExecutionCoordinator` inherits from `DiracJobExecutionCoordinator`
and provides custom implementations for the `preProcess` and `postProcess` methods.
**Methods to Override:**
- `preProcess(job)`
- `postProcess(job)`
"""

def __init__(self, jobArgs: dict, ceArgs: dict) -> None:
Expand Down
17 changes: 14 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JobWrapper:
def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None):
"""Standard constructor"""
self.initialTiming = os.times()
self.section = "/Systems/WorkloadManagement/JobWrapper"
self.section = os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper")
# Create the accounting report
self.accountingReport = AccountingJob()
# Initialize for accounting
Expand All @@ -87,6 +87,19 @@ def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None)

# self.root is the path the Wrapper is running at
self.root = Path.cwd()
# `self.jobIDPath` represents the directory path where the job is being executed.
# By default, it is set to `self.root`, which corresponds to the current directory,
# since the `jobID` is not yet assigned. In this scenario, the job runs directly in the current directory.
#
# This default behavior is particularly useful when the JobWrapper is initialized without a `jobID`,
# such as when it is transferred to a remote computing resource for execution. In these cases,
# the JobWrapper on the remote resource is initialized without a `jobID` because the current directory
# already corresponds to the job's directory, which was set up on the resource where the JobWrapper
# was originally created.
#
# However, if a `jobID` is provided (normal use case), `self.jobIDPath` is updated to `self.root/jobID`.
# This indicates that the job will be executed in a specific subdirectory named after the job ID,
# rather than directly in the root directory.
self.jobIDPath = self.root
result = getCurrentVersion()
if result["OK"]:
Expand Down Expand Up @@ -379,7 +392,6 @@ def preProcess(self):

if not (result := self.jobExecutionCoordinator.preProcess(command, exeEnv))["OK"]:
self.log.error("Failed to pre-process the job", result["Message"])
return result

return result

Expand Down Expand Up @@ -576,7 +588,6 @@ def postProcess(

if not (result := self.jobExecutionCoordinator.postProcess())["OK"]:
self.log.error("Failed to post-process the job", result["Message"])
return result

return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def killJobWrapper(job: JobWrapper) -> int:
return 1


def rescheduleFailedJob(jobID, minorStatus, jobReport: JobReport):
def rescheduleFailedJob(jobID: str, minorStatus: str, jobReport: JobReport):
"""Function for rescheduling a jobID, setting a minorStatus"""

rescheduleResult = JobStatus.RESCHEDULED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@


@pytest.fixture
def setup_job_wrapper():
def setup_job_wrapper(mocker):
"""Fixture to create a JobWrapper instance with a JobExecutionCoordinator."""
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)

def _setup(jobArgs=None, ceArgs=None):
jw = JobWrapper()
Expand Down Expand Up @@ -187,6 +190,9 @@ def test_preProcess_dirac_jobexec_with_args(setup_job_wrapper):
def test_processSuccessfulCommand(mocker):
"""Test the process method of the JobWrapper class: most common scenario."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
jw = JobWrapper()
jw.jobArgs = {"CPUTime": 100, "Memory": 1}

Expand Down Expand Up @@ -215,6 +221,9 @@ def test_processSuccessfulCommand(mocker):
def test_processSuccessfulDiracJobExec(mocker):
"""Test the process method of the JobWrapper class: most common scenario with dirac-jobexec."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
jw = JobWrapper()
jw.jobArgs = {"CPUTime": 100, "Memory": 1}

Expand All @@ -240,6 +249,9 @@ def test_processSuccessfulDiracJobExec(mocker):
def test_processFailedCommand(mocker):
"""Test the process method of the JobWrapper class: the command fails."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
jw = JobWrapper()
jw.jobArgs = {"CPUTime": 100, "Memory": 1}

Expand Down Expand Up @@ -271,6 +283,9 @@ def test_processFailedCommand(mocker):
def test_processFailedSubprocess(mocker):
"""Test the process method of the JobWrapper class: the subprocess fails."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.systemCall")
mock_system_call.return_value = S_ERROR("Any problem")
mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.getChildPID")
Expand Down Expand Up @@ -300,6 +315,9 @@ def test_processFailedSubprocess(mocker):
def test_processQuickExecutionNoWatchdog(mocker):
"""Test the process method of the JobWrapper class: the payload is too fast to start the watchdog."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
jw = JobWrapper()
jw.jobArgs = {"CPUTime": 100, "Memory": 1}

Expand All @@ -309,7 +327,7 @@ def test_processQuickExecutionNoWatchdog(mocker):
with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err:
jw.outputFile = std_out.name
jw.errorFile = std_err.name
result = jw.process(command=f"echo hello", env={})
result = jw.process(command="echo hello", env={})

assert result["OK"]
assert result["Value"]["payloadStatus"] == 0
Expand All @@ -324,6 +342,9 @@ def test_processQuickExecutionNoWatchdog(mocker):
def test_processSubprocessFailureNoPid(mocker):
"""Test the process method of the JobWrapper class: the subprocess fails and no PID is returned."""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
# Test failure in starting the payload process
jw = JobWrapper()
jw.jobArgs = {}
Expand All @@ -337,7 +358,7 @@ def test_processSubprocessFailureNoPid(mocker):
with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err:
jw.outputFile = std_out.name
jw.errorFile = std_err.name
result = jw.process(command=f"mock_command", env={})
result = jw.process(command="mock_command", env={})
assert not result["OK"]
assert "Payload process could not start after 140 seconds" in result["Message"]

Expand Down Expand Up @@ -585,6 +606,9 @@ def test_execute(mocker, executable, args, src, expectedResult):
The returned value of JobWrapper.execute() is not checked as it can apparently be wrong depending on the shell used.
"""
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value")
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)

if src:
shutil.copy(os.path.join(src, executable), executable)
Expand Down Expand Up @@ -651,8 +675,11 @@ def jobIDPath():


@pytest.fixture
def setup_another_job_wrapper(jobIDPath):
def setup_another_job_wrapper(mocker, jobIDPath):
"""Fixture to create a JobWrapper instance with the jobIDPath."""
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)
jw = JobWrapper(jobIDPath)
jw.jobIDPath = Path(str(jobIDPath))
jw.failedFlag = False
Expand Down Expand Up @@ -853,6 +880,9 @@ def test_processJobOutputs_output_data_upload(mocker, setup_another_job_wrapper)

def test_resolveInputData(mocker):
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock())
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)

jw = JobWrapper()
jw.jobArgs["InputData"] = ""
Expand Down Expand Up @@ -938,6 +968,9 @@ def test_transferInputSandbox(mocker, setup_another_job_wrapper):
)
def test_finalize(mocker, failedFlag, expectedRes, finalStates):
mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock())
mocker.patch(
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
)

jw = JobWrapper()
jw.jobArgs = {"Executable": "/bin/ls"}
Expand Down
1 change: 0 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def __init__(self, user, userGroup, allInfo=True):
self.userName = user
self.userGroup = userGroup
self.userProperties = getPropertiesForGroup(userGroup, [])

self.jobDB = None
self.allInfo = allInfo
self._permissions = {}
Expand Down

0 comments on commit b03b066

Please sign in to comment.