From 7166db391c4fd4f4c7909cd6aff2143a15134c53 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 29 May 2024 16:15:24 +0100 Subject: [PATCH] Cubed blockwise (#357) * Initial minimal working Cubed example for "blockwise" * Update minimum cubed version that includes https://github.com/cubed-dev/cubed/pull/448 * Fix mypy errors * Update documentation with a 'blockwise' example for Cubed --- ci/environment.yml | 2 +- .../climatology-hourly-cubed.ipynb | 31 +++ flox/core.py | 182 +++++++++++------- tests/test_core.py | 34 ++++ 4 files changed, 175 insertions(+), 74 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index 9685a90b..f974053a 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -6,7 +6,7 @@ dependencies: - cachey - cftime - codecov - - cubed>=0.14.2 + - cubed>=0.14.3 - dask-core - pandas - numpy>=1.22 diff --git a/docs/source/user-stories/climatology-hourly-cubed.ipynb b/docs/source/user-stories/climatology-hourly-cubed.ipynb index 2277ed7f..be80b4f9 100644 --- a/docs/source/user-stories/climatology-hourly-cubed.ipynb +++ b/docs/source/user-stories/climatology-hourly-cubed.ipynb @@ -86,6 +86,37 @@ "source": [ "hourly.compute()" ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Other climatologies: resampling by month\n", + "\n", + "This uses the \"blockwise\" strategy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n", + "monthly" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "monthly.compute()" + ] } ], "metadata": { diff --git a/flox/core.py b/flox/core.py index d84639bc..72f09899 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1798,87 +1798,123 @@ def cubed_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - inds = tuple(range(array.ndim)) + if method == "blockwise": + assert by.ndim == 1 + assert expected_groups is not None - by_input = by + def _reduction_func(a, by, axis, start_group, num_groups): + # adjust group labels to start from 0 for each chunk + by_for_chunk = by - start_group + expected_groups_for_chunk = pd.RangeIndex(num_groups) - # Unifying chunks is necessary for argreductions. - # We need to rechunk before zipping up with the index - # let's always do it anyway - if not is_chunked_array(by): - # chunk numpy arrays like the input array - chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)) + axis = (axis,) # convert integral axis to tuple - by = cubed.from_array(by, chunks=chunks, spec=array.spec) - _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + blockwise_method = partial( + _reduce_blockwise, + agg=agg, + axis=axis, + expected_groups=expected_groups_for_chunk, + fill_value=fill_value, + engine=engine, + sort=sort, + reindex=reindex, + ) + out = blockwise_method(a, by_for_chunk) + return out[agg.name] - # Cubed's groupby_reduction handles the generation of "intermediates", and the - # "map-reduce" combination step, so we don't have to do that here. - # Only the equivalent of "_simple_combine" is supported, there is no - # support for "_grouped_combine". - labels_are_unknown = is_chunked_array(by_input) and expected_groups is None - do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + num_groups = len(expected_groups) + result = cubed.core.groupby.groupby_blockwise( + array, by, axis=axis, func=_reduction_func, num_groups=num_groups + ) + groups = (expected_groups.to_numpy(),) + return (result, groups) - assert do_simple_combine - assert method == "map-reduce" - assert expected_groups is not None - assert reindex is True - assert len(axis) == 1 # one axis/grouping + else: + inds = tuple(range(array.ndim)) - def _groupby_func(a, by, axis, intermediate_dtype, num_groups): - blockwise_method = partial( - _get_chunk_reduction(agg.reduction_type), - func=agg.chunk, - fill_value=agg.fill_value["intermediate"], - dtype=agg.dtype["intermediate"], - reindex=reindex, - user_dtype=agg.dtype["user"], + by_input = by + + # Unifying chunks is necessary for argreductions. + # We need to rechunk before zipping up with the index + # let's always do it anyway + if not is_chunked_array(by): + # chunk numpy arrays like the input array + chunks = tuple( + array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0) + ) + + by = cubed.from_array(by, chunks=chunks, spec=array.spec) + _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + + # Cubed's groupby_reduction handles the generation of "intermediates", and the + # "map-reduce" combination step, so we don't have to do that here. + # Only the equivalent of "_simple_combine" is supported, there is no + # support for "_grouped_combine". + labels_are_unknown = is_chunked_array(by_input) and expected_groups is None + do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + + assert do_simple_combine + assert method == "map-reduce" + assert expected_groups is not None + assert reindex is True + assert len(axis) == 1 # one axis/grouping + + def _groupby_func(a, by, axis, intermediate_dtype, num_groups): + blockwise_method = partial( + _get_chunk_reduction(agg.reduction_type), + func=agg.chunk, + fill_value=agg.fill_value["intermediate"], + dtype=agg.dtype["intermediate"], + reindex=reindex, + user_dtype=agg.dtype["user"], + axis=axis, + expected_groups=expected_groups, + engine=engine, + sort=sort, + ) + out = blockwise_method(a, by) + # Convert dict to one that cubed understands, dropping groups since they are + # known, and the same for every block. + return { + f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"]) + } + + def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): + # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed + # only combine over the dummy axis, to preserve grouping along 'axis' + dtype = dict(dtype) + out = {} + for idx, combine in enumerate(agg.simple_combine): + field = f"f{idx}" + out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) + return out + + def _groupby_aggregate(a): + # Convert cubed dict to one that _finalize_results works with + results = {"groups": expected_groups, "intermediates": a.values()} + out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) + return out[agg.name] + + # convert list of dtypes to a structured dtype for cubed + intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] + dtype = agg.dtype["final"] + num_groups = len(expected_groups) + + result = cubed.core.groupby.groupby_reduction( + array, + by, + func=_groupby_func, + combine_func=_groupby_combine, + aggregate_func=_groupby_aggregate, axis=axis, - expected_groups=expected_groups, - engine=engine, - sort=sort, + intermediate_dtype=intermediate_dtype, + dtype=dtype, + num_groups=num_groups, ) - out = blockwise_method(a, by) - # Convert dict to one that cubed understands, dropping groups since they are - # known, and the same for every block. - return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])} - - def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): - # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed - # only combine over the dummy axis, to preserve grouping along 'axis' - dtype = dict(dtype) - out = {} - for idx, combine in enumerate(agg.simple_combine): - field = f"f{idx}" - out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) - return out - - def _groupby_aggregate(a): - # Convert cubed dict to one that _finalize_results works with - results = {"groups": expected_groups, "intermediates": a.values()} - out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) - return out[agg.name] - - # convert list of dtypes to a structured dtype for cubed - intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] - dtype = agg.dtype["final"] - num_groups = len(expected_groups) - - result = cubed.core.groupby.groupby_reduction( - array, - by, - func=_groupby_func, - combine_func=_groupby_combine, - aggregate_func=_groupby_aggregate, - axis=axis, - intermediate_dtype=intermediate_dtype, - dtype=dtype, - num_groups=num_groups, - ) - groups = (expected_groups.to_numpy(),) + groups = (expected_groups.to_numpy(),) - return (result, groups) + return (result, groups) def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray: @@ -2467,9 +2503,9 @@ def groupby_reduce( if method is None: method = "map-reduce" - if method != "map-reduce": + if method not in ("map-reduce", "blockwise"): raise NotImplementedError( - "Reduction for Cubed arrays is only implemented for method 'map-reduce'." + "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'." ) partial_agg = partial(cubed_groupby_agg, **kwargs) diff --git a/tests/test_core.py b/tests/test_core.py index 75c4d9ff..7cedc06d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method): assert_equal(expected, actual) +@requires_cubed +@pytest.mark.parametrize("method", ["blockwise", "map-reduce"]) +def test_group_by_datetime_cubed(engine, method): + kwargs = dict( + func="mean", + method=method, + engine=engine, + ) + t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series() + data = t.dt.dayofyear + cubedarray = cubed.from_array(data.values, chunks=30) + + actual, _ = groupby_reduce(cubedarray, t, **kwargs) + expected = data.to_numpy().astype(float) + assert_equal(expected, actual) + + edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy() + actual, _ = groupby_reduce( + cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs + ) + expected = data.resample("ME").mean().to_numpy() + assert_equal(expected, actual) + + actual, _ = groupby_reduce( + cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])), + t.to_numpy(), + isbin=True, + expected_groups=edges, + **kwargs, + ) + expected = np.broadcast_to(expected, (2, 3, expected.shape[-1])) + assert_equal(expected, actual) + + def test_factorize_values_outside_bins(): # pd.factorize returns intp vals = factorize_(