Skip to content

Commit

Permalink
Merge branch 'Open-EO:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
clausmichele authored Oct 11, 2023
2 parents e0a60d6 + 2c8f1cc commit 46b6ad1
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 8 deletions.
7 changes: 7 additions & 0 deletions openeo_processes_dask/process_implementations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
"Did not load machine learning processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, ml]`"
)

try:
from .experimental import *
except ImportError as e:
logger.warning(
"Did not experimental processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, experimental]`"
)

import rioxarray as rio # Required for the .rio accessor on xarrays.

import openeo_processes_dask.process_implementations.cubes._xr_interop
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ def reduce_spatial(
spatial_dims = data.openeo.spatial_dims if data.openeo.spatial_dims else None
return data.reduce(
reducer,
dimension=spatial_dims,
dim=spatial_dims,
keep_attrs=True,
context=context,
positional_parameters=positional_parameters,
named_parameters=named_parameters,
)
44 changes: 40 additions & 4 deletions openeo_processes_dask/process_implementations/ml/curve_fitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,39 @@ def fit_curve(
raise DimensionNotAvailable(
f"Provided dimension ({dimension}) not found in data.dims: {data.dims}"
)
bands_required = False
if "bands" in data.dims:
if len(data["bands"].values) == 1:
bands_required = data["bands"].values[0]

try:
# Try parsing as datetime first
dates = data[dimension].values
dates = np.asarray(dates, dtype=np.datetime64)
except ValueError:
dates = np.asarray(data[dimension].values)

if np.issubdtype(dates.dtype, np.datetime64):
timestep = [
(
(np.datetime64(x) - np.datetime64("1970-01-01", "s"))
/ np.timedelta64(1, "s")
)
for x in dates
]
data[dimension] = np.array(timestep)

dims_before = list(data.dims)

# In the spec, parameters is a list, but xr.curvefit requires names for them,
# so we do this to generate names locally
parameters = {f"param_{i}": v for i, v in enumerate(parameters)}

chunking = {key: "auto" for key in data.dims if key != dimension}
chunking[dimension] = -1

# The dimension along which to fit the curves cannot be chunked!
rechunked_data = data.chunk({dimension: -1})
rechunked_data = data.chunk(chunking)

def wrapper(f):
def _wrap(*args, **kwargs):
Expand Down Expand Up @@ -61,11 +85,15 @@ def _wrap(*args, **kwargs):
.drop_dims(["cov_i", "cov_j"])
.to_array()
.squeeze()
.transpose(*expected_dims_after)
)

fit_result.attrs = data.attrs
fit_result = fit_result.rio.write_crs(rechunked_data.rio.crs)
if bands_required and not "bands" in fit_result.dims:
fit_result = fit_result.assign_coords(**{"bands": bands_required})
fit_result = fit_result.expand_dims(dim="bands")

fit_result = fit_result.transpose(*expected_dims_after)

return fit_result

Expand All @@ -79,6 +107,7 @@ def predict_curve(
):
labels_were_datetime = False
dims_before = list(parameters.dims)
initial_labels = labels

try:
# Try parsing as datetime first
Expand All @@ -87,8 +116,15 @@ def predict_curve(
labels = np.asarray(labels)

if np.issubdtype(labels.dtype, np.datetime64):
labels = labels.astype(int)
labels_were_datetime = True
timestep = [
(
(np.datetime64(x) - np.datetime64("1970-01-01", "s"))
/ np.timedelta64(1, "s")
)
for x in labels
]
labels = np.array(timestep)

# This is necessary to pipe the arguments correctly through @process
def wrapper(f):
Expand Down Expand Up @@ -122,6 +158,6 @@ def _wrap(*args, **kwargs):
predictions = predictions.assign_coords({dimension: labels.data})

if labels_were_datetime:
predictions[dimension] = pd.DatetimeIndex(predictions[dimension].values)
predictions[dimension] = initial_labels

return predictions
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "openeo-processes-dask"
version = "2023.9.1"
version = "2023.10.3"
description = "Python implementations of many OpenEO processes, dask-friendly by default."
authors = ["Lukas Weidenholzer <[email protected]>", "Sean Hoyal <[email protected]>", "Valentina Hutter <[email protected]>"]
maintainers = ["EODC Staff <[email protected]>"]
Expand Down
19 changes: 19 additions & 0 deletions tests/test_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def fitFunction(x, parameters):
assert len(result.coords["y"]) == len(origin_cube.coords["y"])
assert len(result.coords["param"]) == len(parameters)

origin_cube_B02 = origin_cube.sel(bands=["B02"])
result_B02 = fit_curve(
origin_cube_B02, parameters=parameters, function=_process, dimension="t"
)
assert "bands" in result_B02.dims
assert result_B02["bands"].values == "B02"

labels = dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0])
predictions = predict_curve(
result,
Expand All @@ -94,3 +101,15 @@ def fitFunction(x, parameters):
assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels)
assert "param" not in predictions.dims
assert result.rio.crs == predictions.rio.crs

labels = [0, 1, 2, 3]
predictions = predict_curve(
result,
_process,
origin_cube.openeo.temporal_dims[0],
labels=labels,
).compute()

assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels)
assert "param" not in predictions.dims
assert result.rio.crs == predictions.rio.crs
36 changes: 35 additions & 1 deletion tests/test_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension
from openeo_processes_dask.process_implementations.cubes.reduce import (
reduce_dimension,
reduce_spatial,
)
from tests.general_checks import general_output_checks
from tests.mockdata import create_fake_rastercube

Expand Down Expand Up @@ -39,3 +42,34 @@ def test_reduce_dimension(
)

xr.testing.assert_equal(output_cube, input_cube.mean(dim="t"))


@pytest.mark.parametrize("size", [(30, 30, 20, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_reduce_spatial(
temporal_interval, bounding_box, random_raster_data, process_registry
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

_process = partial(
process_registry["sum"].implementation,
ignore_nodata=True,
data=ParameterReference(from_parameter="data"),
)

output_cube = reduce_spatial(data=input_cube, reducer=_process)

general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=False,
verify_crs=True,
)

xr.testing.assert_equal(output_cube, input_cube.sum(dim=["x", "y"]))

0 comments on commit 46b6ad1

Please sign in to comment.