Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Dynamic rechunking option for StoreToZarr #546

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
6346d41
first implementation not working
jbusecke Jul 15, 2023
089dc7a
Save progress from the hack
jbusecke Jul 15, 2023
5fadc03
New even divisor algo + passing tests
jbusecke Jul 16, 2023
d0f3bcb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2023
1903ca7
Remove commented out old version
jbusecke Jul 16, 2023
62d46ad
merge commit
jbusecke Jul 16, 2023
243c5c3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2023
4ba4066
Bugfix now scales with ds size
jbusecke Jul 18, 2023
9375136
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2023
6b89214
rename difference to similarity
jbusecke Jul 18, 2023
f6e86d6
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 18, 2023
b2c48bf
implemented + tested dask.utils.parse_bytes + docstring
jbusecke Jul 18, 2023
b99c4ef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2023
bc980f7
Implemented review + renaming + test pass locally
jbusecke Jul 25, 2023
29097e8
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 25, 2023
f3aeaa5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 25, 2023
c913d3b
nightly commit
jbusecke Jul 25, 2023
9a845a1
Added dynamic chunking and checks to StoreToZarr
jbusecke Jul 26, 2023
39ff7e6
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Jul 26, 2023
f608686
some cleaning up + docstring adds
jbusecke Jul 26, 2023
e3d61eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 26, 2023
d0fce99
Fix flake8 for new tests
jbusecke Jul 26, 2023
ee45258
Restored default behavior if no chunking input is given
jbusecke Jul 26, 2023
e97c492
fixed mypy issues
jbusecke Jul 26, 2023
f5bcea8
Add mypy cast comment
jbusecke Jul 26, 2023
427dbe7
Add final test
jbusecke Jul 26, 2023
91eb6a6
Update dynamic_target_chunks.py
jbusecke Jul 28, 2023
a5a182a
assign target chunks as singelton
jbusecke Jul 29, 2023
707a9ca
simplify AsSingleton call
jbusecke Aug 2, 2023
4ad2a69
Go back to the old logic, but fix typo
jbusecke Aug 2, 2023
8df16ba
Print dynamically determined chunks
jbusecke Aug 3, 2023
11de660
Update transforms.py
jbusecke Aug 5, 2023
b8ead76
implement logic for extra and missing dims + tests
jbusecke Aug 5, 2023
eecd8ed
Add default ratio and allow extra dims input on storetozarr
jbusecke Aug 6, 2023
c28ac56
Merge branch 'dynamic_chunks_2' of github.com:jbusecke/pangeo-forge-r…
jbusecke Aug 6, 2023
7347c1b
Fill docstring of StoreToZarr
jbusecke Aug 6, 2023
96c57e8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
edb4099
Update transforms.py
jbusecke Aug 6, 2023
377133b
Rework docstring for dynamic_target_chunks_from_schema
jbusecke Aug 6, 2023
575bec2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
9334e61
Update pangeo_forge_recipes/dynamic_target_chunks.py
jbusecke Aug 6, 2023
5062318
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
d0f2cf6
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
44aceaa
Update pangeo_forge_recipes/transforms.py
jbusecke Aug 6, 2023
ca680c3
Update test_transforms.py
jbusecke Aug 6, 2023
8d32404
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 6, 2023
53b0ae9
Fix tests
jbusecke Aug 6, 2023
3de3168
fix store to zarr call to _get_target_chunks
jbusecke Aug 6, 2023
db0551f
fix flake 8 issues
jbusecke Aug 6, 2023
27bfd36
fix matched error message in test
jbusecke Aug 6, 2023
3ca76df
Fix another test
jbusecke Aug 6, 2023
4838c5f
Add logging step printing the schema
jbusecke Aug 18, 2023
59432fd
Merge branch 'main' into dynamic_chunks_2
jbusecke Aug 18, 2023
d9115f7
Restor cast import
jbusecke Aug 18, 2023
ec8ab95
Try to force test in-line after finished store
jbusecke Aug 18, 2023
25823e6
more tinkering with store return
jbusecke Aug 18, 2023
ad79726
Reverting changes to target_store return
jbusecke Aug 18, 2023
2cc6700
Added fallback algorithm for non-even divisions
jbusecke Aug 22, 2023
db6767b
Attempt to add logging to dyn chunk logic
jbusecke Aug 22, 2023
fa05c9c
Increase range of scaling factors
jbusecke Aug 22, 2023
082979b
More tests + slight algo refactor
jbusecke Aug 22, 2023
e40eb55
Merge remote-tracking branch 'upstream/main' into dynamic_chunks_2
jbusecke Aug 23, 2023
774fff8
Merge remote-tracking branch 'upstream/main' into dynamic_chunks_2
jbusecke Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions pangeo_forge_recipes/dynamic_target_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import itertools
from typing import Dict, List

