Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for ndimage.map_coordinates #237

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
195 changes: 195 additions & 0 deletions dask_image/ndinterp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
from itertools import product
import warnings

from dask import compute, delayed
import dask.array as da
import numpy as np
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
import scipy
from scipy.ndimage import affine_transform as ndimage_affine_transform
from scipy.ndimage import map_coordinates as ndimage_map_coordinates
from scipy.ndimage import labeled_comprehension as ndimage_labeled_comprehension

from ..dispatch._dispatch_ndinterp import (
dispatch_affine_transform,
Expand Down Expand Up @@ -381,3 +384,195 @@ def spline_filter1d(
)

return result


def _map_single_coordinates_array_chunk(input, coordinates, order=3,
mode='constant', cval=0.0, prefilter=False):
"""
Central helper function for implementing map_coordinates.

Receives 'input' as a dask array and computed coordinates.

Implementation details and steps:
1) associate each coordinate in coordinates with the chunk
it maps to in the input
2) for each input chunk that has been associated to at least one
coordinate, calculate the minimal slice required to map
all coordinates that are associated to it (note that resulting slice
coordinates can lie outside of the coordinate's chunk)
3) for each previously obtained slice and its associated coordinates,
define a dask task and apply ndimage.map_coordinates
4) outputs of ndimage.map_coordinates are rearranged to match input order
"""

# STEP 1: Associate each coordinate in coordinates with
# the chunk it maps to in the input array

# get the input chunks each coordinates maps onto
coords_input_chunk_locations = coordinates.T // np.array(input.chunksize)

# all input chunk locations
input_chunk_locations = np.array([i for i in np.ndindex(input.numblocks)])

# linearize input chunk locations
coords_input_chunk_locations_linear = np.sum(coords_input_chunk_locations *\
np.array([np.product(input.numblocks[:dim])
for dim in range(input.ndim)])[::-1], axis=1, dtype=np.int64)

# determine the input chunks that have coords associated and
# count how many coords map onto each input chunk
chunk_indices_count = np.bincount(coords_input_chunk_locations_linear,
minlength=np.product(input.numblocks))
required_input_chunk_indices = np.where(chunk_indices_count > 0)[0]
required_input_chunks = input_chunk_locations[required_input_chunk_indices]
coord_rc_count = chunk_indices_count[required_input_chunk_indices]

# inverse mapping: input chunks to coordinates
required_input_chunk_coords_indices = \
[np.where(coords_input_chunk_locations_linear == irc)[0]
for irc in required_input_chunk_indices]

# STEP 2: for each input chunk that has been associated to at least
# one coordinate, calculate the minimal slice required to map all
# coordinates that are associated to it (note that resulting slice
# coordinates can lie outside of the coordinate's chunk)

# determine the slices of the input array that are required for
# mapping all coordinates associated to a given input chunk.
# Note that this slice can be larger than the given chunk when coords
# lie at chunk borders.
# (probably there's a more efficient way to do this)
input_slices_lower = np.array([np.clip(
ndimage_labeled_comprehension(
np.floor(coordinates[dim] - order // 2),
coords_input_chunk_locations_linear,
required_input_chunk_indices,
np.min, np.int64, 0),
0, input.shape[dim] - 1)
for dim in range(input.ndim)])

input_slices_upper = np.array([np.clip(
ndimage_labeled_comprehension(
np.ceil(coordinates[dim] + order // 2) + 1,
coords_input_chunk_locations_linear,
required_input_chunk_indices,
np.max, np.int64, 0),
0, input.shape[dim])
for dim in range(input.ndim)])

input_slices = np.array([input_slices_lower, input_slices_upper])\
.swapaxes(1, 2)

# STEP 3: For each previously obtained slice and its associated
# coordinates, define a dask task and apply ndimage.map_coordinates

# prepare building dask graph
# define one task per associated input chunk
name = "map_coordinates_chunk-%s" % tokenize(input,
coordinates,
order,
mode,
cval,
prefilter)

keys = [(name, i) for i in range(len(required_input_chunks))]

# pair map_coordinates calls with input slices and mapped coordinates
values = []
for irc in range(len(required_input_chunks)):

ric_slice = [slice(input_slices[0][irc][dim],
input_slices[1][irc][dim])
for dim in range(input.ndim)]
ric_offset = input_slices[0][irc]

values.append((ndimage_map_coordinates,
input[tuple(ric_slice)],
coordinates[:, required_input_chunk_coords_indices[irc]]
- ric_offset[:, None],
None,
order,
mode,
cval,
prefilter))

# build dask graph
dsk = dict(zip(keys, values))
ar = da.Array(dsk, name, tuple([list(coord_rc_count)]), input.dtype)

# STEP 4: rearrange outputs of ndimage.map_coordinates
# to match input order
orig_order = np.argsort(
[ic for ric_ci in required_input_chunk_coords_indices for ic in ric_ci])

# compute result and reorder
# (ordering first would probably unnecessarily inflate the task graph)
return ar.compute()[orig_order]


def map_coordinates(input, coordinates, order=3,
mode='constant', cval=0.0, prefilter=False):
"""
Wraps ndimage.map_coordinates.

Both the input and coordinate arrays can be dask arrays.

For each chunk in the coordinates array, the coordinates are computed
and mapped onto the required slices of the input array. One task is
is defined for each input array chunk that has been associated to at
least one coordinate. The outputs of the tasks are then rearranged to
match the input order. For more details see the docstring of
'_map_single_coordinates_array_chunk'.

input : array_like
The input array.
coordinates : array_like
The coordinates at which to sample the input.
order : int, optional
The order of the spline interpolation, default is 3. The order has to
be in the range 0-5.
mode : boundary behavior mode, optional
cval : float, optional
Value to fill past edges of input if mode is 'constant'. Default is 0.0
prefilter : bool, optional

Comments:
- in case of a small coordinate array, it might make sense to rechunk
it into a single chunk
"""

# if coordinate array is not a dask array, convert it into one
if type(coordinates) is not da.Array:
coordinates = da.from_array(coordinates, chunks=coordinates.shape)
else:
# make sure indices are not split across chunks, i.e. that there's
# no chunking along the first dimension
if len(coordinates.chunks[0]) > 1:
coordinates = da.rechunk(coordinates,
(-1,) + coordinates.chunks[1:])

# if the input array is not a dask array, convert it into one
if type(input) is not da.Array:
input = da.from_array(input, chunks=input.shape)

# Map each chunk of the coordinates array onto the entire input array.
# 'input' is passed to `_map_single_coordinates_array_chunk` using a bit of
# a dirty trick: it is split into its components and passed as a delayed
# object, which reconstructs the original array when the task is
# executed. Therefore two `compute` calls are required to obtain the
# final result, one of which is peformed by
# `_map_single_coordinates_array_chunk`
output = da.map_blocks(
_map_single_coordinates_array_chunk,
delayed(da.Array)(input.dask, input.name, input.chunks, input.dtype),
coordinates,
order=order,
mode=mode,
cval=cval,
prefilter=prefilter,
dtype=input.dtype,
chunks=coordinates.chunks[1:],
drop_axis=0,
)

return output
2 changes: 1 addition & 1 deletion docs/coverage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ This table shows which SciPy ndimage functions are supported by dask-image.
- ✓
* - ``map_coordinates``
- ✓
-
-
* - ``maximum``
- ✓
- ✓
Expand Down
113 changes: 113 additions & 0 deletions tests/test_dask_image/test_ndinterp/test_map_coordinates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from packaging import version

import dask.array as da
import numpy as np
import pytest
import scipy
import scipy.ndimage

import dask_image.ndinterp

# mode lists for the case with prefilter = False
_supported_modes = ['constant', 'nearest']
_unsupported_modes = ['wrap', 'reflect', 'mirror']

# mode lists for the case with prefilter = True
_supported_prefilter_modes = ['constant']
_unsupported_prefilter_modes = _unsupported_modes + ['nearest']

have_scipy16 = version.parse(scipy.__version__) >= version.parse('1.6.0')

# additional modes are present in SciPy >= 1.6.0
if have_scipy16:
_supported_modes += ['grid-constant']
_unsupported_modes += ['grid-mirror', 'grid-wrap']
_unsupported_prefilter_modes += ['grid-constant', 'grid-mirror',
'grid-wrap']


def validate_map_coordinates_general(n=2,
interp_order=1,
interp_mode='constant',
coord_len=12,
coord_chunksize=6,
coord_offset=0.,
im_shape_per_dim=12,
im_chunksize_per_dim=6,
random_seed=0,
prefilter=False,
):

if interp_order > 1 and interp_mode == 'nearest' and not have_scipy16:
# not clear on the underlying cause, but this fails on older SciPy
pytest.skip("requires SciPy >= 1.6.0")

# define test input
np.random.seed(random_seed)
input = np.random.random([im_shape_per_dim] * n)
input_da = da.from_array(input, chunks=im_chunksize_per_dim)

# define test coordinates
coords = np.random.random((n, coord_len)) * im_shape_per_dim + coord_offset
coords_da = da.from_array(coords, chunks=(n, coord_chunksize))

# ndimage result
mapped_scipy = scipy.ndimage.map_coordinates(
input,
coords,
order=interp_order,
mode=interp_mode,
cval=0.0,
prefilter=prefilter)

# dask-image results
for input_array in [input, input_da]:
for coords_array in [coords, coords_da]:
mapped_dask = dask_image.ndinterp.map_coordinates(
input_array,
coords_array,
order=interp_order,
mode=interp_mode,
cval=0.0,
prefilter=prefilter)

mapped_dask_computed = mapped_dask.compute()

assert np.allclose(mapped_scipy, mapped_dask_computed)


@pytest.mark.parametrize("n",
[1, 2, 3, 4])
@pytest.mark.parametrize("random_seed",
range(2))
def test_map_coordinates_basic(n,
random_seed,
):

kwargs = dict()
kwargs['n'] = n
kwargs['random_seed'] = random_seed

validate_map_coordinates_general(**kwargs)


@pytest.mark.timeout(3)
def test_map_coordinates_large_input():

"""
This test assesses whether relatively large
inputs are processed before timeout.
"""

# define large test image
image_da = da.random.random([1000] * 3, chunks=200)

# define sparse test coordinates
coords = np.random.random((3, 2)) * 1000

# dask-image result
dask_image.ndinterp.map_coordinates(
image_da,
coords).compute()