diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 86f22acc4f3..a7a9896b0ec 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -323,6 +323,8 @@ Agents FailedQueueCycleFactor = 10 # How the agent manages the submission of the jobs SubmissionPolicy = JobWrapper + # The CVMFS location to be used for the job execution on the remote site + CVMFSLocation = "/cvmfs/dirac.egi.eu/dirac/pro" # Clean the task after the job is done CleanTask = True } diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py index c82e10a9de3..f2b7a4f0020 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py @@ -3,11 +3,45 @@ class JobExecutionCoordinator: """ - Abstract class for job execution coordinators. + Job Execution Coordinator Class. - This class is responsible for pre-processing and post-processing jobs before and after execution. - It should be implemented by the community job execution coordinator. - It is used by the JobWrapper to handle the execution of jobs. + This class serves as the base class for job execution coordinators, providing + the necessary methods for pre-processing and post-processing jobs before and after + their execution. + + Communities who need to implement specific workflows for job pre-processing + and post-processing in their Dirac extension should inherit from this class and + override the relevant methods with their custom implementations. + + The `JobExecutionCoordinator` class is primarily used by the `JobWrapper` to manage + the execution of jobs, ensuring that all necessary preparations are made before the + job starts, and that any required cleanup or data handling is performed after the + job completes. + + **Example Usage in your Extension:** + + .. code-block:: python + + from DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator import ( + JobExecutionCoordinator as DIRACJobExecutionCoordinator + ) + + class JobExecutionCoordinator(DIRACJobExecutionCoordinator): + def preProcess(self, job): + # Custom pre-processing code here + pass + + def postProcess(self, job): + # Custom post-processing code here + pass + + In this example, `JobExecutionCoordinator` inherits from `DiracJobExecutionCoordinator` + and provides custom implementations for the `preProcess` and `postProcess` methods. + + **Methods to Override:** + + - `preProcess(job)` + - `postProcess(job)` """ def __init__(self, jobArgs: dict, ceArgs: dict) -> None: diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index f40bf3a8db5..b8dc79c9ea6 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -63,7 +63,7 @@ class JobWrapper: def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None): """Standard constructor""" self.initialTiming = os.times() - self.section = "/Systems/WorkloadManagement/JobWrapper" + self.section = os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper") # Create the accounting report self.accountingReport = AccountingJob() # Initialize for accounting @@ -87,6 +87,19 @@ def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None) # self.root is the path the Wrapper is running at self.root = Path.cwd() + # `self.jobIDPath` represents the directory path where the job is being executed. + # By default, it is set to `self.root`, which corresponds to the current directory, + # since the `jobID` is not yet assigned. In this scenario, the job runs directly in the current directory. + # + # This default behavior is particularly useful when the JobWrapper is initialized without a `jobID`, + # such as when it is transferred to a remote computing resource for execution. In these cases, + # the JobWrapper on the remote resource is initialized without a `jobID` because the current directory + # already corresponds to the job's directory, which was set up on the resource where the JobWrapper + # was originally created. + # + # However, if a `jobID` is provided (normal use case), `self.jobIDPath` is updated to `self.root/jobID`. + # This indicates that the job will be executed in a specific subdirectory named after the job ID, + # rather than directly in the root directory. self.jobIDPath = self.root result = getCurrentVersion() if result["OK"]: @@ -379,7 +392,6 @@ def preProcess(self): if not (result := self.jobExecutionCoordinator.preProcess(command, exeEnv))["OK"]: self.log.error("Failed to pre-process the job", result["Message"]) - return result return result @@ -576,7 +588,6 @@ def postProcess( if not (result := self.jobExecutionCoordinator.postProcess())["OK"]: self.log.error("Failed to post-process the job", result["Message"]) - return result return result diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py index 77eb6d68342..e3c7f971872 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -38,7 +38,7 @@ def killJobWrapper(job: JobWrapper) -> int: return 1 -def rescheduleFailedJob(jobID, minorStatus, jobReport: JobReport): +def rescheduleFailedJob(jobID: str, minorStatus: str, jobReport: JobReport): """Function for rescheduling a jobID, setting a minorStatus""" rescheduleResult = JobStatus.RESCHEDULED diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index 3272bbd2034..d9679d40627 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -28,8 +28,11 @@ @pytest.fixture -def setup_job_wrapper(): +def setup_job_wrapper(mocker): """Fixture to create a JobWrapper instance with a JobExecutionCoordinator.""" + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) def _setup(jobArgs=None, ceArgs=None): jw = JobWrapper() @@ -187,6 +190,9 @@ def test_preProcess_dirac_jobexec_with_args(setup_job_wrapper): def test_processSuccessfulCommand(mocker): """Test the process method of the JobWrapper class: most common scenario.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -215,6 +221,9 @@ def test_processSuccessfulCommand(mocker): def test_processSuccessfulDiracJobExec(mocker): """Test the process method of the JobWrapper class: most common scenario with dirac-jobexec.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -240,6 +249,9 @@ def test_processSuccessfulDiracJobExec(mocker): def test_processFailedCommand(mocker): """Test the process method of the JobWrapper class: the command fails.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -271,6 +283,9 @@ def test_processFailedCommand(mocker): def test_processFailedSubprocess(mocker): """Test the process method of the JobWrapper class: the subprocess fails.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.systemCall") mock_system_call.return_value = S_ERROR("Any problem") mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.getChildPID") @@ -300,6 +315,9 @@ def test_processFailedSubprocess(mocker): def test_processQuickExecutionNoWatchdog(mocker): """Test the process method of the JobWrapper class: the payload is too fast to start the watchdog.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -309,7 +327,7 @@ def test_processQuickExecutionNoWatchdog(mocker): with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: jw.outputFile = std_out.name jw.errorFile = std_err.name - result = jw.process(command=f"echo hello", env={}) + result = jw.process(command="echo hello", env={}) assert result["OK"] assert result["Value"]["payloadStatus"] == 0 @@ -324,6 +342,9 @@ def test_processQuickExecutionNoWatchdog(mocker): def test_processSubprocessFailureNoPid(mocker): """Test the process method of the JobWrapper class: the subprocess fails and no PID is returned.""" mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) # Test failure in starting the payload process jw = JobWrapper() jw.jobArgs = {} @@ -337,7 +358,7 @@ def test_processSubprocessFailureNoPid(mocker): with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: jw.outputFile = std_out.name jw.errorFile = std_err.name - result = jw.process(command=f"mock_command", env={}) + result = jw.process(command="mock_command", env={}) assert not result["OK"] assert "Payload process could not start after 140 seconds" in result["Message"] @@ -585,6 +606,9 @@ def test_execute(mocker, executable, args, src, expectedResult): The returned value of JobWrapper.execute() is not checked as it can apparently be wrong depending on the shell used. """ mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) if src: shutil.copy(os.path.join(src, executable), executable) @@ -651,8 +675,11 @@ def jobIDPath(): @pytest.fixture -def setup_another_job_wrapper(jobIDPath): +def setup_another_job_wrapper(mocker, jobIDPath): """Fixture to create a JobWrapper instance with the jobIDPath.""" + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper(jobIDPath) jw.jobIDPath = Path(str(jobIDPath)) jw.failedFlag = False @@ -853,6 +880,9 @@ def test_processJobOutputs_output_data_upload(mocker, setup_another_job_wrapper) def test_resolveInputData(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs["InputData"] = "" @@ -938,6 +968,9 @@ def test_transferInputSandbox(mocker, setup_another_job_wrapper): ) def test_finalize(mocker, failedFlag, expectedRes, finalStates): mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"Executable": "/bin/ls"} diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py index caf5f6e0951..c200bc834de 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py @@ -76,7 +76,6 @@ def __init__(self, user, userGroup, allInfo=True): self.userName = user self.userGroup = userGroup self.userProperties = getPropertiesForGroup(userGroup, []) - self.jobDB = None self.allInfo = allInfo self._permissions = {}