Skip to content

Commit

Permalink
Last minute edits
Browse files Browse the repository at this point in the history
  • Loading branch information
cmalinmayor committed Jun 28, 2024
1 parent bdf845b commit 4e253fd
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions examples/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# - daisy terminology and concepts
# - running daisy locally with multiprocessing
# - running daisy with independent worker processes (e.g., on a compute cluster)
# - key features of daisy that make it unique

# %%
# %pip install scikit-image
Expand Down Expand Up @@ -224,7 +225,10 @@ def fresh_image():
total_roi=total_roi, # if your output has a different total_roi than your input, you would need to change this
voxel_size=daisy.Coordinate((1,1)),
dtype=raw_data_float.dtype,
write_size=block_size,
# The write size is important! If you don't set this correctly, your workers will have race conditions
# when writing to the same file, resulting in expected behavior or weird errors.
# The prepare_ds function takes care of making zarr chunk sizes that evenly divide your write size
write_size=block_size,
num_channels=n_channels,
)
print("Shape of output dataset:", f["smoothed"].shape)
Expand Down Expand Up @@ -317,7 +321,7 @@ def smooth(block: daisy.Block):


# %% [markdown]
# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the sample_data.zarr.
# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the `sample_data.zarr`. We also define a helper function for deleting a dataset, in case you want to re-run a processing step and see a new result.

# %%
def prepare_smoothing_ds(group):
Expand All @@ -330,6 +334,13 @@ def prepare_smoothing_ds(group):
write_size=block_size,
num_channels=3,
)

def delete_ds(group):
root = zarr.open("sample_data.zarr", 'a')
if group in root:
del root[group]


output_group = "smoothed_with_context"
prepare_smoothing_ds(output_group)

Expand Down Expand Up @@ -500,7 +511,7 @@ def start_subprocess_worker(cluster="local"):


# %% [markdown]
# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. If you provide a process function that takes a block as input, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you.
# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. If you provide a process function that takes a block as input, as we did previously, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. If you provide a process function with no arguments, the worker is expected to set up the client and request blocks.
#
# Doing the `daisy.Client` set up yourself is helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that.

Expand Down Expand Up @@ -582,7 +593,6 @@ def smooth_in_block_with_failure(block: daisy.Block):
# %%
plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower")


# %% [markdown]
# Debugging multi-process code is inherently difficult, but daisy tries to provide as much information as possible. First, you see the progress bar, which also reports the number of blocks at each state, including failed blocks. Any worker error messages are also logged to the scheduler log, although not the full traceback. Upon completion, daisy provides an error summary, which informs you of the final status of all the blocks, and points you to the full output and error logs for each worker, which can be found in `daisy_logs/<task_name>`. The worker error log will contain the full traceback for debugging the exact source of an error.

Expand All @@ -591,10 +601,6 @@ def smooth_in_block_with_failure(block: daisy.Block):

# %%
# delete and re-create the dataset, so that we start from zeros again
def delete_ds(group):
root = zarr.open("sample_data.zarr", 'a')
if group in root:
del root[group]
delete_ds("fault_tolerance")
prepare_smoothing_ds("fault_tolerance")

Expand Down Expand Up @@ -659,7 +665,9 @@ def check_complete(output_group, block):


# %% [markdown]
# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data.
# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed).
#
# If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data.

# %% [markdown]
# ## Task chaining
Expand Down Expand Up @@ -885,8 +893,6 @@ def get_overlapping_labels(array1, array2):
output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi)


# %%

# %%
seg_block_roi = daisy.Roi((0,0), (128, 128))
seg_block_read_roi = seg_block_roi.grow(context, context)
Expand All @@ -895,7 +901,7 @@ def get_overlapping_labels(array1, array2):
prepare_ds(
"sample_data.zarr",
"blue_objects_with_context",
total_roi=total_write_roi,
total_roi=total_roi,
voxel_size=daisy.Coordinate((1,1)),
dtype=np.uint16,
write_size=seg_block_roi.shape,
Expand Down

0 comments on commit 4e253fd

Please sign in to comment.