Skip to content

Commit

Permalink
Rearranged the software requirements of a workflow.
Browse files Browse the repository at this point in the history
As workflows can be run in very disparate environments with different
requisites, they only truly need the workflow engine as such to be run.

On the other hand, a CreateAction describing a workflow execution needs
to declare as instruments all the needed dependencies, which are WfExS-backend
itself, the workflow, the workflow engine, the container engine and
the list of needed software containers.
  • Loading branch information
jmfernandez committed Sep 8, 2023
1 parent 71269da commit 41a8163
Showing 1 changed file with 77 additions and 29 deletions.
106 changes: 77 additions & 29 deletions wfexs_backend/ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import rocrate.model.file
import rocrate.model.file_or_dir
import rocrate.model.metadata
import rocrate.model.person
import rocrate.model.softwareapplication
import rocrate.model.creativework
import rocrate.rocrate
Expand Down Expand Up @@ -320,7 +321,12 @@ class FixedDataset(FixedMixin, rocrate.model.dataset.Dataset): # type: ignore[m


class FixedWorkflow(FixedMixin, rocrate.model.computationalworkflow.ComputationalWorkflow): # type: ignore[misc]
pass
TYPES = [
"File",
"SoftwareSourceCode",
"ComputationalWorkflow",
"SoftwareApplication",
]


