Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileNotFoundError triggered when running a task with a splitter #745

Open
ghisvail opened this issue Mar 29, 2024 · 15 comments
Open

FileNotFoundError triggered when running a task with a splitter #745

ghisvail opened this issue Mar 29, 2024 · 15 comments
Labels
bug Something isn't working

Comments

@ghisvail
Copy link
Collaborator

I defined and ran the following workflow:

from pathlib import Path
from pydra import Submitter, Workflow
from pydra.mark import annotate, task


# Producer of T1w images
@task
@annotate({"return": {"t1w_images": list[Path]}})
def read_t1w_images(bids_dir: Path) -> list[Path]:
    return list(bids_dir.rglob("*T1w.nii.gz"))

# Mapped to each T1w image
@task
@annotate({"return": {"smoothed_image": Path}})
def smooth_image(input_image: Path, smoothed_image: Path) -> Path:
    from nilearn.image import load_img, smooth_img

    smoothed_image = (
        smoothed_image
        or Path.cwd() / (input_image.name.split(".", maxsplit=1)[0] + "_smoothed.nii.gz")
    )

    smooth_img(load_img(input_image), fwhm=3).to_filename(smoothed_image)

    return smoothed_image

# Workflow composing both tasks
wf = Workflow(name, input_spec=["bids_dir"], bids_dir=...)
wf.add(read_t1w_images(name="read", bids_dir=wf.lzin.bids_dir))
wf.add(smooth_image(name="smooth").split("input_image", input_image=wf.read.lzout.t1w_images)
wf.set_output({"smoothed_images": wf.smooth.lzout.smoothed_image})

# Run workflow
with Submitter() as sub:
    res = sub(wf)

and get the following error:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/[email protected]/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/[email protected]/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 131, in _main
    prepare(preparation_data)
  File "/usr/local/Cellar/[email protected]/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 246, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/usr/local/Cellar/[email protected]/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 297, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen runpy>", line 285, in run_path
  File "<frozen runpy>", line 254, in _get_code_from_file
FileNotFoundError: [Errno 2] No such file or directory: '/private/var/folders/m3/r24ql8bj2h9_g1d970f9mr40000w73/T/tmpg8px7s00/FunctionTask_9ba1cbf8e6f65aa1f6fa8bb398e2165f/<input>'
Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/pydevconsole.py", line 364, in runcode
    coro = func()
           ^^^^^^
  File "<input>", line 1, in <module>
  File "/Users/ghislain.vaillant/Library/Caches/pypoetry/virtualenvs/clinica-_Ggo9PP8-py3.12/lib/python3.12/site-packages/pydra/engine/core.py", line 461, in __call__
    res = sub(self, environment=environment)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Library/Caches/pypoetry/virtualenvs/clinica-_Ggo9PP8-py3.12/lib/python3.12/site-packages/pydra/engine/submitter.py", line 55, in __call__
    self.loop.run_until_complete(
  File "/usr/local/Cellar/[email protected]/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 685, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Library/Caches/pypoetry/virtualenvs/clinica-_Ggo9PP8-py3.12/lib/python3.12/site-packages/pydra/engine/submitter.py", line 96, in submit_from_call
    await self.expand_runnable(runnable, wait=True, rerun=rerun)  # TODO
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Library/Caches/pypoetry/virtualenvs/clinica-_Ggo9PP8-py3.12/lib/python3.12/site-packages/pydra/engine/submitter.py", line 144, in expand_runnable
    await asyncio.gather(*futures)
  File "/Users/ghislain.vaillant/Library/Caches/pypoetry/virtualenvs/clinica-_Ggo9PP8-py3.12/lib/python3.12/site-packages/pydra/engine/workers.py", line 186, in exec_as_coro
    res = await self.loop.run_in_executor(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

The tasks run fine if I exercise them individually with:

>>> bids_dir = ...
>>> task1 = read_t1w_images(bids_dir=bids_dir)
>>> res1 = task1()
>>> res1.output.t1w_images[0].exists()
True
>>> task2 = smooth_image(input_image=res1.output.t1w_images[0])
>>> res2 = task2()
>>> res2.output.smoothed_image.exists()
True

Not sure whether this is an issue with the actual definition of the splitter, or in the actual implementation. Either or, letting multiprocessing surface this error does not give much of a clue of what's going on.

@ghisvail ghisvail added the bug Something isn't working label Mar 29, 2024
@tclose
Copy link
Contributor

tclose commented Mar 30, 2024

Looks like it might be a hashing issue. With the line

[Errno 2] No such file or directory: '/private/var/folders/m3/r24ql8bj2h9_g1d970f9mr40000w73/T/tmpg8px7s00/FunctionTask_9ba1cbf8e6f65aa1f6fa8bb398e2165f/<input>'

does the /private/var/folders/m3/r24ql8bj2h9_g1d970f9mr40000w73/T/tmpg8px7s00/FunctionTask_9ba1cbf8e6f65aa1f6fa8bb398e2165f/ directory exist?

@tclose
Copy link
Contributor

tclose commented Mar 31, 2024

Seems to be a issue with cross process hashing as the serial plugin seems to work

@tclose
Copy link
Contributor

tclose commented Mar 31, 2024

Something strange seems to be happening, unless I'm getting the wrong end of the stick with my debugging. The "read" task never seems to get run (I put a print statement in there and can't see an output directory created for it in the cache directory) but is dropped from the list of future tasks at some point in the expansion of the workflow

@ghisvail
Copy link
Collaborator Author

ghisvail commented Apr 1, 2024

I am glad to hear that it's not an issue with how I used the splitter. I like the new interface better actually.

I am a bit skeptical about this issue though. It is a very simple setup, i.e. one producer of files followed by a task applied to each file, but quite representative of what any linear workflow would look like. The test suite probably need expanding to cover more realistic use cases than individual tasks and toy workflows.

What do you think?

@tclose
Copy link
Contributor

tclose commented Apr 2, 2024

I am glad to hear that it's not an issue with how I used the splitter. I like the new interface better actually.

I am a bit skeptical about this issue though. It is a very simple setup, i.e. one producer of files followed by a task applied to each file, but quite representative of what any linear workflow would look like. The test suite probably need expanding to cover more realistic use cases than individual tasks and toy workflows.

What do you think?

Yes, definitely needs to be added as a test case. Does it hold if you read a directory of text files and append a value to each file?

This has got me stumped. I have set breakpoints in lots of different places and the "read" node ever seems to be executed using the CF plugin. I can track the execution down to

async def exec_as_coro(self, runnable, rerun=False, environment=None):
"""Run a task (coroutine wrapper)."""
if isinstance(runnable, TaskBase):
res = await self.loop.run_in_executor(
self.pool, runnable._run, rerun, environment
)
else: # it could be tuple that includes pickle files with tasks and inputs
ind, task_main_pkl, task_orig = runnable
res = await self.loop.run_in_executor(
self.pool, load_and_run, task_main_pkl, ind, rerun, environment
)
return res
, but instead of L181 returning a result, the workflow just hangs and times out at that point. @djarecka @effigies any ideas what could be going on?

@tclose
Copy link
Contributor

tclose commented Apr 2, 2024

Could it be an environment related issue?

@satra
Copy link
Contributor

satra commented Apr 2, 2024

i can't replicate this with the current released version of pydra (0.23 on python 3.11.8 on macos m1). here is a simplified form of the code.

from pathlib import Path
from pydra import Submitter, Workflow
from pydra.mark import annotate, task

@task
@annotate({"return": {"t1w_images": list[Path]}})
def read_t1w_images(bids_dir: Path) -> list[Path]:
    return list(bids_dir.rglob("*"))

@task
@annotate({"return": {"smoothed_image": Path}})
def smooth_image(input_image: Path, smoothed_image: Path) -> Path:
    smoothed_image = (
        smoothed_image
        or Path.cwd() / (input_image.name.split(".", maxsplit=1)[0] + "_smoothed")
    )
    with open(smoothed_image, "wt") as fp:
        fp.write(str(smoothed_image))

    return smoothed_image

wf = Workflow("test", input_spec=["bids_dir"], bids_dir=Path.cwd())
wf.add(read_t1w_images(name="read", bids_dir=wf.lzin.bids_dir))
wf.add(smooth_image(name="smooth").split("input_image", input_image=wf.read.lzout.t1w_images))
wf.set_output({"smoothed_images": wf.smooth.lzout.smoothed_image})

with Submitter() as sub:
    res = sub(wf)

@satra
Copy link
Contributor

satra commented Apr 2, 2024

the above code reads any number of files in your current directory, so just make sure it's not a lot!

@satra
Copy link
Contributor

satra commented Apr 2, 2024

also i wasn't sure how the second function runs without an input for the smoothed_image arg. in normal python that wouldn't work. do we default positional args to optional now?

@tclose
Copy link
Contributor

tclose commented Apr 2, 2024

also i wasn't sure how the second function runs without an input for the smoothed_image arg. in normal python that wouldn't work. do we default positional args to optional now?

That is a good point, the smoothed_image arg should be attrs.NOTHING. In my setup it isn't reaching that point (but then I'm not getting the file not found error that @ghisvail is so maybe something else is not working for me)

@satra
Copy link
Contributor

satra commented Apr 2, 2024

are you unable to run my version of the code either? and are you using the current released version or the main branch?

@tclose
Copy link
Contributor

tclose commented Apr 2, 2024

Ahhh, that is a bit of a trap for beginners. I was running into a common multi-proc issue explained here & here. Maybe we should think about defaulting to the serial worker so that new users don't run into it.

When I place the workflow code into a main() function and run it with import guards

if __name__ == "__main__":
    main()

the workflow runs as expected 🤦‍♂️

@ghisvail I think your problem might be really simple as @satra suggested, i.e. that you just forgot to provide a value for your output smoothed_image path.

In general, we really need to be picking up errors like this up and letting the user know. The reason we don't do it when the task is initiated is we allow the inputs to be set later, but we should probably be running a check before the workflow is submitted to check that all inputs have been provided unless they are explicitly flagged as being optional. Probably, specifying the type as Path | attrs.NOTHING would work

@tclose
Copy link
Contributor

tclose commented Apr 2, 2024

@ghisvail I think that with pre < 0.23 you probably got in the habit of typing all inputs and outputs as Path as the File/Directory was a bit buggy, but >=0.23 it is recommended to switch to using File/Directory (or more specific file-types, e.g. fileformats.medimage.NiftiGz, fileformats.medimage.DicomSeries) as that way the input hashes will correspond to the contents of the files rather than the file paths themselves. Otherwise, changes to the input files will be ignored and a stale cache dir may be used.

@ghisvail
Copy link
Collaborator Author

ghisvail commented Apr 2, 2024

Ahhh, that is a bit of a trap for beginners. I was running into a common multi-proc issue explained here & here. Maybe we should think about defaulting to the serial worker so that new users don't run into it.

Indeed. I ran the example code above in the REPL initially, which is a sensible thing to do for a beginner wanting to try Pydra out. Using __main__ guards in this context is not exactly idiomatic. I am in favor to using serial by default too.

that you just forgot to provide a value for your output smoothed_image path.

I'll check it out. Thanks to you both for investigating this.

In general, we really need to be picking up errors like this up and letting the user know. The reason we don't do it when the task is initiated is we allow the inputs to be set later, but we should probably be running a check before the workflow is submitted to check that all inputs have been provided unless they are explicitly flagged as being optional.

Indeed. Surfacing multiprocessing errors when the workflow is in an undesirable state is terrible from a usability POV.

Probably, specifying the type as Path | attrs.NOTHING would work

I tried Path | None first, and ran into #744.

you probably got in the habit of typing all inputs and outputs as Path [...] but >=0.23 it is recommended to switch to using File/Directory (or more specific file-types, e.g. fileformats.medimage.NiftiGz, fileformats.medimage.DicomSeries)

Indeed. For the sake of simplicity, I went for Path before operating a larger refactoring to the recommended types later.

@ghisvail
Copy link
Collaborator Author

ghisvail commented Apr 3, 2024

In the ideal case, one would wish that:

from __future__ import annotations

@task
def smooth_image(input_image: Path, smoothed_image: Path | None = None) -> Path:
    from nilearn.image import load_img, smooth_img

    smoothed_image = (
        smoothed_image
        or Path.cwd() / (input_image.name.split(".", maxsplit=1)[0] + "_smoothed.nii.gz")
    )

    smooth_img(load_img(input_image), fwhm=3).to_filename(smoothed_image)

    return smoothed_image

just works, i.e. both the optionality effect and default value to None were properly handled by Pydra for Python 3.8 onward (hence the future statement in the preamble).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants