diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index f5c23cf19b5..97ba45fd339 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -49,6 +49,7 @@ jobs: - uses: actions/setup-python@v5 with: python-version: '3.11' + - uses: cvmfs-contrib/github-action-cvmfs@v4 - name: Installing dependencies run: | python -m pip install \ diff --git a/dirac.cfg b/dirac.cfg index 2ebd6005cf4..f57fae99d25 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -582,6 +582,11 @@ Systems index_name=a_different_name } } + JobWrapper + { + # Minimum output buffer requested for running jobs + MinOutputDataBufferGB = 5 + } } } Resources diff --git a/integration_tests.py b/integration_tests.py index 9e6aa773d17..f5cc649cb50 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -779,6 +779,12 @@ def _make_env(flags): env["MYSQL_VER"] = flags.pop("MYSQL_VER", DEFAULT_MYSQL_VER) env["ES_VER"] = flags.pop("ES_VER", DEFAULT_ES_VER) env["IAM_VER"] = flags.pop("IAM_VER", DEFAULT_IAM_VER) + if Path("/cvmfs").is_dir(): + env["CVMFS_DIR"] = "/cvmfs" + else: + # create a directory in tmp + with tempfile.TemporaryDirectory() as tmpdir: + env["CVMFS_DIR"] = tmpdir return env diff --git a/src/DIRAC/WorkloadManagementSystem/Client/DownloadInputData.py b/src/DIRAC/WorkloadManagementSystem/Client/DownloadInputData.py index 3ded1d0eb8e..eae626de7df 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/DownloadInputData.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/DownloadInputData.py @@ -5,7 +5,8 @@ import random import tempfile -from DIRAC import S_ERROR, S_OK, gLogger +from DIRAC import S_ERROR, S_OK, gConfig, gLogger +from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemSection from DIRAC.Core.Utilities.Os import getDiskSpace from DIRAC.Core.Utilities.ReturnValues import returnSingleResult from DIRAC.DataManagementSystem.Utilities.DMSHelpers import DMSHelpers @@ -227,9 +228,9 @@ def __checkDiskSpace(self, totalSize): """ diskSpace = getDiskSpace(self.__getDownloadDir(False)) # MB availableBytes = diskSpace * 1024 * 1024 # bytes - # below can be a configuration option sent via the job wrapper in the future - # Moved from 3 to 5 GB (PhC 130822) for standard output file - bufferGBs = 5.0 + bufferGBs = gConfig.getValue( + os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper", "MinOutputDataBufferGB"), 5.0 + ) data = bufferGBs * 1024 * 1024 * 1024 # bufferGBs in bytes if (data + totalSize) < availableBytes: msg = f"Enough disk space available ({availableBytes} bytes)" diff --git a/src/DIRAC/WorkloadManagementSystem/Client/test/Test_Client_DownloadInputData.py b/src/DIRAC/WorkloadManagementSystem/Client/test/Test_Client_DownloadInputData.py index 04e474cbed9..d92428f8df4 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/test/Test_Client_DownloadInputData.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/test/Test_Client_DownloadInputData.py @@ -113,7 +113,9 @@ def test_DLIDownloadFromBestSE_Fail(dli, mockSE, osPathExists): assert not res["OK"] -def test_DLI_execute(dli, mockSE): +def test_DLI_execute(mocker, dli, mockSE): + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) dli._downloadFromSE = MagicMock(return_value=S_OK({"path": "/local/path/1.txt"})) res = dli.execute(dataToResolve=["/a/lfn/1.txt"]) assert res["OK"] @@ -121,8 +123,10 @@ def test_DLI_execute(dli, mockSE): assert "/a/lfn/1.txt" in res["Value"]["Successful"], res -def test_DLI_execute_getFileMetadata_Fails(dli, mockSE): +def test_DLI_execute_getFileMetadata_Fails(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( {"Successful": {}, "Failed": {"/a/lfn/1.txt": "Error Getting MetaData"}} @@ -137,8 +141,10 @@ def test_DLI_execute_getFileMetadata_Fails(dli, mockSE): assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res -def test_DLI_execute_getFileMetadata_Lost(dli, mockSE): +def test_DLI_execute_getFileMetadata_Lost(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( { @@ -161,8 +167,10 @@ def test_DLI_execute_getFileMetadata_Lost(dli, mockSE): assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res -def test_DLI_execute_getFileMetadata_Unavailable(dli, mockSE): +def test_DLI_execute_getFileMetadata_Unavailable(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( { @@ -185,8 +193,10 @@ def test_DLI_execute_getFileMetadata_Unavailable(dli, mockSE): assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res -def test_DLI_execute_getFileMetadata_Cached(dli, mockSE): +def test_DLI_execute_getFileMetadata_Cached(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( { @@ -209,8 +219,10 @@ def test_DLI_execute_getFileMetadata_Cached(dli, mockSE): assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res -def test_DLI_execute_FirstDownFailed(dli, mockSE): +def test_DLI_execute_FirstDownFailed(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( {"Successful": {"/a/lfn/1.txt": {"Cached": 1, "Accessible": 0}}, "Failed": {}} @@ -224,8 +236,10 @@ def test_DLI_execute_FirstDownFailed(dli, mockSE): assert res["Value"]["Successful"]["/a/lfn/1.txt"]["path"] == "/local/path/1.txt", res -def test_DLI_execute_AllDownFailed(dli, mockSE): +def test_DLI_execute_AllDownFailed(mocker, dli, mockSE): """When getFileMetadata fails for the first SE, we should fall back to the second.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) mockObjectSE = mockSE.return_value mockObjectSE.getFileMetadata.return_value = S_OK( {"Successful": {"/a/lfn/1.txt": {"Cached": 1, "Accessible": 0}}, "Failed": {}} @@ -239,8 +253,10 @@ def test_DLI_execute_AllDownFailed(dli, mockSE): assert res["Value"]["Failed"][0] == "/a/lfn/1.txt", res -def test_DLI_execute_NoLocal(dli, mockSE): +def test_DLI_execute_NoLocal(mocker, dli, mockSE): """Data only at the remote SE.""" + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.getSystemSection", return_value="pippo") + mocker.patch("DIRAC.WorkloadManagementSystem.Client.DownloadInputData.gConfig.getValue", return_value=2) dli = DownloadInputData( { "InputData": [], diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 36bf72b3339..27b87ddacd1 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -378,5 +378,6 @@ JobWrapper # Retry the upload of the output file if only one output SE is defined RetryUpload = False TapeSE = ['-tape', '-RDST', '-RAW'] + MinOutputDataBufferGB = 5 } ##END diff --git a/tests/CI/docker-compose.yml b/tests/CI/docker-compose.yml index 540245d82cb..472da21809a 100644 --- a/tests/CI/docker-compose.yml +++ b/tests/CI/docker-compose.yml @@ -121,6 +121,8 @@ services: user: "${DIRAC_UID}:${DIRAC_GID}" depends_on: - dirac-server + volumes: + - ${CVMFS_DIR}:/cvmfs:ro ulimits: nofile: 8192 pull_policy: always diff --git a/tests/Jenkins/dirac-cfg-update-server.py b/tests/Jenkins/dirac-cfg-update-server.py index 50ac98a3a24..d41f1a78cf6 100644 --- a/tests/Jenkins/dirac-cfg-update-server.py +++ b/tests/Jenkins/dirac-cfg-update-server.py @@ -557,6 +557,8 @@ # to avoid having to wait while testing rescheduling csAPI.setOption("Systems/WorkloadManagement/Production/Executors/Optimizers/JobScheduling/RescheduleDelays", "0") +csAPI.createSection("Systems/WorkloadManagement/Production/JobWrapper/") +csAPI.setOption("Systems/WorkloadManagement/Production/JobWrapper/MinOutputDataBufferGB", 1) # Final action: commit in CS res = csAPI.commit()