Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] TransformationCleaningAgent: add Clean With RMS option #7223

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import errno
import os
import re
import time
from datetime import datetime, timedelta
from hashlib import md5

# # from DIRAC
from DIRAC import S_ERROR, S_OK
Expand All @@ -24,7 +26,11 @@
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
from DIRAC.Resources.Storage.StorageElement import StorageElement
Expand Down Expand Up @@ -74,6 +80,7 @@ def __init__(self, *args, **kwargs):
self.logSE = "LogSE"
# # enable/disable execution
self.enableFlag = "True"
self.cleanWithRMS = False

self.dataProcTTypes = ["MCSimulation", "Merge"]
self.dataManipTTypes = ["Replication", "Removal"]
Expand Down Expand Up @@ -113,6 +120,7 @@ def initialize(self):
self.logSE = Operations().getValue("/LogStorage/LogSE", self.logSE)
self.log.info(f"Will remove logs found on storage element: {self.logSE}")

self.cleanWithRMS = self.am_getOption("CleanWithRMS", self.cleanWithRMS)
# # transformation client
self.transClient = TransformationClient()
# # wms client
Expand Down Expand Up @@ -387,6 +395,11 @@ def cleanContent(self, directory):
# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
failed = {}
if self.cleanWithRMS:
Copy link
Contributor Author

@andresailer andresailer Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually this branch (i.e., cleanContent function) is never used for me, because everything is based on meta data to find files and not on directories

res = self.__submitRemovalRequests(filesFound, 0)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")
return res

for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)):
self.log.info("Removing chunk", chunkId)
res = DataManager().removeFile(filesChunk, force=True)
Expand Down Expand Up @@ -567,10 +580,13 @@ def cleanMetadataCatalogFiles(self, transID):
self.log.info("No files found for transID", transID)
return S_OK()

# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
res = DataManager().removeFile(fileToRemove, force=True)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")
if self.cleanWithRMS:
res = self.__submitRemovalRequests(fileToRemove, transID)
else:
# Executing with shifter proxy
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
res = DataManager().removeFile(fileToRemove, force=True)
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true")

if not res["OK"]:
return res
Expand Down Expand Up @@ -697,3 +713,51 @@ def __removeWMSTasks(self, transJobIDs):
return S_ERROR("Failed to remove all the request from RequestDB")
self.log.info("Successfully removed all the associated failover requests")
return S_OK()

def __submitRemovalRequests(self, lfns, transID=0):
"""Create removal requests for given lfns.

:param list lfns: list of lfns to be removed
:param int transID: transformationID, only used in RequestName
:returns: S_ERROR/S_OK
"""
for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
oRequest = Request()
requestName = "TCA_{transID}_{index}_{md5(repr(time.time())).hexdigest()[:5]}"
oRequest.RequestName = requestName
oOperation = Operation()
oOperation.Type = "RemoveFile"
oOperation.TargetSE = "All"
resMeta = self.metadataClient.getFileMetadata(lfnList)
if not resMeta["OK"]:
self.log.error("Cannot get file metadata", resMeta["Message"])
return resMeta
if resMeta["Value"]["Failed"]:
self.log.warning(
"Could not get the file metadata of the following, so skipping them:", resMeta["Value"]["Failed"]
)

for lfn, lfnInfo in resMeta["Value"]["Successful"].items():
rarFile = File()
rarFile.LFN = lfn
rarFile.ChecksumType = "ADLER32"
rarFile.Size = lfnInfo["Size"]
rarFile.Checksum = lfnInfo["Checksum"]
rarFile.GUID = lfnInfo["GUID"]
oOperation.addFile(rarFile)

oRequest.addOperation(oOperation)
isValid = RequestValidator().validate(oRequest)
if not isValid["OK"]:
self.log.error("Request is not valid:", isValid["Message"])
return isValid
result = self.reqClient.putRequest(oRequest)
if not result["OK"]:
self.log.error("Failed to submit Request: ", result["Message"])
return result
self.log.info(
"RemoveFiles request %d submitted for %d LFNs" % (result["Value"], len(resMeta["Value"]["Successful"]))
)

# after the for loop
return S_OK()
4 changes: 4 additions & 0 deletions src/DIRAC/TransformationSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ Agents
# using the transformation owner for cleanup
shifterProxy=

# If enabled, remove files by submitting requests to the RequestManagementSystem
# instead of during the agent run
CleanWithRMS=False

# Which transformation types to clean
# If not filled, transformation types are taken from
# Operations/Transformations/DataManipulation
Expand Down
Loading