diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py new file mode 100644 index 0000000000..7a7940d3d5 --- /dev/null +++ b/bin/testRucioConMonMem.py @@ -0,0 +1,74 @@ +import os +import sys +import logging + +from memory_profiler import profile +from pympler import asizeof +from WMCore.Services.RucioConMon.RucioConMon import RucioConMon + +RSE_NAME = "T1_US_FNAL_Disk" +RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged" + + + +def loggerSetup(logLevel=logging.INFO): + logger = logging.getLogger(__name__) + outHandler = logging.StreamHandler(sys.stdout) + outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s")) + outHandler.setLevel(logLevel) + logger.addHandler(outHandler) + logger.setLevel(logLevel) + return logger + + +#profileFp = open('getUnmergedFiles.log', 'w+') +#@profile(stream=profileFp) +@profile +def getUnmergedFiles(rucioConMon, logger, compressed=False): + dirs = set() + counter = 0 + logger.info("Fetching data from Rucio ConMon for RSE: %s.", RSE_NAME) + for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed): + dirPath = _cutPath(lfn) + dirs.add(dirPath) + #logger.info(f"Size of dirs object: {asizeof.asizeof(dirs)}") + counter += 1 + logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}") + return dirs + + +def _cutPath(filePath): + newPath = [] + root = filePath + while True: + root, tail = os.path.split(root) + if tail: + newPath.append(tail) + else: + newPath.append(root) + break + newPath.reverse() + # Cut/slice the path to the level/element required. + newPath = newPath[:7] + # Build the path out of all that is found up to the deepest level in the LFN tree + finalPath = os.path.join(*newPath) + return finalPath + + +def main(): + logger = loggerSetup() + rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger) + if False: + zipped=False + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + + if True: + zipped=True + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + logger.info("Done!") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index 5a695e6727..a7df53f660 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -15,6 +15,9 @@ import json import logging +#from memory_profiler import profile +from pympler import asizeof + from WMCore.Services.Service import Service from Utils.Utilities import decodeBytesToUnicode @@ -40,14 +43,13 @@ def __init__(self, url, logger=None, configDict=None): super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) - def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False): + def _getResult(self, uri, callname="", clearCache=False, args=None): """ Either fetch data from the cache file or query the data-service :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior :param args: additional parameters to HTTP request call - :param binary: specifies request for binary object from HTTP requests (e.g. zipped content) :return: A dictionary """ @@ -68,31 +70,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False if clearCache: self.clearCache(cachedApi, args) results = '{}' # explicitly define results which will be loaded by json.loads below - if binary: - with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream: - results = gzip.decompress(istream.read()) - return results - else: - with self.refreshCache(cachedApi, apiUrl) as istream: - results = istream.read() - - results = json.loads(results) - return results + with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream: + results = istream.read() + return json.loads(results) - def _getResultZipped(self, uri, callname="", clearCache=True, args=None): + def _getResultZipped(self, uri, callname="", clearCache=True): """ - This method is retrieving a zipped file from the uri privided, instead - of the normal json + This method retrieves gzipped content, instead of the standard json format. :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior - :param args: additional parameters to HTTP request call - :return: a list of LFNs + :return: yields a single record from the data retrieved """ - data = self._getResult(uri, callname, clearCache, args, binary=True) - # convert bytes which we received upstream to string - data = decodeBytesToUnicode(data) - return [f for f in data.split('\n') if f] + cachedApi = callname + if clearCache: + self.clearCache(cachedApi) + + with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: + for line in istream: + line = decodeBytesToUnicode(line).replace("\n", "") + yield line def getRSEStats(self): """ @@ -104,12 +101,14 @@ def getRSEStats(self): rseStats = self._getResult(uri, callname='stats') return rseStats + #profileFp = open('getRSEUnmerged.log', 'w+') + #@profile(stream=profileFp) def getRSEUnmerged(self, rseName, zipped=False): """ Gets the list of all unmerged files in an RSE :param rseName: The RSE whose list of unmerged files to be retrieved :param zipped: If True the interface providing the zipped lists will be called - :return: A list of unmerged files for the RSE in question + :return: a generator of unmerged files for the RSE in question """ # NOTE: The default API provided by Rucio Consistency Monitor is in a form of a # zipped file/stream. Currently we are using the newly provided json API @@ -117,12 +116,16 @@ def getRSEUnmerged(self, rseName, zipped=False): # implement the method with the zipped API and use disc cache for # reading/streaming from file. This will prevent any set arithmetic # in the future. - if not zipped: - uri = "files?rse=%s&format=json" % rseName - rseUnmerged = self._getResult(uri, callname=rseName) - return rseUnmerged - else: + if zipped: uri = "files?rse=%s&format=raw" % rseName callname = '{}.zipped'.format(rseName) - rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) - return rseUnmerged + rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) + else: + uri = "files?rse=%s&format=json" % rseName + callname = '{}.json'.format(rseName) + rseUnmerged = self._getResult(uri, callname=callname) + + self['logger'].info(f"Size of rseUnmerged object: {asizeof.asizeof(rseUnmerged)}") + # now lazily return items + for item in rseUnmerged: + yield item