Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556 #671

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions pangeo_forge_recipes/rechunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import xarray as xr
import zarr

from .aggregation import XarraySchema, determine_target_chunks
from .chunk_grid import ChunkGrid
Expand Down Expand Up @@ -238,3 +239,42 @@ def _sort_by_speed_of_varying(item):
ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)

return first_index, ds_combined


def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
return list(
set(itertools.chain(*(group[var].attrs.get("_ARRAY_DIMENSIONS", []) for var in group)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine since this is a zarr.Group this iteration isn't very expensive b/c it's on summary data and never really that large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh, I think you're right? Since it's just scanning across vars in groups and looking at the .attrs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this function but the fact that it's the same as we had in the pre-Beam release checks out to me!

def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:

)


def consolidate_dimension_coordinates(
singleton_target_store: zarr.storage.FSStore,
) -> zarr.storage.FSStore:
"""Consolidate dimension coordinates chunking"""
group = zarr.open_group(singleton_target_store)

dims = (dim for dim in _gather_coordinate_dimensions(group) if dim in group)
for dim in dims:
arr = group[dim]
attrs = dict(arr.attrs)
data = arr[:]

# This will generally use bulk-delete API calls
# config.storage_config.target.rm(dim, recursive=True)

singleton_target_store.fs.rm(singleton_target_store.path + "/" + dim, recursive=True)

new = group.array(
dim,
data,
chunks=arr.shape,
dtype=arr.dtype,
compressor=arr.compressor,
fill_value=arr.fill_value,
order=arr.order,
filters=arr.filters,
overwrite=True,
)

new.attrs.update(attrs)
return singleton_target_store
15 changes: 12 additions & 3 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
from .openers import open_url, open_with_kerchunk, open_with_xarray
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
from .rechunking import combine_fragments, split_fragment
from .rechunking import combine_fragments, consolidate_dimension_coordinates, split_fragment
from .storage import CacheFSSpecTarget, FSSpecTarget
from .writers import ZarrWriterMixin, store_dataset_fragment, write_combined_reference

Expand Down Expand Up @@ -412,6 +412,11 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return new_fragments


class ConsolidateDimensionCoordinates(beam.PTransform):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a docstring here?

def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollection:
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
return pcoll | beam.Map(consolidate_dimension_coordinates)


@dataclass
class CombineReferences(beam.PTransform):
"""Combines Kerchunk references into a single reference dataset.
Expand Down Expand Up @@ -572,6 +577,9 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
`store_name` will be appended to this prefix to create a full path.
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the data.
:param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a
single chunk. We recommend consolidating coordinate variables to avoid
many small read requests to get the coordinates in xarray. Defaults to ``True``.
Comment on lines +588 to +590
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point of clarification for my own understanding: do we know for sure that this make a meaningful difference for kerchunk datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I don't actually know. Maybe @abarciauskas-bgse has some insight into this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is a significant difference if we consolidate dimension coordinates in kerchunk datasets. I experienced slow performance when working with zarr stores and saw a relationship between performance (specifically for creating image tiles using rio_tiler's XarrayReader) and the number of coordinate chunks. The notebook which demonstrates this is unfortunately not a brief one (https://github.com/developmentseed/tile-benchmarking/blob/9758c732597c154d1cf7cd796b21c858c3130046/profiling/profile.ipynb) but I could streamline it if we need to do more investigation.

For kerchunk, I would guess there would be a performance impact if a coordinate's chunks are all read all at once or in multiple reads. If it's in multiple reads, it still might impact performance. Do we know which it is? Or should I investigate/run some more tests?

:param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset``
template dataset as its first argument and returns a dynamically generated chunking dict.
If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking
Expand All @@ -593,6 +601,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidate_dimension_coordinates: bool = False

def __post_init__(self):
if self.target_chunks and self.dynamic_chunking_fn:
Expand Down Expand Up @@ -625,7 +634,7 @@ def expand(
| beam.combiners.Sample.FixedSizeGlobally(1)
| beam.FlatMap(lambda x: x) # https://stackoverflow.com/a/47146582
)
# TODO: optionally use `singleton_target_store` to
# consolidate metadata and/or coordinate dims here
if self.consolidate_dimension_coordinates:
singleton_target_store = singleton_target_store | ConsolidateDimensionCoordinates()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: optionally use `singleton_target_store` to
# consolidate metadata and/or coordinate dims here
if self.consolidate_dimension_coordinates:
singleton_target_store = singleton_target_store | ConsolidateDimensionCoordinates()

I went back and forth on this, but IMHO I think the right path here is to not do any consolidation by default, and leave it to users to compose modularly, i.e.:

recipe = (
    ...
    | StoreToZarr()
    | ConsolidateCoordinateDimensions()
    | ConsolidateMetadata()
)

This is more verbose for users, but it is also more explicit as to what they can expect, and it adheres to the composable ethos of Beam, and it's less API-creep for us to maintain in StoreToZarr.

Happy to discuss dissenting views here, but as of now this feels like the right path to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to chat more about this!
I also do like the composability, but wonder about the trade off for transformations that should be done on almost every recipe, but might get overlooked by a newer recipe developer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a chat, I would love to attend if it fits on my schedule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for weighing in! I am not 100% sure which design is best here, and appreciate the lively discussion so we can land on the best solution. (Here and in the comment thread above.)

done on almost every recipe, but might get overlooked by a newer recipe developer.

This is a small point, but one that just came to mind: in my experience, for smaller zarr stores, consolidation doesn't really matter / have a performance impact. Therefore, in some sense do we care if newer recipe developers (which presumably implies smaller-scale as well?) overlook this?

We absolutely should have a design discussion on this, possibly before the next coordination meeting (to expedite the process). I will try to first outline what I see as the various options in a comment on this thread to solicit async feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Happy to chat before. I won't be able to make it to the next coordination meeting, so this week or next before Friday would be ideal for me.


return singleton_target_store
29 changes: 29 additions & 0 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
import pytest
import xarray as xr
import zarr
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from fsspec.implementations.reference import ReferenceFileSystem
Expand Down Expand Up @@ -172,3 +173,31 @@ def test_reference_grib(
# various inconsistencies (of dtype casting int to float, etc.). With the right combination of
# options passed to the pipeline, seems like these should pass?
# xr.testing.assert_equal(ds.load(), ds2)


@pytest.mark.parametrize("consolidate_dimension_coordinates", [False, True])
def test_xarray_zarr_consolidate_dimension_coordinates(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target_url,
consolidate_dimension_coordinates,
):
pattern = netcdf_local_file_pattern_sequential
with pipeline as p:
(
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target_url,
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
store_name="subpath",
combine_dims=pattern.combine_dim_keys,
consolidate_dimension_coordinates=consolidate_dimension_coordinates,
)
)

store = zarr.open(os.path.join(tmp_target_url, "subpath"))
if not consolidate_dimension_coordinates:
assert store.time.chunks[0] != store.time.shape[0]
if consolidate_dimension_coordinates:
assert store.time.chunks[0] == store.time.shape[0]
33 changes: 32 additions & 1 deletion tests/test_rechunking.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import itertools
import os
import random
from collections import namedtuple
from tempfile import TemporaryDirectory

import numpy as np
import pytest
import xarray as xr
import zarr

from pangeo_forge_recipes.rechunking import GroupKey, combine_fragments, split_fragment
from pangeo_forge_recipes.rechunking import (
GroupKey,
combine_fragments,
consolidate_dimension_coordinates,
split_fragment,
)
from pangeo_forge_recipes.types import CombineOp, Dimension, Index, IndexedPosition, Position

from .conftest import split_up_files_by_variable_and_day
Expand Down Expand Up @@ -258,3 +267,25 @@ def test_combine_fragments_errors():
index1 = Index({Dimension("time", CombineOp.CONCAT): IndexedPosition(2)})
with pytest.raises(ValueError, match="are not consistent"):
_ = combine_fragments(group, [(index0, ds), (index1, ds)])


def test_consolidate_dimension_coordinates():
td = TemporaryDirectory()
store_path = os.path.join(td.name + "tmp.zarr")
group = zarr.group(store=store_path, overwrite=True)
group.create(name="data", shape=100, chunks=10, dtype="i4")
group.create(name="time", shape=100, chunks=10, dtype="i4")
group.data[:] = np.random.randn(*group.data.shape)
group.time[:] = np.arange(100)

# If you don't provide these attrs,
# consolidate_dimension_coordinates does not
# raise an error, while Xarray does
group.attrs["_ARRAY_DIMENSIONS"] = ["time"]
jbusecke marked this conversation as resolved.
Show resolved Hide resolved
group.data.attrs["_ARRAY_DIMENSIONS"] = ["time"]
group.time.attrs["_ARRAY_DIMENSIONS"] = ["time"]

consolidated_zarr = consolidate_dimension_coordinates(zarr.storage.FSStore(store_path))
store = zarr.open(consolidated_zarr)
assert store.time.chunks[0] == 100
assert store.data.chunks[0] == 10
1 change: 1 addition & 0 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
combine_dims=pattern.combine_dim_keys,
attrs={},
dynamic_chunking_fn=dynamic_chunking_fn,
consolidate_dimension_coordinates=False,
**kws,
)
open_store = target_store | OpenZarrStore()
Expand Down