Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Dynamic rechunking option for StoreToZarr #546

Closed
wants to merge 63 commits into from

Conversation

jbusecke
Copy link
Contributor

@jbusecke jbusecke commented Jul 16, 2023

This PR came out of the Saturday Sprint at Scipy (together with @amsnyder, @thodson-usgs, @alaws-USGS, @kjdoore).

The proposed mechanism here is a generalization of my prototype CMIP6 pangeo-forge feedstock. Over there I implemented dynamic rechunking along a single dimension according to a desired chunksize in memory (EDIT: I now realized that I could have probably achieved this with dask auto chunking for this specific case).

The generalization was prompted by @rsignell-usgs. Rich wanted to have an option to specify the size of chunks and then have a fixed ration of total chunks between different dimensions.

I think I have put together a solution that should work well in this case. The outline of the algo is the following:

  • Find even divisors of the dimension length for each individual dimension
  • Iterate over each possible combination of chunks along the given dimensions, and compute the chunksize of a single variable for each of the combinations.
  • Select viable combinations that fall within a range of sizes between [size - tolerance * size, size + tolerance * size]
  • For these candidates then find the closest fit to the desired chunking ratio

@alaws-USGS This is a different implementation than we originally came up with at the sprint, but I think the even division of chunks makes this quite attractive.

I put some useage examples together here.

TODO:

  • Decide on how to expose this in the PGF-recipe API

I propose to integrate this functionality into StoreToZarr. The user could do something like this:

...
StoreToZarr(..., target_chunk_size, target_chunk_ratio)
  • Enable string size input. Currently this only takes the size argument as an integer (specifying the number of bytes we want our chunks to have). I think it would be more intuitive to be able to specify size like 100MiB etc.

cc @cisaacstern @rabernat

