Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile RucioConMon memory #12089

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions bin/testRucioConMonMem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
import sys
import logging

from memory_profiler import profile
from pympler import asizeof
from WMCore.Services.RucioConMon.RucioConMon import RucioConMon

RSE_NAME = "T1_US_FNAL_Disk"
RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged"



def loggerSetup(logLevel=logging.INFO):
logger = logging.getLogger(__name__)
outHandler = logging.StreamHandler(sys.stdout)
outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s"))
outHandler.setLevel(logLevel)
logger.addHandler(outHandler)
logger.setLevel(logLevel)
return logger


#profileFp = open('getUnmergedFiles.log', 'w+')
#@profile(stream=profileFp)
@profile
def getUnmergedFiles(rucioConMon, logger, compressed=False):
dirs = set()
counter = 0
logger.info("Fetching data from Rucio ConMon for RSE: %s.", RSE_NAME)
for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed):
dirPath = _cutPath(lfn)
dirs.add(dirPath)
#logger.info(f"Size of dirs object: {asizeof.asizeof(dirs)}")
counter += 1
logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}")
return dirs


def _cutPath(filePath):
newPath = []
root = filePath
while True:
root, tail = os.path.split(root)
if tail:
newPath.append(tail)
else:
newPath.append(root)
break
newPath.reverse()
# Cut/slice the path to the level/element required.
newPath = newPath[:7]
# Build the path out of all that is found up to the deepest level in the LFN tree
finalPath = os.path.join(*newPath)
return finalPath


def main():
logger = loggerSetup()
rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger)
if False:
zipped=False
logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}")
getUnmergedFiles(rucioConMon, logger, compressed=zipped)

if True:
zipped=True
logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}")
getUnmergedFiles(rucioConMon, logger, compressed=zipped)
logger.info("Done!")


if __name__ == "__main__":
sys.exit(main())
61 changes: 32 additions & 29 deletions src/python/WMCore/Services/RucioConMon/RucioConMon.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import json
import logging

#from memory_profiler import profile
from pympler import asizeof

from WMCore.Services.Service import Service
from Utils.Utilities import decodeBytesToUnicode

Expand All @@ -40,14 +43,13 @@ def __init__(self, url, logger=None, configDict=None):
super(RucioConMon, self).__init__(configDict)
self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint'])

def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False):
def _getResult(self, uri, callname="", clearCache=False, args=None):
"""
Either fetch data from the cache file or query the data-service
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:param binary: specifies request for binary object from HTTP requests (e.g. zipped content)
:return: A dictionary
"""

Expand All @@ -68,31 +70,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False
if clearCache:
self.clearCache(cachedApi, args)
results = '{}' # explicitly define results which will be loaded by json.loads below
if binary:
with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream:
results = gzip.decompress(istream.read())
return results
else:
with self.refreshCache(cachedApi, apiUrl) as istream:
results = istream.read()

results = json.loads(results)
return results
with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream:
results = istream.read()
return json.loads(results)

def _getResultZipped(self, uri, callname="", clearCache=True, args=None):
def _getResultZipped(self, uri, callname="", clearCache=True):
"""
This method is retrieving a zipped file from the uri privided, instead
of the normal json
This method retrieves gzipped content, instead of the standard json format.
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:return: a list of LFNs
:return: yields a single record from the data retrieved
"""
data = self._getResult(uri, callname, clearCache, args, binary=True)
# convert bytes which we received upstream to string
data = decodeBytesToUnicode(data)
return [f for f in data.split('\n') if f]
cachedApi = callname
if clearCache:
self.clearCache(cachedApi)

with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream:
for line in istream:
line = decodeBytesToUnicode(line).replace("\n", "")
yield line

def getRSEStats(self):
"""
Expand All @@ -104,25 +101,31 @@ def getRSEStats(self):
rseStats = self._getResult(uri, callname='stats')
return rseStats

#profileFp = open('getRSEUnmerged.log', 'w+')
#@profile(stream=profileFp)
def getRSEUnmerged(self, rseName, zipped=False):
"""
Gets the list of all unmerged files in an RSE
:param rseName: The RSE whose list of unmerged files to be retrieved
:param zipped: If True the interface providing the zipped lists will be called
:return: A list of unmerged files for the RSE in question
:return: a generator of unmerged files for the RSE in question
"""
# NOTE: The default API provided by Rucio Consistency Monitor is in a form of a
# zipped file/stream. Currently we are using the newly provided json API
# But in in case we figure out the data is too big we may need to
# implement the method with the zipped API and use disc cache for
# reading/streaming from file. This will prevent any set arithmetic
# in the future.
if not zipped:
uri = "files?rse=%s&format=json" % rseName
rseUnmerged = self._getResult(uri, callname=rseName)
return rseUnmerged
else:
if zipped:
uri = "files?rse=%s&format=raw" % rseName
callname = '{}.zipped'.format(rseName)
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
return rseUnmerged
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
else:
uri = "files?rse=%s&format=json" % rseName
callname = '{}.json'.format(rseName)
rseUnmerged = self._getResult(uri, callname=callname)

self['logger'].info(f"Size of rseUnmerged object: {asizeof.asizeof(rseUnmerged)}")
# now lazily return items
for item in rseUnmerged:
yield item