diff --git a/examples/tutorial.py b/examples/tutorial.py index 6e938cad..a295c999 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -21,6 +21,7 @@ # - daisy terminology and concepts # - running daisy locally with multiprocessing # - running daisy with independent worker processes (e.g., on a compute cluster) +# - key features of daisy that make it unique # %% # %pip install scikit-image @@ -224,7 +225,10 @@ def fresh_image(): total_roi=total_roi, # if your output has a different total_roi than your input, you would need to change this voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, - write_size=block_size, + # The write size is important! If you don't set this correctly, your workers will have race conditions + # when writing to the same file, resulting in expected behavior or weird errors. + # The prepare_ds function takes care of making zarr chunk sizes that evenly divide your write size + write_size=block_size, num_channels=n_channels, ) print("Shape of output dataset:", f["smoothed"].shape) @@ -317,7 +321,7 @@ def smooth(block: daisy.Block): # %% [markdown] -# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the sample_data.zarr. +# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the `sample_data.zarr`. We also define a helper function for deleting a dataset, in case you want to re-run a processing step and see a new result. # %% def prepare_smoothing_ds(group): @@ -330,6 +334,13 @@ def prepare_smoothing_ds(group): write_size=block_size, num_channels=3, ) + +def delete_ds(group): + root = zarr.open("sample_data.zarr", 'a') + if group in root: + del root[group] + + output_group = "smoothed_with_context" prepare_smoothing_ds(output_group) @@ -500,7 +511,7 @@ def start_subprocess_worker(cluster="local"): # %% [markdown] -# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. If you provide a process function that takes a block as input, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. +# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. If you provide a process function that takes a block as input, as we did previously, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. If you provide a process function with no arguments, the worker is expected to set up the client and request blocks. # # Doing the `daisy.Client` set up yourself is helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that. @@ -582,7 +593,6 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") - # %% [markdown] # Debugging multi-process code is inherently difficult, but daisy tries to provide as much information as possible. First, you see the progress bar, which also reports the number of blocks at each state, including failed blocks. Any worker error messages are also logged to the scheduler log, although not the full traceback. Upon completion, daisy provides an error summary, which informs you of the final status of all the blocks, and points you to the full output and error logs for each worker, which can be found in `daisy_logs/`. The worker error log will contain the full traceback for debugging the exact source of an error. @@ -591,10 +601,6 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% # delete and re-create the dataset, so that we start from zeros again -def delete_ds(group): - root = zarr.open("sample_data.zarr", 'a') - if group in root: - del root[group] delete_ds("fault_tolerance") prepare_smoothing_ds("fault_tolerance") @@ -659,7 +665,9 @@ def check_complete(output_group, block): # %% [markdown] -# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data. +# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). +# +# If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data. # %% [markdown] # ## Task chaining @@ -885,8 +893,6 @@ def get_overlapping_labels(array1, array2): output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi) -# %% - # %% seg_block_roi = daisy.Roi((0,0), (128, 128)) seg_block_read_roi = seg_block_roi.grow(context, context) @@ -895,7 +901,7 @@ def get_overlapping_labels(array1, array2): prepare_ds( "sample_data.zarr", "blue_objects_with_context", - total_roi=total_write_roi, + total_roi=total_roi, voxel_size=daisy.Coordinate((1,1)), dtype=np.uint16, write_size=seg_block_roi.shape,