Skip to content

Commit

Permalink
fix: renew a proxy when it is about to expire
Browse files Browse the repository at this point in the history
  • Loading branch information
martynia committed Oct 26, 2023
1 parent 27fef21 commit 7e094f9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 17 deletions.
71 changes: 57 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
from DIRAC.Core.Utilities.Proxy import executeWithoutServerCertificate
from DIRAC.Core.Utilities.Proxy import getProxy
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
Expand All @@ -34,7 +35,8 @@ class PilotLoggingAgent(AgentModule):
def __init__(self, *args, **kwargs):
"""c'tor"""
super().__init__(*args, **kwargs)
self.clearPilotsDelay = 30
self.clearPilotsDelay = 30 # in days
self.proxyTimeleftLimit = 600 # in seconds

def initialize(self):
"""
Expand All @@ -46,6 +48,8 @@ def initialize(self):
"""
# pilot logs lifetime in days
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay)
# proxy timeleft limit before we get a new one.
self.proxyTimeleftLimit = self.am_getOption("ProxyTimeleftLimit", self.proxyTimeleftLimit)
# configured VOs
res = getVOs()
if not res["OK"]:
Expand Down Expand Up @@ -76,24 +80,20 @@ def initialize(self):
continue

self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
# download a proxy and save a filename for future use:
# download a proxy and save a file name, userDN and proxyGroup for future use:
result = getDNForUsername(proxyUser)
if not result["OK"]:
self.log.error(f"Could not obtain a DN of user {proxyUser} for VO {vo}, skipped")
continue
userDNs = result["Value"] # a same user may have more than one DN
fd, filename = tempfile.mkstemp(prefix=vo + "__")
print("filename", filename)
os.close(fd)
vomsAttr = getVOMSAttributeForGroup(proxyGroup)
result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename)
userDNs = result["Value"] # the same user may have more than one DN

with tempfile.NamedTemporaryFile(prefix="gridpp" + "__", delete=False) as ntf:
result = self._downloadProxy(vo, userDNs, proxyGroup, ntf.name)

if not result["OK"]:
self.log.error(
f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped"
)
# no proxy, we have no other option than to skip the VO
continue
self.proxyDict[vo] = result["Value"]
self.proxyDict[vo] = {"proxy": result["Value"], "DN": userDNs, "group": proxyGroup}

return S_OK()

Expand All @@ -107,8 +107,13 @@ def execute(self):
voRes = {}
self.log.verbose(f"VOs configured for remote logging: {list(self.proxyDict.keys())}")
originalUserProxy = os.environ.get("X509_USER_PROXY")
for vo, proxy in self.proxyDict.items():
os.environ["X509_USER_PROXY"] = proxy
for vo, elem in self.proxyDict.items():
if self._isProxyExpired(elem["proxy"], self.proxyTimeleftLimit):
result = self._downloadProxy(vo, elem["DN"], elem["group"], elem["proxy"])
if not result["OK"]:
voRes[vo] = result["Message"]
continue
os.environ["X509_USER_PROXY"] = elem["proxy"]
res = self.executeForVO(vo)
if not res["OK"]:
voRes[vo] = res["Message"]
Expand Down Expand Up @@ -215,3 +220,41 @@ def clearOldPilotLogs(self, pilotLogPath):
os.remove(fullpath)
except Exception as excp:
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp)

def _downloadProxy(self, vo, userDNs, proxyGroup, filename):
"""
Fetch a new proxy and store it in a file filename.
:param str vo: VO to get a proxy for
:param list userDNs: user DN list
:param str proxyGroup: user group
:param str filename: file name to store a proxy
:return: Dirac S_OK or S_ERROR object
:rtype: dict
"""
vomsAttr = getVOMSAttributeForGroup(proxyGroup)
result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename)
if not result["OK"]:
self.log.error(f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped")
return S_ERROR(f"Could not download a proxy, {vo} skipped")
return result

def _isProxyExpired(self, proxyfile, limit):
"""
Check proxy timeleft. If less than a limit, return True.
:param str proxyfile:
:param int limit: timeleft threshold below which a proxy is considered expired.
:return: True or False
:rtype: bool
"""
result = getProxyInfo(proxyfile)
if not result["OK"]:
self.log.error(f"Could not get proxy info {result['Message']}")
return True
timeleft = result["Value"]["secondsLeft"]
self.log.debug(f"Proxy {proxyfile} time left: {timeleft}")
if timeleft < limit:
self.log.info(f"proxy {proxyfile} expired/is about to expire. Will fetch a new one")
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ def pla(mocker, plaBase):
@pytest.mark.parametrize(
"remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes",
[
([True, False], upDict, S_OK(["myDN"]), S_OK(), S_OK("proxyfilename"), {"gridpp": "proxyfilename"}, S_OK()),
(
[True, False],
upDict,
S_OK(["myDN"]),
S_OK(),
S_OK("proxyfilename"),
{"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}},
S_OK(),
),
([False, False], upDict, S_OK(["myDN"]), S_OK(), S_OK(), {}, S_OK()),
([True, False], upDict, S_ERROR("Could not obtain a DN"), S_OK(), S_OK(), {}, S_OK()),
([True, False], upDict, S_ERROR("Could not download proxy"), S_OK(), S_ERROR("Failure"), {}, S_OK()),
Expand Down Expand Up @@ -110,9 +118,9 @@ def test_initialize(plaBase, remoteLogging, options, getDN, getVOMS, getProxy, r
"proxyDict, execVORes, expectedResult",
[
({}, S_OK(), S_OK()),
({"gridpp": "gridpp_proxyfile"}, S_OK(), S_OK()),
({"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, S_OK(), S_OK()),
(
{"gridpp": "gridpp_proxyfile"},
{"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}},
S_ERROR("Execute for VO failed"),
S_ERROR("Agent cycle for some VO finished with errors"),
),
Expand All @@ -122,6 +130,8 @@ def test_execute(plaBase, proxyDict, execVORes, expectedResult):
"""Testing a thin version of execute (executeForVO is mocked)"""

plaBase.executeForVO = MagicMock()
plaBase._isProxyExpired = MagicMock()
plaBase._isProxyExpired.return_value = False
plaBase.proxyDict = proxyDict
plaBase.executeForVO.return_value = execVORes
res = plaBase.execute()
Expand Down

0 comments on commit 7e094f9

Please sign in to comment.