From 8630edf0633efd4b8eb2d28d13a00e9f8a509f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Tue, 9 Jul 2024 21:39:23 +0200 Subject: [PATCH 1/9] MaterializedInput needs a new field where to pass the metadata related to ContentWithMetadata inputs --- wfexs_backend/common.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/wfexs_backend/common.py b/wfexs_backend/common.py index 717b386..a6003bc 100644 --- a/wfexs_backend/common.py +++ b/wfexs_backend/common.py @@ -401,6 +401,32 @@ def _key_fixes(cls) -> "Mapping[str, str]": ] +if TYPE_CHECKING: + + class ContentWithURIsSetup(TypedDict): + """ + headerRows: Number of header rows + rowSep: Line separator + columnSep: Expression used to break a line into its column values + uriColumns: The 0-based numbers of the columns containing URIs to be fetched + """ + + headerRows: "int" + rowSep: "str" + columnSep: "str" + uriColumns: "Sequence[int]" + + +class ContentWithURIsDesc(NamedTuple): + """ + encodingFormat: the kind of content with URIs (currently implemented is text/csv => tabular) + setup: The dictionary describing the setup + """ + + encodingFormat: "str" + setup: "ContentWithURIsSetup" + + class MaterializedInput(NamedTuple): """ name: Name of the input @@ -413,6 +439,7 @@ class MaterializedInput(NamedTuple): secondaryInputs: "Optional[Sequence[MaterializedContent]]" = None autoFilled: "bool" = False implicit: "bool" = False + contentWithURIs: "Optional[ContentWithURIsDesc]" = None if TYPE_CHECKING: From f433a10092ede4a54e23c7ee4a0da3edc361a90d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Tue, 9 Jul 2024 21:39:55 +0200 Subject: [PATCH 2/9] Updated default cwltool version to the latest one --- wfexs_backend/workflow_engines/cwl_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wfexs_backend/workflow_engines/cwl_engine.py b/wfexs_backend/workflow_engines/cwl_engine.py index 478fb74..e16cfc6 100644 --- a/wfexs_backend/workflow_engines/cwl_engine.py +++ b/wfexs_backend/workflow_engines/cwl_engine.py @@ -152,7 +152,7 @@ class CWLWorkflowEngine(WorkflowEngine): DEVEL_CWLTOOL_REPO = CWLTOOL_REPO CWL_UTILS_REPO = CWL_REPO + CWL_UTILS_PYTHON_PACKAGE - DEFAULT_CWLTOOL_VERSION = cast("EngineVersion", "3.1.20240112164112") + DEFAULT_CWLTOOL_VERSION = cast("EngineVersion", "3.1.20240708091337") # DEVEL_CWLTOOL_PACKAGE = f"git+{CWLTOOL_REPO}.git" DEVEL_CWLTOOL_PACKAGE = f"git+{DEVEL_CWLTOOL_REPO}.git" From 0bed8d1d7fc8b56c29da6d41ce907d341db19a7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Tue, 9 Jul 2024 21:42:58 +0200 Subject: [PATCH 3/9] `wfexs_backend.workflow.WF._fetchContentWithURIs` now fills in the newly added `contentWithURIs` attribute located at MaterializedInput --- wfexs_backend/workflow.py | 40 +++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/wfexs_backend/workflow.py b/wfexs_backend/workflow.py index 3ccbe09..93cfa74 100644 --- a/wfexs_backend/workflow.py +++ b/wfexs_backend/workflow.py @@ -49,6 +49,7 @@ from .common import ( ContainerType, + ContentWithURIsDesc, CratableItem, DEFAULT_CONTAINER_TYPE, NoCratableItem, @@ -421,6 +422,11 @@ class WFWarning(UserWarning): pass +ContentWithURIsMIMEs = { + "tabular": "text/csv", +} + + class WF: """ Workflow enaction class @@ -2858,10 +2864,15 @@ def _fetchContentWithURIs( offline: "bool" = False, ignoreCache: "bool" = False, ) -> "Tuple[Sequence[MaterializedInput], int, Sequence[str]]": - tabconf = inputs.get("tabular") - if not isinstance(tabconf, dict): + # Current code for ContentWithURIs is only implemented for + # tabular contents + config_key = "tabular" + + tabconf = inputs.get(config_key) + encoding_format = ContentWithURIsMIMEs.get(config_key) + if not isinstance(tabconf, dict) or not isinstance(encoding_format, str): raise WFException( - f"Content with uris {linearKey} must have 'tabular' declaration" + f"Content with uris {linearKey} must have a declaration of these types: {', '.join(ContentWithURIsMIMEs.keys())}" ) t_newline: "str" = ( @@ -3035,9 +3046,13 @@ def _fetchContentWithURIs( t_secondary_remote_pair.licensed_uri.uri in these_secondary_uris ): - secondary_uri_mapping[ - t_secondary_remote_pair.licensed_uri.uri - ] = t_secondary_remote_pair.local + mapping_key = t_secondary_remote_pair.licensed_uri.uri + else: + mapping_key = cast("URIType", secondary_remote_file) + + secondary_uri_mapping[ + mapping_key + ] = t_secondary_remote_pair.local # Now, reopen each file to replace URLs by paths for i_remote_pair, remote_pair in enumerate(remote_pairs): @@ -3092,11 +3107,24 @@ def _fetchContentWithURIs( else: secondary_remote_pairs = None + # If more than one URI is provided, due some limitations more + # than one MaterializedInput instance is emitted associated to + # the very same linearKey. Each one of them will be represented + # as a collection in the generated Workflow Run RO-Crate theNewInputs.append( MaterializedInput( name=linearKey, values=remote_pairs, secondaryInputs=secondary_remote_pairs, + contentWithURIs=ContentWithURIsDesc( + encodingFormat=encoding_format, + setup={ + "headerRows": t_skiplines, + "rowSep": t_newline, + "columnSep": t_split, + "uriColumns": t_uri_cols, + }, + ), ) ) From 840ef5869b04b2d8c588763c05ae4a2f2e0b1b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Tue, 9 Jul 2024 21:46:54 +0200 Subject: [PATCH 4/9] Generated WRROC now includes the hints to properly represent content with URIs --- wfexs_backend/ro_crate.py | 85 ++++++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/wfexs_backend/ro_crate.py b/wfexs_backend/ro_crate.py index b8d0f6f..378604e 100644 --- a/wfexs_backend/ro_crate.py +++ b/wfexs_backend/ro_crate.py @@ -1015,23 +1015,28 @@ def _init_empty_crate_and_ComputerLanguage( RO_licences = self._process_licences(licences) # Add extra terms - # self.crate.metadata.extra_terms.update( - # { - # # "sha256": WORKFLOW_RUN_NAMESPACE + "sha256", - # # # Next ones are experimental - # # ContainerImageAdditionalType.Docker.value: WORKFLOW_RUN_NAMESPACE - # # + ContainerImageAdditionalType.Docker.value, - # # ContainerImageAdditionalType.Singularity.value: WORKFLOW_RUN_NAMESPACE - # # + ContainerImageAdditionalType.Singularity.value, - # # "containerImage": WORKFLOW_RUN_NAMESPACE + "containerImage", - # # "ContainerImage": WORKFLOW_RUN_NAMESPACE + "ContainerImage", - # # "registry": WORKFLOW_RUN_NAMESPACE + "registry", - # # "tag": WORKFLOW_RUN_NAMESPACE + "tag", - # "syntheticOutput": WFEXS_TERMS_NAMESPACE + "syntheticOutput", - # "globPattern": WFEXS_TERMS_NAMESPACE + "globPattern", - # "filledFrom": WFEXS_TERMS_NAMESPACE + "filledFrom", - # } - # ) + self.crate.metadata.extra_terms.update( + { + # "sha256": WORKFLOW_RUN_NAMESPACE + "sha256", + # # Next ones are experimental + # ContainerImageAdditionalType.Docker.value: WORKFLOW_RUN_NAMESPACE + # + ContainerImageAdditionalType.Docker.value, + # ContainerImageAdditionalType.Singularity.value: WORKFLOW_RUN_NAMESPACE + # + ContainerImageAdditionalType.Singularity.value, + # "containerImage": WORKFLOW_RUN_NAMESPACE + "containerImage", + # "ContainerImage": WORKFLOW_RUN_NAMESPACE + "ContainerImage", + # "registry": WORKFLOW_RUN_NAMESPACE + "registry", + # "tag": WORKFLOW_RUN_NAMESPACE + "tag", + # "syntheticOutput": WFEXS_TERMS_NAMESPACE + "syntheticOutput", + # "globPattern": WFEXS_TERMS_NAMESPACE + "globPattern", + # "filledFrom": WFEXS_TERMS_NAMESPACE + "filledFrom", + "contentWithURIs": WFEXS_TERMS_NAMESPACE + "contentWithURIs", + "headerRows": WFEXS_TERMS_NAMESPACE + "headerRows", + "rowSep": WFEXS_TERMS_NAMESPACE + "rowSep", + "columnSep": WFEXS_TERMS_NAMESPACE + "columnSep", + "uriColumns": WFEXS_TERMS_NAMESPACE + "uriColumns", + } + ) self.crate.metadata.extra_contexts.append(WORKFLOW_RUN_CONTEXT) self.crate.metadata.extra_contexts.append(WFEXS_TERMS_CONTEXT) @@ -1442,6 +1447,7 @@ def addWorkflowInputs( itemInValue0 = in_item.values[0] additional_type: "Optional[str]" = None + is_content_with_uris = False # A bool is an instance of int in Python if isinstance(itemInValue0, bool): additional_type = "Boolean" @@ -1459,6 +1465,9 @@ def addWorkflowInputs( ContentKind.ContentWithURIs, ): additional_type = "File" + is_content_with_uris = ( + itemInValue0.kind == ContentKind.ContentWithURIs + ) elif itemInValue0.kind == ContentKind.Directory: additional_type = "Dataset" @@ -1482,6 +1491,24 @@ def addWorkflowInputs( # This one must be a real boolean, as of schema.org formal_parameter["valueRequired"] = value_required + if is_content_with_uris: + assert in_item.contentWithURIs is not None + + formal_parameter["contentWithURIs"] = True + formal_parameter[ + "encodingFormat" + ] = in_item.contentWithURIs.encodingFormat + formal_parameter["headerRows"] = in_item.contentWithURIs.setup[ + "headerRows" + ] + formal_parameter["rowSep"] = in_item.contentWithURIs.setup["rowSep"] + formal_parameter["columnSep"] = in_item.contentWithURIs.setup[ + "columnSep" + ] + formal_parameter["uriColumns"] = in_item.contentWithURIs.setup[ + "uriColumns" + ] + item_signature = cast( "bytes", ComputeDigestFromObject( @@ -1734,9 +1761,27 @@ def addWorkflowInputs( secInputLocalSource = secInput.local # local source secInputURISource = secInput.licensed_uri.uri # uri source - secInputURILicences = ( - secInput.licensed_uri.licences - ) # licences + # Properly curate secondary input licences + secInputURILicences: "Optional[MutableSequence[LicenceDescription]]" = ( + None # licences + ) + + if secInput.licensed_uri.licences is not None: + secInputURILicences = [] + for licence in secInput.licensed_uri.licences: + sec_matched_licence: "Optional[LicenceDescription]" + if isinstance(licence, LicenceDescription): + sec_matched_licence = licence + else: + sec_matched_licence = ( + self.licence_matcher.matchLicence(licence) + ) + if sec_matched_licence is None: + failed_licences.append(licence) + + if sec_matched_licence is not None: + secInputURILicences.append(sec_matched_licence) + if os.path.isfile(secInputLocalSource): # This is needed to avoid including the input the_sec_signature: "Optional[Fingerprint]" = None From a8d8c7097061257a6592dbf1f0bf69d517b19850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Wed, 10 Jul 2024 22:54:59 +0200 Subject: [PATCH 5/9] Implemented identification of ContentWithURIs use case on RO-Crate import --- wfexs_backend/utils/rocrate.py | 274 +++++++++++++++++++++++++-------- 1 file changed, 210 insertions(+), 64 deletions(-) diff --git a/wfexs_backend/utils/rocrate.py b/wfexs_backend/utils/rocrate.py index 9dee279..ce2049f 100644 --- a/wfexs_backend/utils/rocrate.py +++ b/wfexs_backend/utils/rocrate.py @@ -202,6 +202,14 @@ class ROCratePayload(NamedTuple): CONTAINER_SIFIMAGE_SHORT: "Final[str]" = "SIFImage" +ContentWithURIsMIMEs = { + "tabular": "text/csv", +} + + +RevContentWithURIsMIMEs = dict((v, k) for k, v in ContentWithURIsMIMEs.items()) + + class ContainerImageAdditionalType(enum.Enum): Docker = WORKFLOW_RUN_NAMESPACE + CONTAINER_DOCKERIMAGE_SHORT Singularity = WORKFLOW_RUN_NAMESPACE + CONTAINER_SIFIMAGE_SHORT @@ -631,7 +639,7 @@ def identifyROCrate( { ?execution wrterm:containerImage ?source_container . } UNION { - ?entity s:softwareAddOn ?source_container. + ?entity s:softwareAddOn|s:softwareRequirements ?source_container. } ?source_container a wrterm:ContainerImage ; @@ -719,14 +727,20 @@ def identifyROCrate( # This compound query is much faster when each of the UNION components # is evaluated separately OBTAIN_WORKFLOW_INPUTS_SPARQL: "Final[str]" = """\ -SELECT ?input ?name ?inputfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_size ?file_sha256 +SELECT DISTINCT ?input ?name ?inputfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_size ?file_sha256 ?encoding_format ?content_with_uris ?header_rows ?row_sep ?column_sep ?uri_columns WHERE { ?main_entity bsworkflow:input ?inputfp . ?inputfp a bs:FormalParameter ; s:name ?name ; - s:additionalType ?additional_type ; - s:workExample ?input . + s:additionalType ?additional_type . + + { + ?input s:exampleOfWork ?inputfp . + } UNION { + ?inputfp s:workExample ?input . + } + { # A file, which is a schema.org MediaObject ?input @@ -760,7 +774,33 @@ def identifyROCrate( s:hasPart+ ?component . ?component a ?leaf_type . + FILTER NOT EXISTS { ?inputfp wfexsterm:contentWithURIs ?content_with_uris } BIND (?component AS ?fileid) + } UNION { + # A combination of files or directories or property values + ?input + a s:Collection ; + s:mainEntity ?extrapolated_input . + ?inputfp + s:encodingFormat ?encoding_format ; + wfexsterm:contentWithURIs ?content_with_uris ; + wfexsterm:headerRows ?header_rows ; + wfexsterm:rowSep ?row_sep ; + wfexsterm:columnSep ?column_sep . + # It is an all or nothing situation + {{ + SELECT ?inputfp (GROUP_CONCAT(?uri_column ; SEPARATOR=";") AS ?uri_columns) + WHERE { + ?inputfp wfexsterm:uriColumns ?uri_column . + } GROUP BY ?inputfp + }} + ?extrapolate_action + a s:CreateAction ; + s:object ?real_input ; + s:result ?extrapolated_input . + ?real_input a s:MediaObject . + + BIND (?real_input AS ?fileid) } OPTIONAL { ?fileid s:contentUrl ?fileuri . @@ -785,14 +825,20 @@ def identifyROCrate( # This compound query is much faster when each of the UNION components # is evaluated separately OBTAIN_WORKFLOW_ENV_SPARQL: "Final[str]" = """\ -SELECT ?env ?name ?name_env ?envfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size +SELECT DISTINCT ?env ?name ?name_env ?envfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size ?encoding_format ?content_with_uris ?header_rows ?row_sep ?column_sep ?uri_columns WHERE { ?main_entity wrterm:environment ?envfp . ?envfp a bs:FormalParameter ; s:name ?name ; - s:additionalType ?additional_type ; - s:workExample ?env . + s:additionalType ?additional_type . + + { + ?env s:exampleOfWork ?envfp . + } UNION { + ?envfp s:workExample ?env . + } + { # A file, which is a schema.org MediaObject ?env @@ -830,7 +876,33 @@ def identifyROCrate( s:hasPart+ ?component . ?component a ?leaf_type . + FILTER NOT EXISTS { ?envfp wfexsterm:contentWithURIs ?content_with_uris } BIND (?component AS ?fileid) + } UNION { + # A combination of files or directories or property values + ?env + a s:Collection ; + s:mainEntity ?extrapolated_input . + ?envfp + s:encodingFormat ?encoding_format ; + wfexsterm:contentWithURIs ?content_with_uris ; + wfexsterm:headerRows ?header_rows ; + wfexsterm:rowSep ?row_sep ; + wfexsterm:columnSep ?column_sep . + # It is an all or nothing situation + {{ + SELECT ?envfp (GROUP_CONCAT(?uri_column ; SEPARATOR=";") AS ?uri_columns) + WHERE { + ?envfp wfexsterm:uriColumns ?uri_column . + } GROUP BY ?envfp + }} + ?extrapolate_action + a s:CreateAction ; + s:object ?real_input ; + s:result ?extrapolated_input . + ?real_input a s:MediaObject . + + BIND (?real_input AS ?fileid) } OPTIONAL { ?fileid s:contentUrl ?fileuri . @@ -884,32 +956,33 @@ def identifyROCrate( # This compound query is much faster when each of the UNION components # is evaluated separately OBTAIN_EXECUTION_INPUTS_SPARQL: "Final[str]" = """\ -SELECT ?input ?name ?inputfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size +SELECT DISTINCT ?input ?name ?inputfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size ?encoding_format ?content_with_uris ?header_rows ?row_sep ?column_sep ?uri_columns WHERE { ?execution s:object ?input . + ?inputfp + a bs:FormalParameter ; + s:name ?name ; + s:additionalType ?additional_type . + + { + ?input s:exampleOfWork ?inputfp . + } UNION { + ?inputfp s:workExample ?input . + } + { # A file, which is a schema.org MediaObject BIND ( "File" AS ?additional_type ) ?input - a s:MediaObject ; - s:exampleOfWork ?inputfp . + a s:MediaObject . BIND (?input AS ?fileid) - ?inputfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . } UNION { # A directory, which is a schema.org Dataset BIND ( "Dataset" AS ?additional_type ) ?input - a s:Dataset ; - s:exampleOfWork ?inputfp . + a s:Dataset . BIND (?input AS ?fileid) - ?inputfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . - FILTER EXISTS { + FILTER EXISTS { # subquery to determine it is not an empty Dataset SELECT ?dircomp WHERE { @@ -923,28 +996,45 @@ def identifyROCrate( # A single property value, which can be either Integer, Text, Boolean or Float VALUES (?additional_type) { ( "Integer" ) ( "Text" ) ( "Boolean" ) ( "Float" ) } ?input - a s:PropertyValue ; - s:exampleOfWork ?inputfp . + a s:PropertyValue . BIND (?input AS ?fileid) - ?inputfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . } UNION { # A combination of files or directories or property values VALUES (?additional_type) { ( "Integer" ) ( "Text" ) ( "Boolean" ) ( "Float" ) ( "Collection" ) } VALUES ( ?leaf_type ) { ( s:PropertyValue ) ( s:MediaObject ) ( s:Dataset ) } ?input a s:Collection ; - s:exampleOfWork ?inputfp ; s:hasPart+ ?component . - ?inputfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . ?component a ?leaf_type . + FILTER NOT EXISTS { ?inputfp wfexsterm:contentWithURIs ?content_with_uris } BIND (?component AS ?fileid) + } UNION { + # A file which points to other remote files or directories + BIND ( "File" AS ?additional_type ) + ?input + a s:Collection ; + s:mainEntity ?extrapolated_input . + ?inputfp + s:encodingFormat ?encoding_format ; + wfexsterm:contentWithURIs ?content_with_uris ; + wfexsterm:headerRows ?header_rows ; + wfexsterm:rowSep ?row_sep ; + wfexsterm:columnSep ?column_sep . + # It is an all or nothing situation + {{ + SELECT ?inputfp (GROUP_CONCAT(?uri_column ; SEPARATOR=";") AS ?uri_columns) + WHERE { + ?inputfp wfexsterm:uriColumns ?uri_column . + } GROUP BY ?inputfp + }} + ?extrapolate_action + a s:CreateAction ; + s:object ?real_input ; + s:result ?extrapolated_input . + ?real_input a s:MediaObject . + + BIND (?real_input AS ?fileid) } OPTIONAL { ?fileid s:contentUrl ?fileuri . @@ -969,33 +1059,34 @@ def identifyROCrate( # This compound query is much faster when each of the UNION components # is evaluated separately OBTAIN_EXECUTION_ENV_SPARQL: "Final[str]" = """\ -SELECT ?env ?name ?name_env ?envfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size +SELECT DISTINCT ?env ?name ?name_env ?envfp ?additional_type ?fileuri ?filepid ?value ?component ?leaf_type ?fileid ?file_sha256 ?file_size ?encoding_format ?content_with_uris ?header_rows ?row_sep ?column_sep ?uri_columns WHERE { ?execution wrterm:environment ?env . + ?envfp + a bs:FormalParameter ; + s:name ?name ; + s:additionalType ?additional_type . + + { + ?env s:exampleOfWork ?envfp . + } UNION { + ?envfp s:workExample ?env . + } + { # A file, which is a schema.org MediaObject BIND ( "File" AS ?additional_type ) ?env a s:MediaObject ; - s:name ?name_env ; - s:exampleOfWork ?envfp . + s:name ?name_env . BIND (?env AS ?fileid) - ?envfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . } UNION { # A directory, which is a schema.org Dataset BIND ( "Dataset" AS ?additional_type ) ?env a s:Dataset ; - s:name ?name_env ; - s:exampleOfWork ?envfp . + s:name ?name_env . BIND (?env AS ?fileid) - ?envfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . FILTER EXISTS { # subquery to determine it is not an empty Dataset SELECT ?dircomp @@ -1011,13 +1102,8 @@ def identifyROCrate( VALUES (?additional_type) { ( "Integer" ) ( "Text" ) ( "Boolean" ) ( "Float" ) } ?env a s:PropertyValue ; - s:name ?name_env ; - s:exampleOfWork ?envfp . + s:name ?name_env . BIND (?env AS ?fileid) - ?envfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . } UNION { # A combination of files or directories or property values VALUES (?additional_type) { ( "Integer" ) ( "Text" ) ( "Boolean" ) ( "Float" ) ( "Collection" ) } @@ -1025,15 +1111,37 @@ def identifyROCrate( ?env a s:Collection ; s:name ?name_env ; - s:exampleOfWork ?envfp ; s:hasPart+ ?component . - ?envfp - a bs:FormalParameter ; - s:name ?name ; - s:additionalType ?additional_type . ?component a ?leaf_type . + FILTER NOT EXISTS { ?envfp wfexsterm:contentWithURIs ?content_with_uris } BIND (?component AS ?fileid) + } UNION { + # A file which points to other remote files or directories + BIND ( "File" AS ?additional_type ) + ?env + a s:Collection ; + s:mainEntity ?extrapolated_input . + ?envfp + s:encodingFormat ?encoding_format ; + wfexsterm:contentWithURIs ?content_with_uris ; + wfexsterm:headerRows ?header_rows ; + wfexsterm:rowSep ?row_sep ; + wfexsterm:columnSep ?column_sep . + # It is an all or nothing situation + {{ + SELECT ?envfp (GROUP_CONCAT(?uri_column ; SEPARATOR=";") AS ?uri_columns) + WHERE { + ?envfp wfexsterm:uriColumns ?uri_column . + } GROUP BY ?envfp + }} + ?extrapolate_action + a s:CreateAction ; + s:object ?real_input ; + s:result ?extrapolated_input . + ?real_input a s:MediaObject . + + BIND (?real_input AS ?fileid) } OPTIONAL { ?fileid s:contentUrl ?fileuri . @@ -1743,7 +1851,6 @@ def __parseInputsResults( public_name: "str", payload_dir: "Optional[pathlib.Path]" = None, ) -> "Tuple[ParamsBlock, Optional[Sequence[MaterializedInput]]]": - # TODO: implement this params: "MutableParamsBlock" = {} cached_inputs_hash: "MutableMapping[str, MaterializedInput]" = {} for inputrow in qinputsres: @@ -1779,12 +1886,20 @@ def __parseInputsResults( valarr = base.setdefault(param_last, []) # Is it a file or a directory? + # Or even better, a ContentWithURIs if additional_type in ("File", "Dataset"): - kindobj = ( - ContentKind.Directory - if additional_type == "Dataset" - else ContentKind.File - ) + self.logger.error(f"BINGO!! {inputrow}") + if ( + isinstance(inputrow.content_with_uris, rdflib.term.Literal) + and bool(inputrow.content_with_uris.value) + and str(inputrow.encoding_format) in RevContentWithURIsMIMEs + ): + kindobj = ContentKind.ContentWithURIs + elif additional_type == "Dataset": + kindobj = ContentKind.Directory + else: + kindobj = ContentKind.File + valobj = base.setdefault( param_last, { @@ -1792,6 +1907,29 @@ def __parseInputsResults( }, ) + # Time to transfer the additional properties + if kindobj == ContentKind.ContentWithURIs: + assert isinstance(inputrow.header_rows, rdflib.term.Literal) + assert isinstance(inputrow.row_sep, rdflib.term.Literal) + assert isinstance(inputrow.column_sep, rdflib.term.Literal) + assert isinstance(inputrow.uri_columns, rdflib.term.Literal) + + encoding_format_key = RevContentWithURIsMIMEs[ + str(inputrow.encoding_format) + ] + base[param_last].update( + { + encoding_format_key: { + "header-rows": inputrow.header_rows.value, + "row-sep": inputrow.row_sep.value, + "column-sep": inputrow.column_sep.value, + "uri-columns": list( + map(int, inputrow.uri_columns.value.split(";")) + ), + } + } + ) + if isinstance(valobj, dict): the_uri: "str" if inputrow.fileuri is not None: @@ -2504,6 +2642,7 @@ def generateWorkflowMetaFromJSONLD( params: "ParamsBlock" = {} environment: "EnvironmentBlock" = {} outputs: "OutputsBlock" = {} + num_execs: "int" = 0 if retrospective_first: # For the retrospective provenance at least an execution must # be described in the RO-Crate. Once one is chosen, @@ -2524,6 +2663,7 @@ def generateWorkflowMetaFromJSONLD( "mainentity": matched_crate.mainentity, }, ) + num_execs = len(qexecsres) for execrow in qexecsres: assert isinstance( execrow, rdflib.query.ResultRow @@ -2575,8 +2715,10 @@ def generateWorkflowMetaFromJSONLD( public_name=public_name, ) - # Now, let's get the list of input parameters - break + # TODO: deal with more than one execution + # Now, let's check the list of captured input parameters + if len(params) == 0: + break except Exception as e: raise ROCrateToolboxException( f"Unable to perform JSON-LD workflow execution details query over {public_name} (see cascading exceptions)" @@ -2584,6 +2726,10 @@ def generateWorkflowMetaFromJSONLD( # Following the prospective path if len(params) == 0: + if retrospective_first and num_execs > 0: + self.logger.warning( + f"No params found associated to {num_execs} executions" + ) contresult = self._parseContainersFromWorkflow( g, main_entity=matched_crate.mainentity, From a60c2a3c59eb96e3f7f95de915a3123312974ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Wed, 10 Jul 2024 23:06:47 +0200 Subject: [PATCH 6/9] Wired up new parameter to control whether to ignore retrospective provenance when an RO-Crate is being imported --- wfexs_backend/__main__.py | 15 +++++++++++++++ wfexs_backend/wfexs_backend.py | 2 ++ wfexs_backend/workflow.py | 8 +++----- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/wfexs_backend/__main__.py b/wfexs_backend/__main__.py index 219d29b..1ee9d58 100644 --- a/wfexs_backend/__main__.py +++ b/wfexs_backend/__main__.py @@ -256,6 +256,20 @@ def genParserSub( help="Workflow Run RO-Crate describing a previous workflow execution. It can be either a local path or an URI resolvable from WfExS with no authentication", ) + ap_.add_argument( + "--ignore-retrospective-provenance", + dest="retrospective_first", + action="store_false", + default=True, + help="Retrospective provenance is ignored", + ) + ap_.add_argument( + "--prefer-retrospective-provenance", + dest="retrospective_first", + action="store_true", + help="Retrospective provenance is first inspected", + ) + not_restage = command not in (WfExS_Commands.Import, WfExS_Commands.ReStage) ap_.add_argument( "-W", @@ -1483,6 +1497,7 @@ def main() -> None: secure=args.secure, reproducibility_level=ReproducibilityLevel(args.reproducibility_level), strict_reproducibility_level=args.strict_reproducibility_level, + retrospective_first=args.retrospective_first, ) else: print( diff --git a/wfexs_backend/wfexs_backend.py b/wfexs_backend/wfexs_backend.py index 8db0ba8..6f2237c 100644 --- a/wfexs_backend/wfexs_backend.py +++ b/wfexs_backend/wfexs_backend.py @@ -1397,6 +1397,7 @@ def fromPreviousROCrate( paranoidMode: "bool" = False, reproducibility_level: "ReproducibilityLevel" = ReproducibilityLevel.Metadata, strict_reproducibility_level: "bool" = False, + retrospective_first: "bool" = True, ) -> "WF": # Let's check whether it is a local file # or a remote RO-Crate @@ -1430,6 +1431,7 @@ def fromPreviousROCrate( paranoidMode=paranoidMode, reproducibility_level=reproducibility_level, strict_reproducibility_level=strict_reproducibility_level, + retrospective_first=retrospective_first, ) def parseAndValidateSecurityContextFile( diff --git a/wfexs_backend/workflow.py b/wfexs_backend/workflow.py index 93cfa74..1ec31cc 100644 --- a/wfexs_backend/workflow.py +++ b/wfexs_backend/workflow.py @@ -249,6 +249,7 @@ WorkflowRunROCrate, ) from .utils.rocrate import ( + ContentWithURIsMIMEs, ReadROCrateMetadata, ReproducibilityLevel, ) @@ -422,11 +423,6 @@ class WFWarning(UserWarning): pass -ContentWithURIsMIMEs = { - "tabular": "text/csv", -} - - class WF: """ Workflow enaction class @@ -1614,6 +1610,7 @@ def FromPreviousROCrate( paranoidMode: "bool" = False, reproducibility_level: "ReproducibilityLevel" = ReproducibilityLevel.Metadata, strict_reproducibility_level: "bool" = False, + retrospective_first: "bool" = True, ) -> "WF": """ This class method creates a new staged working directory @@ -1641,6 +1638,7 @@ def FromPreviousROCrate( public_name, reproducibility_level=reproducibility_level, strict_reproducibility_level=strict_reproducibility_level, + retrospective_first=retrospective_first, payload_dir=payload_dir, ) From 699e9fc6faede0f43d32bfa33c766730a8cbb10c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Wed, 10 Jul 2024 23:09:20 +0200 Subject: [PATCH 7/9] Removed debug trace --- wfexs_backend/utils/rocrate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/wfexs_backend/utils/rocrate.py b/wfexs_backend/utils/rocrate.py index ce2049f..698ffe5 100644 --- a/wfexs_backend/utils/rocrate.py +++ b/wfexs_backend/utils/rocrate.py @@ -1888,7 +1888,6 @@ def __parseInputsResults( # Is it a file or a directory? # Or even better, a ContentWithURIs if additional_type in ("File", "Dataset"): - self.logger.error(f"BINGO!! {inputrow}") if ( isinstance(inputrow.content_with_uris, rdflib.term.Literal) and bool(inputrow.content_with_uris.value) From 3a22d0eb48d501a9476da3b494e48f97dd0962d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Thu, 11 Jul 2024 04:20:13 +0200 Subject: [PATCH 8/9] Cached container file naming methods redesigned (and the related machinery) Fixed issue introduced in 1.0.0a0 , where injected original container tags could override the ones extracted from the workflow. Original container tags are tied to the way the filenames used for cached SIF containers. --- tests/containers/util.py | 8 +- wfexs_backend/container_factories/__init__.py | 210 ++++++++++-------- .../container_factories/docker_container.py | 85 +++---- .../container_factories/podman_container.py | 88 ++++---- .../singularity_container.py | 90 ++++---- wfexs_backend/workflow_engines/__init__.py | 2 +- wfexs_backend/workflow_engines/cwl_engine.py | 16 +- .../workflow_engines/nextflow_engine.py | 4 +- 8 files changed, 280 insertions(+), 223 deletions(-) diff --git a/tests/containers/util.py b/tests/containers/util.py index ca3968b..e72208b 100644 --- a/tests/containers/util.py +++ b/tests/containers/util.py @@ -22,13 +22,17 @@ ) if TYPE_CHECKING: + from typing import ( + Sequence, + ) + from wfexs_backend.common import ( RelPath, URIType, ) -def simpleTestContainerFileName(imageUrl: "URIType") -> "RelPath": +def simpleTestContainerFileName(imageUrl: "URIType") -> "Sequence[RelPath]": """ This method was borrowed from https://github.com/nextflow-io/nextflow/blob/539a22b68c114c94eaf4a88ea8d26b7bfe2d0c39/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L80 @@ -46,4 +50,4 @@ def simpleTestContainerFileName(imageUrl: "URIType") -> "RelPath": name = name.replace(":", "-").replace("/", "-") - return cast("RelPath", name + extension) + return [cast("RelPath", name + extension)] diff --git a/wfexs_backend/container_factories/__init__.py b/wfexs_backend/container_factories/__init__.py index 30117a8..4987c58 100644 --- a/wfexs_backend/container_factories/__init__.py +++ b/wfexs_backend/container_factories/__init__.py @@ -18,7 +18,7 @@ from __future__ import absolute_import import copy -from dataclasses import dataclass +import dataclasses import json import os import pathlib @@ -80,7 +80,7 @@ # As each workflow engine can have its own naming convention, leave them to # provide it - ContainerFileNamingMethod: TypeAlias = Callable[[URIType], RelPath] + ContainerFileNamingMethod: TypeAlias = Callable[[URIType], Sequence[RelPath]] ContainerLocalConfig: TypeAlias = Mapping[str, Any] @@ -114,7 +114,7 @@ class AbstractImageManifestMetadata(TypedDict): DEFAULT_DOCKER_REGISTRY: "Final[str]" = "docker.io" -@dataclass +@dataclasses.dataclass class Container(ContainerTaggedName): """ origTaggedName: Symbolic name or identifier of the container @@ -322,15 +322,10 @@ def _genTmpContainerPath(self) -> "pathlib.Path": def _genContainerPaths( self, container: "ContainerTaggedName" - ) -> "Tuple[pathlib.Path, pathlib.Path]": - containerFilename = self.simpleFileNameMethod( - cast("URIType", container.origTaggedName) + ) -> "Sequence[Tuple[pathlib.Path, pathlib.Path]]": + return self.genStagedContainersDirPaths( + container, self.engineContainersSymlinkDir ) - containerFilenameMeta = containerFilename + META_JSON_POSTFIX - localContainerPath = self.engineContainersSymlinkDir / containerFilename - localContainerPathMeta = self.engineContainersSymlinkDir / containerFilenameMeta - - return localContainerPath, localContainerPathMeta def _computeFingerprint(self, image_path: "pathlib.Path") -> "Fingerprint": return cast("Fingerprint", ComputeDigestFromFile(image_path.as_posix())) @@ -349,89 +344,115 @@ def _computeCanonicalImagePath( def query( self, container: "ContainerTaggedName" - ) -> "Tuple[bool, pathlib.Path, pathlib.Path, Optional[Fingerprint]]": + ) -> "Tuple[bool, Sequence[Tuple[pathlib.Path, pathlib.Path]], Optional[Fingerprint]]": """ This method checks whether the container snapshot and its metadata are in the caching directory """ - localContainerPath, localContainerPathMeta = self._genContainerPaths(container) - - trusted_copy = False - imageSignature: "Optional[Fingerprint]" = None - if localContainerPath.is_file(): - if localContainerPath.is_symlink(): - # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols - # Path.readlink was added in Python 3.9 - unlinkedContainerPath = pathlib.Path(os.readlink(localContainerPath)) - fsImageSignature = unlinkedContainerPath.name - imageSignature = cast( - "Fingerprint", - fsImageSignature.replace("~", "=") - .replace("-", "/") - .replace("_", "+"), - ) - - # Do not trust paths outside the caching directory - canonicalContainerPath = self.containersCacheDir / fsImageSignature - - trusted_copy = localContainerPath.resolve().samefile( - canonicalContainerPath.resolve() - ) - else: - ( - canonicalContainerPath, - imageSignature, - ) = self._computeCanonicalImagePath(localContainerPath) - - if localContainerPath.samefile(canonicalContainerPath): - trusted_copy = True - elif canonicalContainerPath.is_file(): - canonicalImageSignature = self._computeFingerprint( - canonicalContainerPath + possible_container_paths = self._genContainerPaths(container) + assert len(possible_container_paths) > 0 + + all_imageSignature: "Optional[Fingerprint]" = None + all_trusted_copies = True + for localContainerPath, localContainerPathMeta in possible_container_paths: + trusted_copy = False + imageSignature: "Optional[Fingerprint]" = None + if localContainerPath.is_file(): + if localContainerPath.is_symlink(): + # Some filesystems complain when filenames contain 'equal', 'slash' or 'plus' symbols + # Path.readlink was added in Python 3.9 + unlinkedContainerPath = pathlib.Path( + os.readlink(localContainerPath) + ) + fsImageSignature = unlinkedContainerPath.name + imageSignature = cast( + "Fingerprint", + fsImageSignature.replace("~", "=") + .replace("-", "/") + .replace("_", "+"), ) - trusted_copy = canonicalImageSignature == imageSignature + # Do not trust paths outside the caching directory + canonicalContainerPath = self.containersCacheDir / fsImageSignature - if trusted_copy: - if localContainerPathMeta.is_file(): - try: - with localContainerPathMeta.open(mode="r", encoding="utf-8") as mH: - signaturesAndManifest = cast( - "AbstractImageManifestMetadata", json.load(mH) - ) - imageSignature_in_metadata = signaturesAndManifest.get( - "image_signature" + trusted_copy = localContainerPath.resolve().samefile( + canonicalContainerPath.resolve() + ) + else: + ( + canonicalContainerPath, + imageSignature, + ) = self._computeCanonicalImagePath(localContainerPath) + + if localContainerPath.samefile(canonicalContainerPath): + trusted_copy = True + elif canonicalContainerPath.is_file(): + canonicalImageSignature = self._computeFingerprint( + canonicalContainerPath ) - trusted_copy = imageSignature_in_metadata == imageSignature - except: + + trusted_copy = canonicalImageSignature == imageSignature + + if trusted_copy: + if localContainerPathMeta.is_file(): + try: + with localContainerPathMeta.open( + mode="r", encoding="utf-8" + ) as mH: + signaturesAndManifest = cast( + "AbstractImageManifestMetadata", json.load(mH) + ) + imageSignature_in_metadata = signaturesAndManifest.get( + "image_signature" + ) + trusted_copy = imageSignature_in_metadata == imageSignature + except: + trusted_copy = False + else: trusted_copy = False - else: - trusted_copy = False - return trusted_copy, localContainerPath, localContainerPathMeta, imageSignature + # One failed, all fails + if not trusted_copy or ( + all_imageSignature is not None and all_imageSignature != imageSignature + ): + all_trusted_copies = False + all_imageSignature = None + break + + # To improve checks in the next loop + all_imageSignature = imageSignature + + return all_trusted_copies, possible_container_paths, all_imageSignature def genStagedContainersDirPaths( self, container: "ContainerTaggedName", stagedContainersDir: "pathlib.Path", - ) -> "Tuple[pathlib.Path, pathlib.Path]": - containerFilename = self.simpleFileNameMethod( + ) -> "Sequence[Tuple[pathlib.Path, pathlib.Path]]": + container_filenames = self.simpleFileNameMethod( cast("URIType", container.origTaggedName) ) - containerFilenameMeta = containerFilename + META_JSON_POSTFIX - containerPath = stagedContainersDir / containerFilename + staged_container_filenames: "MutableSequence[Tuple[pathlib.Path, pathlib.Path]]" = ( + [] + ) + for container_filename in container_filenames: + container_filename_meta = container_filename + META_JSON_POSTFIX + + containerPath = stagedContainersDir / container_filename - containerPathMeta = stagedContainersDir / containerFilenameMeta + containerPathMeta = stagedContainersDir / container_filename_meta - return containerPath, containerPathMeta + staged_container_filenames.append((containerPath, containerPathMeta)) + + return staged_container_filenames def transfer( self, container: "ContainerTaggedName", stagedContainersDir: "pathlib.Path", force: "bool" = False, - ) -> "Optional[Tuple[pathlib.Path, pathlib.Path]]": + ) -> "Optional[Sequence[Tuple[pathlib.Path, pathlib.Path]]]": """ This method is used to transfer both the container snapshot and its metadata from the caching directory to stagedContainersDir @@ -439,8 +460,7 @@ def transfer( # First, get the local paths ( trusted_copy, - localContainerPath, - localContainerPathMeta, + local_container_paths, imageSignature, ) = self.query(container) if not trusted_copy: @@ -449,16 +469,18 @@ def transfer( # Last, but not the least important # Hardlink or copy the container and its metadata stagedContainersDir.mkdir(parents=True, exist_ok=True) - containerPath, containerPathMeta = self.genStagedContainersDirPaths( + container_paths = self.genStagedContainersDirPaths( container, stagedContainersDir ) - if force or not containerPath.exists(): - link_or_copy_pathlib(localContainerPath, containerPath) - if force or not containerPathMeta.exists(): - link_or_copy_pathlib(localContainerPathMeta, containerPathMeta) + assert len(container_paths) == len(local_container_paths) + + for local_paths, dest_paths in zip(local_container_paths, container_paths): + for local_path, dest_path in zip(local_paths, dest_paths): + if force or not dest_path.exists(): + link_or_copy_pathlib(local_path, dest_path) - return (containerPath, containerPathMeta) + return container_paths def update( self, @@ -470,9 +492,6 @@ def update( # First, let's remove what it is still there self.invalidate(container) - # Then, get the local paths - localContainerPath, localContainerPathMeta = self._genContainerPaths(container) - # Now, compute the hash canonicalContainerPath, imageSignature = self._computeCanonicalImagePath( image_path @@ -495,21 +514,28 @@ def update( ) # Last, the symbolic links - localContainerPath.symlink_to( - os.path.relpath(canonicalContainerPath, self.engineContainersSymlinkDir) - ) + # to all the possible local paths + possible_container_paths = self._genContainerPaths(container) + assert len(possible_container_paths) > 0 - localContainerPathMeta.symlink_to( - os.path.relpath(canonicalContainerPathMeta, self.engineContainersSymlinkDir) - ) + for localContainerPath, localContainerPathMeta in possible_container_paths: + localContainerPath.symlink_to( + os.path.relpath(canonicalContainerPath, self.engineContainersSymlinkDir) + ) + + localContainerPathMeta.symlink_to( + os.path.relpath( + canonicalContainerPathMeta, self.engineContainersSymlinkDir + ) + ) def invalidate(self, container: "ContainerTaggedName") -> "None": # First, get the local paths - localContainerPath, localContainerPathMeta = self._genContainerPaths(container) - - # Let's remove what it is still there - real_unlink_if_exists(localContainerPath) - real_unlink_if_exists(localContainerPathMeta) + possible_container_paths = self._genContainerPaths(container) + for localContainerPath, localContainerPathMeta in possible_container_paths: + # Let's remove what it is still there + real_unlink_if_exists(localContainerPath) + real_unlink_if_exists(localContainerPathMeta) class ContainerFactory(abc.ABC): @@ -717,7 +743,9 @@ def materializeContainers( continue inj_tag = self.generateCanonicalTag(injectable_container) if contTag == inj_tag: - tag_to_use = injectable_container + tag_to_use = dataclasses.replace( + injectable_container, origTaggedName=tag.origTaggedName + ) self.logger.info(f"Matched injected container {contTag}") break diff --git a/wfexs_backend/container_factories/docker_container.py b/wfexs_backend/container_factories/docker_container.py index 575707d..55c069c 100644 --- a/wfexs_backend/container_factories/docker_container.py +++ b/wfexs_backend/container_factories/docker_container.py @@ -245,8 +245,9 @@ def materializeSingleContainer( fetch_metadata = True trusted_copy = False - localContainerPath: "Optional[pathlib.Path]" = None - localContainerPathMeta: "Optional[pathlib.Path]" = None + local_container_paths: "Optional[Sequence[Tuple[pathlib.Path, pathlib.Path]]]" = ( + None + ) imageSignature: "Optional[Fingerprint]" = None image_id: "Optional[Fingerprint]" = None manifestsImageSignature: "Optional[Fingerprint]" = None @@ -255,12 +256,14 @@ def materializeSingleContainer( if not force: ( trusted_copy, - localContainerPath, - localContainerPathMeta, + local_container_paths, imageSignature, ) = self.cc_handler.query(tag) if trusted_copy: + assert len(local_container_paths) > 0 + # We only need to inspect first provided path + localContainerPath, localContainerPathMeta = local_container_paths[0] try: with open(localContainerPathMeta, mode="r", encoding="utf-8") as mH: signaturesAndManifest = cast( @@ -297,12 +300,10 @@ def materializeSingleContainer( f"Banned remove docker containers in offline mode from {tag_name}" ) - if ( - localContainerPathMeta is not None - and localContainerPath is not None - and ( - os.path.exists(localContainerPathMeta) - or os.path.exists(localContainerPath) + if local_container_paths is not None and any( + map( + lambda lcp: os.path.exists(lcp[0]) or os.path.exists(lcp[1]), + local_container_paths, ) ): self.logger.warning( @@ -417,11 +418,11 @@ def materializeSingleContainer( containers_dir = self.stagedContainersDir # Do not allow overwriting in offline mode - transferred_image = self.cc_handler.transfer( + transferred_images = self.cc_handler.transfer( tag, stagedContainersDir=containers_dir, force=force and not offline ) - assert transferred_image is not None, f"Unexpected cache miss for {tag}" - containerPath, containerPathMeta = transferred_image + assert transferred_images is not None, f"Unexpected cache miss for {tag}" + containerPath, containerPathMeta = transferred_images[0] assert manifestsImageSignature is not None assert manifests is not None @@ -470,7 +471,7 @@ def deploySingleContainer( # These are the paths to the copy of the saved container if containers_dir is None: containers_dir = self.stagedContainersDir - containerPath, containerPathMeta = self.cc_handler.genStagedContainersDirPaths( + container_paths = self.cc_handler.genStagedContainersDirPaths( container, containers_dir ) @@ -479,36 +480,40 @@ def deploySingleContainer( manifests = None manifest = None was_redeployed = False - if ( - not containerPath.is_file() - and isinstance(container, Container) - and container.localPath is not None - ): - # Time to inject the image! - link_or_copy_pathlib(container.localPath, containerPath, force_copy=True) - was_redeployed = True + for containerPath, containerPathMeta in container_paths: + if ( + not containerPath.is_file() + and isinstance(container, Container) + and container.localPath is not None + ): + # Time to inject the image! + link_or_copy_pathlib( + container.localPath, containerPath, force_copy=True + ) + was_redeployed = True - if not containerPath.is_file(): - errmsg = f"Docker saved image {containerPath.name} is not in the staged working dir for {tag_name}" - self.logger.warning(errmsg) - raise ContainerFactoryException(errmsg) + if not containerPath.is_file(): + errmsg = f"Docker saved image {containerPath.name} is not in the staged working dir for {tag_name}" + self.logger.warning(errmsg) + raise ContainerFactoryException(errmsg) - if ( - not containerPathMeta.is_file() - and isinstance(container, Container) - and container.metadataLocalPath is not None - ): - # Time to inject the metadata! - link_or_copy_pathlib( - container.metadataLocalPath, containerPathMeta, force_copy=True - ) - was_redeployed = True + if ( + not containerPathMeta.is_file() + and isinstance(container, Container) + and container.metadataLocalPath is not None + ): + # Time to inject the metadata! + link_or_copy_pathlib( + container.metadataLocalPath, containerPathMeta, force_copy=True + ) + was_redeployed = True - if not containerPathMeta.is_file(): - errmsg = f"Docker saved image metadata {containerPathMeta.name} is not in the staged working dir for {tag_name}" - self.logger.warning(errmsg) - raise ContainerFactoryException(errmsg) + if not containerPathMeta.is_file(): + errmsg = f"Docker saved image metadata {containerPathMeta.name} is not in the staged working dir for {tag_name}" + self.logger.warning(errmsg) + raise ContainerFactoryException(errmsg) + containerPath, containerPathMeta = container_paths[0] try: with containerPathMeta.open(mode="r", encoding="utf-8") as mH: signaturesAndManifest = cast("DockerManifestMetadata", json.load(mH)) diff --git a/wfexs_backend/container_factories/podman_container.py b/wfexs_backend/container_factories/podman_container.py index 1f63538..009eb1c 100644 --- a/wfexs_backend/container_factories/podman_container.py +++ b/wfexs_backend/container_factories/podman_container.py @@ -251,8 +251,9 @@ def materializeSingleContainer( fetch_metadata = True trusted_copy = False - localContainerPath: "Optional[pathlib.Path]" = None - localContainerPathMeta: "Optional[pathlib.Path]" = None + local_container_paths: "Optional[Sequence[Tuple[pathlib.Path, pathlib.Path]]]" = ( + None + ) imageSignature: "Optional[Fingerprint]" = None image_id: "Optional[Fingerprint]" = None manifestsImageSignature: "Optional[Fingerprint]" = None @@ -261,12 +262,14 @@ def materializeSingleContainer( if not force: ( trusted_copy, - localContainerPath, - localContainerPathMeta, + local_container_paths, imageSignature, ) = self.cc_handler.query(tag) if trusted_copy: + assert len(local_container_paths) > 0 + # We only need to inspect first provided path + localContainerPath, localContainerPathMeta = local_container_paths[0] try: with open(localContainerPathMeta, mode="r", encoding="utf-8") as mH: signaturesAndManifest = cast( @@ -303,12 +306,10 @@ def materializeSingleContainer( f"Banned remove podman containers in offline mode from {tag_name}" ) - if ( - localContainerPathMeta is not None - and localContainerPath is not None - and ( - os.path.exists(localContainerPathMeta) - or os.path.exists(localContainerPath) + if local_container_paths is not None and any( + map( + lambda lcp: os.path.exists(lcp[0]) or os.path.exists(lcp[1]), + local_container_paths, ) ): self.logger.warning( @@ -422,11 +423,11 @@ def materializeSingleContainer( containers_dir = self.stagedContainersDir # Do not allow overwriting in offline mode - transferred_image = self.cc_handler.transfer( + transferred_images = self.cc_handler.transfer( tag, stagedContainersDir=containers_dir, force=force and not offline ) - assert transferred_image is not None, f"Unexpected cache miss for {tag}" - containerPath, containerPathMeta = transferred_image + assert transferred_images is not None, f"Unexpected cache miss for {tag}" + containerPath, containerPathMeta = transferred_images[0] assert manifestsImageSignature is not None assert manifests is not None @@ -476,45 +477,50 @@ def deploySingleContainer( # These are the paths to the copy of the saved container if containers_dir is None: containers_dir = self.stagedContainersDir - containerPath, containerPathMeta = self.cc_handler.genStagedContainersDirPaths( + container_paths = self.cc_handler.genStagedContainersDirPaths( container, containers_dir ) + assert len(container_paths) > 0 imageSignature: "Optional[Fingerprint]" = None manifestsImageSignature: "Optional[Fingerprint]" = None manifests = None manifest = None was_redeployed = False - if ( - not containerPath.is_file() - and isinstance(container, Container) - and container.localPath is not None - ): - # Time to inject the image! - link_or_copy_pathlib(container.localPath, containerPath, force_copy=True) - was_redeployed = True - - if not containerPath.is_file(): - errmsg = f"Podman saved image {containerPath.name} is not in the staged working dir for {tag_name}" - self.logger.warning(errmsg) - raise ContainerFactoryException(errmsg) + for containerPath, containerPathMeta in container_paths: + if ( + not containerPath.is_file() + and isinstance(container, Container) + and container.localPath is not None + ): + # Time to inject the image! + link_or_copy_pathlib( + container.localPath, containerPath, force_copy=True + ) + was_redeployed = True - if ( - not containerPathMeta.is_file() - and isinstance(container, Container) - and container.metadataLocalPath is not None - ): - # Time to inject the metadata! - link_or_copy_pathlib( - container.metadataLocalPath, containerPathMeta, force_copy=True - ) - was_redeployed = True + if not containerPath.is_file(): + errmsg = f"Podman saved image {containerPath.name} is not in the staged working dir for {tag_name}" + self.logger.warning(errmsg) + raise ContainerFactoryException(errmsg) - if not containerPathMeta.is_file(): - errmsg = f"FATAL ERROR: Podman saved image metadata {containerPathMeta.name} is not in the staged working dir for {tag_name}" - self.logger.error(errmsg) - raise ContainerFactoryException(errmsg) + if ( + not containerPathMeta.is_file() + and isinstance(container, Container) + and container.metadataLocalPath is not None + ): + # Time to inject the metadata! + link_or_copy_pathlib( + container.metadataLocalPath, containerPathMeta, force_copy=True + ) + was_redeployed = True + + if not containerPathMeta.is_file(): + errmsg = f"FATAL ERROR: Podman saved image metadata {containerPathMeta.name} is not in the staged working dir for {tag_name}" + self.logger.error(errmsg) + raise ContainerFactoryException(errmsg) + containerPath, containerPathMeta = container_paths[0] try: with containerPathMeta.open(mode="r", encoding="utf-8") as mH: signaturesAndManifest = cast("DockerManifestMetadata", json.load(mH)) diff --git a/wfexs_backend/container_factories/singularity_container.py b/wfexs_backend/container_factories/singularity_container.py index 38b0eac..0af1c72 100644 --- a/wfexs_backend/container_factories/singularity_container.py +++ b/wfexs_backend/container_factories/singularity_container.py @@ -504,19 +504,22 @@ def _materializeSingleContainerSing( fetch_metadata = True trusted_copy = False - localContainerPath: "Optional[pathlib.Path]" = None - localContainerPathMeta: "Optional[pathlib.Path]" = None + local_container_paths: "Optional[Sequence[Tuple[pathlib.Path, pathlib.Path]]]" = ( + None + ) imageSignature: "Optional[Fingerprint]" = None fingerprint: "Optional[Fingerprint]" = None if not force: ( trusted_copy, - localContainerPath, - localContainerPathMeta, + local_container_paths, imageSignature, ) = self.cc_handler.query(tag) if trusted_copy: + assert len(local_container_paths) > 0 + # We only need to inspect first provided path + localContainerPath, localContainerPathMeta = local_container_paths[0] try: with localContainerPathMeta.open(mode="r", encoding="utf8") as tcpm: raw_metadata = json.load(tcpm) @@ -643,12 +646,13 @@ def _materializeSingleContainerSing( if fetch_metadata: if offline: raise ContainerFactoryException( - f"Cannot download containers metadata in offline mode from {tag_name} to {localContainerPath}" + f"Cannot download containers metadata in offline mode from {tag_name} to {local_container_paths}" ) if tmpContainerPath is None: - assert localContainerPath is not None + assert local_container_paths is not None tmpContainerPath = self.cc_handler._genTmpContainerPath() + localContainerPath = local_container_paths[0][0] link_or_copy_pathlib(localContainerPath, tmpContainerPath) tmpContainerPathMeta = tmpContainerPath.with_name( tmpContainerPath.name + META_JSON_POSTFIX @@ -728,11 +732,11 @@ def _materializeSingleContainerSing( containers_dir = pathlib.Path(self.stagedContainersDir) # Do not allow overwriting in offline mode - transferred_image = self.cc_handler.transfer( + transferred_images = self.cc_handler.transfer( tag, stagedContainersDir=containers_dir, force=force and not offline ) - assert transferred_image is not None, f"Unexpected cache miss for {tag}" - containerPath, containerPathMeta = transferred_image + assert transferred_images is not None, f"Unexpected cache miss for {tag}" + containerPath, containerPathMeta = transferred_images[0] return Container( origTaggedName=tag_name, @@ -783,7 +787,10 @@ def materializeContainers( continue inj_tag = self.generateCanonicalTag(injectable_container) if singTag == inj_tag: - tag_to_use = injectable_container + # This is needed to respect the locally tagged image + tag_to_use = dataclasses.replace( + injectable_container, origTaggedName=tag.origTaggedName + ) self.logger.info(f"Matched injected container {singTag}") break @@ -834,41 +841,46 @@ def deploySingleContainer( """ if containers_dir is None: containers_dir = self.stagedContainersDir - containerPath, containerPathMeta = self.cc_handler.genStagedContainersDirPaths( + container_paths = self.cc_handler.genStagedContainersDirPaths( container, containers_dir ) + assert len(container_paths) > 0 was_redeployed = False - if ( - not containerPath.is_file() - and isinstance(container, Container) - and container.localPath is not None - ): - # Time to inject the image! - link_or_copy_pathlib(container.localPath, containerPath, force_copy=True) - was_redeployed = True - - if not containerPath.is_file(): - errmsg = f"SIF saved image {containerPath.name} is not in the staged working dir for {container.origTaggedName}" - self.logger.warning(errmsg) - raise ContainerFactoryException(errmsg) - - if ( - not containerPathMeta.is_file() - and isinstance(container, Container) - and container.metadataLocalPath is not None - ): - # Time to inject the metadata! - link_or_copy_pathlib( - container.metadataLocalPath, containerPathMeta, force_copy=True - ) - was_redeployed = True + for containerPath, containerPathMeta in container_paths: + if ( + not containerPath.is_file() + and isinstance(container, Container) + and container.localPath is not None + ): + # Time to inject the image! + link_or_copy_pathlib( + container.localPath, containerPath, force_copy=True + ) + was_redeployed = True + + if not containerPath.is_file(): + errmsg = f"SIF saved image {containerPath.name} is not in the staged working dir for {container.origTaggedName}" + self.logger.warning(errmsg) + raise ContainerFactoryException(errmsg) + + if ( + not containerPathMeta.is_file() + and isinstance(container, Container) + and container.metadataLocalPath is not None + ): + # Time to inject the metadata! + link_or_copy_pathlib( + container.metadataLocalPath, containerPathMeta, force_copy=True + ) + was_redeployed = True - if not containerPathMeta.is_file(): - errmsg = f"SIF saved image metadata {containerPathMeta.name} is not in the staged working dir for {container.origTaggedName}" - self.logger.warning(errmsg) - raise ContainerFactoryException(errmsg) + if not containerPathMeta.is_file(): + errmsg = f"SIF saved image metadata {containerPathMeta.name} is not in the staged working dir for {container.origTaggedName}" + self.logger.warning(errmsg) + raise ContainerFactoryException(errmsg) + containerPath, containerPathMeta = container_paths[0] try: with containerPathMeta.open(mode="r", encoding="utf-8") as mH: signaturesAndManifest = cast("SingularityManifest", json.load(mH)) diff --git a/wfexs_backend/workflow_engines/__init__.py b/wfexs_backend/workflow_engines/__init__.py index 7cde8a3..2874ed2 100644 --- a/wfexs_backend/workflow_engines/__init__.py +++ b/wfexs_backend/workflow_engines/__init__.py @@ -760,7 +760,7 @@ def sideContainers(self) -> "Sequence[ContainerTaggedName]": return list() @abc.abstractmethod - def simpleContainerFileName(self, imageUrl: "URIType") -> "RelPath": + def simpleContainerFileName(self, imageUrl: "URIType") -> "Sequence[RelPath]": """ This method must be implemented to tell which names expect the workflow engine on its container cache directories when an image is locally materialized diff --git a/wfexs_backend/workflow_engines/cwl_engine.py b/wfexs_backend/workflow_engines/cwl_engine.py index e16cfc6..54e016d 100644 --- a/wfexs_backend/workflow_engines/cwl_engine.py +++ b/wfexs_backend/workflow_engines/cwl_engine.py @@ -857,7 +857,7 @@ def sideContainers(self) -> "Sequence[ContainerTaggedName]": """ return self.OPERATIONAL_CONTAINER_TAGS - def simpleContainerFileName(self, imageUrl: "URIType") -> "RelPath": + def simpleContainerFileName(self, imageUrl: "URIType") -> "Sequence[RelPath]": """ This method was borrowed from https://github.com/common-workflow-language/cwltool/blob/5bdb3d3dd47d8d1b3a1685220b4b6ce0f94c055e/cwltool/singularity.py#L107 @@ -865,12 +865,14 @@ def simpleContainerFileName(self, imageUrl: "URIType") -> "RelPath": # match = re.search( # pattern=r"([a-z]*://)", string=imageUrl # ) - img_name = _normalize_image_id(imageUrl) - # candidates.append(img_name) - # sif_name = _normalize_sif_id(dockerRequirement["dockerPull"]) - # candidates.append(sif_name) - - return img_name + candidates: "MutableSequence[RelPath]" = [_normalize_image_id(imageUrl)] + # Next block could be needed in a darker future where either one + # or another file naming style is used depending on some + # obscure reason + # if self.container_factory.containerType == ContainerType.Singularity: + # candidates.append(_normalize_sif_id(imageUrl)) + + return candidates @staticmethod def generateDotWorkflow( diff --git a/wfexs_backend/workflow_engines/nextflow_engine.py b/wfexs_backend/workflow_engines/nextflow_engine.py index 8dcc5ed..461bde2 100644 --- a/wfexs_backend/workflow_engines/nextflow_engine.py +++ b/wfexs_backend/workflow_engines/nextflow_engine.py @@ -1509,7 +1509,7 @@ def materializeWorkflow( containerTags.extend(containerTagsConda) return matWorkflowEngine, containerTags - def simpleContainerFileName(self, imageUrl: "URIType") -> "RelPath": + def simpleContainerFileName(self, imageUrl: "URIType") -> "Sequence[RelPath]": """ This method was borrowed from https://github.com/nextflow-io/nextflow/blob/539a22b68c114c94eaf4a88ea8d26b7bfe2d0c39/modules/nextflow/src/main/groovy/nextflow/container/SingularityCache.groovy#L80 @@ -1527,7 +1527,7 @@ def simpleContainerFileName(self, imageUrl: "URIType") -> "RelPath": name = name.replace(":", "-").replace("/", "-") - return cast("RelPath", name + extension) + return [cast("RelPath", name + extension)] def structureAsNXFParams( self, matInputs: "Sequence[MaterializedInput]", outputsDir: "pathlib.Path" From d44df6f9857fd2a6452273ce879ab104c405ab95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20M=C2=AA=20Fern=C3=A1ndez?= Date: Thu, 11 Jul 2024 04:23:21 +0200 Subject: [PATCH 9/9] Version bump to 1.0.0a1 --- CITATION.cff | 2 +- wfexs_backend/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CITATION.cff b/CITATION.cff index e0469a5..ceed1ac 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -28,4 +28,4 @@ message: "If you use this software, please cite it using these metadata." repository-code: "https://github.com/inab/WfExS-backend" type: software title: "WfExS-backend" -version: 1.0.0a0 +version: 1.0.0a1 diff --git a/wfexs_backend/__init__.py b/wfexs_backend/__init__.py index 5f73692..ff4e1f2 100644 --- a/wfexs_backend/__init__.py +++ b/wfexs_backend/__init__.py @@ -21,7 +21,7 @@ __license__ = "Apache 2.0" # https://www.python.org/dev/peps/pep-0396/ -__version__ = "1.0.0a0" +__version__ = "1.0.0a1" __url__ = "https://github.com/inab/WfExS-backend" __official_name__ = "WfExS-backend"