diff --git a/wfexs_backend/ro_crate.py b/wfexs_backend/ro_crate.py index c8c8e492..2e805d9a 100644 --- a/wfexs_backend/ro_crate.py +++ b/wfexs_backend/ro_crate.py @@ -87,6 +87,7 @@ import rocrate.model.file import rocrate.model.file_or_dir import rocrate.model.metadata +import rocrate.model.person import rocrate.model.softwareapplication import rocrate.model.creativework import rocrate.rocrate @@ -320,7 +321,12 @@ class FixedDataset(FixedMixin, rocrate.model.dataset.Dataset): # type: ignore[m class FixedWorkflow(FixedMixin, rocrate.model.computationalworkflow.ComputationalWorkflow): # type: ignore[misc] - pass + TYPES = [ + "File", + "SoftwareSourceCode", + "ComputationalWorkflow", + "SoftwareApplication", + ] class FixedROCrate(rocrate.rocrate.ROCrate): # type: ignore[misc] @@ -558,6 +564,18 @@ def __init__( # This is used to avoid including twice the very same value # in the RO-Crate self._item_hash: "MutableMapping[bytes, rocrate.model.entity.Entity]" = {} + self._wf_to_containers: "MutableMapping[str, MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]]" = ( + {} + ) + self._wf_to_operational_containers: "MutableMapping[str, MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]]" = ( + {} + ) + self._wf_to_container_sa: "MutableMapping[str, rocrate.model.softwareapplication.SoftwareApplication]" = ( + {} + ) + + # TODO: add agents + self._agents: "MutableSequence[rocrate.model.person.Person]" = [] if len(licences) == 0: licences = [NoLicenceShort] @@ -634,11 +652,22 @@ def __init__( wf_consolidate_action = self.crate.add(wf_consolidate_action) wf_consolidate_action["object"] = original_workflow_crate wf_consolidate_action["result"] = ran_workflow_crate - wf_consolidate_action["instrument"] = self.weng_crate - wf_consolidate_action["agent"] = self.wf_wfexs + instruments: "MutableSequence[rocrate.model.entity.Entity]" = [ + self.wf_wfexs, + self.weng_crate, + ] + if ran_workflow_crate.id in self._wf_to_operational_containers: + if ran_workflow_crate.id in self._wf_to_container_sa: + instruments.append(self._wf_to_container_sa[ran_workflow_crate.id]) + instruments.extend( + self._wf_to_operational_containers[ran_workflow_crate.id] + ) + wf_consolidate_action.append_to("instrument", instruments) wf_consolidate_action.append_to( "actionStatus", {"@id": "http://schema.org/CompletedActionStatus"} ) + if len(self._agents) > 0: + wf_consolidate_action.append_to("agent", self._agents) else: ran_workflow_crate = original_workflow_crate @@ -747,8 +776,11 @@ def _add_containers_to_workflow( containers: "Sequence[Container]", the_workflow_crate: "FixedWorkflow", weng_crate: "Optional[rocrate.model.softwareapplication.SoftwareApplication]" = None, - ) -> None: + ) -> "MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]": # Operational containers are needed by the workflow engine, not by the workflow + added_containers: "MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]" = ( + [] + ) if len(containers) > 0: do_attach = CratableItem.Containers in self.payloads sa_crate: "Union[rocrate.model.computationalworkflow.ComputationalWorkflow, rocrate.model.softwareapplication.SoftwareApplication]" @@ -769,11 +801,12 @@ def _add_containers_to_workflow( container_type["softwareVersion"] = self.containerEngineVersion crate_cont_type = self.crate.add(container_type) - the_workflow_crate.append_to( - "softwareRequirements", crate_cont_type - ) self.cached_cts[container.type] = crate_cont_type + # Saving it for later usage when CreateAction are declared + if the_workflow_crate.id not in self._wf_to_container_sa: + self._wf_to_container_sa[the_workflow_crate.id] = crate_cont_type + software_container: "Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]" if do_attach and container.localPath is not None: the_size = os.stat(container.localPath).st_size @@ -842,26 +875,9 @@ def _add_containers_to_workflow( software_container["softwareRequirements"] = crate_cont_type crate_cont = self.crate.add(software_container) + added_containers.append(crate_cont) - # TODO: Optimize this - do_append = True - if "softwareRequirements" in sa_crate: - softwareRequirements = sa_crate["softwareRequirements"] - if isinstance(softwareRequirements, list): - softwareRequirements_l = softwareRequirements - else: - softwareRequirements_l = [softwareRequirements] - for req in softwareRequirements_l: - if isinstance(req, rocrate.model.entity.Entity): - req_id = req.id - else: - req_id = req - if req_id == crate_cont.id: - do_append = False - break - - if do_append: - sa_crate.append_to("softwareRequirements", crate_cont) + return added_containers def addWorkflowInputs( self, @@ -1428,16 +1444,30 @@ def _add_workflow_to_crate( the_workflow_crate["runtimePlatform"] = workflow_engine_version if materialized_engine.containers is not None: - self._add_containers_to_workflow( + added_containers = self._add_containers_to_workflow( materialized_engine.containers, the_workflow_crate, ) + existing_containers = self._wf_to_containers.get(the_workflow_crate.id, []) + for added_container in added_containers: + if added_container not in existing_containers: + existing_containers.append(added_container) + self._wf_to_containers[the_workflow_crate.id] = existing_containers if materialized_engine.operational_containers is not None: - self._add_containers_to_workflow( + added_operational_containers = self._add_containers_to_workflow( materialized_engine.operational_containers, the_workflow_crate, weng_crate=the_weng_crate, ) + existing_operational_containers = self._wf_to_operational_containers.get( + the_workflow_crate.id, [] + ) + for added_operational_container in added_operational_containers: + if added_operational_container not in existing_operational_containers: + existing_operational_containers.append(added_operational_container) + self._wf_to_operational_containers[ + the_workflow_crate.id + ] = existing_operational_containers if do_attach and (the_uri is not None): if the_uri.startswith("http") or the_uri.startswith("ftp"): @@ -1591,7 +1621,25 @@ def addWorkflowExecution( ) self.crate.add(crate_action) self.crate.root_dataset.append_to("mentions", crate_action) - crate_action["instrument"] = self.wf_file + instruments: "MutableSequence[rocrate.model.entity.Entity]" = [ + self.wf_wfexs, + self.wf_file, + self.weng_crate, + ] + # Adding both operational containers + if self.wf_file.id in self._wf_to_operational_containers: + if self.wf_file.id in self._wf_to_container_sa: + instruments.append(self._wf_to_container_sa[self.wf_file.id]) + instruments.extend(self._wf_to_operational_containers[self.wf_file.id]) + # and "normal" containers + if self.wf_file.id in self._wf_to_containers: + if ( + self.wf_file.id in self._wf_to_container_sa + and self.wf_file.id not in self._wf_to_operational_containers + ): + instruments.append(self._wf_to_container_sa[self.wf_file.id]) + instruments.extend(self._wf_to_containers[self.wf_file.id]) + crate_action.append_to("instrument", instruments) # subjectOf is not fulfilled as this execution has not public page if stagedExec.exitVal == 0: action_status = "http://schema.org/CompletedActionStatus"