Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add pydra.tasks.core sequence tasks #434

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:

build_docs:
docker:
- image: python:3.7.4
- image: python:3.8.6
working_directory: /tmp/gh-pages
environment:
- FSLOUTPUTTYPE: NIFTI
Expand Down
23 changes: 16 additions & 7 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ def __init__(
self.name = name
if not self.input_spec:
raise Exception("No input_spec in class: %s" % self.__class__.__name__)
klass = make_klass(self.input_spec)
klass = self.input_spec
if isinstance(klass, SpecInfo):
klass = make_klass(klass)

self.inputs = klass(
**{
Expand Down Expand Up @@ -161,7 +163,7 @@ def __init__(
raise ValueError(f"Unknown input set {inputs!r}")
inputs = self._input_sets[inputs]

self.inputs = attr.evolve(self.inputs, **inputs)
self.inputs = attr.evolve(self.inputs, **inputs)

# checking if metadata is set properly
self.inputs.check_metadata()
Expand Down Expand Up @@ -198,8 +200,10 @@ def __str__(self):

def __getstate__(self):
state = self.__dict__.copy()
state["input_spec"] = cp.dumps(state["input_spec"])
state["output_spec"] = cp.dumps(state["output_spec"])
if "input_spec" in state:
state["input_spec"] = cp.dumps(state["input_spec"])
if "output_spec" in state:
state["output_spec"] = cp.dumps(state["output_spec"])
inputs = {}
for k, v in attr.asdict(state["inputs"]).items():
if k.startswith("_"):
Expand All @@ -209,9 +213,14 @@ def __getstate__(self):
return state

def __setstate__(self, state):
state["input_spec"] = cp.loads(state["input_spec"])
state["output_spec"] = cp.loads(state["output_spec"])
state["inputs"] = make_klass(state["input_spec"])(**state["inputs"])
if "input_spec" in state:
state["input_spec"] = cp.loads(state["input_spec"])
if "output_spec" in state:
state["output_spec"] = cp.loads(state["output_spec"])
input_spec = state.get("input_spec")
if input_spec is None: # If it is not saved, it should be a class attribute
input_spec = self.input_spec
state["inputs"] = make_klass(input_spec)(**state["inputs"])
self.__dict__.update(state)

def __getattr__(self, name):
Expand Down
4 changes: 4 additions & 0 deletions pydra/mark/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ def task(func):
def decorate(**kwargs):
return FunctionTask(func=func, **kwargs)

decorate.__module__ = func.__module__
decorate.__name__ = func.__name__
decorate.__doc__ = func.__doc__

return decorate
4 changes: 4 additions & 0 deletions pydra/tasks/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Pydra provides a small number of utility tasks that are aimed at common manipulations
of Python objects.
"""
197 changes: 197 additions & 0 deletions pydra/tasks/core/sequences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import attr
import typing as ty
import pydra
from pydra.engine.specs import BaseSpec, SpecInfo, MultiInputObj, MultiOutputObj
from pydra.engine.core import TaskBase
from pydra.engine.helpers import ensure_list

try:
from typing import Literal
except ImportError: # PY37
from typing_extensions import Literal


@attr.s(kw_only=True)
class SplitInputSpec(BaseSpec):
inlist: ty.List = attr.ib(metadata={"help_string": "List of values to split"})
splits: ty.List[int] = attr.ib(
metadata={
"help_string": "Number of outputs in each split - should add to number of inputs"
}
)
squeeze: bool = attr.ib(
default=False,
metadata={"help_string": "Unfold one-element splits removing the list"},
)


class Split(TaskBase):
"""
Task to split lists into multiple outputs

Examples
--------
>>> from pydra.tasks.core.sequences import Split
>>> sp = Split(name="sp", splits=[5, 4, 3, 2, 1])
>>> out = sp(inlist=list(range(15)))
>>> out.output.out1
[0, 1, 2, 3, 4]
>>> out.output.out2
[5, 6, 7, 8]
>>> out.output.out5
[14]
"""

_task_version = "1"
input_spec = SplitInputSpec

def __init__(self, splits, *args, **kwargs):
self.output_spec = SpecInfo(
name="Outputs",
fields=[(f"out{i + 1}", list) for i in range(len(splits))],
bases=(BaseSpec,),
)
super().__init__(*args, **kwargs)
self.inputs.splits = splits
Comment on lines +49 to +55
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now no way to update output_spec if splits is computed and passed via a workflow. I'm not sure we could determine the output names in order to connect the outputs of this task if the number of splits is not known.

We could instead do something like max_splits and create all out{i} fields with the understanding that anything depending on an output that is not generated will receive a None (or attr.NOTHING...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not following this i think. a task writer should not concern themselves with splits or combines. that's outside of the task's responsibility. it should simply take whatever input is given and work with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task splits a list. It has nothing to do with splitting/combining.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semantics are hard! :)

Copy link
Collaborator

@djarecka djarecka Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now no way to update output_spec if splits is computed and passed via a workflow. I'm not sure we could determine the output names in order to connect the outputs of this task if the number of splits is not known.

This is not exactly what you want, but we have also option to pass all outputs, by using lzout.all_: https://github.com/nipype/pydra/blob/master/pydra/engine/tests/test_workflow.py#L3662

Copy link
Contributor

@satra satra Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in nipype splits creates a named output for the different numbers of splits. an example is the following:

let's say a subject has M T1s and N T2s and i want them to be processed together. i can create a Merge node to aggregate them (merge also produces the relevant output to split them apart), do the processing and then split them back into 2 components (a list of processed T1s and a list of processed T2s. as a workflow designer i know that there are two categories here, what i don't ahead of time is the number. hence being able to set split would be required.

M,N can vary per subject, so cannot be determined up front.

another thought here is that the outputs would come from lzout, if we can't find an attribute at construction time we can check that attribute at runtime. it may still error out, but we should be able to evaluate that connection. @effigies - would that address your question, without having to define max_splits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think if we could have static and dynamic output specs, then that could work. In the static case we would do name checking, and in the dynamic case we leave it to post-run.

In the specific case of Split(), you need to know how many splits you have, even if you don't know the specific number in each split. We could either do that dynamically by connections or we could do it statically by declaring a number of splits.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if you know the number of splits in Split() you could do it with normal python function. Could someone give me a simple workflow when this could be used. I'm probably missing something important, since I still don't see where I'd like to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't specifically care about Split(). I've literally never used it. Nonetheless the point is that there are numerous Nipype interfaces where the full set of inputs or outputs isn't known until the interface is instantiated. The Merge() example in this PR is one such that I have used and it's easier to write:

merge3 = Merge(n=3)
merge4 = Merge(n=4)

Than:

@pydra.mark.task
def merge3(in1, in2, in3):
    return in1 + in2 + in3

m3 = merge3()

...

Similarly, to do the same thing with Split() would involve repeatedly writing:

@pydra.mark.task
def custom_split(inlist, splits) -> {"out1": list, "out2": list, "out3": list}:
    # ...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so it is design to mimic these simple python function, but we just want to allow people to use it instead of creating annotation. I'm fine with this, but I'm a bit afraid of using the name Split. Perhaps SplitTask and MergeTask? I know it's longer, but having split and Split - one used as a method, the other as a class can confuse people IMO


def _run_task(self):
self.output_ = {}
left = 0
for i, split in enumerate(self.inputs.splits, 1):
right = left + split
self.output_[f"out{i}"] = self.inputs.inlist[left:right]
left = right


@attr.s(kw_only=True)
class MergeInputSpec(BaseSpec):
axis: Literal["vstack", "hstack"] = attr.ib(
default="vstack",
metadata={
"help_string": "Direction in which to merge, hstack requires same number of elements in each input."
},
)
no_flatten: bool = attr.ib(
default=False,
metadata={
"help_string": "Append to outlist instead of extending in vstack mode."
},
)
ravel_inputs: bool = attr.ib(
default=False,
metadata={"help_string": "Ravel inputs when no_flatten is False."},
)


def _ravel(in_val):
if not isinstance(in_val, list):
return in_val
flat_list = []
for val in in_val:
raveled_val = _ravel(val)
if isinstance(raveled_val, list):
flat_list.extend(raveled_val)
else:
flat_list.append(raveled_val)
return flat_list


class Merge(TaskBase):
"""
Task to merge inputs into a single list

``Merge(1)`` will merge a list of lists

Examples
--------
>>> from pydra.tasks.core.sequences import Merge
>>> mi = Merge(3, name="mi")
>>> mi.inputs.in1 = 1
>>> mi.inputs.in2 = [2, 5]
>>> mi.inputs.in3 = 3
>>> out = mi()
>>> out.output.out
[1, 2, 5, 3]

>>> merge = Merge(1, name="merge")
>>> merge.inputs.in1 = [1, [2, 5], 3]
>>> out = merge()
>>> out.output.out
[1, [2, 5], 3]

>>> merge = Merge(1, name="merge")
>>> merge.inputs.in1 = [1, [2, 5], 3]
>>> merge.inputs.ravel_inputs = True
>>> out = merge()
>>> out.output.out
[1, 2, 5, 3]

>>> merge = Merge(1, name="merge")
>>> merge.inputs.in1 = [1, [2, 5], 3]
>>> merge.inputs.no_flatten = True
>>> out = merge()
>>> out.output.out
[[1, [2, 5], 3]]
"""

_task_version = "1"
output_spec = SpecInfo(name="Outputs", fields=[("out", ty.List)], bases=(BaseSpec,))

def __init__(self, numinputs, *args, **kwargs):
self._numinputs = max(numinputs, 0)
self.input_spec = SpecInfo(
name="Inputs",
fields=[(f"in{i + 1}", ty.List) for i in range(self._numinputs)],
bases=(MergeInputSpec,),
)
super().__init__(*args, **kwargs)

def _run_task(self):
self.output_ = {"out": []}
if self._numinputs < 1:
return

values = [
getattr(self.inputs, f"in{i + 1}")
for i in range(self._numinputs)
if getattr(self.inputs, f"in{i + 1}") is not attr.NOTHING
]

if self.inputs.axis == "vstack":
for value in values:
if isinstance(value, list) and not self.inputs.no_flatten:
self.output_["out"].extend(
_ravel(value) if self.inputs.ravel_inputs else value
)
else:
self.output_["out"].append(value)
else:
lists = [ensure_list(val) for val in values]
self.output_["out"] = [
[val[i] for val in lists] for i in range(len(lists[0]))
]


@pydra.mark.task
def Select(inlist: MultiInputObj, index: MultiInputObj) -> MultiOutputObj:
"""
Task to select specific elements from a list

Examples
--------

>>> from pydra.tasks.core.sequences import Select
>>> sl = Select(name="sl")
>>> sl.inputs.inlist = [1, 2, 3, 4, 5]
>>> sl.inputs.index = [3]
>>> out = sl()
>>> out.output.out
4

>>> sl = Select(name="sl")
>>> out = sl(inlist=[1, 2, 3, 4, 5], index=[3, 4])
>>> out.output.out
[4, 5]

"""
return [inlist[i] for i in index]
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ install_requires =
cloudpickle >= 1.2.2
filelock >= 3.0.0
etelemetry >= 0.2.2
typing_extensions ; python_version < "3.8"

test_requires =
pytest >= 4.4.0, < 6.0.0
Expand Down