Skip to content

Commit

Permalink
More lazy extract_levels preprocessor function (#2120)
Browse files Browse the repository at this point in the history
Co-authored-by: Valeriu Predoi <[email protected]>
  • Loading branch information
bouweandela and valeriupredoi authored Feb 21, 2024
1 parent cbf9394 commit 265d2f4
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 54 deletions.
11 changes: 10 additions & 1 deletion esmvalcore/cmor/_fixes/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ def _map_on_filled(function, array):
array = num_module.ma.filled(array, fill_value)

# Apply function and return masked array
array = function(array)
if isinstance(array, da.Array):
array = da.map_blocks(
function,
array,
dtype=array.dtype,
enforce_ndim=True,
meta=da.utils.meta_from_array(array),
)
else:
array = function(array)
return num_module.ma.masked_array(array, mask=mask)


Expand Down
192 changes: 148 additions & 44 deletions esmvalcore/preprocessor/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import warnings
from copy import deepcopy
from decimal import Decimal
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

import dask.array as da
import iris
Expand All @@ -21,7 +22,6 @@
from geopy.geocoders import Nominatim
from iris.analysis import AreaWeighted, Linear, Nearest
from iris.cube import Cube
from iris.util import broadcast_to_shape

from esmvalcore.cmor._fixes.shared import (
add_altitude_from_plev,
Expand Down Expand Up @@ -935,28 +935,90 @@ def _create_cube(src_cube, data, src_levels, levels):
return result


def is_lazy_masked_data(array):
"""Similar to `iris._lazy_data.is_lazy_masked_data`."""
return isinstance(array, da.Array) and isinstance(
da.utils.meta_from_array(array), np.ma.MaskedArray)


def broadcast_to_shape(array, shape, dim_map, chunks=None):
"""Copy of `iris.util.broadcast_to_shape` that allows specifying chunks."""
if isinstance(array, da.Array):
if chunks is not None:
chunks = list(chunks)
for src_idx, tgt_idx in enumerate(dim_map):
# Only use the specified chunks along new dimensions or on
# dimensions that have size 1 in the source array.
if array.shape[src_idx] != 1:
chunks[tgt_idx] = array.chunks[src_idx]
broadcast = partial(da.broadcast_to, shape=shape, chunks=chunks)
else:
broadcast = partial(np.broadcast_to, shape=shape)

n_orig_dims = len(array.shape)
n_new_dims = len(shape) - n_orig_dims
array = array.reshape(array.shape + (1,) * n_new_dims)

# Get dims in required order.
array = np.moveaxis(array, range(n_orig_dims), dim_map)
new_array = broadcast(array)

if np.ma.isMA(array):
# broadcast_to strips masks so we need to handle them explicitly.
mask = np.ma.getmask(array)
if mask is np.ma.nomask:
new_mask = np.ma.nomask
else:
new_mask = broadcast(mask)
new_array = np.ma.array(new_array, mask=new_mask)

elif is_lazy_masked_data(array):
# broadcast_to strips masks so we need to handle them explicitly.
mask = da.ma.getmaskarray(array)
new_mask = broadcast(mask)
new_array = da.ma.masked_array(new_array, new_mask)

return new_array


def _vertical_interpolate(cube, src_levels, levels, interpolation,
extrapolation):
"""Perform vertical interpolation."""
# Determine the source levels and axis for vertical interpolation.
z_axis, = cube.coord_dims(cube.coord(axis='z', dim_coords=True))

# Broadcast the 1d source cube vertical coordinate to fully
# describe the spatial extent that will be interpolated.
src_levels_broadcast = broadcast_to_shape(src_levels.points, cube.shape,
cube.coord_dims(src_levels))
if cube.has_lazy_data():
# Make source levels lazy if cube has lazy data.
src_points = src_levels.lazy_points()
else:
src_points = src_levels.core_points()

# Broadcast the source cube vertical coordinate to fully describe the
# spatial extent that will be interpolated.
src_levels_broadcast = broadcast_to_shape(
src_points,
shape=cube.shape,
chunks=cube.lazy_data().chunks if cube.has_lazy_data() else None,
dim_map=cube.coord_dims(src_levels),
)

# Make the target levels lazy if the input data is lazy.
if cube.has_lazy_data() and isinstance(src_points, da.Array):
levels = da.asarray(levels)

# force mask onto data as nan's
npx = get_array_module(cube.core_data())
data = npx.ma.filled(cube.core_data(), np.nan)

# Now perform the actual vertical interpolation.
new_data = stratify.interpolate(levels,
src_levels_broadcast,
data,
axis=z_axis,
interpolation=interpolation,
extrapolation=extrapolation)
# Perform vertical interpolation.
new_data = stratify.interpolate(
levels,
src_levels_broadcast,
data,
axis=z_axis,
interpolation=interpolation,
extrapolation=extrapolation,
)

# Calculate the mask based on the any NaN values in the interpolated data.
new_data = npx.ma.masked_where(npx.isnan(new_data), new_data)
Expand Down Expand Up @@ -1026,42 +1088,74 @@ def parse_vertical_scheme(scheme):
return scheme, extrap_scheme


def extract_levels(cube,
levels,
scheme,
coordinate=None,
rtol=1e-7,
atol=None):
def _rechunk_aux_factory_dependencies(
cube: iris.cube.Cube,
coord_name: str,
) -> iris.cube.Cube:
"""Rechunk coordinate aux factory dependencies.
This ensures that the resulting coordinate has reasonably sized
chunks that are aligned with the cube data for optimal computational
performance.
"""
# Workaround for https://github.com/SciTools/iris/issues/5457
try:
factory = cube.aux_factory(coord_name)
except iris.exceptions.CoordinateNotFoundError:
return cube

cube = cube.copy()
cube_chunks = cube.lazy_data().chunks
for coord in factory.dependencies.values():
coord_dims = cube.coord_dims(coord)
if coord_dims is not None:
coord = coord.copy()
chunks = tuple(cube_chunks[i] for i in coord_dims)
coord.points = coord.lazy_points().rechunk(chunks)
if coord.has_bounds():
coord.bounds = coord.lazy_bounds().rechunk(chunks + (None, ))
cube.replace_coord(coord)
return cube


def extract_levels(
cube: iris.cube.Cube,
levels: np.typing.ArrayLike | da.Array,
scheme: str,
coordinate: Optional[str] = None,
rtol: float = 1e-7,
atol: Optional[float] = None,
):
"""Perform vertical interpolation.
Parameters
----------
cube : iris.cube.Cube
cube:
The source cube to be vertically interpolated.
levels : ArrayLike
levels:
One or more target levels for the vertical interpolation. Assumed
to be in the same S.I. units of the source cube vertical dimension
coordinate. If the requested levels are sufficiently close to the
levels of the cube, cube slicing will take place instead of
interpolation.
scheme : str
scheme:
The vertical interpolation scheme to use. Choose from
'linear',
'nearest',
'linear_extrapolate',
'nearest_extrapolate'.
coordinate : optional str
coordinate:
The coordinate to interpolate. If specified, pressure levels
(if present) can be converted to height levels and vice versa using
the US standard atmosphere. E.g. 'coordinate = altitude' will convert
existing pressure levels (air_pressure) to height levels (altitude);
'coordinate = air_pressure' will convert existing height levels
(altitude) to pressure levels (air_pressure).
rtol : float
rtol:
Relative tolerance for comparing the levels in `cube` to the requested
levels. If the levels are sufficiently close, the requested levels
will be assigned to the cube and no interpolation will take place.
atol : float
atol:
Absolute tolerance for comparing the levels in `cube` to the requested
levels. If the levels are sufficiently close, the requested levels
will be assigned to the cube and no interpolation will take place.
Expand All @@ -1081,29 +1175,37 @@ def extract_levels(cube,
interpolation, extrapolation = parse_vertical_scheme(scheme)

# Ensure we have a non-scalar array of levels.
levels = np.array(levels, ndmin=1)

# Get the source cube vertical coordinate, if available.
if coordinate:
coord_names = [coord.name() for coord in cube.coords()]
if coordinate not in coord_names:
# Try to calculate air_pressure from altitude coordinate or
# vice versa using US standard atmosphere for conversion.
if coordinate == 'air_pressure' and 'altitude' in coord_names:
# Calculate pressure level coordinate from altitude.
add_plev_from_altitude(cube)
if coordinate == 'altitude' and 'air_pressure' in coord_names:
# Calculate altitude coordinate from pressure levels.
add_altitude_from_plev(cube)
src_levels = cube.coord(coordinate)
if not isinstance(levels, da.Array):
levels = np.array(levels, ndmin=1)

# Try to determine the name of the vertical coordinate automatically
if coordinate is None:
coordinate = cube.coord(axis='z', dim_coords=True).name()

# Add extra coordinates
coord_names = [coord.name() for coord in cube.coords()]
if coordinate in coord_names:
cube = _rechunk_aux_factory_dependencies(cube, coordinate)
else:
src_levels = cube.coord(axis='z', dim_coords=True)
# Try to calculate air_pressure from altitude coordinate or
# vice versa using US standard atmosphere for conversion.
if coordinate == 'air_pressure' and 'altitude' in coord_names:
# Calculate pressure level coordinate from altitude.
cube = _rechunk_aux_factory_dependencies(cube, 'altitude')
add_plev_from_altitude(cube)
if coordinate == 'altitude' and 'air_pressure' in coord_names:
# Calculate altitude coordinate from pressure levels.
cube = _rechunk_aux_factory_dependencies(cube, 'air_pressure')
add_altitude_from_plev(cube)

src_levels = cube.coord(coordinate)

if (src_levels.shape == levels.shape and np.allclose(
src_levels.points,
src_levels.core_points(),
levels,
rtol=rtol,
atol=1e-7 * np.mean(src_levels.points) if atol is None else atol,
atol=1e-7 *
np.mean(src_levels.core_points()) if atol is None else atol,
)):
# Only perform vertical extraction/interpolation if the source
# and target levels are not "similar" enough.
Expand All @@ -1114,7 +1216,9 @@ def extract_levels(cube,
set(levels).issubset(set(src_levels.points)):
# If all target levels exist in the source cube, simply extract them.
name = src_levels.name()
coord_values = {name: lambda cell: cell.point in set(levels)}
coord_values = {
name: lambda cell: cell.point in set(levels) # type: ignore
}
constraint = iris.Constraint(coord_values=coord_values)
result = cube.extract(constraint)
# Ensure the constraint did not fail.
Expand Down
22 changes: 16 additions & 6 deletions tests/integration/preprocessor/_regrid/test_extract_levels.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
"""
Integration tests for the :func:`esmvalcore.preprocessor.regrid.extract_levels`
function.
"""
"""Integration tests for the
:func:`esmvalcore.preprocessor.regrid.extract_levels` function."""

import unittest

import dask.array as da
import iris
import numpy as np

Expand Down Expand Up @@ -76,8 +74,20 @@ def test_interpolation__linear_lazy(self):
levels = [0.5, 1.5]
scheme = 'linear'
cube = self.cube.copy(self.cube.lazy_data())
result = extract_levels(cube, levels, scheme)
coord_name = 'multidimensional_vertical_coord'
coord_points = (
cube.coord('air_pressure').core_points().reshape(3, 1, 1) *
np.ones((3, 2, 2)))
cube.add_aux_coord(
iris.coords.AuxCoord(
da.asarray(coord_points),
long_name=coord_name,
),
[1, 2, 3],
)
result = extract_levels(cube, levels, scheme, coordinate=coord_name)
self.assertTrue(result.has_lazy_data())
self.assertTrue(cube.coord(coord_name).has_lazy_points())
expected = np.ma.array([
[
[[2., 3.], [4., 5.]],
Expand Down
Loading

0 comments on commit 265d2f4

Please sign in to comment.