Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for ndimage.map_coordinates #237

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

m-albert
Copy link
Collaborator

@m-albert m-albert commented May 30, 2021

This PR suggests a dask-image implementation for ndimage.map_coordinates which covers the case in which the provided coordinates fit into memory and the input array consists of a (potentially large) dask array. See #235.

Implementation details:

  • for every coordinate the corresponding chunk of the input array and the required input slice are determined
  • coordinates are grouped based on associated chunks
  • a task is defined for each group and minimum required input array slice to apply ndimage.map_coordinates
  • output intensities are rearranged to match input order

Some tests are included, plan is to add some more following feedback on the choices of the implementation.

Implementation: pre-map coordinates onto the chunks of 'image' and
execute `ndimage.map_coordinates` for every chunk.
@m-albert
Copy link
Collaborator Author

A good test would be to see how it performs for your use case @jni.

@GenevieveBuckley
Copy link
Collaborator

A good test would be to see how it performs for your use case @jni.

Yes, this would be very interesting to see

@GenevieveBuckley
Copy link
Collaborator

GenevieveBuckley commented Feb 9, 2022

First, my apologies - I keep sitting down to look at this, and then deciding I need to come back later when I feel I can give it enough attention. Then the same thing happens again later.

Tests

I like the structure of the test validation, comparing the results from scipy.ndimage.map_coordinates with the new Dask implementation. I'd prefer each test case have an explicit assert statement at the end of it, the two actual test functions are not immediately clear with what they're checking.

numpy specific code

There is lots of numpy specific code here. That might cause a separate headache later on, with our goal to implement support for other, non-numpy array libraries (eg: cupy). I don't think that's a reason not to do it, especially if there are groups who would use a map_coordinates feature (Juan, possibly Adam Ginsburg). But I would prefer if we could minimise or eliminate calls to np.array/np.asarray. Again, this doesn't have to be your headache now.

Readability

If you could add comments to the main code function, indicating which parts handle each of the four steps you outline in the first comment on this PR thread, I think that would improve readability.

Also on readability, the docstring will need to be fleshed out a bit (but perhaps a bit later, this might be putting the cart before the horse right now).

... I still don't have thoughtful comments on your actual implementation (mostly I haven't fully understood it yet). Will try to sit down and give it another go soon-ish, but no promises!

@astrofrog
Copy link

Just wanted to say that I would find this feature very useful for making https://github.com/astropy/reproject more compatible with dask inputs!

