From 570d1f691a3b20298d8433f055b5da56008f152f Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 30 Aug 2024 11:34:13 -0400 Subject: [PATCH 1/4] Profile RucioConMon memory fix parameters --- bin/testRucioConMonMem.py | 66 +++++++++++++++++++ .../Services/RucioConMon/RucioConMon.py | 57 ++++++++-------- 2 files changed, 94 insertions(+), 29 deletions(-) create mode 100644 bin/testRucioConMonMem.py diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py new file mode 100644 index 0000000000..602f3c83ff --- /dev/null +++ b/bin/testRucioConMonMem.py @@ -0,0 +1,66 @@ +import os +import sys +import logging +from memory_profiler import profile +from WMCore.Services.RucioConMon.RucioConMon import RucioConMon + +RSE_NAME = "T2_AT_Vienna" +RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged" + +def loggerSetup(logLevel=logging.INFO): + logger = logging.getLogger(__name__) + outHandler = logging.StreamHandler(sys.stdout) + outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s")) + outHandler.setLevel(logLevel) + logger.addHandler(outHandler) + logger.setLevel(logLevel) + return logger + + +profileFp = open('getUnmergedFiles.log', 'w+') +@profile(stream=profileFp) +def getUnmergedFiles(rucioConMon, logger, compressed=False): + dirs = set() + counter = 0 + logger.info("Fetching data from Rucio ConMon for RSE: %s.", RSE_NAME) + for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed): + dirPath = _cutPath(lfn) + dirs.add(dirPath) + counter =+ 1 + logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}") + return dirs + + +def _cutPath(filePath): + newPath = [] + root = filePath + while True: + root, tail = os.path.split(root) + if tail: + newPath.append(tail) + else: + newPath.append(root) + break + newPath.reverse() + # Cut/slice the path to the level/element required. + newPath = newPath[:7] + # Build the path out of all that is found up to the deepest level in the LFN tree + finalPath = os.path.join(*newPath) + return finalPath + + +def main(): + logger = loggerSetup() + zipped=False + rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger) + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + + zipped=True + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + logger.info("Done!") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index 5a695e6727..bc3305f48e 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -14,6 +14,7 @@ import gzip import json import logging +from memory_profiler import profile from WMCore.Services.Service import Service from Utils.Utilities import decodeBytesToUnicode @@ -40,14 +41,13 @@ def __init__(self, url, logger=None, configDict=None): super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) - def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False): + def _getResult(self, uri, callname="", clearCache=False, args=None): """ Either fetch data from the cache file or query the data-service :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior :param args: additional parameters to HTTP request call - :param binary: specifies request for binary object from HTTP requests (e.g. zipped content) :return: A dictionary """ @@ -68,31 +68,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False if clearCache: self.clearCache(cachedApi, args) results = '{}' # explicitly define results which will be loaded by json.loads below - if binary: - with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream: - results = gzip.decompress(istream.read()) - return results - else: - with self.refreshCache(cachedApi, apiUrl) as istream: - results = istream.read() - - results = json.loads(results) - return results + with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream: + results = istream.read() + return json.loads(results) - def _getResultZipped(self, uri, callname="", clearCache=True, args=None): + def _getResultZipped(self, uri, callname="", clearCache=True): """ - This method is retrieving a zipped file from the uri privided, instead - of the normal json + This method retrieves gzipped content, instead of the standard json format. :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior - :param args: additional parameters to HTTP request call - :return: a list of LFNs + :return: yields a single record from the data retrieved """ - data = self._getResult(uri, callname, clearCache, args, binary=True) - # convert bytes which we received upstream to string - data = decodeBytesToUnicode(data) - return [f for f in data.split('\n') if f] + cachedApi = callname + if clearCache: + self.clearCache(cachedApi) + + with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: + for line in istream: + line = decodeBytesToUnicode(line).replace("\n", "") + yield line def getRSEStats(self): """ @@ -104,12 +99,14 @@ def getRSEStats(self): rseStats = self._getResult(uri, callname='stats') return rseStats + profileFp = open('getRSEUnmerged.log', 'w+') + @profile(stream=profileFp) def getRSEUnmerged(self, rseName, zipped=False): """ Gets the list of all unmerged files in an RSE :param rseName: The RSE whose list of unmerged files to be retrieved :param zipped: If True the interface providing the zipped lists will be called - :return: A list of unmerged files for the RSE in question + :return: a generator of unmerged files for the RSE in question """ # NOTE: The default API provided by Rucio Consistency Monitor is in a form of a # zipped file/stream. Currently we are using the newly provided json API @@ -117,12 +114,14 @@ def getRSEUnmerged(self, rseName, zipped=False): # implement the method with the zipped API and use disc cache for # reading/streaming from file. This will prevent any set arithmetic # in the future. - if not zipped: - uri = "files?rse=%s&format=json" % rseName - rseUnmerged = self._getResult(uri, callname=rseName) - return rseUnmerged - else: + if zipped: uri = "files?rse=%s&format=raw" % rseName callname = '{}.zipped'.format(rseName) - rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) - return rseUnmerged + rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) + else: + uri = "files?rse=%s&format=json" % rseName + callname = '{}.json'.format(rseName) + rseUnmerged = self._getResult(uri, callname=callname) + # now lazily return items + for item in rseUnmerged: + yield item From f7ec44ec61f676694775391aaab47b4d3b48d2e1 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 30 Aug 2024 17:52:31 -0400 Subject: [PATCH 2/4] Use pympler to check memory --- bin/testRucioConMonMem.py | 8 +++++--- src/python/WMCore/Services/RucioConMon/RucioConMon.py | 9 ++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py index 602f3c83ff..94659351dc 100644 --- a/bin/testRucioConMonMem.py +++ b/bin/testRucioConMonMem.py @@ -1,7 +1,8 @@ import os import sys import logging -from memory_profiler import profile +#from memory_profiler import profile +from pympler import asizeof from WMCore.Services.RucioConMon.RucioConMon import RucioConMon RSE_NAME = "T2_AT_Vienna" @@ -17,8 +18,8 @@ def loggerSetup(logLevel=logging.INFO): return logger -profileFp = open('getUnmergedFiles.log', 'w+') -@profile(stream=profileFp) +#profileFp = open('getUnmergedFiles.log', 'w+') +#@profile(stream=profileFp) def getUnmergedFiles(rucioConMon, logger, compressed=False): dirs = set() counter = 0 @@ -26,6 +27,7 @@ def getUnmergedFiles(rucioConMon, logger, compressed=False): for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed): dirPath = _cutPath(lfn) dirs.add(dirPath) + #logger.info(f"Size of dirs object: {asizeof.asizeof(dirs)}") counter =+ 1 logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}") return dirs diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index bc3305f48e..f5c9e96c56 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -14,7 +14,8 @@ import gzip import json import logging -from memory_profiler import profile +#from memory_profiler import profile +from pympler import asizeof from WMCore.Services.Service import Service from Utils.Utilities import decodeBytesToUnicode @@ -99,8 +100,8 @@ def getRSEStats(self): rseStats = self._getResult(uri, callname='stats') return rseStats - profileFp = open('getRSEUnmerged.log', 'w+') - @profile(stream=profileFp) + #profileFp = open('getRSEUnmerged.log', 'w+') + #@profile(stream=profileFp) def getRSEUnmerged(self, rseName, zipped=False): """ Gets the list of all unmerged files in an RSE @@ -122,6 +123,8 @@ def getRSEUnmerged(self, rseName, zipped=False): uri = "files?rse=%s&format=json" % rseName callname = '{}.json'.format(rseName) rseUnmerged = self._getResult(uri, callname=callname) + + self['logger'].info(f"Size of rseUnmerged object: {asizeof.asizeof(rseUnmerged)}") # now lazily return items for item in rseUnmerged: yield item From 2130a1d3cece204773dfc317dbc62e54c26ca14c Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 6 Sep 2024 15:09:13 -0400 Subject: [PATCH 3/4] use tracemalloc --- bin/testRucioConMonMem.py | 6 +++--- .../WMCore/Services/RucioConMon/RucioConMon.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py index 94659351dc..c321b7049f 100644 --- a/bin/testRucioConMonMem.py +++ b/bin/testRucioConMonMem.py @@ -1,7 +1,7 @@ import os import sys import logging -#from memory_profiler import profile +from memory_profiler import profile from pympler import asizeof from WMCore.Services.RucioConMon.RucioConMon import RucioConMon @@ -18,8 +18,8 @@ def loggerSetup(logLevel=logging.INFO): return logger -#profileFp = open('getUnmergedFiles.log', 'w+') -#@profile(stream=profileFp) +profileFp = open('getUnmergedFiles.log', 'w+') +@profile(stream=profileFp) def getUnmergedFiles(rucioConMon, logger, compressed=False): dirs = set() counter = 0 diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index f5c9e96c56..d4cccf9a97 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -14,6 +14,8 @@ import gzip import json import logging + +import tracemalloc #from memory_profiler import profile from pympler import asizeof @@ -41,6 +43,7 @@ def __init__(self, url, logger=None, configDict=None): configDict['logger'] = logger if logger else logging.getLogger() super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) + tracemalloc.start() def _getResult(self, uri, callname="", clearCache=False, args=None): """ @@ -81,14 +84,27 @@ def _getResultZipped(self, uri, callname="", clearCache=True): :param clearCache: parameter to control the cache behavior :return: yields a single record from the data retrieved """ + snapshot1 = tracemalloc.take_snapshot() cachedApi = callname if clearCache: self.clearCache(cachedApi) with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: + snapshot2 = tracemalloc.take_snapshot() for line in istream: line = decodeBytesToUnicode(line).replace("\n", "") yield line + snapshot3 = tracemalloc.take_snapshot() + + print("Memory stats between snapshot2 and snapshot1") + top_stats = snapshot2.compare_to(snapshot1, 'lineno') + for thisStat in top_stats[:5]: + print(thisStat) + + print("Memory stats between snapshot3 and snapshot2") + top_stats = snapshot3.compare_to(snapshot2, 'lineno') + for thisStat in top_stats[:5]: + print(thisStat) def getRSEStats(self): """ From 3ea5301e7191107e646c114f71207a4e8265c67e Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 6 Sep 2024 18:50:26 -0400 Subject: [PATCH 4/4] remove tracemalloc; measure with memory_profiler; use FNAL --- bin/testRucioConMonMem.py | 26 ++++++++++++------- .../Services/RucioConMon/RucioConMon.py | 15 ----------- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py index c321b7049f..7a7940d3d5 100644 --- a/bin/testRucioConMonMem.py +++ b/bin/testRucioConMonMem.py @@ -1,13 +1,16 @@ import os import sys import logging + from memory_profiler import profile from pympler import asizeof from WMCore.Services.RucioConMon.RucioConMon import RucioConMon -RSE_NAME = "T2_AT_Vienna" +RSE_NAME = "T1_US_FNAL_Disk" RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged" + + def loggerSetup(logLevel=logging.INFO): logger = logging.getLogger(__name__) outHandler = logging.StreamHandler(sys.stdout) @@ -18,8 +21,9 @@ def loggerSetup(logLevel=logging.INFO): return logger -profileFp = open('getUnmergedFiles.log', 'w+') -@profile(stream=profileFp) +#profileFp = open('getUnmergedFiles.log', 'w+') +#@profile(stream=profileFp) +@profile def getUnmergedFiles(rucioConMon, logger, compressed=False): dirs = set() counter = 0 @@ -28,7 +32,7 @@ def getUnmergedFiles(rucioConMon, logger, compressed=False): dirPath = _cutPath(lfn) dirs.add(dirPath) #logger.info(f"Size of dirs object: {asizeof.asizeof(dirs)}") - counter =+ 1 + counter += 1 logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}") return dirs @@ -53,14 +57,16 @@ def _cutPath(filePath): def main(): logger = loggerSetup() - zipped=False rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger) - logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") - getUnmergedFiles(rucioConMon, logger, compressed=zipped) + if False: + zipped=False + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) - zipped=True - logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") - getUnmergedFiles(rucioConMon, logger, compressed=zipped) + if True: + zipped=True + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) logger.info("Done!") diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index d4cccf9a97..a7df53f660 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -15,7 +15,6 @@ import json import logging -import tracemalloc #from memory_profiler import profile from pympler import asizeof @@ -43,7 +42,6 @@ def __init__(self, url, logger=None, configDict=None): configDict['logger'] = logger if logger else logging.getLogger() super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) - tracemalloc.start() def _getResult(self, uri, callname="", clearCache=False, args=None): """ @@ -84,27 +82,14 @@ def _getResultZipped(self, uri, callname="", clearCache=True): :param clearCache: parameter to control the cache behavior :return: yields a single record from the data retrieved """ - snapshot1 = tracemalloc.take_snapshot() cachedApi = callname if clearCache: self.clearCache(cachedApi) with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: - snapshot2 = tracemalloc.take_snapshot() for line in istream: line = decodeBytesToUnicode(line).replace("\n", "") yield line - snapshot3 = tracemalloc.take_snapshot() - - print("Memory stats between snapshot2 and snapshot1") - top_stats = snapshot2.compare_to(snapshot1, 'lineno') - for thisStat in top_stats[:5]: - print(thisStat) - - print("Memory stats between snapshot3 and snapshot2") - top_stats = snapshot3.compare_to(snapshot2, 'lineno') - for thisStat in top_stats[:5]: - print(thisStat) def getRSEStats(self): """