Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556 #671

Merged

Conversation

norlandrhagen
Copy link
Contributor

Reviving from a clean slate https://github.com/norlandrhagen/pangeo-forge-recipes/pull/new/consolidate_dimension_coords. This PR only intends to work on this issue: #556.

@norlandrhagen
Copy link
Contributor Author

Unit and end2end tests added! Big thanks to @jbusecke for help here. Both of us went back to Zarr101 class to figure out how to create a dataset without Xarray.

@norlandrhagen norlandrhagen changed the title [WIP] ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556 ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556 Jan 18, 2024
@norlandrhagen norlandrhagen marked this pull request as ready for review January 18, 2024 22:44
@jbusecke
Copy link
Contributor

Big thanks back! This was really fun, and I am excited to see this moving.

Copy link
Contributor

@ranchodeluxe ranchodeluxe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me (but I don't know much 😆). Like the tests. Left a couple non-blocking comments

tests/test_end_to_end.py Outdated Show resolved Hide resolved

def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
return list(
set(itertools.chain(*(group[var].attrs.get("_ARRAY_DIMENSIONS", []) for var in group)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine since this is a zarr.Group this iteration isn't very expensive b/c it's on summary data and never really that large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh, I think you're right? Since it's just scanning across vars in groups and looking at the .attrs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this function but the fact that it's the same as we had in the pre-Beam release checks out to me!

def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:

Copy link
Contributor

@jbusecke jbusecke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @norlandrhagen. Ill try this in the wild with CMIP6 in a few.

You can probably delete the TODO comments here too.

tests/test_rechunking.py Outdated Show resolved Hide resolved
@jbusecke
Copy link
Contributor

@jbusecke
Copy link
Contributor

⚠️ This failed with all my test case cmip files with an error like this:

Error message from worker: Traceback (most recent call last):
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/conventions.py", line 436, in decode_cf_variables
    new_vars[k] = decode_cf_variable(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/conventions.py", line 287, in decode_cf_variable
    var = times.CFDatetimeCoder(use_cftime=use_cftime).decode(var, name=name)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/coding/times.py", line 832, in decode
    dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/coding/times.py", line 209, in _decode_cf_datetime_dtype
    [first_n_items(values, 1) or [0], last_item(values) or [0]]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/formatting.py", line 90, in first_n_items
    return np.ravel(to_duck_array(array))[:n_desired]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/pycompat.py", line 140, in to_duck_array
    return np.asarray(data)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 487, in __array__
    return np.asarray(self.get_duck_array(), dtype=dtype)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 490, in get_duck_array
    return self.array.get_duck_array()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 554, in get_duck_array
    array = self.array[self.key]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/zarr.py", line 92, in __getitem__
    return array[key.tuple]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 844, in __getitem__
    result = self.get_basic_selection(pure_selection, fields=fields)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 970, in get_basic_selection
    return self._get_basic_selection_nd(selection=selection, out=out, fields=fields)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 1012, in _get_basic_selection_nd
    return self._get_selection(indexer=indexer, out=out, fields=fields)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 1388, in _get_selection
    self._chunk_getitems(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 2228, in _chunk_getitems
    self._process_chunk(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 2141, in _process_chunk
    chunk = self._decode_chunk(cdata)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/zarr/core.py", line 2427, in _decode_chunk
    chunk = chunk.reshape(expected_shape or self._chunks, order=self._order)
ValueError: cannot reshape array of size 1980 into shape (180,)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
  File "feedstock/recipe.py", line 79, in _test_open_store
  File "feedstock/recipe.py", line 74, in _get_dataset
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/api.py", line 572, in open_dataset
    backend_ds = backend.open_dataset(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/zarr.py", line 1026, in open_dataset
    ds = store_entrypoint.open_dataset(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/store.py", line 46, in open_dataset
    vars, attrs, coord_names = conventions.decode_cf_variables(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/conventions.py", line 447, in decode_cf_variables
    raise type(e)(f"Failed to decode variable {k!r}: {e}") from e
ValueError: Failed to decode variable 'time': cannot reshape array of size 1980 into shape (180,)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 300, in _execute
    response = task()
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 375, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 995, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
  File "feedstock/recipe.py", line 79, in _test_open_store
  File "feedstock/recipe.py", line 74, in _get_dataset
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/api.py", line 572, in open_dataset
    backend_ds = backend.open_dataset(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/zarr.py", line 1026, in open_dataset
    ds = store_entrypoint.open_dataset(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/store.py", line 46, in open_dataset
    vars, attrs, coord_names = conventions.decode_cf_variables(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/conventions.py", line 447, in decode_cf_variables
    raise type(e)(f"Failed to decode variable {k!r}: {e}") from e
ValueError: Failed to decode variable 'time': cannot reshape array of size 1980 into shape (180,) [while running 'Creating CMIP6.CMIP.CCCma.CanESM5.historical.r37i1p2f1.Omon.tos.gn.v20190429|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr|Copy|Logging to non-QC table|TestDataset|Logging to QC table/TestDataset/Testing - Open Store-ptransform-66']

Seems like we need some more test cases with time arrays/enconding ase datetime and cftime?

@norlandrhagen
Copy link
Contributor Author

Thanks for giving it a test drive @jbusecke! Yeah, seems like it. Maybe we can add another test in our Zar102 class next.

@jbusecke
Copy link
Contributor

Would be happy to pair on this maybe tomorrow (Wednesday)? In case that works for you.

@norlandrhagen
Copy link
Contributor Author

That would be great! I'm a bit busy with ESIP today through Thursday, but I'm free all of Friday or next week.

@cisaacstern
Copy link
Member

Looks like we are waiting on resolution of the error @jbusecke hit here, correct?

@norlandrhagen
Copy link
Contributor Author

@cisaacstern yup! @jbusecke and I are going to work on this tomorrow.

@jbusecke
Copy link
Contributor

Ok with the current state of this PR I am getting successful writes of CMIP6 stores like this one:
'gs://cmip6/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1/historical/r29i1p1f2/Omon/tos/gr1/v20200529/'

import gcsfs
fs = gcsfs.GCSFileSystem()
fs.ls('gs://cmip6/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1/historical/r29i1p1f2/Omon/tos/gr1/v20200529/time')

shows that there is only one time chunk:

['cmip6/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1/historical/r29i1p1f2/Omon/tos/gr1/v20200529/time/.zarray',
 'cmip6/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1/historical/r29i1p1f2/Omon/tos/gr1/v20200529/time/.zattrs',
 'cmip6/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1/historical/r29i1p1f2/Omon/tos/gr1/v20200529/time/0']

@norlandrhagen
Copy link
Contributor Author

@cisaacstern this PR seems to be working now!

Copy link
Member

@cisaacstern cisaacstern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much @norlandrhagen and @jbusecke for your work on this!

I think this is very close. As I went through this, I did end up feeling rather strongly that @sbquinlan's earlier suggestion to favor modularity/composability over boolean flag configuration felt to me much more in the design spirit of Beam, and potentially a lower maintenance burden for us as developers as well. All of the "big" changes I suggest here relate to that.

Curious to know your thoughts.

Comment on lines 280 to 282
# Open question: Should this always be called
# or should we have the option of ConsolidateMetadata
zarr.consolidate_metadata(singleton_target_store)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Open question: Should this always be called
# or should we have the option of ConsolidateMetadata
zarr.consolidate_metadata(singleton_target_store)

IMO this is too opinionated, and it should be up to the user if they want to consolidate coordinates and metadata, or just coordinates.

I'm using as a guide here the fact that in the pre-Beam releases, these options were not coupled either, see:

if config.consolidate_dimension_coordinates:
logger.info("Consolidating dimension coordinate arrays")
target_mapper = config.storage_config.target.get_mapper()
group = zarr.open(target_mapper, mode="a")
# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/214
# filter out the dims from the array metadata not in the Zarr group
# to handle coordinateless dimensions.
dims = (dim for dim in _gather_coordinate_dimensions(group) if dim in group)
for dim in dims:
arr = group[dim]
attrs = dict(arr.attrs)
data = arr[:]
# This will generally use bulk-delete API calls
config.storage_config.target.rm(dim, recursive=True)
new = group.array(
dim,
data,
chunks=arr.shape,
dtype=arr.dtype,
compressor=arr.compressor,
fill_value=arr.fill_value,
order=arr.order,
filters=arr.filters,
overwrite=True,
)
new.attrs.update(attrs)
if config.consolidate_zarr:
logger.info("Consolidating Zarr metadata")
target_mapper = config.storage_config.target.get_mapper()
zarr.consolidate_metadata(target_mapper)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the fix @jbusecke and I came up with yesterday to get around the errors reported in the cmip6 pipeline. I think when the coordinate_dims are consolidated in this PR, the metadata has to be updated after. @jbusecke might have a better way to explain this though!

Before the consolidated_metadata PR option was introduced all of the StoreToZarr recipes were consolidating metadata by default.
from the docs The__ default (consolidated=None) means write consolidated metadata and attempt to read consolidated metadata for existing stores (falling back to non-consolidated).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes total sense @norlandrhagen!

Apologies the review here is a little disjointed. Down in #671 (comment) I mention my proposed fix for this, which is that we should also remove the consolidate_metadata flag from StoreToZarr, and always default consolidated=False in StoreToZarr.

Then it is up to the user if they want to do:

StoreToZarr() | ConsolidateCoordinateDimensions()

or

StoreToZarr() | ConsolidateCoordinateDimensions() | ConsolidateMetadata()

(☝️ seems like this is what @jbusecke would want to do)

or

StoreToZarr() | ConsolidateMetadata()

But in order to make these options possible, we need to start from an unconsolidated store emitted by StoreToZarr(), because as you observe here, if StoreToZarr emits a consolidated store, and coordinate dims are subsequently consolidated without then re-consolidating metadata, we end up in a broken state.

Always emitting an unconsolidated store from StoreToZarr() should resolve this.

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries!

It does fix it! However, then we are defaulting to non_consolidated coords, which seems like the wrong approach for most recipes. I think this is fine if everyone knows that they probably should add the ConsolidatedCoords transform into their recipe, but I could see it commonly slipping through the cracks. Maybe this is kind of what kind of guard rails / default assumptions we want to have 🤷 .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share the concern about this requirement for explicitly having to chain consolidation stages (in a specific order) leading to errors on the user side.

Maybe we can consider renaming StoreToZarr-> StoreToZarrUnconsolidated (or something else) and subclass that as the new StoreToZarr (this would change the default behavior)

alternatively make a new class StoreToZarrConsolidated that subclasses the current StoreToZarr, this would not change the default behavior but probably would need a thorough rewrite of the docs to make sure users are using consolidated storage as a default?


def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
return list(
set(itertools.chain(*(group[var].attrs.get("_ARRAY_DIMENSIONS", []) for var in group)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this function but the fact that it's the same as we had in the pre-Beam release checks out to me!

def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:

@@ -430,6 +430,11 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return new_fragments


class ConsolidateDimensionCoordinates(beam.PTransform):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a docstring here?

pangeo_forge_recipes/transforms.py Outdated Show resolved Hide resolved
Comment on lines +582 to +584
:param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a
single chunk. We recommend consolidating coordinate variables to avoid
many small read requests to get the coordinates in xarray. Defaults to ``True``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point of clarification for my own understanding: do we know for sure that this make a meaningful difference for kerchunk datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I don't actually know. Maybe @abarciauskas-bgse has some insight into this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is a significant difference if we consolidate dimension coordinates in kerchunk datasets. I experienced slow performance when working with zarr stores and saw a relationship between performance (specifically for creating image tiles using rio_tiler's XarrayReader) and the number of coordinate chunks. The notebook which demonstrates this is unfortunately not a brief one (https://github.com/developmentseed/tile-benchmarking/blob/9758c732597c154d1cf7cd796b21c858c3130046/profiling/profile.ipynb) but I could streamline it if we need to do more investigation.

For kerchunk, I would guess there would be a performance impact if a coordinate's chunks are all read all at once or in multiple reads. If it's in multiple reads, it still might impact performance. Do we know which it is? Or should I investigate/run some more tests?

pyproject.toml Outdated
@@ -42,7 +42,7 @@ dependencies = [
[project.optional-dependencies]
test = [
"click",
"pytest",
"pytest<8.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this needs to be rebased to main, since didn't this fix already go in?

Comment on lines 637 to 647
# TODO: optionally use `singleton_target_store` to
# consolidate metadata and/or coordinate dims here
if self.consolidate_dimension_coordinates:
singleton_target_store = singleton_target_store | ConsolidateDimensionCoordinates()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: optionally use `singleton_target_store` to
# consolidate metadata and/or coordinate dims here
if self.consolidate_dimension_coordinates:
singleton_target_store = singleton_target_store | ConsolidateDimensionCoordinates()

I went back and forth on this, but IMHO I think the right path here is to not do any consolidation by default, and leave it to users to compose modularly, i.e.:

recipe = (
    ...
    | StoreToZarr()
    | ConsolidateCoordinateDimensions()
    | ConsolidateMetadata()
)

This is more verbose for users, but it is also more explicit as to what they can expect, and it adheres to the composable ethos of Beam, and it's less API-creep for us to maintain in StoreToZarr.

Happy to discuss dissenting views here, but as of now this feels like the right path to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to chat more about this!
I also do like the composability, but wonder about the trade off for transformations that should be done on almost every recipe, but might get overlooked by a newer recipe developer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a chat, I would love to attend if it fits on my schedule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for weighing in! I am not 100% sure which design is best here, and appreciate the lively discussion so we can land on the best solution. (Here and in the comment thread above.)

done on almost every recipe, but might get overlooked by a newer recipe developer.

This is a small point, but one that just came to mind: in my experience, for smaller zarr stores, consolidation doesn't really matter / have a performance impact. Therefore, in some sense do we care if newer recipe developers (which presumably implies smaller-scale as well?) overlook this?

We absolutely should have a design discussion on this, possibly before the next coordination meeting (to expedite the process). I will try to first outline what I see as the various options in a comment on this thread to solicit async feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Happy to chat before. I won't be able to make it to the next coordination meeting, so this week or next before Friday would be ideal for me.

Comment on lines 609 to 610
consolidate_dimension_coordinates: bool = False
consolidated_metadata: Optional[bool] = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
consolidate_dimension_coordinates: bool = False
consolidated_metadata: Optional[bool] = True

See comment below re: boolean flags vs. composition. I feel like @sbquinlan's design suggestion is the right way to go here: modularity/composibility over branching/boolean flag logic.

Note if we go this route we want to also make the following change in the body of StoreToZarr below (but GitHub won't let me make a comment on these lines because the PR does not already change them):

target_store = schema | PrepareZarrTarget(
    target=self.get_full_target(),
    target_chunks=target_chunks,
    attrs=self.attrs,
-   consolidated_metadata=self.consolidated_metadata,
+   # In StoreToZarr, always default to unconsolidated. This leaves it up to the
+   # user whether or not they want to consolidate with ConsolidateMetadata(). Also,
+   # it prevents a broken/inconsistent state that could arise from metadata being
+   # consolidated here, and then falling out of sync with coordinates if
+   # ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr().
+   consolidated_metadata=False,
    encoding=self.encoding,
)

Comment on lines 186 to 197
(
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target,
store_name="subpath",
combine_dims=pattern.combine_dim_keys,
consolidate_dimension_coordinates=consolidate_dimension_coordinates,
)
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target,
store_name="subpath",
combine_dims=pattern.combine_dim_keys,
consolidate_dimension_coordinates=consolidate_dimension_coordinates,
)
)
unconsolidated_zarr = (
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=tmp_target,
store_name="subpath",
combine_dims=pattern.combine_dim_keys,
)
)
if consolidate_dimension_coordinates:
unconsolidated_zarr | ConsolidateCoordinateDimensions()

Assuming we go with the composable/modular design rather than the boolean flag design, then this test would need to updated to look something like above.

I'm not sure if ConsolidateMetadata() would also need to be piped on the end here to get the asserts below to pass.

Comment on lines 313 to 314
consolidate_dimension_coordinates=False,
consolidated_metadata=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
consolidate_dimension_coordinates=False,
consolidated_metadata=True,

Assuming we remove these flags as proposed above, they would need to be removed from this test as well.

@norlandrhagen
Copy link
Contributor Author

Going to hold off doing any more work on this PR until we all chat design.

@norlandrhagen norlandrhagen added the test-integration Apply this label to run integration tests on a PR. label Feb 6, 2024
@norlandrhagen norlandrhagen added the documentation Improvements or additions to documentation label Feb 6, 2024
@norlandrhagen
Copy link
Contributor Author

Tests are passing! @cisaacstern & @jbusecke

@norlandrhagen
Copy link
Contributor Author

Update to all the discussion about design above

@cisaacstern, @jbusecke met and decided to go the route of composability instead of adding extra kwargs. This adds a bit of work onto the recipe writer, but gives them more flexibility and should reduce the maintence burden on the already very complex StoreToZarr transform. Thanks to @sbquinlan for pushing us down this design path.

@norlandrhagen norlandrhagen merged commit 0d328ee into pangeo-forge:main Feb 8, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation test-integration Apply this label to run integration tests on a PR.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants