Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] TransformationCleaningAgent: add Clean With RMS option #7239

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import errno
import os
import re
import time
from datetime import datetime, timedelta
from hashlib import md5

# # from DIRAC
from DIRAC import S_ERROR, S_OK
Expand All @@ -24,7 +26,11 @@
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
from DIRAC.Resources.Storage.StorageElement import StorageElement
Expand Down Expand Up @@ -368,7 +374,7 @@ def _addDirs(cls, transID, newDirs, existingDirs):
# These are the methods for performing the cleaning of catalogs and storage
#

def cleanContent(self, directory):
def cleanContent(self, directory, transID):
"""wipe out everything from catalog under folder :directory:

:param self: self reference
Expand All @@ -384,27 +390,7 @@ def cleanContent(self, directory):
return S_OK()
self.log.info("Attempting to remove possible remnants from the catalog and storage", f"(n={len(filesFound)})")

# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
failed = {}
for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)):
self.log.info("Removing chunk", chunkId)
res = DataManager().removeFile(filesChunk, force=True)
if not res["OK"]:
failed.update(dict.fromkeys(filesChunk, res["Message"]))
failed.update(res["Value"]["Failed"])
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")

realFailure = False
for lfn, reason in failed.items():
if "File does not exist" in str(reason):
self.log.warn(f"File {lfn} not found in some catalog: ")
else:
self.log.error("Failed to remove file found in the catalog", f"{lfn} {reason}")
realFailure = True
if realFailure:
return S_ERROR("Failed to remove some files found in the catalog")
return S_OK()
return self.__submitRemovalRequests(filesFound, transID)

def __getCatalogDirectoryContents(self, directories):
"""get catalog contents under paths :directories:
Expand Down Expand Up @@ -468,7 +454,7 @@ def removeTransformationOutput(self, transID):
directories = res["Value"]
for directory in directories:
if not re.search("/LOG/", directory):
res = self.cleanContent(directory)
res = self.cleanContent(directory, transID)
if not res["OK"]:
return res

Expand Down Expand Up @@ -537,7 +523,7 @@ def cleanTransformation(self, transID):
res = self.cleanTransformationLogFiles(directory)
if not res["OK"]:
return res
res = self.cleanContent(directory)
res = self.cleanContent(directory, transID)
if not res["OK"]:
return res

Expand Down Expand Up @@ -567,19 +553,7 @@ def cleanMetadataCatalogFiles(self, transID):
self.log.info("No files found for transID", transID)
return S_OK()

# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
res = DataManager().removeFile(fileToRemove, force=True)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")

if not res["OK"]:
return res
for lfn, reason in res["Value"]["Failed"].items():
self.log.error("Failed to remove file found in metadata catalog", f"{lfn} {reason}")
if res["Value"]["Failed"]:
return S_ERROR("Failed to remove all files found in the metadata catalog")
self.log.info("Successfully removed all files found in the DFC")
return S_OK()
return self.__submitRemovalRequests(fileToRemove, transID)

#############################################################################
#
Expand Down Expand Up @@ -697,3 +671,51 @@ def __removeWMSTasks(self, transJobIDs):
return S_ERROR("Failed to remove all the request from RequestDB")
self.log.info("Successfully removed all the associated failover requests")
return S_OK()

def __submitRemovalRequests(self, lfns, transID=0):
"""Create removal requests for given lfns.

:param list lfns: list of lfns to be removed
:param int transID: transformationID, only used in RequestName
:returns: S_ERROR/S_OK
"""
for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
oRequest = Request()
requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}"
oRequest.RequestName = requestName
oOperation = Operation()
oOperation.Type = "RemoveFile"
oOperation.TargetSE = "All"
resMeta = self.metadataClient.getFileMetadata(lfnList)
if not resMeta["OK"]:
self.log.error("Cannot get file metadata", resMeta["Message"])
return resMeta
if resMeta["Value"]["Failed"]:
self.log.warning(
"Could not get the file metadata of the following, so skipping them:", resMeta["Value"]["Failed"]
)

for lfn, lfnInfo in resMeta["Value"]["Successful"].items():
rarFile = File()
rarFile.LFN = lfn
rarFile.ChecksumType = "ADLER32"
rarFile.Size = lfnInfo["Size"]
rarFile.Checksum = lfnInfo["Checksum"]
rarFile.GUID = lfnInfo["GUID"]
oOperation.addFile(rarFile)

oRequest.addOperation(oOperation)
isValid = RequestValidator().validate(oRequest)
if not isValid["OK"]:
self.log.error("Request is not valid:", isValid["Message"])
return isValid
result = self.reqClient.putRequest(oRequest)
if not result["OK"]:
self.log.error("Failed to submit Request: ", result["Message"])
return result
self.log.info(
"RemoveFiles request %d submitted for %d LFNs" % (result["Value"], len(resMeta["Value"]["Successful"]))
)

# after the for loop
return S_OK()
Loading