Skip to content

Commit

Permalink
Cubed blockwise (#357)
Browse files Browse the repository at this point in the history
* Initial minimal working Cubed example for "blockwise"

* Update minimum cubed version that includes cubed-dev/cubed#448

* Fix mypy errors

* Update documentation with a 'blockwise' example for Cubed
  • Loading branch information
tomwhite authored May 29, 2024
1 parent d68d837 commit 7166db3
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 74 deletions.
2 changes: 1 addition & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- cachey
- cftime
- codecov
- cubed>=0.14.2
- cubed>=0.14.3
- dask-core
- pandas
- numpy>=1.22
Expand Down
31 changes: 31 additions & 0 deletions docs/source/user-stories/climatology-hourly-cubed.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@
"source": [
"hourly.compute()"
]
},
{
"cell_type": "markdown",
"id": "7",
"metadata": {},
"source": [
"## Other climatologies: resampling by month\n",
"\n",
"This uses the \"blockwise\" strategy."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n",
"monthly"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9",
"metadata": {},
"outputs": [],
"source": [
"monthly.compute()"
]
}
],
"metadata": {
Expand Down
182 changes: 109 additions & 73 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,87 +1798,123 @@ def cubed_groupby_agg(
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)

inds = tuple(range(array.ndim))
if method == "blockwise":
assert by.ndim == 1
assert expected_groups is not None

by_input = by
def _reduction_func(a, by, axis, start_group, num_groups):
# adjust group labels to start from 0 for each chunk
by_for_chunk = by - start_group
expected_groups_for_chunk = pd.RangeIndex(num_groups)

# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
axis = (axis,) # convert integral axis to tuple

by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
blockwise_method = partial(
_reduce_blockwise,
agg=agg,
axis=axis,
expected_groups=expected_groups_for_chunk,
fill_value=fill_value,
engine=engine,
sort=sort,
reindex=reindex,
)
out = blockwise_method(a, by_for_chunk)
return out[agg.name]

# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
num_groups = len(expected_groups)
result = cubed.core.groupby.groupby_blockwise(
array, by, axis=axis, func=_reduction_func, num_groups=num_groups
)
groups = (expected_groups.to_numpy(),)
return (result, groups)

assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping
else:
inds = tuple(range(array.ndim))

def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
by_input = by

# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(
array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)
)

by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])

# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown

assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping

def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {
f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])
}

def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out

def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]

# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)

result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}

def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out

def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]

# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)

result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)

groups = (expected_groups.to_numpy(),)
groups = (expected_groups.to_numpy(),)

return (result, groups)
return (result, groups)


def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
Expand Down Expand Up @@ -2467,9 +2503,9 @@ def groupby_reduce(
if method is None:
method = "map-reduce"

if method != "map-reduce":
if method not in ("map-reduce", "blockwise"):
raise NotImplementedError(
"Reduction for Cubed arrays is only implemented for method 'map-reduce'."
"Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'."
)

partial_agg = partial(cubed_groupby_agg, **kwargs)
Expand Down
34 changes: 34 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method):
assert_equal(expected, actual)


@requires_cubed
@pytest.mark.parametrize("method", ["blockwise", "map-reduce"])
def test_group_by_datetime_cubed(engine, method):
kwargs = dict(
func="mean",
method=method,
engine=engine,
)
t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
data = t.dt.dayofyear
cubedarray = cubed.from_array(data.values, chunks=30)

actual, _ = groupby_reduce(cubedarray, t, **kwargs)
expected = data.to_numpy().astype(float)
assert_equal(expected, actual)

edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
actual, _ = groupby_reduce(
cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs
)
expected = data.resample("ME").mean().to_numpy()
assert_equal(expected, actual)

actual, _ = groupby_reduce(
cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])),
t.to_numpy(),
isbin=True,
expected_groups=edges,
**kwargs,
)
expected = np.broadcast_to(expected, (2, 3, expected.shape[-1]))
assert_equal(expected, actual)


def test_factorize_values_outside_bins():
# pd.factorize returns intp
vals = factorize_(
Expand Down

0 comments on commit 7166db3

Please sign in to comment.