`ndimage.map_coordinates` for every chunk.
"""

coordinates = np.asarray(coordinates)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently assumes that coordinates is a small enough array that it will fit in memory, but what if both image and coordinates are large arrays and would both not fit in memory? Or is it unlikely that this would be the case?

@m-albert
Copy link
Collaborator Author

m-albert commented Oct 9, 2022

Hey @GenevieveBuckley, first of all I really appreciate you having a look at this PR and giving your feedback and comments! Second of all apologies back for letting this sit still for quite a while now. I had worked on modifications some months back and am only pushing them now.

Tests
I like the structure of the test validation, comparing the results from scipy.ndimage.map_coordinates with the new Dask implementation. I'd prefer each test case have an explicit assert statement at the end of it, the two actual test functions are not immediately clear with what they're checking.

Regarding the assert statements, it's probably not super clear that they're performed within validate_map_coordinates_general, which is called from within a test function. In test_map_coordinates_large_input, I added a comment to clarify that its testing functionality consists in finishing before timeout. Not really sure though whether it makes sense to test large inputs in this way (or at all?), I guess because 'large' is relative and also one would probably avoid heavy lifting during testing.

numpy specific code
There is lots of numpy specific code here. That might cause a separate headache later on, with our goal to implement support for other, non-numpy array libraries (eg: cupy). I don't think that's a reason not to do it, especially if there are groups who would use a map_coordinates feature (Juan, possibly Adam Ginsburg). But I would prefer if we could minimise or eliminate calls to np.array/np.asarray. Again, this doesn't have to be your headache now.

That's a very good point to keep in mind. I eliminated some explicit conversions to numpy arrays, but we'd probably need to rewrite some things when implementing more general support.

Readability
If you could add comments to the main code function, indicating which parts handle each of the four steps you outline in the first comment on this PR thread, I think that would improve readability.

Great idea, I added comments explaining the concrete steps that are performed in the code. That should hopefully make things clearer.

Also on readability, the docstring will need to be fleshed out a bit (but perhaps a bit later, this might be putting the cart before the horse right now).

Definitely, I'm open for suggestions. Actually, would there currently be a way to use @utils._update_wrapper (that copies the original docstring), and add some additional comments to e.g. give additional details about the dask-image wrapping? If not, that might be useful.

@m-albert
Copy link
Collaborator Author

m-albert commented Oct 9, 2022

your actual implementation (mostly I haven't fully understood it yet)

Hope the comments and explanations in the code make it clearer. I'm also linking a part of the previous discussion we had here: #235 (comment)

@m-albert
Copy link
Collaborator Author

m-albert commented Oct 9, 2022

Hey @astrofrog, thanks for your comment :) Yes you're right that this implementation doesn't work with chunked coordinate arrays. Allowing both the image and coordinate arrays to be dask arrays makes the problem much more complex. I guess this is because before the coordinates are present in memory, it's not known which chunks of the image array they need to be mapped onto. See this discussion #235.

I'd tend to think that while it's not necessarily safe to assume the coordinate array to be small in all use cases, it's reasonable to assume the coordinate array is at least smaller than the image array. Because in the contrary case, it's probably more efficient to arrange the coordinates on a regular grid or in some other sensible way. Also, this is true for most of my use cases (all?) of map_coordinates.

What would be your thoughts on this?

@astrofrog
Copy link

For my use case (https://github.com/astropy/reproject), the coordinates might have a similar number of elements as the input array - we are essentially transforming images to different coordinate systems which can involve arbitrary distortions etc. We are currently able to deal with large arrays of coordinates in map_coordinates by just calling map_coordinates several times for different chunks of coordinates and then putting the results together at the end. Since we've already coded this up, it's not critical to me that map_coordinates be able to deal with large arrays of coordinates, but just wanted to bring it up in case it might matter to others, and in case it turns out to be simple here.

@m-albert
Copy link
Collaborator Author

Interesting. In the case of having regular coordinates I was thinking that sth like scipy.interpolate.RegularGridInterpolator might be more appropriate than scipy.ndimage.map_coordinates, but of course that's not the case for nonlinear transformations. And here coordinate arrays can be arbitrarily large.

What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.

I guess this could in principle be done using the dask.distributed functionality of launching tasks from within tasks. For a less brute-force approach, we could also have a look at whether the problem could be expressed as a shuffling one, as suggested by @jakirkham #235 (comment).

@jni
Copy link
Contributor

jni commented Oct 19, 2022

What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.

This makes a lot of sense to me. I can't see why it wouldn't work.

@m-albert
Copy link
Collaborator Author

Thanks for your comment @jni! I'll try to implement this in the next week(s) - extending this PR's map_coordinates implementation by applying it to each chunk of the coordinate array.

@m-albert
Copy link
Collaborator Author

m-albert commented Jan 8, 2023

Happy New Year!

We are currently able to deal with large arrays of coordinates in map_coordinates by just calling map_coordinates several times for different chunks of coordinates and then putting the results together at the end.

What's also true though is that as you mention it's relatively straight forward to split up the coordinate array and call map_coordinates several times. Therefore maybe it makes sense to use the current approach applied separately to each chunk of the coordinate array.

Finally I pushed an implementation of this, which allows also the coordinates to be given as a (potentially large) dask array.

I faced a little difficulty worth mentioning here. Following the approach we discussed above, when both input and coordinates are dask arrays, two rounds of compute are required. For each chunk of the coordinate array

  1. the coordinate values are computed
  2. and are then mapped onto the input array using the discussed approach.

To achieve this I used da.map_blocks in combination with a little hack. Normally, the function used with map_blocks(func..) receives precomputed data chunks. However, in this case, only the coordinate data chunk should be precomputed, while the input array should remain a dask array. So I found the following way to avoid triggering the computation:

output = da.map_blocks(
        _map_single_coordinates_array_chunk,
        delayed(da.Array)(input.dask, input.name, input.chunks, input.dtype),
        coordinates,
        ...)

(Edit: I posted a question regarding this on the dask discourse forum).

Apart from that, I adapted the tests and the implementation is consistent with the scipy output for all combinations of numpy or dask arrays as inputs (without prefiltering). What the tests don't assess though is performance.

@astrofrog Would you be able to test dask_image.ndinterp.map_coordinates with your use case of large coordinate arrays?

@m-albert
Copy link
Collaborator Author

m-albert commented Jan 9, 2023

Actually, would there currently be a way to use @utils._update_wrapper (that copies the original docstring), and add some additional comments to e.g. give additional details about the dask-image wrapping? If not, that might be useful.

Just adding here that I just came across
dask.utils.derived_from(original_klass, version=None, ua_args=None, skipblocks=0, inconsistencies=None) which could be useful to both automatically copy original docstrings and add comments about inconsistencies between dask-image and original functions.

@GenevieveBuckley
Copy link
Collaborator

Happy new year!

I quite like your little hack, it seems like a pretty elegant sidestep around the problem to me. I commented in more detail on the discourse thread.

@jni
Copy link
Contributor

jni commented Jan 10, 2023

Just dropping in to say that I am in awe. 😍 Happy New Year indeed! 😃

As a side note the RTD action failure appears to be unrelated to this PR, probably to do with some sphinx/jinja2 update...

  File "/home/docs/checkouts/readthedocs.org/user_builds/dask-image/conda/237/lib/python3.7/site-packages/sphinx/util/rst.py", line 22, in <module>
    from jinja2 import environmentfilter
ImportError: cannot import name 'environmentfilter' from 'jinja2' (/home/docs/checkouts/readthedocs.org/user_builds/dask-image/conda/237/lib/python3.7/site-packages/jinja2/__init__.py)

@m-albert
Copy link
Collaborator Author

Thanks for having a closer look at the hack and your comments on the dask discourse @GenevieveBuckley :) So far it seems to be an okay solution.

As a side note the RTD action failure appears to be unrelated to this PR, probably to do with some sphinx/jinja2 update...

You're right, pinning jinja2<3.1 seems to fix the problem: readthedocs/readthedocs.org#9038.

@m-albert
Copy link
Collaborator Author

While exploring the performance of dask_image.ndinterp.map_coordinates I found that associating the coordinates to the input array chunks was very inefficient, and optimized this a bit.

Here are some first performance comparisons (run on a macbook m2).

When everything fits into memory:

import dask.array as da
from dask_image import ndinterp
from scipy import ndimage
import numpy as np
from time import perf_counter

class catchtime:
    def __enter__(self):
        self.time = perf_counter()
        return self

    def __exit__(self, type, value, traceback):
        self.time = perf_counter() - self.time
        self.readout = f'Time: {self.time:.3f} seconds'
        print(self.readout)

if __name__ == "__main__":

    interp_order = 3
    dim = 3
    input_extent = 5*10**2
    input_chunk_size = 100
    coordinates_size = 10**7
    coordinates_chunk_size = 10**6

    input_np = np.random.randint(0, 100, size=[input_extent] * dim)
    coordinates_np = np.random.randint(0, input_extent, size=(dim, coordinates_size))

    input = da.from_array(input_np, chunks=[input_chunk_size] * dim)
    coordinates = da.from_array(coordinates_np, chunks=(dim, coordinates_chunk_size))

    print(input_extent, input_chunk_size, coordinates_size, coordinates_chunk_size)

    with catchtime() as t:
        rnp = ndimage.map_coordinates(input_np, coordinates=coordinates_np, order=interp_order) # 13.0 seconds

    with catchtime() as t:
        r = ndinterp.map_coordinates(input, coordinates=coordinates, order=interp_order) # 0.001 seconds

    with catchtime() as t:
        rc = r.compute(scheduler='threads') # 5.7 seconds

So here there seems to be a performance gain.

When things don't fit into memory anymore:

    interp_order = 3
    dim = 3
    input_extent = 5 * 10**3
    input_chunk_size = 10**2
    coordinates_size = 10**10
    coordinates_chunk_size = 10**4

    input = da.random.randint(0, 100, size=[input_extent] * dim, chunks=[input_chunk_size] * dim)
    coordinates = da.random.randint(0, input_extent, size=(dim, coordinates_size), chunks=(dim, coordinates_chunk_size))

    with catchtime() as t:
        r = ndinterp.map_coordinates(input, coordinates=coordinates, order=interp_order) # 0.364 seconds

    with catchtime() as t:
        r[0].compute(scheduler='threads') # 125.381 seconds

The last computation is really slow, but it finishes 😅

@jni
Copy link
Contributor

jni commented Jan 22, 2023

@m-albert I guess the dask diagnostics tools can help figure out where most of the time is going in that situation? (ie constantly spilling to disk, or something else?) I don't have much experience here though.

@m-albert
Copy link
Collaborator Author

m-albert commented Jan 23, 2023

@jni Great idea. This is what comes out of profiling the two cases above: medium-sized arrays and larger-than-memory arrays.

The first case looks okay, on average 400% CPU are used by 8 workers, and memory looks fine I guess. The output array is divided into 10 chunks and interestingly, after finishing to compute the first 8 result chunks, a lot of caching seems to be done. This is probably the input array which is required by all tasks.

The second case is a bit more cryptic to me. Interestingly, for some reason only one worker is used.

When trying to use some of the distributed diagnostics options (as opposed to the local ones) I've noticed that the current implementation doesn't work using a distributed scheduler. I'll have a look into that next.

@jakirkham
Copy link
Member

Maybe it is worth looking at the recent changes to rechunking in Distributed ( dask/distributed#7534 )

@GenevieveBuckley
Copy link
Collaborator

#278 should fix that readthedocs CI failure. Let us know if merging the main branch does not fix that particular problem.

@m-albert
Copy link
Collaborator Author

Thanks for bringing the branch up to date @jakirkham.

Maybe it is worth looking at the recent changes to rechunking in Distributed ( dask/distributed#7534 )

I was reading a bit into shuffling, its problem with dask's centralized scheduler and the recent changes. And think I understand a bit better now the relevance for map_coordinates: In the general case in which each output coordinate chunk depends on many input array chunks, it'd be much more efficient if data flowed directly between workers instead of needing to go through the scheduler, which acts as a bottleneck.

Something I don't quite understand yet is how the new shuttling mechanism (that bypasses the scheduler) would be triggered in the context of arrays. For instance, I was assuming that some degree of direct communication between workers would already be happening by default when using a distributed backend (see here).

@GenevieveBuckley
Copy link
Collaborator

Does that mean it's worth trying to organise a conversation with Hendrik (author of dask/distributed#7534)?

I don't know Hendrik, but perhaps we can work out a way to get your questions answered, particularly since this is still quite a new change in distributed.

@m-albert
Copy link
Collaborator Author

That's interesting. Not entirely sure whether my questions would be super concrete at this point, but a conversation might clarify things a bit!

Something I don't quite understand yet is how the new shuttling mechanism (that bypasses the scheduler) would be triggered in the context of arrays. For instance, I was assuming that some degree of direct communication between workers would already be happening by default when using a distributed backend (see here).

Hm as far as I understand now, direct worker communication might be triggered by the scheduler sometimes, but the new peer to peer mechanism is explicitely implemented for

  1. array rechunking and
  2. dataframe shuffling (which rearranges a dataframe according to column entries).

Maybe there'd be a (very naive) workflow in which a map_coordinates implementation could be mapped onto these two operations, with sth like this:

  1. convert the coordinate dask array into a dask dataframe
  2. determine the input chunk each coordinate maps onto
  3. shuffle the dataframe such that all rows (coordinates) that map onto the same input chunk land in the same partition
  4. apply dask.array.overlap to the input array to make sure coordinates close to chunk borders can be interpolated. overlap can benefit from rechunking, and maybe here p2p could also help?
  5. apply ndimage.map_coordinates on pairs of coordinate partitions and input (overlap) chunks in a hybrid of map_blocks and map_partitions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants