From 1968c85c6de588f447fe83ce5ad0dc501f2dfc6e Mon Sep 17 00:00:00 2001 From: fstagni Date: Wed, 11 Oct 2023 12:28:15 +0200 Subject: [PATCH] fix: sadly, remove MJF --- src/DIRAC/Core/Utilities/MJF.py | 198 ------------------ .../BatchSystems/TimeLeft/MJFResourceUsage.py | 108 ---------- .../BatchSystems/TimeLeft/TimeLeft.py | 4 - .../TimeLeft/test/Test_TimeLeft.py | 2 - .../JobWrapper/Watchdog.py | 31 --- .../Utilities/JobParameters.py | 74 +------ .../scripts/dirac_wms_get_wn_parameters.py | 9 +- .../WorkloadManagementSystem/Test_TimeLeft.sh | 36 +--- 8 files changed, 11 insertions(+), 451 deletions(-) delete mode 100644 src/DIRAC/Core/Utilities/MJF.py delete mode 100644 src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py diff --git a/src/DIRAC/Core/Utilities/MJF.py b/src/DIRAC/Core/Utilities/MJF.py deleted file mode 100644 index fdd2bc64a2c..00000000000 --- a/src/DIRAC/Core/Utilities/MJF.py +++ /dev/null @@ -1,198 +0,0 @@ -""" The MJF utility calculates the amount of wall clock time - left for a given batch system slot or VM. This is essential for the - 'Filling Mode' where several jobs may be executed in the same slot. - - Machine Job/Features are used following HSF-TN-2016-02 if available. - Otherwise values are filled in using the batch system and CS - information. -""" -import os -import ssl -import time -from urllib.request import urlopen - -import DIRAC - -from DIRAC import gLogger, gConfig - - -class MJF: - """Machine/Job Features methods""" - - mjfKeys = { - "MACHINEFEATURES": ["total_cpu", "hs06", "shutdowntime", "grace_secs"], - "JOBFEATURES": [ - "allocated_cpu", - "hs06_job", - "shutdowntime_job", - "grace_secs_job", - "jobstart_secs", - "job_id", - "wall_limit_secs", - "cpu_limit_secs", - "max_rss_bytes", - "max_swap_bytes", - "scratch_limit_bytes", - ], - } - - ############################################################################# - def __init__(self): - """Standard constructor""" - self.log = gLogger.getSubLogger(self.__class__.__name__) - - capath = DIRAC.Core.Security.Locations.getCAsLocation() - - if not capath: - raise Exception("Unable to find CA files location! Not in /etc/grid-security/certificates/ etc.") - - # Used by urllib when talking to HTTPS web servers - self.context = ssl.create_default_context(capath=capath) - - def updateConfig(self, pilotStartTime=None): - """Populate /LocalSite/MACHINEFEATURES and /LocalSite/JOBFEATURES with MJF values - This is run early in the job to update the configuration file that subsequent DIRAC - scripts read when they start. - """ - - if pilotStartTime: - gConfig.setOptionValue("/LocalSite/JOBFEATURES/jobstart_secs", str(pilotStartTime)) - - for mORj in ["MACHINEFEATURES", "JOBFEATURES"]: - for key in self.mjfKeys[mORj]: - value = self.__fetchMachineJobFeature(mORj, key) - - if value is not None: - gConfig.setOptionValue(f"/LocalSite/{mORj}/{key}", value) - - def getMachineFeature(self, key): - """Returns MACHINEFEATURES/key value saved in /LocalSite configuration by - updateConfigFile() unless MACHINEFEATURES/shutdowntime when we try to fetch - from the source URL itself again in case it changes. - """ - if key == "shutdowntime": - value = self.__fetchMachineJobFeature("MACHINEFEATURES", "shutdowntime") - # If unable to fetch shutdowntime, go back to any value in /LocalSite - # in case HTTP(S) server is down - if value is not None: - return value - - return gConfig.getValue("/LocalSite/MACHINEFEATURES/" + key, None) - - def getIntMachineFeature(self, key): - """Returns MACHINEFEATURES/key as an int or None if not an int or not present""" - value = self.getMachineFeature(key) - - try: - return int(value) - except ValueError: - return None - - def getJobFeature(self, key): - """Returns JOBFEATURES/key value saved in /LocalSite configuration by - updateConfigFile() unless JOBFEATURES/shutdowntime_job when we try to fetch - from the source URL itself again in case it changes. - """ - if key == "shutdowntime_job": - value = self.__fetchMachineJobFeature("JOBFEATURES", "shutdowntime_job") - # If unable to fetch shutdowntime_job, go back to any value in /LocalSite - # in case HTTP(S) server is down - if value is not None: - return value - - return gConfig.getValue("/LocalSite/JOBFEATURES/" + key, None) - - def getIntJobFeature(self, key): - """Returns JOBFEATURES/key as an int or None if not an int or not present""" - value = self.getJobFeature(key) - - try: - return int(value) - except ValueError: - return None - - def __fetchMachineJobFeature(self, mORj, key): - """Returns raw MJF value for a given key, perhaps by HTTP(S), perhaps from a local file - mORj must be MACHINEFEATURES or JOBFEATURES - If the value cannot be found, then return None. There are many legitimate ways for - a site not to provide some MJF values so we don't log errors, failures etc. - """ - if mORj != "MACHINEFEATURES" and mORj != "JOBFEATURES": - raise Exception("Must request MACHINEFEATURES or JOBFEATURES") - - if mORj not in os.environ: - return None - - url = os.environ[mORj] + "/" + key - - # Simple if a file - if url[0] == "/": - try: - with open(url) as fd: - return fd.read().strip() - except Exception: - return None - - # Otherwise make sure it's an HTTP(S) URL - if not url.startswith("http://") and not url.startswith("https://"): - return None - - # We could have used urlopen() for local files too, but we also - # need to check HTTP return code in case we get an HTML error page - # instead of a true key value. - try: - mjfUrl = urlopen(url=url, context=self.context) - # HTTP return codes other than 2xx mean failure - if int(mjfUrl.getcode() / 100) != 2: - return None - return mjfUrl.read().strip() - except Exception: - return None - finally: - try: - mjfUrl.close() - except UnboundLocalError: - pass - - def getWallClockSecondsLeft(self): - """Returns the number of seconds until either the wall clock limit - or the shutdowntime(_job) is reached. - """ - - now = int(time.time()) - secondsLeft = None - jobstartSecs = self.getIntJobFeature("jobstart_secs") - wallLimitSecs = self.getIntJobFeature("wall_limit_secs") - shutdowntimeJob = self.getIntJobFeature("shutdowntime_job") - shutdowntime = self.getIntMachineFeature("shutdowntime") - - # look for local shutdown file - try: - with open("/var/run/shutdown_time") as fd: - shutdowntimeLocal = int(fd.read().strip()) - except (OSError, ValueError): - shutdowntimeLocal = None - - if jobstartSecs is not None and wallLimitSecs is not None: - secondsLeft = jobstartSecs + wallLimitSecs - now - - if shutdowntimeJob is not None: - if secondsLeft is None: - secondsLeft = shutdowntimeJob - now - elif shutdowntimeJob - now < secondsLeft: - secondsLeft = shutdowntimeJob - now - - if shutdowntime is not None: - if secondsLeft is None: - secondsLeft = shutdowntime - now - elif shutdowntime - now < secondsLeft: - secondsLeft = shutdowntime - now - - if shutdowntimeLocal is not None: - if secondsLeft is None: - secondsLeft = shutdowntimeLocal - now - elif shutdowntimeLocal - now < secondsLeft: - secondsLeft = shutdowntimeLocal - now - - # Wall Clock time left or None if unknown - return secondsLeft diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py deleted file mode 100644 index 95ee93c2faa..00000000000 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py +++ /dev/null @@ -1,108 +0,0 @@ -""" The Machine/Job Features TimeLeft utility interrogates the MJF values - for the current CPU and Wallclock consumed, as well as their limits. -""" -import os -import time -from urllib.request import urlopen - -from DIRAC import S_OK, S_ERROR -from DIRAC.Resources.Computing.BatchSystems.TimeLeft.ResourceUsage import ResourceUsage - - -class MJFResourceUsage(ResourceUsage): - """ - This is the MJF plugin of the TimeLeft Utility - """ - - ############################################################################# - def __init__(self): - """Standard constructor""" - super().__init__("MJF", "JOB_ID") - - self.queue = os.environ.get("QUEUE") - - self.log.verbose(f"jobID={self.jobID}, queue={self.queue}") - self.startTime = time.time() - - ############################################################################# - def getResourceUsage(self): - """Returns S_OK with a dictionary containing the entries CPU, CPULimit, - WallClock, WallClockLimit, and Unit for current slot. - """ - - cpuLimit = None - wallClockLimit = None - wallClock = None - jobStartSecs = None - - jobFeaturesPath = None - machineFeaturesPath = None - - # Getting info from JOBFEATURES - try: - # We are not called from TimeLeft.py if these are not set - jobFeaturesPath = os.environ["JOBFEATURES"] - except KeyError: - self.log.warn("$JOBFEATURES is not set") - - if jobFeaturesPath: - try: - wallClockLimit = int(urlopen(jobFeaturesPath + "/wall_limit_secs").read()) - self.log.verbose("wallClockLimit from JF = %d" % wallClockLimit) - except ValueError: - self.log.warn("/wall_limit_secs is unreadable") - except OSError as e: - self.log.exception("Issue with $JOBFEATURES/wall_limit_secs", lException=e) - self.log.warn("Could not determine cpu limit from $JOBFEATURES/wall_limit_secs") - - try: - jobStartSecs = int(urlopen(jobFeaturesPath + "/jobstart_secs").read()) - self.log.verbose("jobStartSecs from JF = %d" % jobStartSecs) - except ValueError: - self.log.warn("/jobstart_secs is unreadable, setting a default") - jobStartSecs = self.startTime - except OSError as e: - self.log.exception("Issue with $JOBFEATURES/jobstart_secs", lException=e) - self.log.warn("Can't open jobstart_secs, setting a default") - jobStartSecs = self.startTime - - try: - cpuLimit = int(urlopen(jobFeaturesPath + "/cpu_limit_secs").read()) - self.log.verbose("cpuLimit from JF = %d" % cpuLimit) - except ValueError: - self.log.warn("/cpu_limit_secs is unreadable") - except OSError as e: - self.log.exception("Issue with $JOBFEATURES/cpu_limit_secs", lException=e) - self.log.warn("Could not determine cpu limit from $JOBFEATURES/cpu_limit_secs") - - wallClock = int(time.time()) - jobStartSecs - - # Getting info from MACHINEFEATURES - try: - # We are not called from TimeLeft.py if these are not set - machineFeaturesPath = os.environ["MACHINEFEATURES"] - except KeyError: - self.log.warn("$MACHINEFEATURES is not set") - - if machineFeaturesPath and jobStartSecs: - try: - shutdownTime = int(urlopen(machineFeaturesPath + "/shutdowntime").read()) - self.log.verbose("shutdownTime from MF = %d" % shutdownTime) - if int(time.time()) + wallClockLimit > shutdownTime: - # reduce wallClockLimit if would overrun shutdownTime - wallClockLimit = shutdownTime - jobStartSecs - except ValueError: - self.log.warn("/shutdowntime is unreadable") - except OSError as e: - self.log.warn("Issue with $MACHINEFEATURES/shutdowntime", repr(e)) - self.log.warn("Could not determine a shutdowntime value from $MACHINEFEATURES/shutdowntime") - - # Reporting - consumed = {"CPU": None, "CPULimit": cpuLimit, "WallClock": wallClock, "WallClockLimit": wallClockLimit} - if cpuLimit and wallClock and wallClockLimit: - self.log.verbose(f"MJF consumed: {str(consumed)}") - return S_OK(consumed) - self.log.info("Could not determine some parameters") - retVal = S_ERROR("Could not determine some parameters") - retVal["Value"] = consumed - return retVal diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py index ba577c0f9e3..30013df9e3d 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py @@ -143,10 +143,6 @@ def __getBatchSystemPlugin(self): name = batchSystem break - if name is None and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ: - # Only use MJF if legacy batch system information not available for now - name = "MJF" - if name is None: self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported") return S_ERROR("Current batch system is not supported") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index 3e2521a6f90..5ec8c522392 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -54,7 +54,6 @@ " 00:00:60.00 6267 40713 25469,14249 12/31-20:52:00 -" ) -MJF_OUT = "0" SLURM_OUT_0 = "12345,86400,24,3600,03:00:00" SLURM_OUT_1 = "12345,86400,24,3600,4-03:00:00" @@ -72,7 +71,6 @@ [ ("LSF", {}, LSF_OUT, 0.0), ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0}, LSF_OUT, 0.0), - ("MJF", {}, MJF_OUT, 0.0), ("SGE", {}, SGE_OUT, 300.0), ("SLURM", {}, SLURM_OUT_0, 432000.0), ("SLURM", {}, SLURM_OUT_1, 432000.0), diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index 69dfd9ec1b2..542296f7d91 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -32,7 +32,6 @@ from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.ConfigurationSystem.Client.Config import gConfig from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemInstance -from DIRAC.Core.Utilities import MJF from DIRAC.Core.Utilities.Os import getDiskSpace from DIRAC.Core.Utilities.Profiler import Profiler from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft @@ -217,7 +216,6 @@ def execute(self): and (time.time() - self.initialValues["StartTime"]) > self.wallClockCheckSeconds * self.wallClockCheckCount ): self.wallClockCheckCount += 1 - self._performWallClockChecks() if self.littleTimeLeft: # if we have gone over enough iterations query again @@ -242,35 +240,6 @@ def execute(self): # self.log.debug('Application thread is alive: checking count is %s' %(self.checkCount)) return S_OK() - ############################################################################# - def _performWallClockChecks(self): - """Watchdog performs the wall clock checks based on MJF. Signals are sent - to processes if we need to stop, but function always returns S_OK() - """ - mjf = MJF.MJF() - - try: - wallClockSecondsLeft = mjf.getWallClockSecondsLeft() - except Exception: - # Just stop if we can't get the wall clock seconds left - return S_OK() - - jobstartSeconds = mjf.getIntJobFeature("jobstart_secs") - if jobstartSeconds is None: - # Just stop if we don't know when the job started - return S_OK() - - if (int(time.time()) > jobstartSeconds + self.stopSigStartSeconds) and ( - wallClockSecondsLeft < self.stopSigFinishSeconds + self.wallClockCheckSeconds - ): - # Need to send the signal! Assume it works to avoid sending the signal more than once - self.log.info("Sending signal to JobWrapper children", f"({self.stopSigNumber})") - self.stopSigSent = True - - kill_proc_tree(self.wrapperPID, includeParent=False) - - return S_OK() - ############################################################################# def _performChecks(self): """The Watchdog checks are performed at a different period to the checking of the diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index d3cb9a5a8d2..3af8e48b4ba 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -1,63 +1,11 @@ -""" DIRAC Workload Management System utility module to get available memory and processors from mjf +""" DIRAC Workload Management System utility module to get available memory and processors """ import multiprocessing -import os -from urllib.request import urlopen from DIRAC import gConfig, gLogger from DIRAC.Core.Utilities.List import fromChar -def getJobFeatures(): - features = {} - if "JOBFEATURES" not in os.environ: - return features - for item in ( - "allocated_cpu", - "hs06_job", - "shutdowntime_job", - "grace_secs_job", - "jobstart_secs", - "job_id", - "wall_limit_secs", - "cpu_limit_secs", - "max_rss_bytes", - "max_swap_bytes", - "scratch_limit_bytes", - ): - fname = os.path.join(os.environ["JOBFEATURES"], item) - try: - val = urlopen(fname).read() - except Exception: - val = 0 - features[item] = val - return features - - -def getProcessorFromMJF(): - jobFeatures = getJobFeatures() - if jobFeatures: - try: - return int(jobFeatures["allocated_cpu"]) - except KeyError: - gLogger.error( - "MJF is available but allocated_cpu is not an integer", repr(jobFeatures.get("allocated_cpu")) - ) - return None - - -def getMemoryFromMJF(): - jobFeatures = getJobFeatures() - if jobFeatures: - try: - return int(jobFeatures["max_rss_bytes"]) - except KeyError: - gLogger.error( - "MJF is available but max_rss_bytes is not an integer", repr(jobFeatures.get("max_rss_bytes")) - ) - return None - - def getMemoryFromProc(): meminfo = {i.split()[0].rstrip(":"): int(i.split()[1]) for i in open("/proc/meminfo").readlines()} maxRAM = meminfo["MemTotal"] @@ -72,10 +20,9 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): Tries to find it in this order: 1) from the /Resources/Computing/CEDefaults/NumberOfProcessors (which is what the pilot fills up) - 2) if not present from JobFeatures - 3) if not present looks in CS for "NumberOfProcessors" Queue or CE option - 4) if not present but there's WholeNode tag, look what the WN provides using multiprocessing.cpu_count() - 5) return 1 + 2) if not present looks in CS for "NumberOfProcessors" Queue or CE option + 3) if not present but there's WholeNode tag, look what the WN provides using multiprocessing.cpu_count() + 4) return 1 """ # 1) from /Resources/Computing/CEDefaults/NumberOfProcessors @@ -84,14 +31,7 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): if numberOfProcessors: return numberOfProcessors - # 2) from MJF - gLogger.info("Getting numberOfProcessors from MJF") - numberOfProcessors = getProcessorFromMJF() - if numberOfProcessors: - return numberOfProcessors - gLogger.info("NumberOfProcessors could not be found in MJF") - - # 3) looks in CS for "NumberOfProcessors" Queue or CE or site option + # 2) looks in CS for "NumberOfProcessors" Queue or CE or site option if not siteName: siteName = gConfig.getValue("/LocalSite/Site", "") if not gridCE: @@ -116,7 +56,7 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): if numberOfProcessors: return numberOfProcessors - # 4) looks in CS for tags + # 3) looks in CS for tags gLogger.info(f"Getting tagsfor {siteName}: {gridCE}: {queue}") # Tags of the CE tags = fromChar( @@ -133,7 +73,7 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): gLogger.info("Found WholeNode tag, using multiprocessing.cpu_count()") return multiprocessing.cpu_count() - # 5) return the default + # 4) return the default return 1 diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py index 07a1042f88a..890ee15b218 100755 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py @@ -2,8 +2,8 @@ """ Determine number of processors and memory for the worker node """ -from DIRAC.Core.Base.Script import Script from DIRAC import gLogger +from DIRAC.Core.Base.Script import Script from DIRAC.WorkloadManagementSystem.Utilities import JobParameters ceName = "" @@ -39,11 +39,8 @@ def main(): gLogger.info("Getting number of processors") numberOfProcessor = JobParameters.getNumberOfProcessors(Site, ceName, Queue) - gLogger.info("Getting memory (RAM) from MJF") - maxRAM = JobParameters.getMemoryFromMJF() - if not maxRAM: - gLogger.info("maxRAM could not be found in MJF, using JobParameters.getMemoryFromProc()") - maxRAM = JobParameters.getMemoryFromProc() + gLogger.info("Getting memory (RAM)") + maxRAM = JobParameters.getMemoryFromProc() gLogger.info("Getting number of GPUs") numberOfGPUs = JobParameters.getNumberOfGPUs(Site, ceName, Queue) diff --git a/tests/Integration/WorkloadManagementSystem/Test_TimeLeft.sh b/tests/Integration/WorkloadManagementSystem/Test_TimeLeft.sh index af9473c3592..f444bf91840 100755 --- a/tests/Integration/WorkloadManagementSystem/Test_TimeLeft.sh +++ b/tests/Integration/WorkloadManagementSystem/Test_TimeLeft.sh @@ -17,7 +17,7 @@ fi SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" ############################################################################### -# Can't find anywhere a batch plugin, not even MJF +# Can't find anywhere a batch plugin dirac-wms-get-queue-cpu-time --cfg "${SCRIPT_DIR}/pilot.cfg" $DEBUG @@ -27,37 +27,3 @@ else echo -e "\nSomething wrong!\n\n" >&2 exit 1 fi - - -############################################################################### -# Found MJF, not reading it (not a directory) - -export MACHINEFEATURES="${SCRIPT_DIR}/sb.cfg" -export JOBFEATURES="${SCRIPT_DIR}/sb.cfg" - -dirac-wms-get-queue-cpu-time --cfg "${SCRIPT_DIR}/pilot.cfg" $DEBUG - -if [[ "${?}" -eq 0 ]]; then - echo -e "\nSuccess\n\n" -else - echo -e "\nSomething wrong!\n\n" >&2 - exit 1 -fi - - -############################################################################### -# Found MJF, gave proper values - -export MACHINEFEATURES=${SCRIPT_DIR}/MJF/ -export JOBFEATURES=${SCRIPT_DIR}/MJF/ - -dirac-wms-get-queue-cpu-time --cfg "${SCRIPT_DIR}/pilot.cfg" $DEBUG - -if [[ "${?}" -eq 0 ]]; then - echo -e "\nSuccess\n\n" -else - echo -e "\nSomething wrong!\n\n" >&2 - exit 1 -fi - -exit 0