import numpy as np
import xarray as xr

from pangeo_forge_recipes.aggregation import XarraySchema, schema_to_template_ds


def get_memory_size(ds: xr.Dataset, chunks: Dict[str, int]) -> int:
"""Returns an estimate of memory size based on input chunks.
Currently this applies the chunks input to the dataset, then
iterates through the variables and returns the maximum.
"""
ds_single_chunk = ds.isel({dim: slice(0, chunk) for dim, chunk in chunks.items()})
mem_size = max([ds_single_chunk[var].nbytes for var in ds_single_chunk.data_vars])
return mem_size


def difference(a: np.ndarray, b: np.ndarray) -> np.ndarray:
jbusecke marked this conversation as resolved.
Show resolved Hide resolved
return np.sqrt(np.sum((a - b) ** 2))


def normalize(a: np.ndarray) -> np.ndarray:
"""Convert to a unit vector"""
return a / np.sqrt(np.sum(a**2))


def even_divisor_chunks(n: int) -> List[int]:
"""Returns values that evenly divide n"""
divisors = []
for i in range(1, n + 1):
if n % i == 0:
divisors.append(n // i)
return divisors


def dynamic_target_chunks_from_schema(
schema: XarraySchema,
target_chunk_nbytes: int, # TODO: Accept a str like `100MB`
target_chunk_ratio: Dict[str, int],
nbytes_tolerance: float = 0.2,
) -> dict[str, int]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to review this PR but realized I was missing some key context. Could we provide a docstring for this function which explains what this function does and what these parameters are? In particular, I don't understand target_chunk_ratio.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I was also confused by that term. So +1 on additional context. 🙏

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be slightly clearer to call it target_chunk_aspect_ratio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a docstring explaining the inputs.


ds = schema_to_template_ds(schema)

if set(target_chunk_ratio.keys()) != set(ds.dims):
raise ValueError(
f"target_chunk_ratio must contain all dimensions in dataset. "
f"Got {target_chunk_ratio.keys()} but expected {list(ds.dims.keys())}"
)

dims, shape = zip(*ds.dims.items())
ratio = [target_chunk_ratio[dim] for dim in dims]
ratio_normalized = normalize(np.array(ratio))

possible_chunks = []
for s, r, dim in zip(shape, ratio, dims):
if r > 0:
# Get a list of all the even divisors
possible_chunks.append(even_divisor_chunks(s))
elif r == -1:
# Always keep this dimension unchunked
possible_chunks.append([s])
else:
raise ValueError(
f"Ratio value can only be larger than 0 or -1. Got {r} for dimension {dim}"
)

combinations = [p for p in itertools.product(*possible_chunks)]
# Check the size of each combination on the dataset
combination_sizes = [
get_memory_size(ds, {dim: chunk for dim, chunk in zip(dims, c)}) for c in combinations
]

# And select a subset with some form of tolerance based on the size requirement
tolerance = nbytes_tolerance * target_chunk_nbytes
combinations_filtered = [
c
for c, s in zip(combinations, combination_sizes)
if abs(s - target_chunk_nbytes) < tolerance
]

# If there are no matches in the range, the user has to increase the tolerance for this to work.
if len(combinations_filtered) == 0:
raise ValueError(
"Could not find any chunk combinations satisfying the size constraint. Consider increasing tolerance"
)

# Now that we have cominations in the memory size range we want, we can check which is closest to our
# desired chunk ratio. We can think of this as comparing the angle of two vectors.
# To compare them we need to normalize (we dont care about the amplitude here)

# convert the combinations into the normalized inverse
ratio_combinations = [normalize(1 / np.array(c)) for c in combinations_filtered]
# ratio_combinations = [normalize(np.array(c)) for c in combinations_filtered]

# Find the 'closest' fit of chunk ratio to the target ratio
# cartesian difference between vectors ok?
ratio_difference = [difference(ratio_normalized, r) for r in ratio_combinations]

combinations_sorted = [c for _, c in sorted(zip(ratio_difference, combinations_filtered))]

# Return the chunk combination with the closest fit
optimal_combination = combinations_sorted[0]

return {dim: chunk for dim, chunk in zip(dims, optimal_combination)}
1 change: 0 additions & 1 deletion pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
T = TypeVar("T")
Indexed = Tuple[Index, T]


# TODO: replace with beam.MapTuple?
def _add_keys(func):
"""Convenience decorator to remove and re-add keys to items in a Map"""
Expand Down
63 changes: 63 additions & 0 deletions tests/test_dynamic_target_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import dask.array as dsa
import pytest
import xarray as xr

from pangeo_forge_recipes.aggregation import dataset_to_schema
from pangeo_forge_recipes.dynamic_target_chunks import dynamic_target_chunks_from_schema


class TestDynamicTargetChunks:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially a good use case for hypothesis - you could parameterize a unit test with a hypothesis strategy that generates arbitrary (regular) chunks, then assert that the property that the returned target chunk size is within the specified tolerance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is a good idea. I will have to dig into hypothesis a bit to understand how to encode the logic.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR to dask might give you what you need (I can also finish it if that would help)

dask/dask#9374

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough tests.

Question: is there a reason you chose to use pytest's test class approach here? I'm not categorically opposed to it, but AFAICT the tests added by this PR would be the only class-based tests in pangeo-forge-recipes, so for stylistic consistency maybe better to write them in the function-based style, unless there's a specific reason not to?

@pytest.fixture
def ds(self) -> int:
return xr.DataArray(dsa.random.random([100, 300, 400]), dims=["x", "y", "z"]).to_dataset(
name="data"
)

@pytest.mark.parametrize(
("target_chunk_ratio", "expected_target_chunks"),
[
# make sure that for the same dataset we get smaller chunksize along a dimension if the ratio is larger
({"x": 1, "y": 1, "z": 10}, {"x": 100, "y": 150, "z": 8}),
({"x": 10, "y": 1, "z": 1}, {"x": 10, "y": 150, "z": 80}),
# test the special case where we want to just chunk along a single dimension
({"x": -1, "y": -1, "z": 1}, {"x": 100, "y": 300, "z": 4}),
],
)
def test_dynamic_rechunking1(self, ds, target_chunk_ratio, expected_target_chunks):
schema = dataset_to_schema(ds)
target_chunks = dynamic_target_chunks_from_schema(
schema, 1e6, target_chunk_ratio=target_chunk_ratio
)
print(target_chunks)
for dim, chunks in expected_target_chunks.items():
assert target_chunks[dim] == chunks

@pytest.mark.parametrize(
"target_chunk_ratio", [{"x": 1, "y": -1, "z": 10}, {"x": 6, "y": -1, "z": 2}]
) # always keep y unchunked, and vary the others
@pytest.mark.parametrize("target_chunk_nbytes", [1e6, 1e7])
def test_dynamic_skip_dimension(self, ds, target_chunk_ratio, target_chunk_nbytes):
# Mark dimension as 'not-to-chunk' with -1
schema = dataset_to_schema(ds)
target_chunks = dynamic_target_chunks_from_schema(
schema, target_chunk_nbytes, target_chunk_ratio=target_chunk_ratio
)
assert target_chunks["y"] == len(ds["y"])

def test_dynamic_rechunking_error_dimension_missing(self, ds):
# make sure that an error is raised if some dimension is not specified
schema = dataset_to_schema(ds)

with pytest.raises(
ValueError, match="target_chunk_ratio must contain all dimensions in dataset."
):
dynamic_target_chunks_from_schema(schema, 1e6, target_chunk_ratio={"x": 1, "z": 10})

def test_dynamic_rechunking_error_dimension_wrong(self, ds):
schema = dataset_to_schema(ds)
with pytest.raises(
ValueError, match="target_chunk_ratio must contain all dimensions in dataset."
):
dynamic_target_chunks_from_schema(
schema, 1e6, target_chunk_ratio={"x": 1, "y_wrong": 1, "z": 10}
)