class FixedROCrate(rocrate.rocrate.ROCrate): # type: ignore[misc]
Expand Down Expand Up @@ -558,6 +564,18 @@ def __init__(
# This is used to avoid including twice the very same value
# in the RO-Crate
self._item_hash: "MutableMapping[bytes, rocrate.model.entity.Entity]" = {}
self._wf_to_containers: "MutableMapping[str, MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]]" = (
{}
)
self._wf_to_operational_containers: "MutableMapping[str, MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]]" = (
{}
)
self._wf_to_container_sa: "MutableMapping[str, rocrate.model.softwareapplication.SoftwareApplication]" = (
{}
)

# TODO: add agents
self._agents: "MutableSequence[rocrate.model.person.Person]" = []

if len(licences) == 0:
licences = [NoLicenceShort]
Expand Down Expand Up @@ -634,11 +652,22 @@ def __init__(
wf_consolidate_action = self.crate.add(wf_consolidate_action)
wf_consolidate_action["object"] = original_workflow_crate
wf_consolidate_action["result"] = ran_workflow_crate
wf_consolidate_action["instrument"] = self.weng_crate
wf_consolidate_action["agent"] = self.wf_wfexs
instruments: "MutableSequence[rocrate.model.entity.Entity]" = [
self.wf_wfexs,
self.weng_crate,
]
if ran_workflow_crate.id in self._wf_to_operational_containers:
if ran_workflow_crate.id in self._wf_to_container_sa:
instruments.append(self._wf_to_container_sa[ran_workflow_crate.id])
instruments.extend(
self._wf_to_operational_containers[ran_workflow_crate.id]
)
wf_consolidate_action.append_to("instrument", instruments)
wf_consolidate_action.append_to(
"actionStatus", {"@id": "http://schema.org/CompletedActionStatus"}
)
if len(self._agents) > 0:
wf_consolidate_action.append_to("agent", self._agents)
else:
ran_workflow_crate = original_workflow_crate

Expand Down Expand Up @@ -747,8 +776,11 @@ def _add_containers_to_workflow(
containers: "Sequence[Container]",
the_workflow_crate: "FixedWorkflow",
weng_crate: "Optional[rocrate.model.softwareapplication.SoftwareApplication]" = None,
) -> None:
) -> "MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]":
# Operational containers are needed by the workflow engine, not by the workflow
added_containers: "MutableSequence[Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]]" = (
[]
)
if len(containers) > 0:
do_attach = CratableItem.Containers in self.payloads
sa_crate: "Union[rocrate.model.computationalworkflow.ComputationalWorkflow, rocrate.model.softwareapplication.SoftwareApplication]"
Expand All @@ -769,11 +801,12 @@ def _add_containers_to_workflow(
container_type["softwareVersion"] = self.containerEngineVersion

crate_cont_type = self.crate.add(container_type)
the_workflow_crate.append_to(
"softwareRequirements", crate_cont_type
)
self.cached_cts[container.type] = crate_cont_type

# Saving it for later usage when CreateAction are declared
if the_workflow_crate.id not in self._wf_to_container_sa:
self._wf_to_container_sa[the_workflow_crate.id] = crate_cont_type

software_container: "Union[rocrate.model.softwareapplication.SoftwareApplication, SoftwareContainer]"
if do_attach and container.localPath is not None:
the_size = os.stat(container.localPath).st_size
Expand Down Expand Up @@ -842,26 +875,9 @@ def _add_containers_to_workflow(
software_container["softwareRequirements"] = crate_cont_type

crate_cont = self.crate.add(software_container)
added_containers.append(crate_cont)

# TODO: Optimize this
do_append = True
if "softwareRequirements" in sa_crate:
softwareRequirements = sa_crate["softwareRequirements"]
if isinstance(softwareRequirements, list):
softwareRequirements_l = softwareRequirements
else:
softwareRequirements_l = [softwareRequirements]
for req in softwareRequirements_l:
if isinstance(req, rocrate.model.entity.Entity):
req_id = req.id
else:
req_id = req
if req_id == crate_cont.id:
do_append = False
break

if do_append:
sa_crate.append_to("softwareRequirements", crate_cont)
return added_containers

def addWorkflowInputs(
self,
Expand Down Expand Up @@ -1428,16 +1444,30 @@ def _add_workflow_to_crate(
the_workflow_crate["runtimePlatform"] = workflow_engine_version

if materialized_engine.containers is not None:
self._add_containers_to_workflow(
added_containers = self._add_containers_to_workflow(
materialized_engine.containers,
the_workflow_crate,
)
existing_containers = self._wf_to_containers.get(the_workflow_crate.id, [])
for added_container in added_containers:
if added_container not in existing_containers:
existing_containers.append(added_container)
self._wf_to_containers[the_workflow_crate.id] = existing_containers
if materialized_engine.operational_containers is not None:
self._add_containers_to_workflow(
added_operational_containers = self._add_containers_to_workflow(
materialized_engine.operational_containers,
the_workflow_crate,
weng_crate=the_weng_crate,
)
existing_operational_containers = self._wf_to_operational_containers.get(
the_workflow_crate.id, []
)
for added_operational_container in added_operational_containers:
if added_operational_container not in existing_operational_containers:
existing_operational_containers.append(added_operational_container)
self._wf_to_operational_containers[
the_workflow_crate.id
] = existing_operational_containers

if do_attach and (the_uri is not None):
if the_uri.startswith("http") or the_uri.startswith("ftp"):
Expand Down Expand Up @@ -1591,7 +1621,25 @@ def addWorkflowExecution(
)
self.crate.add(crate_action)
self.crate.root_dataset.append_to("mentions", crate_action)
crate_action["instrument"] = self.wf_file
instruments: "MutableSequence[rocrate.model.entity.Entity]" = [
self.wf_wfexs,
self.wf_file,
self.weng_crate,
]
# Adding both operational containers
if self.wf_file.id in self._wf_to_operational_containers:
if self.wf_file.id in self._wf_to_container_sa:
instruments.append(self._wf_to_container_sa[self.wf_file.id])
instruments.extend(self._wf_to_operational_containers[self.wf_file.id])
# and "normal" containers
if self.wf_file.id in self._wf_to_containers:
if (
self.wf_file.id in self._wf_to_container_sa
and self.wf_file.id not in self._wf_to_operational_containers
):
instruments.append(self._wf_to_container_sa[self.wf_file.id])
instruments.extend(self._wf_to_containers[self.wf_file.id])
crate_action.append_to("instrument", instruments)
# subjectOf is not fulfilled as this execution has not public page
if stagedExec.exitVal == 0:
action_status = "http://schema.org/CompletedActionStatus"
Expand Down

0 comments on commit 41a8163

Please sign in to comment.