Skip to content

Commit

Permalink
cont
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Feb 6, 2024
1 parent 4864ff3 commit a21f366
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
40 changes: 40 additions & 0 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.DataManagementSystem.private import FTS3Utilities
from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
Expand Down Expand Up @@ -106,6 +107,8 @@ def __readConf(self):
# lifetime of the proxy we download to delegate to FTS
self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME)

self.useTokens = self.am_getOption("UseTokens", False)

return S_OK()

def initialize(self):
Expand Down Expand Up @@ -138,6 +141,40 @@ def beginExecution(self):
self.dataOpSender = DataOperationSender()
return self.__readConf()

def getFTS3TokenContext(self, username, group, ftsServer, threadID):
log = gLogger.getSubLogger("getFTS3TokenContext")

contextes = self._globalContextCache.setdefault(threadID, DictCache())

idTuple = (ftsServer,)
log.debug(f"Getting context for {idTuple}")

# We need a context (and so a token) valid for at least 10mn
# such that FTS has time to refresh
if not contextes.exists(idTuple, 10 * 60):
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"fts"],
)
if not res["OK"]:
return res
expires_in = res["Value"]["expires_in"]
access_token = res["Value"]["access_token"]

# We generate the context

res = FTS3Job.generateContext(ftsServer, None, fts_access_token=access_token)

if not res["OK"]:
return res
context = res["Value"]

# we add it to the cache for this thread for 1h
contextes.add(idTuple, expires_in, context)

return S_OK(contextes.get(idTuple))

def getFTS3Context(self, username, group, ftsServer, threadID):
"""Returns an fts3 context for a given user, group and fts server
Expand All @@ -159,6 +196,9 @@ def getFTS3Context(self, username, group, ftsServer, threadID):
"""

if self.useTokens:
return self.getFTS3TokenContext(username, group, ftsServer, threadID)

log = gLogger.getSubLogger("getFTS3Context")

contextes = self._globalContextCache.setdefault(threadID, DictCache())
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/DataManagementSystem/Client/FTS3File.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class FTS3File(JSerializable):
"Started", # From FTS: File transfer has started
"Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop)
"Archiving", # From FTS: file not yet migrated to tape
"Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token
]

# These are the states that we consider final.
Expand Down
54 changes: 32 additions & 22 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FTS3Job(JSerializable):
to an FTS3Operation
"""

# alter table Jobs CHANGE `status` `status` enum('Submitted','Ready','Active','Finished','Canceled','Failed','Finisheddirty','Staging','Archiving', 'Token_prep') DEFAULT 'Submitted',
# `status` enum('Submitted','Ready','Active','Finished','Canceled','Failed','Finisheddirty','Staging','Archiving', 'Token_prep') DEFAULT 'Submitted',
# START states

# States from FTS doc https://fts3-docs.web.cern.ch/fts3-docs/docs/state_machine.html
Expand All @@ -44,7 +44,6 @@ class FTS3Job(JSerializable):
"Finisheddirty", # Some files Failed
"Staging", # One of the files within a job went to Staging state
"Archiving", # From FTS: one of the files within a job went to Archiving state
"Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token
]

FINAL_STATES = ["Canceled", "Failed", "Finished", "Finisheddirty"]
Expand Down Expand Up @@ -425,7 +424,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
log.debug(f"Not preparing transfer for file {ftsFile.lfn}")
continue

srcToken, dstToken = None
srcToken = None
dstToken = None

sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn]
stageURL = allStageURLs.get(ftsFile.lfn)
Expand Down Expand Up @@ -484,7 +484,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
trans_metadata["activity"] = self.activity

# Add tokens if both storages support it
if self.__seTokenSupport(hopSrcSEName) and self.__seTokenSupport(hopDstSEName):
if self.__seTokenSupport(hopSrcSEName, self.vo) and self.__seTokenSupport(hopDstSEName, self.vo):
res = srcSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
return res
Expand All @@ -496,7 +496,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
)
if not res["OK"]:
return res
srcToken = res["Value"]
srcToken = res["Value"]["access_token"]

res = dstSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
Expand All @@ -505,11 +505,11 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"storage.create:/{dstTokenPath}", "offline_access"],
scope=[f"storage.modify:/{dstTokenPath}", "offline_access"],
)
if not res["OK"]:
return res
dstToken = res["Value"]
dstToken = res["Value"]["access_token"]

# because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433)
# the checksum needs to be lowercase. It does not impact the other
Expand Down Expand Up @@ -551,7 +551,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
transfers=transfers,
overwrite=True,
source_spacetoken=source_spacetoken,
spacetoken=target_spacetoken,
destination_spacetoken=target_spacetoken,
bring_online=bring_online,
copy_pin_lifetime=copy_pin_lifetime,
retry=3,
Expand Down Expand Up @@ -791,7 +791,7 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc
return S_OK(fileIDsInTheJob)

@staticmethod
def generateContext(ftsServer, ucert, lifetime=25200):
def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200):
"""This method generates an fts3 context
:param ftsServer: address of the fts3 server
Expand All @@ -801,21 +801,31 @@ def generateContext(ftsServer, ucert, lifetime=25200):
:returns: an fts3 context
"""
if fts_access_token and ucert:
return S_ERROR("fts_access_token and ucert cannot be both set")

try:
context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False)
context = fts3.Context(
endpoint=ftsServer,
ucert=ucert,
request_class=ftsSSLRequest,
verify=False,
fts_access_token=fts_access_token,
)

# Explicitely delegate to be sure we have the lifetime we want
# Note: the delegation will re-happen only when the FTS server
# decides that there is not enough timeleft.
# At the moment, this is 1 hour, which effectively means that if you do
# not submit a job for more than 1h, you have no valid proxy in FTS servers
# anymore, and all the jobs failed. So we force it when
# one third of the lifetime will be left.
# Also, the proxy given as parameter might have less than "lifetime" left
# since it is cached, but it does not matter, because in the FTS3Agent
# we make sure that we renew it often enough
td_lifetime = datetime.timedelta(seconds=lifetime)
fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3)
if ucert:
# Explicitely delegate to be sure we have the lifetime we want
# Note: the delegation will re-happen only when the FTS server
# decides that there is not enough timeleft.
# At the moment, this is 1 hour, which effectively means that if you do
# not submit a job for more than 1h, you have no valid proxy in FTS servers
# anymore, and all the jobs failed. So we force it when
# one third of the lifetime will be left.
# Also, the proxy given as parameter might have less than "lifetime" left
# since it is cached, but it does not matter, because in the FTS3Agent
# we make sure that we renew it often enough
td_lifetime = datetime.timedelta(seconds=lifetime)
fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3)

return S_OK(context)
except FTS3ClientException as e:
Expand Down

0 comments on commit a21f366

Please sign in to comment.