Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556 #671

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/composition/styles.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ the recipe pipeline will contain at a minimum the following transforms applied t
* {class}`pangeo_forge_recipes.transforms.OpenURLWithFSSpec`: retrieves each pattern file using the specified URLs.
* {class}`pangeo_forge_recipes.transforms.OpenWithXarray`: load each pattern file into an [`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html).
* {class}`pangeo_forge_recipes.transforms.StoreToZarr`: generate a Zarr store by combining the datasets.
* {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates`: consolidate the Dimension Coordinates for dataset read performance.
* {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata`: calls Zarr's convinience function to consolidate metadata.

```{tip}
If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates` transform, make sure to chain on the {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata` transform to your recipe.

```


## Open with Kerchunk, write to virtual Zarr

Expand Down
10 changes: 9 additions & 1 deletion examples/feedstock/gpcp_from_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
import zarr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from pangeo_forge_recipes.transforms import (
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)

dates = [
d.to_pydatetime().strftime("%Y%m%d")
Expand Down Expand Up @@ -43,5 +49,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
store_name="gpcp.zarr",
combine_dims=pattern.combine_dim_keys,
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| "Test dataset" >> beam.Map(test_ds)
)
12 changes: 10 additions & 2 deletions examples/feedstock/noaa_oisst.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
import zarr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from pangeo_forge_recipes.transforms import (
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)

dates = pd.date_range("1981-09-01", "2022-02-01", freq="D")

Expand All @@ -26,7 +32,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# TODO: see if --setup-file option for runner fixes this
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
ds = xr.open_dataset(store, engine="zarr", consolidated=True, chunks={})
for var in ["anom", "err", "ice", "sst"]:
assert var in ds.data_vars
return store
Expand All @@ -40,5 +46,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
store_name="noaa-oisst.zarr",
combine_dims=pattern.combine_dim_keys,
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| beam.Map(test_ds)
)
41 changes: 41 additions & 0 deletions pangeo_forge_recipes/rechunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import xarray as xr
import zarr

from .aggregation import XarraySchema, determine_target_chunks
from .chunk_grid import ChunkGrid
Expand Down Expand Up @@ -238,3 +239,43 @@ def _sort_by_speed_of_varying(item):
ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)

return first_index, ds_combined


def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
return list(
set(itertools.chain(*(group[var].attrs.get("_ARRAY_DIMENSIONS", []) for var in group)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine since this is a zarr.Group this iteration isn't very expensive b/c it's on summary data and never really that large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh, I think you're right? Since it's just scanning across vars in groups and looking at the .attrs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this function but the fact that it's the same as we had in the pre-Beam release checks out to me!

def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:

)


def consolidate_dimension_coordinates(
singleton_target_store: zarr.storage.FSStore,
) -> zarr.storage.FSStore:
"""Consolidate dimension coordinates chunking"""
group = zarr.open_group(singleton_target_store)

dims = (dim for dim in _gather_coordinate_dimensions(group) if dim in group)
for dim in dims:
arr = group[dim]
attrs = dict(arr.attrs)
data = arr[:]

# This will generally use bulk-delete API calls
# config.storage_config.target.rm(dim, recursive=True)

singleton_target_store.fs.rm(singleton_target_store.path + "/" + dim, recursive=True)

new = group.array(
dim,
data,
chunks=arr.shape,
dtype=arr.dtype,
compressor=arr.compressor,
fill_value=arr.fill_value,
order=arr.order,
filters=arr.filters,
overwrite=True,
)

new.attrs.update(attrs)

return singleton_target_store
30 changes: 20 additions & 10 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas
from .openers import open_url, open_with_kerchunk, open_with_xarray
from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop
from .rechunking import combine_fragments, split_fragment
from .rechunking import combine_fragments, consolidate_dimension_coordinates, split_fragment
from .storage import CacheFSSpecTarget, FSSpecTarget
from .writers import (
ZarrWriterMixin,
Expand Down Expand Up @@ -364,9 +364,14 @@ class PrepareZarrTarget(beam.PTransform):
If chunking is present in the schema for a given dimension, the length of
the first fragment will be used. Otherwise, the dimension will not be chunked.
:param attrs: Extra group-level attributes to inject into the dataset.
:param consolidated_metadata: Bool controlling if xarray.to_zarr()
writes consolidated metadata. Default's to True.
:param encoding: Dictionary describing encoding for xarray.to_zarr()
:param consolidated_metadata: Bool controlling if xarray.to_zarr()
writes consolidated metadata. Default's to False. In StoreToZarr,
always default to unconsolidated. This leaves it up to the
user whether or not they want to consolidate with ConsolidateMetadata(). Also,
it prevents a broken/inconsistent state that could arise from metadata being
consolidated here, and then falling out of sync with coordinates if
ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr().
"""

target: str | FSSpecTarget
Expand All @@ -386,8 +391,8 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
target_store=store,
target_chunks=self.target_chunks,
attrs=self.attrs,
consolidated_metadata=self.consolidated_metadata,
encoding=self.encoding,
consolidated_metadata=False,
)
return initialized_target

Expand Down Expand Up @@ -430,6 +435,13 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return new_fragments


class ConsolidateDimensionCoordinates(beam.PTransform):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a docstring here?

def expand(
self, pcoll: beam.PCollection[zarr.storage.FSStore]
) -> beam.PCollection[zarr.storage.FSStore]:
return pcoll | beam.Map(consolidate_dimension_coordinates)


@dataclass
class CombineReferences(beam.PTransform):
"""Combines Kerchunk references into a single reference dataset.
Expand Down Expand Up @@ -573,6 +585,9 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
`store_name` will be appended to this prefix to create a full path.
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the data.
:param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a
single chunk. We recommend consolidating coordinate variables to avoid
many small read requests to get the coordinates in xarray. Defaults to ``True``.
Comment on lines +588 to +590
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point of clarification for my own understanding: do we know for sure that this make a meaningful difference for kerchunk datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I don't actually know. Maybe @abarciauskas-bgse has some insight into this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is a significant difference if we consolidate dimension coordinates in kerchunk datasets. I experienced slow performance when working with zarr stores and saw a relationship between performance (specifically for creating image tiles using rio_tiler's XarrayReader) and the number of coordinate chunks. The notebook which demonstrates this is unfortunately not a brief one (https://github.com/developmentseed/tile-benchmarking/blob/9758c732597c154d1cf7cd796b21c858c3130046/profiling/profile.ipynb) but I could streamline it if we need to do more investigation.

For kerchunk, I would guess there would be a performance impact if a coordinate's chunks are all read all at once or in multiple reads. If it's in multiple reads, it still might impact performance. Do we know which it is? Or should I investigate/run some more tests?

:param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset``
template dataset as its first argument and returns a dynamically generated chunking dict.
If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking
Expand All @@ -581,8 +596,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
out https://github.com/jbusecke/dynamic_chunks
:param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``.
:param attrs: Extra group-level attributes to inject into the dataset.
:param consolidated_metadata: Bool controlling if xarray.to_zarr()
writes consolidated metadata. Default's to True.

:param encoding: Dictionary encoding for xarray.to_zarr().
"""

Expand All @@ -597,7 +611,6 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin):
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
attrs: Dict[str, str] = field(default_factory=dict)
consolidated_metadata: Optional[bool] = True
encoding: Optional[dict] = field(default_factory=dict)

def __post_init__(self):
Expand All @@ -624,7 +637,6 @@ def expand(
target=self.get_full_target(),
target_chunks=target_chunks,
attrs=self.attrs,
consolidated_metadata=self.consolidated_metadata,
encoding=self.encoding,
)
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
Expand All @@ -633,7 +645,5 @@ def expand(
| beam.combiners.Sample.FixedSizeGlobally(1)
| beam.FlatMap(lambda x: x) # https://stackoverflow.com/a/47146582
)
# TODO: optionally use `singleton_target_store` to
# consolidate metadata and/or coordinate dims here

return singleton_target_store
6 changes: 2 additions & 4 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping:
:param store: Input Store for Zarr
:type store: MutableMapping
:return: Output Store
:rtype: MutableMapping
:rtype: zarr.storage.FSStore
"""

import zarr
Expand All @@ -83,12 +83,10 @@ def consolidate_metadata(store: MutableMapping) -> MutableMapping:
"""Creating consolidated metadata for Kerchunk references should not
yield a performance benefit so consolidating metadata is not supported."""
)

if isinstance(store, zarr.storage.FSStore):
zarr.convenience.consolidate_metadata(store)

zc = zarr.open_consolidated(store)
return zc
return store


def store_dataset_fragment(
Expand Down
28 changes: 28 additions & 0 deletions tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
OpenWithKerchunk,
OpenWithXarray,
StoreToZarr,
Expand Down Expand Up @@ -173,3 +175,29 @@ def test_reference_grib(
# various inconsistencies (of dtype casting int to float, etc.). With the right combination of
# options passed to the pipeline, seems like these should pass?
# xr.testing.assert_equal(ds.load(), ds2)


def test_xarray_zarr_consolidate_dimension_coordinates(
netcdf_local_file_pattern_sequential,
pipeline,
tmp_target,
):
pattern = netcdf_local_file_pattern_sequential
with pipeline as p:
(
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target,
store_name="subpath",
combine_dims=pattern.combine_dim_keys,
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
)

path = os.path.join(tmp_target.root_path, "subpath")
ds = xr.open_dataset(path, engine="zarr", consolidated=True, chunks={})

assert ds.time.encoding["chunks"][0] == ds.time.shape[0]
32 changes: 31 additions & 1 deletion tests/test_rechunking.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import itertools
import os
import random
from collections import namedtuple
from tempfile import TemporaryDirectory

import numpy as np
import pytest
import xarray as xr
import zarr

from pangeo_forge_recipes.rechunking import GroupKey, combine_fragments, split_fragment
from pangeo_forge_recipes.rechunking import (
GroupKey,
combine_fragments,
consolidate_dimension_coordinates,
split_fragment,
)
from pangeo_forge_recipes.types import CombineOp, Dimension, Index, IndexedPosition, Position

from .conftest import split_up_files_by_variable_and_day
Expand Down Expand Up @@ -258,3 +267,24 @@ def test_combine_fragments_errors():
index1 = Index({Dimension("time", CombineOp.CONCAT): IndexedPosition(2)})
with pytest.raises(ValueError, match="are not consistent"):
_ = combine_fragments(group, [(index0, ds), (index1, ds)])


def test_consolidate_dimension_coordinates():
td = TemporaryDirectory()
store_path = os.path.join(td.name + "tmp.zarr")
group = zarr.group(store=store_path, overwrite=True)
group.create(name="data", shape=100, chunks=10, dtype="i4")
group.create(name="time", shape=100, chunks=10, dtype="i4")
group.data[:] = np.random.randn(*group.data.shape)
group.time[:] = np.arange(100)

# If you don't provide these attrs,
# consolidate_dimension_coordinates does not
# raise an error, while Xarray does
group.data.attrs["_ARRAY_DIMENSIONS"] = ["time"]
group.time.attrs["_ARRAY_DIMENSIONS"] = ["time"]

consolidated_zarr = consolidate_dimension_coordinates(zarr.storage.FSStore(store_path))
store = zarr.open(consolidated_zarr)
assert store.time.chunks[0] == 100
assert store.data.chunks[0] == 10
1 change: 0 additions & 1 deletion tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1):
combine_dims=pattern.combine_dim_keys,
attrs={},
dynamic_chunking_fn=dynamic_chunking_fn,
consolidated_metadata=True,
**kws,
)
open_store = target_store | OpenZarrStore()
Expand Down
1 change: 0 additions & 1 deletion tests/test_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def test_zarr_consolidate_metadata(
target_root=tmp_target,
store_name="store",
combine_dims=pattern.combine_dim_keys,
consolidated_metadata=False,
)
| ConsolidateMetadata()
)
Expand Down
Loading