Skip to content

Commit

Permalink
Merge pull request #129 from inab/full_circle
Browse files Browse the repository at this point in the history
Partial fix for issue #128
  • Loading branch information
jmfernandez authored Oct 17, 2024
2 parents 98257c2 + 7714606 commit 812c060
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 20 deletions.
53 changes: 50 additions & 3 deletions wfexs_backend/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ class WfExS_Commands(StrDocEnum):
"re-stage",
"Prepare a new staging (working) directory for workflow execution, repeating the fetch of dependencies and contents",
)
TryStage = (
"try-stage",
"Prepare a workflow in a new staging (working) directory for workflow execution, fetching dependencies but no input",
)
MountWorkDir = (
"mount-workdir",
"Mount the encrypted staging directory on secure staging scenarios",
Expand Down Expand Up @@ -342,7 +346,20 @@ def genParserSub(
help="Force secured working directory",
)

if preStageParams or exportParams or command == WfExS_Commands.ReStage:
if command == WfExS_Commands.TryStage:
ap_.add_argument(
"--workflow-uri",
dest="workflowURI",
required=True,
type=str,
help="URI of the workflow to be tried",
)

if (
preStageParams
or exportParams
or command in (WfExS_Commands.ReStage, WfExS_Commands.TryStage)
):
ap_.add_argument(
"-Z",
"--creds-config",
Expand All @@ -362,7 +379,7 @@ def genParserSub(

if (
preStageParams and command not in (WfExS_Commands.ConfigValidate,)
) or command == WfExS_Commands.ReStage:
) or command in (WfExS_Commands.ReStage, WfExS_Commands.TryStage):
ap_.add_argument(
"-n",
"--nickname-prefix",
Expand Down Expand Up @@ -1306,6 +1323,7 @@ def _get_wfexs_argparse_internal(
ap_ll = genParserSub(sp, WfExS_Commands.ListLicences)
ap_cv = genParserSub(sp, WfExS_Commands.ConfigValidate, preStageParams=True)

ap_try = genParserSub(sp, WfExS_Commands.TryStage)
ap_s = genParserSub(sp, WfExS_Commands.Stage, preStageParams=True)

ap_r_s = genParserSub(
Expand Down Expand Up @@ -1568,7 +1586,10 @@ def main() -> None:
file=sys.stderr,
)
sys.exit(1)
elif command != WfExS_Commands.Import and not args.workflowConfigFilename:
elif (
command not in (WfExS_Commands.Import, WfExS_Commands.TryStage)
and not args.workflowConfigFilename
):
print("[ERROR] Workflow config was not provided! Stopping.", file=sys.stderr)
sys.exit(1)
elif command == WfExS_Commands.ConfigValidate:
Expand All @@ -1587,6 +1608,12 @@ def main() -> None:
orcids=op_orcids,
paranoidMode=args.secure,
)
elif command == WfExS_Commands.TryStage:
wfInstance = wfBackend.tryWorkflowURI(
args.workflowURI,
args.securityContextsConfigFilename,
nickname_prefix=args.nickname_prefix,
)
elif command == WfExS_Commands.Import:
wfInstance = wfBackend.fromPreviousROCrate(
args.workflowROCrateFilenameOrURI,
Expand Down Expand Up @@ -1678,6 +1705,26 @@ def main() -> None:
or not isinstance(wfInstance.stageMarshalled, datetime.datetime)
else 0
)
elif command == WfExS_Commands.TryStage:
print(
"\t Instance {} (nickname '{}') (to be inspected later)".format(
wfSetup.instance_id, wfSetup.nickname
)
)
stagedSetup = wfInstance.tryStageWorkflow()
print(
"\t- Instance {} (nickname '{}') is {} ready".format(
wfSetup.instance_id,
wfSetup.nickname,
"NOT" if stagedSetup.is_damaged else "now",
)
)
sys.exit(
1
if stagedSetup.is_damaged
or not isinstance(wfInstance.stageMarshalled, datetime.datetime)
else 0
)

# Depending on the parameters, it might not exist
if getattr(args, "doMaterializedROCrate", None):
Expand Down
18 changes: 18 additions & 0 deletions wfexs_backend/utils/rocrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,24 @@ def extractWorkflowMetadata(
if langrow.workflow_alternate_name is not None:
repo_relpath = str(langrow.workflow_alternate_name)

# A fallback
if repo_relpath is None:
self.logger.warning(
f"Deriving relative path of workflow entry point from entry point location in RO-Crate metadata"
)
main_entity_uri = str(main_entity)
main_entity_parsed_uri = urllib.parse.urlparse(main_entity_uri)
use_main_entity = (
main_entity_parsed_uri.scheme == self.RELATIVE_ROCRATE_SCHEME
)

if use_main_entity:
entity_path = urllib.parse.unquote(main_entity_parsed_uri.path)
if entity_path.startswith("/"):
entity_path = entity_path[1:]

repo_relpath = entity_path

repo_web_url: "Optional[str]" = None
if langrow.workflow_url is not None:
repo_web_url = str(langrow.workflow_url)
Expand Down
47 changes: 34 additions & 13 deletions wfexs_backend/wfexs_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,19 @@ def getDefaultParanoidMode(self) -> "bool":
def enableDefaultParanoidMode(self) -> None:
self.defaultParanoidMode = True

def tryWorkflowURI(
self,
workflow_uri: "str",
securityContextsConfigFilename: "Optional[pathlib.Path]" = None,
nickname_prefix: "Optional[str]" = None,
) -> "WF":
return WF.TryWorkflowURI(
self,
workflow_uri,
securityContextsConfigFilename=securityContextsConfigFilename,
nickname_prefix=nickname_prefix,
)

def fromFiles(
self,
workflowMetaFilename: "pathlib.Path",
Expand Down Expand Up @@ -2122,10 +2135,12 @@ def cacheWorkflow(
web_url=guessedRepo.web_url,
)
else:
repoRelPath: "Optional[str]" = None
(
i_workflow,
cached_putative_path,
metadata_array,
repoRelPath,
) = self.getWorkflowBundleFromURI(
cast("URIType", workflow_id),
offline=offline,
Expand All @@ -2134,17 +2149,17 @@ def cacheWorkflow(

if i_workflow is None:
repoDir = cached_putative_path
repoRelPath: "Optional[str]" = None
if repoDir.is_dir():
if len(parsedRepoURL.fragment) > 0:
frag_qs = urllib.parse.parse_qs(parsedRepoURL.fragment)
subDirArr = frag_qs.get("subdirectory", [])
if len(subDirArr) > 0:
repoRelPath = subDirArr[0]
elif len(metadata_array) > 0:
# Let's try getting a pretty filename
# when the workflow is a single file
repoRelPath = metadata_array[0].preferredName
if not repoRelPath:
if repoDir.is_dir():
if len(parsedRepoURL.fragment) > 0:
frag_qs = urllib.parse.parse_qs(parsedRepoURL.fragment)
subDirArr = frag_qs.get("subdirectory", [])
if len(subDirArr) > 0:
repoRelPath = subDirArr[0]
elif len(metadata_array) > 0:
# Let's try getting a pretty filename
# when the workflow is a single file
repoRelPath = metadata_array[0].preferredName

# It can be either a relative path to a directory or to a file
# It could be even empty!
Expand Down Expand Up @@ -2418,6 +2433,7 @@ def getWorkflowRepoFromTRS(
i_workflow,
self.cacheROCrateFilename,
metadata_array,
_,
) = self.getWorkflowBundleFromURI(
roCrateURL,
expectedEngineDesc=self.RECOGNIZED_TRS_DESCRIPTORS[
Expand Down Expand Up @@ -2607,7 +2623,7 @@ def getWorkflowBundleFromURI(
offline: "bool" = False,
ignoreCache: "bool" = False,
registerInCache: "bool" = True,
) -> "Tuple[Optional[IdentifiedWorkflow], pathlib.Path, Sequence[URIWithMetadata]]":
) -> "Tuple[Optional[IdentifiedWorkflow], pathlib.Path, Sequence[URIWithMetadata], Optional[RelPath]]":
try:
cached_content = self.cacheFetch(
remote_url,
Expand Down Expand Up @@ -2649,16 +2665,21 @@ def getWorkflowBundleFromURI(
roCrateFile,
)

identified_workflow = self.getWorkflowRepoFromROCrateFile(
roCrateFile, expectedEngineDesc
)
return (
self.getWorkflowRepoFromROCrateFile(roCrateFile, expectedEngineDesc),
identified_workflow,
roCrateFile,
cached_content.metadata_array,
identified_workflow.remote_repo.rel_path,
)
else:
return (
None,
cached_content.path,
cached_content.metadata_array,
None,
)

def getWorkflowRepoFromROCrateFile(
Expand Down
61 changes: 57 additions & 4 deletions wfexs_backend/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,32 @@ def FromWorkDir(
fail_ok=fail_ok,
)

@classmethod
def TryWorkflowURI(
cls,
wfexs: "WfExSBackend",
workflow_uri: "str",
securityContextsConfigFilename: "Optional[pathlib.Path]" = None,
nickname_prefix: "Optional[str]" = None,
) -> "WF":
"""
This class method creates a new staged working directory
"""

workflow_meta = {
"workflow_id": workflow_uri,
"workflow_config": {"secure": False},
"params": {},
}

return cls.FromStagedRecipe(
wfexs,
workflow_meta,
securityContextsConfigFilename=securityContextsConfigFilename,
nickname_prefix=nickname_prefix,
reproducibility_level=ReproducibilityLevel.Minimal,
)

@classmethod
def FromFiles(
cls,
Expand Down Expand Up @@ -2052,15 +2078,15 @@ def fetchWorkflow(
)
self.logger.info(
"materialized workflow repository (checkout {}): {}".format(
self.repoEffectiveCheckout, self.workflowDir
self.repoEffectiveCheckout, localWorkflow.dir
)
)

if self.repoRelPath is not None:
if not (self.workflowDir / self.repoRelPath).exists():
if localWorkflow.relPath is not None:
if not (localWorkflow.dir / localWorkflow.relPath).exists():
raise WFException(
"Relative path {} cannot be found in materialized workflow repository {}".format(
self.repoRelPath, self.workflowDir
localWorkflow.relPath, localWorkflow.dir
)
)
# A valid engine must be identified from the fetched content
Expand Down Expand Up @@ -3574,6 +3600,33 @@ def fetchInputs(

return theInputs, lastInput, the_failed_uris

def tryStageWorkflow(
self, offline: "bool" = False, ignoreCache: "bool" = False
) -> "StagedSetup":
"""
This method is here to try materializing and identifying a workflow
"""

# Inputs should be materialized before materializing the workflow itself
# because some workflow systems could need them in order to describe
# some its internal details.
#
# But as we are trying to materialize a bare workflow, no input
# is going to be provided

# This method is called from within setupEngine
# self.fetchWorkflow(self.id, self.version_id, self.trs_endpoint, self.descriptor_type)
# This method is called from within materializeWorkflowAndContainers
# self.setupEngine(offline=offline)
self.materializeWorkflowAndContainers(
offline=offline,
ignoreCache=ignoreCache,
)

self.marshallStage()

return self.getStagedSetup()

def stageWorkDir(
self, offline: "bool" = False, ignoreCache: "bool" = False
) -> "StagedSetup":
Expand Down

0 comments on commit 812c060

Please sign in to comment.