@TomNicholas: I looked into the dask auto chunking, and I think that this covers the case where you strictly want to chunk only one dimension, but does not enable the user to specify the above ratio (they seem to aim for evenly sized chunks. But I might be overlooking something.

  • Enable a fallback algorithm, that does loosen the constraint of even divisors to avoid raising an error for certain CMIP6 models

@thodson-usgs
Copy link
Contributor

Thanks, Julius,
BTW, @kjdoore was the one working on the implementation with you.

@jbusecke
Copy link
Contributor Author

Oh my bad for the confusion. My name buffer is super small and always overflows on conferences 😂

@kjdoore
Copy link

kjdoore commented Jul 17, 2023

@jbusecke This looks great! It is exactly what I was thinking as the next version of the simple algorithm we originally developed. Thanks for taking this to the next level from what we developed during the sprint.

@cisaacstern
Copy link
Member

Nice work, @jbusecke, and everyone else named here who contributed.

I propose to integrate this functionality into StoreToZarr

That seems reasonable to insert right above here

rechunked_datasets = indexed_datasets | Rechunk(
target_chunks=self.target_chunks, schema=schema
)

as an alternate path for deriving target_chunks.

Enable string size input.

At the risk of stating the obvious, would be great to reference whatever established parsing strategies / conventions exist in other packages for this, rather than inventing our own convention.

Comment on lines 38 to 43
def dynamic_target_chunks_from_schema(
schema: XarraySchema,
target_chunk_nbytes: int, # TODO: Accept a str like `100MB`
target_chunk_ratio: Dict[str, int],
nbytes_tolerance: float = 0.2,
) -> dict[str, int]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to review this PR but realized I was missing some key context. Could we provide a docstring for this function which explains what this function does and what these parameters are? In particular, I don't understand target_chunk_ratio.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I was also confused by that term. So +1 on additional context. 🙏

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be slightly clearer to call it target_chunk_aspect_ratio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a docstring explaining the inputs.

pangeo_forge_recipes/dynamic_target_chunks.py Outdated Show resolved Hide resolved
Comment on lines 38 to 43
def dynamic_target_chunks_from_schema(
schema: XarraySchema,
target_chunk_nbytes: int, # TODO: Accept a str like `100MB`
target_chunk_ratio: Dict[str, int],
nbytes_tolerance: float = 0.2,
) -> dict[str, int]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be slightly clearer to call it target_chunk_aspect_ratio?

from pangeo_forge_recipes.dynamic_target_chunks import dynamic_target_chunks_from_schema


class TestDynamicTargetChunks:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially a good use case for hypothesis - you could parameterize a unit test with a hypothesis strategy that generates arbitrary (regular) chunks, then assert that the property that the returned target chunk size is within the specified tolerance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is a good idea. I will have to dig into hypothesis a bit to understand how to encode the logic.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR to dask might give you what you need (I can also finish it if that would help)

dask/dask#9374

@TomNicholas
Copy link

established parsing strategies / conventions exist in other packages

A function for parsing sizes expressed as strings with SI units into integers exists in both cubed and dask, and dask I think has a strategy that will try to generate chunks of a given size.

@jbusecke
Copy link
Contributor Author

jbusecke commented Jul 18, 2023

Hey everyone, thanks for the great comments. Sorry for the patchwork PR (working in between flights), I should have marked this as draft in the meantime.

To clarify the purpose (and naming) of target_chunk_ratio a bit and maybe crowdsource a better name:
The idea here is to give the user some control over the ratio of total number of chunks along each dimension. For example if I specify target_chunks_ratio={'x':1, 'y':10} the algorithm should aim to get len(ds_rechunked.chunks['x']/len(ds_rechunked.chunks['y'] as close to 1:10 as possible (within given constraints). This works (now, see most recent commit) independently of dataset size. I agree that the term target_chunk_ratio here is confusing.

How about something like nchunks_aspect_ratio? I am trying to reflect here that we are optimizing the ratio between absolute chunks (EDIT: Now also explained in docstring), not the aspect ratio of each individual chunk (which I believe would be closer to what dask is doing).

@jbusecke
Copy link
Contributor Author

A function for parsing sizes expressed as strings with SI units into integers exists in both cubed and dask, and dask I think has a strategy that will try to generate chunks of a given size.

I just implemented dask.utils.parse_bytes (with test). I chose the dask implementation since we already depend on dask here.

@jbusecke jbusecke marked this pull request as draft July 18, 2023 08:41
@rsignell-usgs
Copy link

rsignell-usgs commented Jul 18, 2023

How about something like nchunks_aspect_ratio?

I think this conveys the concept nicely!

And thanks for working on this -- we intend to use this for the HyTEST project as well!

@cisaacstern
Copy link
Member

cisaacstern commented Jul 18, 2023

How about something like nchunks_aspect_ratio?

I favor target_chunk_aspect_ratio as proposed by @TomNicholas in #546 (comment). Or maybe target_chunks_aspect_ratio (plural "chunks")?

In either case, I like the consistency with the target_chunk_nbytes kw on this function, and the target_chunks kw on the StoreToZarr transform, which feels in keeping with the spirit of "call things the same thing".

Thanks for all the work on this, Julius!

@cisaacstern
Copy link
Member

Or maybe target_chunks_aspect_ratio (plural "chunks")?

I think this is preferable, because the aspect ratio is between plural chunks. (As opposed to target_chunk_nbytes which does refer to the desired size of a single chunk.)

@rsignell-usgs
Copy link

@jbusecke and @cisaacstern , where do we stand with this?

@jbusecke
Copy link
Contributor Author

I have just added some logic to implement a fallback algorithm. This one is a lot more naive. It basically just determines the biggest chunk possible by dividing the length of each dimension by the corresponding value of target_chunks_aspect_ratio. Then I scale this maximum chunk by dividing it by increasing integers, until we find the closest match to the desired size.
This can end up with really weird chunking schemes if applied in multiple dimensions, but should work fine with cases where we only chunk along a single dimension (leap-stc/cmip6-leap-feedstock#9).
Ill give that a go over there, and report back.

We still need to think about a way to check this functionality with an end-to-end test. @cisaacstern do you have any suggestions for this in general?

@cisaacstern
Copy link
Member

cisaacstern commented Aug 22, 2023

We still need to think about a way to check this functionality with an end-to-end test. @cisaacstern do you have any suggestions for this in general?

@jbusecke can you remind me what about this is non-unit-testable? (Or rather, what the unit tests don't/can't capture?)

@jbusecke
Copy link
Contributor Author

Basically all the tests that I currently wrote are for the logic in pangeo_forge_recipes/dynamic_target_chunks.py, but nothing touches the dynamic chunking logic in transforms.py/StoreToZarr!
I am not suggesting that there isnt a way to unit-test this, I merely do not know how one would go about it. I did not find any example of using StoreToZarr in the tests (outside of the end-to-end tests). The current transform.py tests do not even import StoreToZarr.
I 100% understand the desire not to add more parametrizations to the end-to-end tests, but do you maybe have some guidance on how to make a 'synthetic' test that actually writes out a chunked store?

@jbusecke
Copy link
Contributor Author

@jbusecke and @cisaacstern , where do we stand with this?

@rsignell-usgs, this PR is useable (I am running CMIP6 rechunking jobs with it right now), but I believe it needs some more testing and docs before it can get merged.
I would love for you to give this a try on your end and see if the results are satisfying. You can see an implementation here. The only specific thing to include to run from this PR is a requirements.txt. NOTE: It was vital to include the #egg=pangeo_forge_recipes suffix to make this work on dataflow!

@cisaacstern
Copy link
Member

cisaacstern commented Aug 22, 2023

Per conversion with @jbusecke, we believe the best path to releasing this work is:

🤔 Hmm actually I think we can move forward with below items without above item being a blocker, because this interface does not require dynamic deploy-time injections.

  • Define dynamic target chunks interface in StoreToZarr #572, something like:
    class StoreToZarr(beam.PTransform):
        """
        
        :param dynamic_chunking_fn: A callable which _must_ take `schema` as first arg.
        """
        ...
        dynamic_chunking_fn: Optional[Callable] = None
        dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
    
        def expand(...):
            schema = ...
            target_chunks = (
                dynamic_chunking_fn(schema, **dynamic_chunking_fn_kwargs)
                if self.dynamic_chunking_fn
                else self.target_chunks
            )          
            ...
  • Move the work in this PR into a separate plugin package

This design allows users to bring their own dynamic chunking algorithm, and also keeps the rather large amount of code in this implementation out of the core pangeo-forge-recipes maintenance sphere.

Julius notes that those wanting to use this work before above action items are complete can install pangeo-forge-recipes from this branch. This comment is intended not to dissuade use of this work now (please, go for it!), but rather to chart a path for how it will be released eventually.

@yuvipanda
Copy link
Contributor

Just wanted to confirm that this does not depend on my dynamic injections work :)

@jbusecke
Copy link
Contributor Author

Closing this in favor of #595. I have refactored the logic implemented here to dynamic-chunks and am working on an implementation of the CMIP workflow here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants