Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically determine subset_inputs and target_chunks from cached files. #355

Closed
jbusecke opened this issue May 3, 2022 · 3 comments
Closed

Comments

@jbusecke
Copy link
Contributor

jbusecke commented May 3, 2022

Over in pangeo-forge/cmip6-feedstock#2 we are planning to convert CMIP6 data (which is very heterogenous in chunking/filesize etc) to ARCO data using pangeo forge. We currently have to specify the input parameters subset_inputs and target_chunks for each converted dataset.
If we scale up our efforts there we will have to possibly do this for hundreds of thousands of datasets, which is obviously not sustainable.

While working on that effort I was wondering if there wouldnt be a way to determine these parameters in a dynamic way, once the files are cached (this would not work in a recipe where caching is not enabled).

In particular the subset_inputs parameter actually does not seem quite aligned with the proposed separation of recipe and execution logic, since that ultimately is dictated by the size of the workers of the execution environment?

I was able to draft up some pretty light code that does successfully infer these from a feedstock recipe.

Assume that the files have already been cached.

Dynamically determine subset_inputs

The subset_inputs could then be dynamically inferred as such:

# iterate through cached files and check size
# approach 1
# using getsize function os.path module
import os
import pathlib
cache_path = pathlib.Path(recipe.storage_config.cache.root_path)

subset_inputs_list = []
for file in cache_path.iterdir():
    file_size = os.path.getsize(file)
    subset_inputs = file_size//5e8 + 1 #Do we have to check if this is more than the elements in the concat_dim? or is that done elsewhere in the code?
    subset_inputs_list.append(subset_inputs)
    print(file_size/1e6)
    print(subset_inputs)

which gives reasonable values

801.356821
2.0
1249.376851
3.0

and can then be set internally on the recipe object

# Now set a global parameter on the object
recipe.subset_inputs = max(subset_inputs_list)
recipe.subset_inputs
3

Dynamically determine target_chunks

A similar logic block could enable the user to specify a size range for the target chunks (which would nicely accomodate the different dimensionality of our input for instance):

target_chunk_size_range = (100e6, 200e6) # in bytes (Maybe this ultimately can accept "nnnMB"/"nnGB" etc 

example_file = list(cache_path.iterdir())[0]
ds = xr.open_dataset(example_file) # Perhaps there is a way to get the info below without using xarray?
bytes_per_dim_element = ds.nbytes / len(ds[recipe.concat_dim])
chunksize = np.round(np.mean(np.array(target_chunk_size_range) / bytes_per_dim_element)) # I guess this could be more sophisticated, but for now should be good

# set the target_chunks
target_chunks = {recipe.concat_dim: int(chunksize)}
recipe.target_chunks = target_chunks
recipe.target_chunks
{'time': 24}

I was discussing this with @cisaacstern earlier and we were thinking that this could be integrated as part of a more granual stage structure as mentioned in #224.

See here for the full notebook.

Happy to help wherever I can.

@cisaacstern
Copy link
Member

In particular the subset_inputs parameter actually does not seem quite aligned with the proposed separation of recipe and execution logic, since that ultimately is dictated by the size of the workers of the execution environment?

This is a very compelling argument.

Also a small point that the pathlib .iterdir() method is elegant! Hadn't seen that before.

Let's coordinate on this perhaps next week once some of the other features we've been discussing have moved forward.

@rabernat
Copy link
Contributor

rabernat commented May 5, 2022

This is a good idea.

Making it work would require significant refactoring to the "pipelines" execution model. We would need the ability for sequence for a map stage to be generated dynamically by an earlier stage. This should be possible. But someone will need to refactor the executors. So it touches #256.

@cisaacstern
Copy link
Member

cisaacstern commented Aug 25, 2023

Superseded by #546, so closing. We've made so much headway on this since this issue was first opened!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants