From 37510ed0b60675121038bf144ad0373b93ef0902 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Wed, 26 Jun 2024 14:07:18 -0400 Subject: [PATCH 01/12] Local section of tutorial --- examples/tutorial.py | 383 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 examples/tutorial.py diff --git a/examples/tutorial.py b/examples/tutorial.py new file mode 100644 index 00000000..57d3ece4 --- /dev/null +++ b/examples/tutorial.py @@ -0,0 +1,383 @@ +# --- +# jupyter: +# jupytext: +# formats: py:percent +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.16.2 +# kernelspec: +# display_name: Python 3 (ipykernel) +# language: python +# name: python3 +# --- + +# %% [markdown] +# # Daisy Tutorial +# Daisy is a library for processing large volumes in parallel. While other libraries (e.g. dask) can perform similar tasks, daisy is optimized for extremely large volumes, out-of-memory operations, and operations where neighboring blocks should not be run at the same time. +# +# In this tutorial, we will cover: +# - daisy terminology and concepts +# - running daisy locally with multiprocessing +# - running daisy with independent worker processes (e.g., on a compute cluster) + +# %% +# %pip install scikit-image +# %pip install zarr +# %pip install matplotlib +import multiprocessing +# multiprocessing.set_start_method("fork") + +# %% [markdown] +# ## Building Blocks + +# %% [markdown] +# Daisy is designed for processing volumetric data. Therefore, it has specific ways to describe locations in a volume. We will demonstrate the common terms and utilities using this image of astronaut Eileen Collins. + +# %% +from skimage import data +import numpy as np +import matplotlib.pyplot as plt + +raw_data = np.flip(data.astronaut(), 0)[0:512, 0:400] +axes_image = plt.imshow(raw_data, zorder=1, origin="lower") + +# %% [markdown] +# ### Coordinate +# - A daisy Coordinate is essentially a tuple with one value per spatial dimension. In our case, the Coordinates are two-dimensional. +# - Daisy coordinates can represent points in the volume, or distances in the volume. +# - Daisy mostly passes around abstract placeholders of the data. Therefore, a Coordinate does not contain data, it is simply a pointer to a location in a volume. +# - The main difference between a Coordinate and a tuple is that operations (e.g. addition) between Coordinates are performed pointwise, to match the spatial definition. +# +# `daisy.Coordinate` is an alias of [`funlib.geometry.Coordinate`](https://github.com/funkelab/funlib.geometry/blob/main/funlib/geometry/coordinate.py) +# +# Here are some example Coordinates, and a visualization of their location on the Eileen Collins volume. + +# %% +import daisy + +p1 = daisy.Coordinate(10,10) # white +p2 = daisy.Coordinate(452, 250) # yellow +p3 = p2 - p1 # orange + + +# %% +def display_coord(axes, coord, color): + x = coord[1] + y = coord[0] + axes.scatter(x, y, color=color, edgecolors="black", zorder=3) +figure = axes_image.figure +axes = figure.axes[0] +for point, color in zip([p1, p2, p3], [ "white", "yellow", "orange"]): + display_coord(axes, point, color=color) +figure + +# %% [markdown] +# ### Roi +# A Roi (Region of interest) is a bounding box in a volume. It is defined by two Coordinates: +# - offset: the starting corner of the bounding box relative to the origin +# - shape: The extent of the bounding box in each dimension +# +# `daisy.Roi` is an alias of [`funlib.geometry.Roi`](https://github.com/funkelab/funlib.geometry/blob/main/funlib/geometry/roi.py). Rois have operations like grow, shift, and intersect, that represent spatial manipulations +# +# Here are some example Rois and their visualization in our Eileen Collins volume. Remember, the Roi does not contain the data! It is simply a bounding box. + +# %% +head = daisy.Roi(offset=(320, 150), shape=(180, 150)) # purple +# to get the Roi of the nose, we will shrink the head Roi by a certain amount in x and y on each side of the Roi +# the first argument will move the corner closest to the origin, and the second argument will move the corner furthest from the origin +nose = head.grow(daisy.Coordinate(-60, -55), daisy.Coordinate(-90, -55)) # orange + +body = daisy.Roi(offset=p1, shape=(330, 350)) # grey +# to get the neck, we will intersect the body and the head rois +neck = head.intersect(body) # blue + +# %% +from matplotlib.patches import Rectangle + +def display_roi(axes, roi, color): + xy = (roi.offset[1], roi.offset[0]) + width = roi.shape[1] + height = roi.shape[0] + rect = Rectangle(xy, width, height, alpha=0.4, color=color, zorder=2) + axes.add_patch(rect) + +def fresh_image(): + plt.close() + axes_image = plt.imshow(raw_data, zorder=1, origin="lower") + figure = axes_image.figure + axes = figure.axes[0] + return figure + +figure = fresh_image() +for roi, color in zip([head, nose, body, neck], ["purple", "orange", "grey", "blue"]): + display_roi(figure.axes[0], roi, color=color) + +# %% [markdown] +# ### Array +# So far we have seen how to specify regions of the data with Rois and Coordinates, which do not contain any data. However, eventually you will need to access the actual data using your Rois! For this, we use [`funlib.persistence.arrays.Array`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/array.py). If you are familiar with dask, this is the daisy equivalent of dask arrays. +# +# The core information about the `funlib.persistence.arrays.Array` class is that you can slice them with Rois, along with normal numpy-like slicing. However, in order to support this type of slicing, we need to also know the Roi of the whole Array. Here we show you how to create an array from our raw data that is held in memory as a numpy array. However, we highly recommend using a zarr backend, and will show this in our simple example next! + +# %% +from funlib.persistence.arrays import Array + +# daisy Arrays expect the channel dimension first, but our sklearn loaded image has channels last - let's fix that +raw_data_reshaped = raw_data.transpose(2, 0, 1) +print("New data shape:", raw_data_reshaped.shape) + +# we need the spatial extend of the data +data_spatial_shape = raw_data_reshaped.shape[1:] +print("Spatial shape:", data_spatial_shape) + +# Roi of the whole volume +total_roi = daisy.Roi(offset=(0,0), shape=data_spatial_shape) +print("Total dataset roi:", total_roi) + +raw_array = Array( + data=raw_data_reshaped, + roi=total_roi, + voxel_size=daisy.Coordinate(1,1), +) + +# %% [markdown] +# Now we can demonstrate how to access data from an Array using a Roi + +# %% +# slicing an Array with a Roi gives you another Array +head_array = raw_array[head] +print("Head array type:", type(head_array)) + +# the to_ndarray() function gives you the actual numpy array with the data +head_data = head_array.to_ndarray() +plt.close() +plt.imshow(head_data.transpose(1, 2, 0), origin="lower") # need to transpose channels back to the end for matplotlib to work + +# %% +# you can also combine the two steps +body_data = raw_array.to_ndarray(body) +plt.close() +plt.imshow(body_data.transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# ### Block +# +# Daisy is a blockwise task scheduler. Therefore, the concept of a block is central to Daisy. To efficiently process large volumes, Daisy splits the whole volume into a set of adjacent blocks that cover the whole image. These blocks are what is passed between the scheduler and the workers. +# +# A Block is simply a (set of) Roi(s), and does not contain data. In practice, it has additional information that is useful to the daisy server and workers to help them perform their task, which we will decribe below: + +# %% +# let's define a Block of our Eileen Collins volume +# In practice, you will never need to make a block - the daisy scheduler will do this for you + +block_size = daisy.Coordinate(64, 64) +block_origin = daisy.Coordinate(128, 128) +block_roi = daisy.Roi(block_origin, block_size) + +block = daisy.Block( + total_roi = total_roi, + read_roi = block_roi, + write_roi = block_roi, +) + +# Here are all the attributes of the block +print("Block id:", block.block_id) # a unique ID for each block given the total roi of the volume +print("Block read roi:", block.read_roi) # the Roi which represents the location in the volume of the input data to the process +print("Block write roi:", block.write_roi) # the Roi which represents the location in the volume where the process should write the output data +print("Block status:", block.status) # The status of the block (e.g. created, in progress, succeeded, failed) + +# let's look at the read roi of our block on top of the original figure +figure = fresh_image() +display_roi(figure.axes[0], block.read_roi, color="white") + +# %% [markdown] +# You may be wondering why the block has a read roi and a write roi - this will be illustrated next in our simple daisy example! + +# %% [markdown] +# ## A Simple Example: Local Smoothing +# TODO: Intro + +# %% [markdown] +# ### Dataset Preparation +# As mentioned earlier, we highly recommend using a zarr backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of the write_roi. +# +# TODO: .... +# + +# %% +import zarr +# convert our data to float, because gaussian smoothing in scikit expects float input and output +# recall that we already reshaped it to put channel dimension first, as the funlib.persistence Array expects +raw_data_float = raw_data_reshaped.astype(np.float32)/255.0 + +# store image in zarr container +f = zarr.open('sample_data.zarr', 'w') +f['raw'] = raw_data_float +f['raw'].attrs['offset'] = daisy.Coordinate((0,0)) +f['raw'].attrs['resolution'] = daisy.Coordinate((1,1)) + +# %% +from funlib.persistence import prepare_ds +# prepare an output dataset with a chunk size that is a divisor of the block roi +n_channels = 3 +prepare_ds( + "sample_data.zarr", + "smoothed", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) +print("Shape of output dataset:", f["smoothed"].shape) +print("Chunk size in output dataset:",f['smoothed'].chunks) + + +# %% [markdown] +# ### Define our Process Function +# TODO: Describe + +# %% +def smooth_in_roi(block: daisy.Block): + from funlib.persistence.arrays import open_ds + from skimage import filters + sigma = 5.0 + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + output_ds = open_ds('sample_data.zarr', 'smoothed', 'a') + output_ds[block.write_roi] = smoothed + + +# %% +smooth_in_roi(block) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# ### Run daisy with local multiprocessing +# TODO: Describe + +# %% +daisy.run_blockwise([ + daisy.Task( + "Smoothing", + process_function=smooth_in_roi, + total_roi=total_roi, + read_roi=block_roi, + write_roi=block_roi, + fit="shrink", + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# #### Take 2: Add context! + +# %% +prepare_ds( + "sample_data.zarr", + "smoothed_with_context", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +def smooth_in_roi_with_context(block: daisy.Block): + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + sigma = 5.0 + context = int(sigma) * 2 + grown_roi = block.read_roi.grow(context, context) + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(grown_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', 'smoothed_with_context', 'a') + + smoothed = Array(smoothed, roi=grown_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +daisy.run_blockwise([ + daisy.Task( + "Smoothing with context", + process_function=smooth_in_roi_with_context, + total_roi=total_roi, + read_roi=block_roi, + write_roi=block_roi, + fit="shrink", + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_with_context'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# #### Take 3: The Daisy Way + +# %% +prepare_ds( + "sample_data.zarr", + "smoothed_blockwise", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +def smooth_in_block(block: daisy.Block): + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', 'smoothed_blockwise', 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) + +daisy.run_blockwise([ + daisy.Task( + "Smoothing with context", + process_function=smooth_in_block, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# ## Distributing on the Cluster + +# %% From 3556bc32c6444906aa6022287a7a3d28b3f8865d Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Wed, 26 Jun 2024 14:50:20 -0400 Subject: [PATCH 02/12] Add subprocess/cluster example to tutorial --- examples/tutorial.py | 82 +++++++++++++++++++++++++++++++++++++ examples/tutorial_worker.py | 37 +++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 examples/tutorial_worker.py diff --git a/examples/tutorial.py b/examples/tutorial.py index 57d3ece4..c034bb57 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -377,7 +377,89 @@ def smooth_in_block(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") +# %% [markdown] +# ### Conclusion: Dask and Daisy +# +# Phew! We managed to smooth the image, and along the way you learned the basics of Daisy. In this example, we only parallelized the processing using our local computer's resources, and our "volume" was very small. +# +# If your task is similar to this toy example, you can use dask to do the same task with many fewer lines of code: + +# %% +# %pip install dask + +# %% +import dask +import dask.array as da +from skimage import filters + + +f = zarr.open('sample_data.zarr', 'r') +raw = da.from_array(f['raw'], chunks=(3, 64, 64)) +print("Raw dask array:", raw) +sigma = 5.0 +context = int(sigma) * 2 + +def smooth_in_block(x): + return filters.gaussian(x, sigma=sigma, channel_axis=0) + +smoothed = raw.map_overlap(smooth_in_block, depth=(0, context, context)) +plt.imshow(smoothed.transpose((1, 2, 0)), origin="lower") + +# %% [markdown] +# Notice that there is no saving of the output in a zarr - we simply recieve the whole thing in memory. On this small example, this works very well, but when the whole volume is too large to fit in memory, we need to save the output to disk, as done in daisy. Additionally, dask generates more overhead than daisy, which can be a problem with larger and more complex problems. With daisy, the only object that is sent between the scheduler and the worker is a Block, which as we have learned is lightweight. + # %% [markdown] # ## Distributing on the Cluster +# TODO + +# %% +# tutorial worker code in tutorial_worker.py + + +# %% +prepare_ds( + "sample_data.zarr", + "smoothed_subprocess", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +# new process function to start the worker subprocess +def start_subprocess_worker(cluster="local"): + import subprocess + if cluster == "bsub": + num_cpus_per_worker = 4 + subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py"]) + elif cluster== "local": + subprocess.run(["python", "./tutorial_worker.py"]) + else: + raise ValueError("Only bsub and local currently supported for this tutorial") + + + +# %% +# scheduler +from functools import partial + +# note: Must be on submit node to run this +tutorial_task = daisy.Task( + "smoothing_subprocess", + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + process_function=partial(start_subprocess_worker, "local"), + num_workers=2, + fit="shrink", +) + +daisy.run_blockwise([tutorial_task]) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower") # %% diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py new file mode 100644 index 00000000..d0791a93 --- /dev/null +++ b/examples/tutorial_worker.py @@ -0,0 +1,37 @@ +import daisy +import logging +import time +from funlib.persistence.arrays import open_ds, Array +from skimage import filters + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def smooth_in_block(block: daisy.Block): + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', 'smoothed_subprocess', 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +if __name__ == "__main__": + client = daisy.Client() + print("Client:", client) + + while True: + logger.info("getting block") + with client.acquire_block() as block: + + if block is None: + break + + logger.info(f"got block {block}") + smooth_in_block(block) + + logger.info(f"releasing block: {block}") From 1e22087fea2d383795d9a4f6ed05f846b0f4e059 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Wed, 26 Jun 2024 21:59:02 -0400 Subject: [PATCH 03/12] Add fault tolerance, task chaining, and read write conflict examples --- examples/tutorial.py | 428 +++++++++++++++++++++++++++-- examples/tutorial_config.json | 7 + examples/tutorial_worker.py | 2 + examples/tutorial_worker_config.py | 43 +++ 4 files changed, 464 insertions(+), 16 deletions(-) create mode 100644 examples/tutorial_config.json create mode 100644 examples/tutorial_worker_config.py diff --git a/examples/tutorial.py b/examples/tutorial.py index c034bb57..031ae9d6 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -30,7 +30,7 @@ # multiprocessing.set_start_method("fork") # %% [markdown] -# ## Building Blocks +# # Building Blocks # %% [markdown] # Daisy is designed for processing volumetric data. Therefore, it has specific ways to describe locations in a volume. We will demonstrate the common terms and utilities using this image of astronaut Eileen Collins. @@ -44,7 +44,7 @@ axes_image = plt.imshow(raw_data, zorder=1, origin="lower") # %% [markdown] -# ### Coordinate +# ## Coordinate # - A daisy Coordinate is essentially a tuple with one value per spatial dimension. In our case, the Coordinates are two-dimensional. # - Daisy coordinates can represent points in the volume, or distances in the volume. # - Daisy mostly passes around abstract placeholders of the data. Therefore, a Coordinate does not contain data, it is simply a pointer to a location in a volume. @@ -74,7 +74,7 @@ def display_coord(axes, coord, color): figure # %% [markdown] -# ### Roi +# ## Roi # A Roi (Region of interest) is a bounding box in a volume. It is defined by two Coordinates: # - offset: the starting corner of the bounding box relative to the origin # - shape: The extent of the bounding box in each dimension @@ -115,7 +115,7 @@ def fresh_image(): display_roi(figure.axes[0], roi, color=color) # %% [markdown] -# ### Array +# ## Array # So far we have seen how to specify regions of the data with Rois and Coordinates, which do not contain any data. However, eventually you will need to access the actual data using your Rois! For this, we use [`funlib.persistence.arrays.Array`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/array.py). If you are familiar with dask, this is the daisy equivalent of dask arrays. # # The core information about the `funlib.persistence.arrays.Array` class is that you can slice them with Rois, along with normal numpy-like slicing. However, in order to support this type of slicing, we need to also know the Roi of the whole Array. Here we show you how to create an array from our raw data that is held in memory as a numpy array. However, we highly recommend using a zarr backend, and will show this in our simple example next! @@ -161,7 +161,7 @@ def fresh_image(): plt.imshow(body_data.transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Block +# ## Block # # Daisy is a blockwise task scheduler. Therefore, the concept of a block is central to Daisy. To efficiently process large volumes, Daisy splits the whole volume into a set of adjacent blocks that cover the whole image. These blocks are what is passed between the scheduler and the workers. # @@ -195,11 +195,11 @@ def fresh_image(): # You may be wondering why the block has a read roi and a write roi - this will be illustrated next in our simple daisy example! # %% [markdown] -# ## A Simple Example: Local Smoothing +# # A Simple Example: Local Smoothing # TODO: Intro # %% [markdown] -# ### Dataset Preparation +# ## Dataset Preparation # As mentioned earlier, we highly recommend using a zarr backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of the write_roi. # # TODO: .... @@ -235,13 +235,14 @@ def fresh_image(): # %% [markdown] -# ### Define our Process Function +# ## Define our Process Function # TODO: Describe # %% def smooth_in_roi(block: daisy.Block): from funlib.persistence.arrays import open_ds from skimage import filters + sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) data = raw_ds.to_ndarray(block.read_roi) @@ -257,7 +258,7 @@ def smooth_in_roi(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Run daisy with local multiprocessing +# ## Run daisy with local multiprocessing # TODO: Describe # %% @@ -269,6 +270,7 @@ def smooth_in_roi(block: daisy.Block): read_roi=block_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -277,7 +279,7 @@ def smooth_in_roi(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# #### Take 2: Add context! +# ### Take 2: Add context! # %% prepare_ds( @@ -318,6 +320,7 @@ def smooth_in_roi_with_context(block: daisy.Block): read_roi=block_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -326,7 +329,7 @@ def smooth_in_roi_with_context(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_with_context'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# #### Take 3: The Daisy Way +# ### Take 3: The Daisy Way # %% prepare_ds( @@ -370,6 +373,7 @@ def smooth_in_block(block: daisy.Block): read_roi=read_roi, write_roi=block_roi, fit="shrink", + num_workers=5, ) ] ) @@ -378,7 +382,7 @@ def smooth_in_block(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Conclusion: Dask and Daisy +# ## Conclusion: Dask and Daisy # # Phew! We managed to smooth the image, and along the way you learned the basics of Daisy. In this example, we only parallelized the processing using our local computer's resources, and our "volume" was very small. # @@ -399,17 +403,17 @@ def smooth_in_block(block: daisy.Block): sigma = 5.0 context = int(sigma) * 2 -def smooth_in_block(x): +def smooth_in_block_dask(x): return filters.gaussian(x, sigma=sigma, channel_axis=0) -smoothed = raw.map_overlap(smooth_in_block, depth=(0, context, context)) +smoothed = raw.map_overlap(smooth_in_block_dask, depth=(0, context, context)) plt.imshow(smoothed.transpose((1, 2, 0)), origin="lower") # %% [markdown] # Notice that there is no saving of the output in a zarr - we simply recieve the whole thing in memory. On this small example, this works very well, but when the whole volume is too large to fit in memory, we need to save the output to disk, as done in daisy. Additionally, dask generates more overhead than daisy, which can be a problem with larger and more complex problems. With daisy, the only object that is sent between the scheduler and the worker is a Block, which as we have learned is lightweight. # %% [markdown] -# ## Distributing on the Cluster +# # Distributing on the Cluster # TODO # %% @@ -446,7 +450,7 @@ def start_subprocess_worker(cluster="local"): # scheduler from functools import partial -# note: Must be on submit node to run this +# note: Must be on submit node to run this with bsub argument tutorial_task = daisy.Task( "smoothing_subprocess", total_roi=total_roi, @@ -462,4 +466,396 @@ def start_subprocess_worker(cluster="local"): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower") +# %% [markdown] +# ## Passing "arguments" to your subprocess + +# %% +prepare_ds( + "sample_data.zarr", + "smoothed_subprocess_config", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +def start_subprocess_worker_config(cluster="local"): + import subprocess + if cluster == "bsub": + num_cpus_per_worker = 4 + subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker_config.py", "tutorial_config.json"]) + elif cluster== "local": + subprocess.run(["python", "./tutorial_worker_config.py", "tutorial_config.json"]) + else: + raise ValueError("Only bsub and local currently supported for this tutorial") + + + +# %% +# note: Must be on submit node to run this with bsub argument +tutorial_task = daisy.Task( + "smoothing_subprocess_config", + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + process_function=partial(start_subprocess_worker_config, "local"), + num_workers=2, + fit="shrink", +) + +daisy.run_blockwise([tutorial_task]) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess_config'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# # Important Features + +# %% [markdown] +# ## Fault tolerance and the pre-check function + +# %% +prepare_ds( + "sample_data.zarr", + "fault_tolerance", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + + +# %% +# simulate failing 50% of the time +def smooth_in_block_with_failure(block: daisy.Block): + import random + + if random.random() < 0.5: + raise ValueError("Simulating random failure") + + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', 'fault_tolerance', 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + num_workers=5, + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + +# %% [markdown] +# Why is so much more than 60% done? Answer: "max retries" +# TODO: Also mention log files here + +# %% +root = zarr.open("sample_data.zarr", 'a') +del root['fault_tolerance'] +prepare_ds( + "sample_data.zarr", + "fault_tolerance", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + max_retries=1, + num_workers=3, + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + + +# %% [markdown] +# If we re-run enough times, eventually all the holes will fill. But, we can do something smarter! + +# %% +def check_complete(output_group, block): + from funlib.persistence.arrays import open_ds + import numpy as np + output_ds = open_ds('sample_data.zarr', output_group, 'r') + if np.max(output_ds.to_ndarray(block.write_roi)) > 0: + return True + else: + return False + + +daisy.run_blockwise([ + daisy.Task( + "fault tolerance test", + process_function=smooth_in_block_with_failure, + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + read_write_conflict=False, + max_retries=1, + check_function=partial(check_complete, "fault_tolerance") + ) + ] +) + +# %% +plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + + +# %% [markdown] +# Note: your pre-check function has to be faster than your actual function for this to be worth it. We recommend saving the block id as a file in a shared file system or database at the end of the worker function. + +# %% [markdown] +# ## Task chaining + +# %% [markdown] +# Say we have a function to segment out instances of blue objects in an image, and we want to apply it after smoothing. We can define two tasks and run them sequentially in the scheduler. + +# %% +def smooth_in_block(output_group, block: daisy.Block): + from funlib.persistence.arrays import open_ds, Array + from skimage import filters + + sigma = 5.0 + + raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds('sample_data.zarr', output_group, 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +# %% +# %pip install opencv-python + +# %% +def segment_blue_objects(input_group, output_group, block): + import cv2 + from funlib.persistence.arrays import open_ds, Array + import numpy as np + import skimage + + input_ds = open_ds('sample_data.zarr', input_group, "r",) + data = input_ds.to_ndarray(block.read_roi) + + back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) + cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) + hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection + lower_blue = np.array([100,30,0]) + upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = mask.astype(np.uint16) + + mask = mask // 255 + labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) + block_id_mask = mask * (block.block_id[1]) + labels = labels + block_id_mask + + output_ds = open_ds('sample_data.zarr', output_group, 'a') + output_ds[block.write_roi] = labels + + +# %% + +prepare_ds( + "sample_data.zarr", + "smoothed_for_seg", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=n_channels, +) + +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) + +smoothing_task = daisy.Task( + "smooth_for_seg", + process_function=partial(smooth_in_block, "smoothed_for_seg"), + total_roi=total_roi, + read_roi=read_roi, + write_roi=block_roi, + fit="shrink", + num_workers=5, + read_write_conflict=False, + check_function=partial(check_complete, "smoothed_for_seg") +) + +# %% +# you can have different block sizes in different tasks +seg_block_roi = daisy.Roi((0,0), (128, 128)) + +prepare_ds( + "sample_data.zarr", + "blue_objects", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=np.uint8, + write_size=seg_block_roi.shape, +) + +seg_task = daisy.Task( + "segmentation", + process_function=partial(segment_blue_objects, "smoothed_for_seg", "blue_objects"), + total_roi=total_roi, + read_roi=seg_block_roi, + write_roi=seg_block_roi, + fit="shrink", + read_write_conflict=False, + num_workers=5, + upstream_tasks=[smoothing_task], +) + +# %% +daisy.run_blockwise([smoothing_task, seg_task]) + +# %% +from skimage.color import label2rgb +figure, axes = plt.subplots(1, 2) +axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") +axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") + +# %% [markdown] +# ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) + +# %% [markdown] +# How can we resolve the labels of adjacent blocks? + +# %% +import logging + +def segment_blue_objects_with_context(input_group, output_group, block): + import cv2 + from funlib.persistence.arrays import open_ds, Array + import numpy as np + import skimage + import logging + import time + + def get_overlapping_labels(array1, array2): + array1 = array1.flatten() + array2 = array2.flatten() + # get indices where both are not zero (ignore background) + # this speeds up computation significantly + non_zero_indices = np.logical_and(array1, array2) + flattened_stacked = np.array([array1[non_zero_indices], array2[non_zero_indices]]) + intersections = np.unique(flattened_stacked, axis=1) + return intersections # a x 2 nparray + + input_ds = open_ds('sample_data.zarr', input_group, "r",) + output_ds = open_ds('sample_data.zarr', output_group, 'a') + data = input_ds.to_ndarray(block.read_roi) + existing_labels = output_ds.to_ndarray(block.read_roi) + + back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) + cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) + hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection + lower_blue = np.array([100,30,0]) + upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = mask.astype(np.uint16) + + mask = mask // 255 + labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) + block_id_mask = mask * (block.block_id[1]) + labels = labels + block_id_mask + + # if there are existing labels, change the label to match + # note: This only works if objects never span multiple rows/columns. If you have long objects like neurons, you need to do true agglomeration + intersections = get_overlapping_labels(labels, existing_labels) + for index in range(intersections.shape[1]): + new_label, old_label = intersections[:, index] + labels[labels == new_label] = old_label + + time.sleep(0.5) + output_ds = open_ds('sample_data.zarr', output_group, 'a') + output_array = Array(labels, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi) + + +# %% +seg_block_roi = daisy.Roi((0,0), (128, 128)) +seg_block_read_roi = seg_block_roi.grow(10, 10) +root = zarr.open("sample_data.zarr", 'a') +del root['blue_objects_with_context'] +prepare_ds( + "sample_data.zarr", + "blue_objects_with_context", + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=np.uint8, + write_size=seg_block_roi.shape, +) + +seg_task = daisy.Task( + "segmentation_with_context", + process_function=partial(segment_blue_objects_with_context, "smoothed_for_seg", "blue_objects_with_context"), + total_roi=total_roi, + read_roi=seg_block_read_roi, + write_roi=seg_block_roi, + fit="shrink", + read_write_conflict=True, + num_workers=10, + upstream_tasks=[smoothing_task], +) +daisy.run_blockwise([seg_task]) + +# %% +from skimage.color import label2rgb +figure, axes = plt.subplots(1, 2) +axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") +axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects_with_context'][:]), origin="lower") + # %% diff --git a/examples/tutorial_config.json b/examples/tutorial_config.json new file mode 100644 index 00000000..ddd81faa --- /dev/null +++ b/examples/tutorial_config.json @@ -0,0 +1,7 @@ +{ + "input_zarr": "sample_data.zarr", + "output_zarr": "sample_data.zarr", + "input_group": "raw", + "output_group": "smoothed_subprocess_config", + "sigma": 5 +} \ No newline at end of file diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py index d0791a93..80f650c9 100644 --- a/examples/tutorial_worker.py +++ b/examples/tutorial_worker.py @@ -3,6 +3,7 @@ import time from funlib.persistence.arrays import open_ds, Array from skimage import filters +import time logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -32,6 +33,7 @@ def smooth_in_block(block: daisy.Block): break logger.info(f"got block {block}") + time.sleep(0.5) smooth_in_block(block) logger.info(f"releasing block: {block}") diff --git a/examples/tutorial_worker_config.py b/examples/tutorial_worker_config.py new file mode 100644 index 00000000..3c4ae65b --- /dev/null +++ b/examples/tutorial_worker_config.py @@ -0,0 +1,43 @@ +import daisy +import logging +import time +from funlib.persistence.arrays import open_ds, Array +from skimage import filters +import sys +import json + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def smooth_in_block(block: daisy.Block, config: dict): + sigma = config["sigma"] + + raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) + data = raw_ds.to_ndarray(block.read_roi) + smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + + output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') + + smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) + + +if __name__ == "__main__": + config_path = sys.argv[1] + with open(config_path) as f: + config = json.load(f) + + client = daisy.Client() + print("Client:", client) + + while True: + logger.info("getting block") + with client.acquire_block() as block: + + if block is None: + break + + logger.info(f"got block {block}") + smooth_in_block(block, config) + + logger.info(f"releasing block: {block}") From e8639b04d8d695fe04981688e9617bf0560ed298 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Thu, 27 Jun 2024 10:41:56 -0400 Subject: [PATCH 04/12] Add total read roi context to all examples --- .gitignore | 4 +- examples/tutorial.py | 70 +++++++++++++++++------------- examples/tutorial_worker.py | 6 ++- examples/tutorial_worker_config.py | 2 +- 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index 47b040d5..67e3c821 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,6 @@ dist daisy_logs/ .pytest_cache/ -.mypy_cache/ \ No newline at end of file +.mypy_cache/ +.ipynb* +*.zarr diff --git a/examples/tutorial.py b/examples/tutorial.py index 031ae9d6..006a9113 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -26,8 +26,7 @@ # %pip install scikit-image # %pip install zarr # %pip install matplotlib -import multiprocessing -# multiprocessing.set_start_method("fork") +# %pip install funlib.persistence # %% [markdown] # # Building Blocks @@ -331,11 +330,18 @@ def smooth_in_roi_with_context(block: daisy.Block): # %% [markdown] # ### Take 3: The Daisy Way +# %% +sigma = 5 +context = int(sigma) * 2 +read_roi = block_roi.grow(context, context) +total_read_roi = total_roi.grow(context, context) +total_write_roi = total_roi + # %% prepare_ds( "sample_data.zarr", "smoothed_blockwise", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -351,7 +357,7 @@ def smooth_in_block(block: daisy.Block): sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi) + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) output_ds = open_ds('sample_data.zarr', 'smoothed_blockwise', 'a') @@ -369,7 +375,7 @@ def smooth_in_block(block: daisy.Block): daisy.Task( "Smoothing with context", process_function=smooth_in_block, - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, fit="shrink", @@ -381,6 +387,10 @@ def smooth_in_block(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") +# %% [markdown] +# #### Passing arguments to your process function +# TODO + # %% [markdown] # ## Conclusion: Dask and Daisy # @@ -437,7 +447,7 @@ def smooth_in_block_dask(x): def start_subprocess_worker(cluster="local"): import subprocess if cluster == "bsub": - num_cpus_per_worker = 4 + num_cpus_per_worker = 1 subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py"]) elif cluster== "local": subprocess.run(["python", "./tutorial_worker.py"]) @@ -453,10 +463,10 @@ def start_subprocess_worker(cluster="local"): # note: Must be on submit node to run this with bsub argument tutorial_task = daisy.Task( "smoothing_subprocess", - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - process_function=partial(start_subprocess_worker, "local"), + process_function=partial(start_subprocess_worker, "bsub"), num_workers=2, fit="shrink", ) @@ -473,7 +483,7 @@ def start_subprocess_worker(cluster="local"): prepare_ds( "sample_data.zarr", "smoothed_subprocess_config", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -498,7 +508,7 @@ def start_subprocess_worker_config(cluster="local"): # note: Must be on submit node to run this with bsub argument tutorial_task = daisy.Task( "smoothing_subprocess_config", - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, process_function=partial(start_subprocess_worker_config, "local"), @@ -521,7 +531,7 @@ def start_subprocess_worker_config(cluster="local"): prepare_ds( "sample_data.zarr", "fault_tolerance", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -543,7 +553,7 @@ def smooth_in_block_with_failure(block: daisy.Block): sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi) + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) output_ds = open_ds('sample_data.zarr', 'fault_tolerance', 'a') @@ -561,7 +571,7 @@ def smooth_in_block_with_failure(block: daisy.Block): daisy.Task( "fault tolerance test", process_function=smooth_in_block_with_failure, - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, fit="shrink", @@ -581,10 +591,12 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% root = zarr.open("sample_data.zarr", 'a') del root['fault_tolerance'] + +# %% prepare_ds( "sample_data.zarr", "fault_tolerance", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -595,7 +607,7 @@ def smooth_in_block_with_failure(block: daisy.Block): daisy.Task( "fault tolerance test", process_function=smooth_in_block_with_failure, - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, fit="shrink", @@ -628,7 +640,7 @@ def check_complete(output_group, block): daisy.Task( "fault tolerance test", process_function=smooth_in_block_with_failure, - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, fit="shrink", @@ -660,7 +672,7 @@ def smooth_in_block(output_group, block: daisy.Block): sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi) + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) output_ds = open_ds('sample_data.zarr', output_group, 'a') @@ -707,7 +719,7 @@ def segment_blue_objects(input_group, output_group, block): prepare_ds( "sample_data.zarr", "smoothed_for_seg", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -721,7 +733,7 @@ def segment_blue_objects(input_group, output_group, block): smoothing_task = daisy.Task( "smooth_for_seg", process_function=partial(smooth_in_block, "smoothed_for_seg"), - total_roi=total_roi, + total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, fit="shrink", @@ -737,7 +749,7 @@ def segment_blue_objects(input_group, output_group, block): prepare_ds( "sample_data.zarr", "blue_objects", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=np.uint8, write_size=seg_block_roi.shape, @@ -746,7 +758,7 @@ def segment_blue_objects(input_group, output_group, block): seg_task = daisy.Task( "segmentation", process_function=partial(segment_blue_objects, "smoothed_for_seg", "blue_objects"), - total_roi=total_roi, + total_roi=total_write_roi, # Note: This task does not have context (yet...?) read_roi=seg_block_roi, write_roi=seg_block_roi, fit="shrink", @@ -764,6 +776,7 @@ def segment_blue_objects(input_group, output_group, block): axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") + # %% [markdown] # ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) @@ -771,8 +784,6 @@ def segment_blue_objects(input_group, output_group, block): # How can we resolve the labels of adjacent blocks? # %% -import logging - def segment_blue_objects_with_context(input_group, output_group, block): import cv2 from funlib.persistence.arrays import open_ds, Array @@ -793,8 +804,8 @@ def get_overlapping_labels(array1, array2): input_ds = open_ds('sample_data.zarr', input_group, "r",) output_ds = open_ds('sample_data.zarr', output_group, 'a') - data = input_ds.to_ndarray(block.read_roi) - existing_labels = output_ds.to_ndarray(block.read_roi) + data = input_ds.to_ndarray(block.read_roi, fill_value=0) + existing_labels = output_ds.to_ndarray(block.read_roi, fill_value=0) back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) @@ -827,13 +838,12 @@ def get_overlapping_labels(array1, array2): # %% seg_block_roi = daisy.Roi((0,0), (128, 128)) -seg_block_read_roi = seg_block_roi.grow(10, 10) -root = zarr.open("sample_data.zarr", 'a') -del root['blue_objects_with_context'] +seg_block_read_roi = seg_block_roi.grow(context, context) + prepare_ds( "sample_data.zarr", "blue_objects_with_context", - total_roi=total_roi, + total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), dtype=np.uint8, write_size=seg_block_roi.shape, @@ -842,7 +852,7 @@ def get_overlapping_labels(array1, array2): seg_task = daisy.Task( "segmentation_with_context", process_function=partial(segment_blue_objects_with_context, "smoothed_for_seg", "blue_objects_with_context"), - total_roi=total_roi, + total_roi=total_read_roi, read_roi=seg_block_read_roi, write_roi=seg_block_roi, fit="shrink", diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py index 80f650c9..b055ffd2 100644 --- a/examples/tutorial_worker.py +++ b/examples/tutorial_worker.py @@ -12,7 +12,7 @@ def smooth_in_block(block: daisy.Block): sigma = 5.0 raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi) + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) output_ds = open_ds('sample_data.zarr', 'smoothed_subprocess', 'a') @@ -25,6 +25,9 @@ def smooth_in_block(block: daisy.Block): client = daisy.Client() print("Client:", client) + # simlate long setup time (e.g. loading a model) + # time.sleep(50) + while True: logger.info("getting block") with client.acquire_block() as block: @@ -33,7 +36,6 @@ def smooth_in_block(block: daisy.Block): break logger.info(f"got block {block}") - time.sleep(0.5) smooth_in_block(block) logger.info(f"releasing block: {block}") diff --git a/examples/tutorial_worker_config.py b/examples/tutorial_worker_config.py index 3c4ae65b..9225db66 100644 --- a/examples/tutorial_worker_config.py +++ b/examples/tutorial_worker_config.py @@ -13,7 +13,7 @@ def smooth_in_block(block: daisy.Block, config: dict): sigma = config["sigma"] raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) - data = raw_ds.to_ndarray(block.read_roi) + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') From afcaae5cf986a1067df7d467a05f567641beb3c2 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Thu, 27 Jun 2024 15:57:22 -0400 Subject: [PATCH 05/12] Write text for local smoothing example --- examples/tutorial.py | 217 ++++++++++++++++++------------------ examples/tutorial_worker.py | 2 +- 2 files changed, 107 insertions(+), 112 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 006a9113..ff739abf 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -39,7 +39,7 @@ import numpy as np import matplotlib.pyplot as plt -raw_data = np.flip(data.astronaut(), 0)[0:512, 0:400] +raw_data = np.flip(data.astronaut(), 0) axes_image = plt.imshow(raw_data, zorder=1, origin="lower") # %% [markdown] @@ -186,23 +186,20 @@ def fresh_image(): print("Block write roi:", block.write_roi) # the Roi which represents the location in the volume where the process should write the output data print("Block status:", block.status) # The status of the block (e.g. created, in progress, succeeded, failed) -# let's look at the read roi of our block on top of the original figure +# let's look at the write roi of our block on top of the original figure figure = fresh_image() -display_roi(figure.axes[0], block.read_roi, color="white") +display_roi(figure.axes[0], block.write_roi, color="white") # %% [markdown] # You may be wondering why the block has a read roi and a write roi - this will be illustrated next in our simple daisy example! # %% [markdown] # # A Simple Example: Local Smoothing -# TODO: Intro +# In this next example, we will use gaussian smoothing to illustrate how to parallize a task on your local machine using daisy. # %% [markdown] # ## Dataset Preparation -# As mentioned earlier, we highly recommend using a zarr backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of the write_roi. -# -# TODO: .... -# +# As mentioned earlier, we highly recommend using a zarr/n5 backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of and aligned with the write_roi. The zarr dataset must exist before you start scheudling though - we recommend using [`funlib.persistence.prepare_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L423) function to prepare the dataset. Then later, you can use [`funlib.persistence.open_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L328) to open the dataset and it will automatically read the metadata and wrap it into a `funlib.persistence.Array`. # %% import zarr @@ -214,16 +211,17 @@ def fresh_image(): f = zarr.open('sample_data.zarr', 'w') f['raw'] = raw_data_float f['raw'].attrs['offset'] = daisy.Coordinate((0,0)) -f['raw'].attrs['resolution'] = daisy.Coordinate((1,1)) +f['raw'].attrs['resolution'] = daisy.Coordinate((1,1)) # this attribute holds the voxel size # %% from funlib.persistence import prepare_ds # prepare an output dataset with a chunk size that is a divisor of the block roi -n_channels = 3 + +n_channels = 3 # our output will be an RGB image as well prepare_ds( "sample_data.zarr", "smoothed", - total_roi=total_roi, + total_roi=total_roi, # if your output has a different total_roi than your input, you would need to change this voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, write_size=block_size, @@ -235,41 +233,58 @@ def fresh_image(): # %% [markdown] # ## Define our Process Function -# TODO: Describe +# When run locally, daisy process functions must take a block as the only argument. Depending on the multiprocessing spawn function settings on your computer, the function might not inherit the imports and variables of the scope where the scheudler is run, so it is always safer to import and define everything inside the function. +# +# Here is an example for smoothing. Generally, daisy process functions have the following three steps: +# 1. Load the data from disk +# 2. Process the data +# 3. Save the result to disk +# +# Note that for now, the worker has to know where to load the data from and save the result to. Later we will show you ways around the rule that the process function must only take the block as input, to allow you to pass that information in when you start the scheduler. # %% -def smooth_in_roi(block: daisy.Block): +def smooth(block: daisy.Block): + # imports and hyperaparmeters inside scope, to be safe from funlib.persistence.arrays import open_ds from skimage import filters - sigma = 5.0 + + # open the raw dataset as an Array raw_ds = open_ds('sample_data.zarr', 'raw', "r",) + # Read the data in the block read roi and turn it into a numpy array data = raw_ds.to_ndarray(block.read_roi) + # smooth the data using the gaussian filter from skimage smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) + # open the output smoothed dataset as an Array output_ds = open_ds('sample_data.zarr', 'smoothed', 'a') + # save the result in the output dataset using the block write roi output_ds[block.write_roi] = smoothed # %% -smooth_in_roi(block) - -# %% +# Let's test the data on our block that we defined earlier and visualize the result +smooth(block) plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] # ## Run daisy with local multiprocessing -# TODO: Describe +# We are about ready to run daisy! We need to tell the scheduler the following pieces of information: +# - The process function, which takes a block as an argument +# - The total roi to process (in our case, the whole image) +# - The read roi and write roi of each block (the shape and relative offset are what is important, since they will be shifted as a pair to tile the total_roi) +# - How many workers to spawn +# +# These pieces of information get wrapped into a [`daisy.Task`](https://github.com/funkelab/daisy/blob/master/daisy/task.py), along with a name for the task. Then the `daisy.run_blockwise` function starts the scheduler, which creates all the blocks that tile the total roi, spawns the workers, distributes the blocks to the workers, and reports if the blocks were successfully processed. # %% daisy.run_blockwise([ daisy.Task( - "Smoothing", - process_function=smooth_in_roi, - total_roi=total_roi, - read_roi=block_roi, - write_roi=block_roi, - fit="shrink", - num_workers=5, + "Smoothing", # task name + process_function=smooth, # a function that takes a block as argument + total_roi=total_roi, # The whole roi of the image + read_roi=block_roi, # The roi that the worker should read from + write_roi=block_roi, # the roi that the worker should write to + num_workers=5, ) ] ) @@ -279,124 +294,101 @@ def smooth_in_roi(block: daisy.Block): # %% [markdown] # ### Take 2: Add context! +# The task ran successfully, but you'll notice that there are edge artefacts where the blocks border each other. This is because each worker only sees the inside of the block, and it needs more context to smooth seamlessly between blocks. If we increase the size of the read_roi so that each block sees all pixels that contribute meaningfully to the smoothed values in the interior (write_roi) of the block, the edge artefacts should disappear. # %% -prepare_ds( - "sample_data.zarr", - "smoothed_with_context", - total_roi=total_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +context = 2*sigma # pixels beyond 2*sigma contribute almost nothing to the output +block_read_roi = block_roi.grow(context, context) +block_write_roi = block_roi +# we also grow the total roi by the context, so that the write_rois are still in the same place when we tile +total_read_roi = total_roi.grow(context, context) - -# %% -def smooth_in_roi_with_context(block: daisy.Block): - from funlib.persistence.arrays import open_ds, Array - from skimage import filters - sigma = 5.0 - context = int(sigma) * 2 - grown_roi = block.read_roi.grow(context, context) - - raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(grown_roi) - smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) - - output_ds = open_ds('sample_data.zarr', 'smoothed_with_context', 'a') - - smoothed = Array(smoothed, roi=grown_roi, voxel_size=(1, 1)) - output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) - - -# %% -daisy.run_blockwise([ - daisy.Task( - "Smoothing with context", - process_function=smooth_in_roi_with_context, - total_roi=total_roi, - read_roi=block_roi, - write_roi=block_roi, - fit="shrink", - num_workers=5, - ) - ] +block = daisy.Block( + total_roi = total_roi, + read_roi = block_read_roi, + write_roi = block_write_roi, ) -# %% -plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_with_context'][:].transpose(1, 2, 0), origin="lower") +# let's look at the new block rois +figure = fresh_image() +display_roi(figure.axes[0], block.read_roi, color="purple") +display_roi(figure.axes[0], block.write_roi, color="white") + # %% [markdown] -# ### Take 3: The Daisy Way +# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the sample_data.zarr. # %% -sigma = 5 -context = int(sigma) * 2 -read_roi = block_roi.grow(context, context) -total_read_roi = total_roi.grow(context, context) -total_write_roi = total_roi +def prepare_smoothing_ds(group): + prepare_ds( + "sample_data.zarr", + group, + total_roi=total_roi, + voxel_size=daisy.Coordinate((1,1)), + dtype=raw_data_float.dtype, + write_size=block_size, + num_channels=3, + ) +output_group = "smoothed_with_context" +prepare_smoothing_ds(output_group) -# %% -prepare_ds( - "sample_data.zarr", - "smoothed_blockwise", - total_roi=total_write_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +# %% [markdown] +# Now we have to adapt our process function to crop the output before saving. It would be nice to be able to pass the output group in as an argument, so we will show you a workaround using `functools.partial` to partially evaluate the function. To use this workaround, your process function must have the block as the last argument. # %% -def smooth_in_block(block: daisy.Block): +def smooth_in_block(output_group: str, block: daisy.Block): + # imports and hyperaparmeters inside scope, to be safe from funlib.persistence.arrays import open_ds, Array from skimage import filters - + import time sigma = 5.0 - + # open the raw dataset as an Array raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi, fill_value=0) + # Read the data in the block read roi and turn it into a numpy array + data = raw_ds.to_ndarray(block.read_roi, fill_value=0) # NOTE: this fill value allows you to read outside the total_roi without erroring + # smooth the data using the gaussian filter from skimage smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) - - output_ds = open_ds('sample_data.zarr', 'smoothed_blockwise', 'a') - + # open the output smoothed dataset as an Array + output_ds = open_ds('sample_data.zarr', output_group, 'a') + # turn the smoothed result into an Array so we can crop it with a Roi (you can also center crop it by the context manually, but this is easier!) smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) + # save the result in the output dataset using the block write roi output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) -# %% -sigma = 5 -context = int(sigma) * 2 -read_roi = block_roi.grow(context, context) +# %% [markdown] +# Now we can re-run daisy. Note these changes from the previous example: +# - using `functools.partial` to partially evaluate our `smooth_in_block` function , turning it into a function that only takes the block as an argument +# - the total_roi is now exapnded to include the context, as is the read_roi +# %% +from functools import partial daisy.run_blockwise([ daisy.Task( "Smoothing with context", - process_function=smooth_in_block, + process_function=partial(smooth_in_block, output_group), total_roi=total_read_roi, - read_roi=read_roi, - write_roi=block_roi, - fit="shrink", + read_roi=block_read_roi, + write_roi=block_write_roi, num_workers=5, ) ] ) # %% -plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_blockwise'][:].transpose(1, 2, 0), origin="lower") +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_with_context'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# #### Passing arguments to your process function -# TODO +# Success! Notice that there is a fade to black at the border, due to the `fill_value=0` argument used when reading the data from the input Array. +# Smoothing is poorly defined at the border of the volume - if you want different behavior, you can expand the input array to include extended data of your choice at the border, or shrink the total output roi by the context to only include the section of the output that depends on existing data. # %% [markdown] # ## Conclusion: Dask and Daisy # -# Phew! We managed to smooth the image, and along the way you learned the basics of Daisy. In this example, we only parallelized the processing using our local computer's resources, and our "volume" was very small. +# Congrats! You have learned the basics of Daisy. In this example, we only parallelized the processing using our local computer's resources, and our "volume" was very small. # -# If your task is similar to this toy example, you can use dask to do the same task with many fewer lines of code: +# If your task is similar to this example, you can use dask to do the same task with many fewer lines of code: # %% # %pip install dask @@ -420,7 +412,11 @@ def smooth_in_block_dask(x): plt.imshow(smoothed.transpose((1, 2, 0)), origin="lower") # %% [markdown] -# Notice that there is no saving of the output in a zarr - we simply recieve the whole thing in memory. On this small example, this works very well, but when the whole volume is too large to fit in memory, we need to save the output to disk, as done in daisy. Additionally, dask generates more overhead than daisy, which can be a problem with larger and more complex problems. With daisy, the only object that is sent between the scheduler and the worker is a Block, which as we have learned is lightweight. +# For some tasks, dask is much simpler and better suited. One key difference between dask and daisy is that in dask, functions are not supposed to have side effects. In daisy, functions are expected to have "side effects" - saving the output, rather than returning it. In general, daisy is designed for... +# - Cases where the output is too big to be kept in memory +# - Cases where you want to be able to pick up where you left off after an error, rather than starting the whole task over (because blocks that finished saved their results to disk) +# - Cases where blocks should be executed in a particular order, so that certain blocks see other blocks outputs (without passing the output through the scheduler) +# - Cases where the worker function needs setup and teardown that takes longer than processing a block (see our next example!) # %% [markdown] # # Distributing on the Cluster @@ -586,7 +582,6 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% [markdown] # Why is so much more than 60% done? Answer: "max retries" -# TODO: Also mention log files here # %% root = zarr.open("sample_data.zarr", 'a') @@ -613,7 +608,7 @@ def smooth_in_block_with_failure(block: daisy.Block): fit="shrink", read_write_conflict=False, max_retries=1, - num_workers=3, + num_workers=1, ) ] ) @@ -621,11 +616,11 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") - # %% [markdown] # If we re-run enough times, eventually all the holes will fill. But, we can do something smarter! # %% +from functools import partial def check_complete(output_group, block): from funlib.persistence.arrays import open_ds import numpy as np @@ -801,7 +796,7 @@ def get_overlapping_labels(array1, array2): flattened_stacked = np.array([array1[non_zero_indices], array2[non_zero_indices]]) intersections = np.unique(flattened_stacked, axis=1) return intersections # a x 2 nparray - + input_ds = open_ds('sample_data.zarr', input_group, "r",) output_ds = open_ds('sample_data.zarr', output_group, 'a') data = input_ds.to_ndarray(block.read_roi, fill_value=0) @@ -827,8 +822,8 @@ def get_overlapping_labels(array1, array2): # note: This only works if objects never span multiple rows/columns. If you have long objects like neurons, you need to do true agglomeration intersections = get_overlapping_labels(labels, existing_labels) for index in range(intersections.shape[1]): - new_label, old_label = intersections[:, index] - labels[labels == new_label] = old_label + new_label, existing_label = intersections[:, index] + labels[labels == new_label] = existing_label time.sleep(0.5) output_ds = open_ds('sample_data.zarr', output_group, 'a') @@ -856,7 +851,7 @@ def get_overlapping_labels(array1, array2): read_roi=seg_block_read_roi, write_roi=seg_block_roi, fit="shrink", - read_write_conflict=True, + read_write_conflict=False, num_workers=10, upstream_tasks=[smoothing_task], ) diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py index b055ffd2..502ae09c 100644 --- a/examples/tutorial_worker.py +++ b/examples/tutorial_worker.py @@ -26,7 +26,7 @@ def smooth_in_block(block: daisy.Block): print("Client:", client) # simlate long setup time (e.g. loading a model) - # time.sleep(50) + time.sleep(50) while True: logger.info("getting block") From ae31f4857357cb05828e958a662bccaca149e6d7 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Thu, 27 Jun 2024 16:36:10 -0400 Subject: [PATCH 06/12] Refine cluster example for tutorial --- examples/tutorial.py | 128 +++++++++++++++-------------- examples/tutorial_config.json | 2 +- examples/tutorial_worker.py | 43 +++++----- examples/tutorial_worker_config.py | 43 ---------- 4 files changed, 90 insertions(+), 126 deletions(-) delete mode 100644 examples/tutorial_worker_config.py diff --git a/examples/tutorial.py b/examples/tutorial.py index ff739abf..9bbd5c48 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -420,29 +420,24 @@ def smooth_in_block_dask(x): # %% [markdown] # # Distributing on the Cluster -# TODO +# While daisy can run locally, it is designed to shine in a cluster computing environment. The only information passed between scheduler and workers are Blocks, which are extremely lightweight and are communicated through TCP. Therefore, workers can be distributed on the cluster with minimal communication overhead. +# +# Let's re-do our smoothing, but this time run each worker as a completely separate subprocess, as would be needed on a cluster. First, we prepare the output dataset. # %% -# tutorial worker code in tutorial_worker.py - +# first, prepare the dataset +prepare_smoothing_ds("smoothed_subprocess") -# %% -prepare_ds( - "sample_data.zarr", - "smoothed_subprocess", - total_roi=total_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +# %% [markdown] +# Then, we prepare our process function. This time, it has two parts. The first part is the function defined in the cell below, and essentially just calls `subprocess.run` locally or with bsub, as an example compute environment. The second part is the external python script that is actually executed in the `subprocess.run` call. # %% # new process function to start the worker subprocess def start_subprocess_worker(cluster="local"): import subprocess if cluster == "bsub": + # this is where you define your cluster arguments specific to your task (gpus, cpus, etc) num_cpus_per_worker = 1 subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py"]) elif cluster== "local": @@ -450,72 +445,79 @@ def start_subprocess_worker(cluster="local"): else: raise ValueError("Only bsub and local currently supported for this tutorial") - - -# %% -# scheduler -from functools import partial - -# note: Must be on submit node to run this with bsub argument -tutorial_task = daisy.Task( - "smoothing_subprocess", - total_roi=total_read_roi, - read_roi=read_roi, - write_roi=block_roi, - process_function=partial(start_subprocess_worker, "bsub"), - num_workers=2, - fit="shrink", -) - -daisy.run_blockwise([tutorial_task]) - -# %% -plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower") - # %% [markdown] -# ## Passing "arguments" to your subprocess - -# %% -prepare_ds( - "sample_data.zarr", - "smoothed_subprocess_config", - total_roi=total_write_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) - - -# %% -def start_subprocess_worker_config(cluster="local"): - import subprocess - if cluster == "bsub": - num_cpus_per_worker = 4 - subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker_config.py", "tutorial_config.json"]) - elif cluster== "local": - subprocess.run(["python", "./tutorial_worker_config.py", "tutorial_config.json"]) - else: - raise ValueError("Only bsub and local currently supported for this tutorial") +# Code from tutorial_worker.py, copied here for convenience (Note: running this cell won't run the code, because it is a markdown cell) +# ``` python +# import daisy +# import logging +# import time +# from funlib.persistence.arrays import open_ds, Array +# from skimage import filters +# import sys +# import json +# +# +# # This function is the same as the local function, but we can pass as many different arguments as we want, and we don't need to import inside it +# def smooth_in_block(block: daisy.Block, config: dict): +# sigma = config["sigma"] +# raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) +# data = raw_ds.to_ndarray(block.read_roi, fill_value=0) +# smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) +# output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') +# smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) +# output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) +# +# +# if __name__ == "__main__": +# # load a config path or other parameters from the sysargs (recommended to use argparse argument parser for anything more complex) +# config_path = sys.argv[1] +# +# # load the config +# with open(config_path) as f: +# config = json.load(f) +# +# # simulate long setup time (e.g. loading a model) +# time.sleep(20) +# +# # set up the daisy client (this is done by daisy automatically in the local example) +# # it depends on environment variables to determine configuration +# client = daisy.Client() +# +# while True: +# # ask for a block from the scheduler +# with client.acquire_block() as block: +# +# # The scheduler will return None when there are no more blocks left +# if block is None: +# break +# +# # process your block! +# # Note: you can now define whatever function signature you want, rather than being limited to one block argument +# smooth_in_block(block, config) +# +# ``` +# %% [markdown] +# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. This means that rather than spawning one worker per block, the workers are persistent for the full time the task is running, and can request process and return many blocks. +# +# This is particularly helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that. # %% # note: Must be on submit node to run this with bsub argument tutorial_task = daisy.Task( - "smoothing_subprocess_config", + "smoothing_subprocess", total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - process_function=partial(start_subprocess_worker_config, "local"), + process_function=partial(start_subprocess_worker, "local"), num_workers=2, - fit="shrink", ) daisy.run_blockwise([tutorial_task]) # %% -plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess_config'][:].transpose(1, 2, 0), origin="lower") +plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed_subprocess'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] # # Important Features diff --git a/examples/tutorial_config.json b/examples/tutorial_config.json index ddd81faa..8b905d53 100644 --- a/examples/tutorial_config.json +++ b/examples/tutorial_config.json @@ -2,6 +2,6 @@ "input_zarr": "sample_data.zarr", "output_zarr": "sample_data.zarr", "input_group": "raw", - "output_group": "smoothed_subprocess_config", + "output_group": "smoothed_subprocess", "sigma": 5 } \ No newline at end of file diff --git a/examples/tutorial_worker.py b/examples/tutorial_worker.py index 502ae09c..b1cd4690 100644 --- a/examples/tutorial_worker.py +++ b/examples/tutorial_worker.py @@ -3,39 +3,44 @@ import time from funlib.persistence.arrays import open_ds, Array from skimage import filters -import time +import sys +import json -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) -def smooth_in_block(block: daisy.Block): - sigma = 5.0 - - raw_ds = open_ds('sample_data.zarr', 'raw', "r",) +# This function is the same as the local function, but we can pass as many different arguments as we want, and we don't need to import inside it +def smooth_in_block(block: daisy.Block, config: dict): + sigma = config["sigma"] + raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) data = raw_ds.to_ndarray(block.read_roi, fill_value=0) smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) - - output_ds = open_ds('sample_data.zarr', 'smoothed_subprocess', 'a') - + output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) if __name__ == "__main__": + # load a config path or other parameters from the sysargs (recommended to use argparse argument parser for anything more complex) + config_path = sys.argv[1] + + # load the config + with open(config_path) as f: + config = json.load(f) + + # simulate long setup time (e.g. loading a model) + time.sleep(20) + + # set up the daisy client (this is done by daisy automatically in the local example) + # it depends on environment variables to determine configuration client = daisy.Client() - print("Client:", client) - - # simlate long setup time (e.g. loading a model) - time.sleep(50) while True: - logger.info("getting block") + # ask for a block from the scheduler with client.acquire_block() as block: + # The scheduler will return None when there are no more blocks left if block is None: break - logger.info(f"got block {block}") - smooth_in_block(block) - - logger.info(f"releasing block: {block}") + # process your block! + # Note: you can now define whatever function signature you want, rather than being limited to one block argument + smooth_in_block(block, config) diff --git a/examples/tutorial_worker_config.py b/examples/tutorial_worker_config.py deleted file mode 100644 index 9225db66..00000000 --- a/examples/tutorial_worker_config.py +++ /dev/null @@ -1,43 +0,0 @@ -import daisy -import logging -import time -from funlib.persistence.arrays import open_ds, Array -from skimage import filters -import sys -import json - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -def smooth_in_block(block: daisy.Block, config: dict): - sigma = config["sigma"] - - raw_ds = open_ds(config["input_zarr"], config["input_group"], "r",) - data = raw_ds.to_ndarray(block.read_roi, fill_value=0) - smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) - - output_ds = open_ds(config["output_zarr"], config["output_group"], 'a') - - smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) - output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) - - -if __name__ == "__main__": - config_path = sys.argv[1] - with open(config_path) as f: - config = json.load(f) - - client = daisy.Client() - print("Client:", client) - - while True: - logger.info("getting block") - with client.acquire_block() as block: - - if block is None: - break - - logger.info(f"got block {block}") - smooth_in_block(block, config) - - logger.info(f"releasing block: {block}") From 4a83e8932d363d8e63fa85559816f89e6919fffd Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Thu, 27 Jun 2024 16:38:46 -0400 Subject: [PATCH 07/12] Bugfix: forgot config in cluster example --- examples/tutorial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 9bbd5c48..61f10492 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -439,9 +439,9 @@ def start_subprocess_worker(cluster="local"): if cluster == "bsub": # this is where you define your cluster arguments specific to your task (gpus, cpus, etc) num_cpus_per_worker = 1 - subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py"]) + subprocess.run(["bsub", "-I", f"-n {num_cpus_per_worker}", "python", "./tutorial_worker.py", "tutorial_config.json"]) elif cluster== "local": - subprocess.run(["python", "./tutorial_worker.py"]) + subprocess.run(["python", "./tutorial_worker.py", "tutorial_config.json"]) else: raise ValueError("Only bsub and local currently supported for this tutorial") From b4a26ab0b393777930acd036c24c46cf9fe19156 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Thu, 27 Jun 2024 21:54:41 -0400 Subject: [PATCH 08/12] Add explanation for important features tasks --- examples/tutorial.py | 253 ++++++++++++++++++++++++++----------------- 1 file changed, 156 insertions(+), 97 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 61f10492..5d04d96b 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -293,7 +293,7 @@ def smooth(block: daisy.Block): plt.imshow(zarr.open('sample_data.zarr', 'r')['smoothed'][:].transpose(1, 2, 0), origin="lower") # %% [markdown] -# ### Take 2: Add context! +# ## Take 2: Add context! # The task ran successfully, but you'll notice that there are edge artefacts where the blocks border each other. This is because each worker only sees the inside of the block, and it needs more context to smooth seamlessly between blocks. If we increase the size of the read_roi so that each block sees all pixels that contribute meaningfully to the smoothed values in the interior (write_roi) of the block, the edge artefacts should disappear. # %% @@ -522,31 +522,29 @@ def start_subprocess_worker(cluster="local"): # %% [markdown] # # Important Features +# %% [markdown] +# There are a few more features that you should know about to take full advantage of daisy! + # %% [markdown] # ## Fault tolerance and the pre-check function +# %% [markdown] +# Even if your code is completely bug-free, things will always go wrong eventually when scaling up to millions of workers. Perhaps your node on the cluster was shared with another process that temporarily hogged too much memory, or you forgot to set a longer timeout and the process was killed after 8 hours. Let's see how daisy can help you handle these issues, by seeing what happens when we add random failures to our smoothing task. + # %% -prepare_ds( - "sample_data.zarr", - "fault_tolerance", - total_roi=total_write_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +# as always, prepare a new output smoothing dataset +prepare_smoothing_ds("fault_tolerance") # %% # simulate failing 50% of the time def smooth_in_block_with_failure(block: daisy.Block): import random + from funlib.persistence.arrays import open_ds, Array + from skimage import filters if random.random() < 0.5: raise ValueError("Simulating random failure") - - from funlib.persistence.arrays import open_ds, Array - from skimage import filters sigma = 5.0 @@ -572,7 +570,6 @@ def smooth_in_block_with_failure(block: daisy.Block): total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - fit="shrink", read_write_conflict=False, num_workers=5, ) @@ -582,24 +579,23 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + # %% [markdown] -# Why is so much more than 60% done? Answer: "max retries" +# Debugging multi-process code is inherently difficult, but daisy tries to provide as much information as possible. First, you see the progress bar, which also reports the number of blocks at each state, including failed blocks. Any worker error messages are also logged to the scheduler log, although not the full traceback. Upon completion, daisy provides an error summary, which informs you of the final status of all the blocks, and points you to the full output and error logs for each worker, which can be found in `daisy_logs/`. The worker error log will contain the full traceback for debugging the exact source of an error. -# %% -root = zarr.open("sample_data.zarr", 'a') -del root['fault_tolerance'] +# %% [markdown] +# You may have noticed that while we coded the task to fail 50% of the time, much more than 50% of the blocks succeeded. This is because daisy by default retries each block 3 times (on different workers) before marking it as failed, to deal gracefully with random error. We will re-run this example, but set max_retries=1 to see the effect of this parameter. # %% -prepare_ds( - "sample_data.zarr", - "fault_tolerance", - total_roi=total_write_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +# delete and re-create the dataset, so that we start from zeros again +def delete_ds(group): + root = zarr.open("sample_data.zarr", 'a') + if group in root: + del root[group] +delete_ds("fault_tolerance") +prepare_smoothing_ds("fault_tolerance") +# %% daisy.run_blockwise([ daisy.Task( "fault tolerance test", @@ -607,10 +603,9 @@ def smooth_in_block_with_failure(block: daisy.Block): total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - fit="shrink", read_write_conflict=False, max_retries=1, - num_workers=1, + num_workers=5, ) ] ) @@ -618,11 +613,15 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") + # %% [markdown] -# If we re-run enough times, eventually all the holes will fill. But, we can do something smarter! +# We still have greater than 50% success! This is because if a specific worker fails multiple times, daisy will assume something might have gone wrong with that worker. The scheduler will shut down and restart the worker, and retry the blocks that failed on that worker. So daisy is very robust to random error. +# +# But what about non-random error, like stopping after 8 hours? If you don't want to re-process all the already processed blocks from a prior run, you can write a function that takes a block and checks if it is complete, and pass it to the scheduler. The scheduler will run this check function on each block and skip the block if the check function returns true. +# +# Here is an example check function for our smoothing task, that checks if there are any non-zero values in the output array # %% -from functools import partial def check_complete(output_group, block): from funlib.persistence.arrays import open_ds import numpy as np @@ -631,8 +630,12 @@ def check_complete(output_group, block): return True else: return False - + +# %% [markdown] +# If we re-run the task, but with the check function provided, you should see in the execution summary that all the blocks that finished before are skipped. You can continue re-running until you reach 100% completion, without ever re-processing those same blocks + +# %% daisy.run_blockwise([ daisy.Task( "fault tolerance test", @@ -640,9 +643,9 @@ def check_complete(output_group, block): total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - fit="shrink", read_write_conflict=False, max_retries=1, + num_workers=5, check_function=partial(check_complete, "fault_tolerance") ) ] @@ -653,117 +656,110 @@ def check_complete(output_group, block): # %% [markdown] -# Note: your pre-check function has to be faster than your actual function for this to be worth it. We recommend saving the block id as a file in a shared file system or database at the end of the worker function. +# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data. # %% [markdown] # ## Task chaining # %% [markdown] -# Say we have a function to segment out instances of blue objects in an image, and we want to apply it after smoothing. We can define two tasks and run them sequentially in the scheduler. - -# %% -def smooth_in_block(output_group, block: daisy.Block): - from funlib.persistence.arrays import open_ds, Array - from skimage import filters - - sigma = 5.0 - - raw_ds = open_ds('sample_data.zarr', 'raw', "r",) - data = raw_ds.to_ndarray(block.read_roi, fill_value=0) - smoothed = filters.gaussian(data, sigma=sigma, channel_axis=0) - - output_ds = open_ds('sample_data.zarr', output_group, 'a') - - smoothed = Array(smoothed, roi=block.read_roi, voxel_size=(1, 1)) - output_ds[block.write_roi] = smoothed.to_ndarray(block.write_roi) - +# Frequently, image processing pipelines involve multiple tasks, where some tasks depend on the output of other tasks. For example, we have a function to segment out instances of blue objects in an image, and we want to apply it after smoothing. We can define two tasks and run them sequentially in the scheduler. Daisy will even begin the second task as soon as a single block can be run, rather than waiting for the first task to fully complete before starting the second task. # %% # %pip install opencv-python # %% +# here is our function to segment blue objects +# it is just thresholding in HSV space, so unlike smoothing, the neighboring pixels do not affect the outcome +# There is probably a nicer way to get the image into hsv space, but this gets the job done! def segment_blue_objects(input_group, output_group, block): import cv2 from funlib.persistence.arrays import open_ds, Array import numpy as np import skimage - + + # load the data as usual input_ds = open_ds('sample_data.zarr', input_group, "r",) data = input_ds.to_ndarray(block.read_roi) - + + # massage the data into open cv hsv format :/ back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection lower_blue = np.array([100,30,0]) upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors - mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) # this returns 0 and 255 mask = mask.astype(np.uint16) - - mask = mask // 255 + mask = mask // 255 # turn into 0/1 labels + + # give each connected component its own instance segmentation label labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) block_id_mask = mask * (block.block_id[1]) labels = labels + block_id_mask + # save the output output_ds = open_ds('sample_data.zarr', output_group, 'a') output_ds[block.write_roi] = labels -# %% - -prepare_ds( - "sample_data.zarr", - "smoothed_for_seg", - total_roi=total_write_roi, - voxel_size=daisy.Coordinate((1,1)), - dtype=raw_data_float.dtype, - write_size=block_size, - num_channels=n_channels, -) +# %% [markdown] +# Previously, we always defined our `daisy.Task` inside the call to `daisy.run_blockwise`. Now, we need to save our smoothing task in a variable so we can reference it as a dependency our segmentation task. -sigma = 5 -context = int(sigma) * 2 -read_roi = block_roi.grow(context, context) +# %% +# as always, prepare the output dataset +delete_ds("smoothed_for_seg") +prepare_smoothing_ds("smoothed_for_seg") +# same smoothing task as usual, with unqiue output dataset smoothing_task = daisy.Task( "smooth_for_seg", process_function=partial(smooth_in_block, "smoothed_for_seg"), total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - fit="shrink", - num_workers=5, + num_workers=1, read_write_conflict=False, check_function=partial(check_complete, "smoothed_for_seg") ) +# %% [markdown] +# Next, we make our segmentatation task. Each task can have different hyperaparameters, including block sizes and total rois. For the segmentation, we will double the block size and not add context, since unlike smoothing, thresholding does not depend on neighboring pixels. +# +# Since our instance segmentation output has different output data type, no channels, and a different total output roi, we need to change our arguments to `prepare_ds` for this task. + # %% # you can have different block sizes in different tasks seg_block_roi = daisy.Roi((0,0), (128, 128)) +delete_ds("blue_objects") prepare_ds( "sample_data.zarr", "blue_objects", - total_roi=total_write_roi, + total_roi=total_roi, voxel_size=daisy.Coordinate((1,1)), - dtype=np.uint8, - write_size=seg_block_roi.shape, + dtype=np.uint16, # This is different! Our labels will be uint16 + write_size=seg_block_roi.shape, # use the new block roi to determine the chunk size ) seg_task = daisy.Task( "segmentation", process_function=partial(segment_blue_objects, "smoothed_for_seg", "blue_objects"), - total_roi=total_write_roi, # Note: This task does not have context (yet...?) - read_roi=seg_block_roi, - write_roi=seg_block_roi, - fit="shrink", + total_roi=total_roi, # Note: This task does not have context (yet...?) + read_roi=seg_block_roi, # again, no context + write_roi=seg_block_roi, # so read and write rois are the same read_write_conflict=False, num_workers=5, - upstream_tasks=[smoothing_task], + upstream_tasks=[smoothing_task], # Here is where we define that this task depends on the output of the smoothing task ) +# %% [markdown] +# Note the `upstream_tasks` argument - this is how you tell the scheduler that this task depends on the output of the smoothing task. Then, we can pass both tasks into `run_blockwise`. You should see that the process bar for the segmentation task starts before the smoothing progress bar completes. + # %% daisy.run_blockwise([smoothing_task, seg_task]) @@ -773,14 +769,55 @@ def segment_blue_objects(input_group, output_group, block): axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") +# %% [markdown] +# Success! On the task chaining, anyways. Now let's discuss how to fix the issue with blue objects that are split across blocks having different labels. # %% [markdown] # ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) # %% [markdown] -# How can we resolve the labels of adjacent blocks? +# So far, we have always written process functions that read from one input array and write to one output array. However, we have much more flexibility than that - daisy just gives the worker the block, and the worker can do whatever it wants. +# +# Say we expand our read_roi as before. Then each block can check the existing output dataset to see if its neighbors assigned any labels, and extend them if they are there. Let's visualize an example: +# # %% +context = 10 # It could be as low as 1, but we use 10 for ease of visualization +seg_block_roi = daisy.Roi((128,128), (128, 128)) +seg_read_roi = seg_block_roi.grow(context, context) +seg_write_roi = seg_block_roi +seg_total_read_roi = total_roi.grow(context, context) + +seg_block = daisy.Block( + total_roi = seg_total_read_roi, + read_roi = seg_read_roi, + write_roi = seg_write_roi, +) + +# simulate this block not being completed yet +from funlib.persistence import open_ds +output_ds = open_ds("sample_data.zarr", "blue_objects", "a") +output_ds[seg_block.write_roi] = 0 + +from skimage.color import label2rgb +figure, axes = plt.subplots(1, 2) +axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") +axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") +display_roi(figure.axes[0], seg_block.read_roi, color="purple") +display_roi(figure.axes[0], seg_block.write_roi, color="white") +display_roi(figure.axes[1], seg_block.read_roi, color="purple") +display_roi(figure.axes[1], seg_block.write_roi, color="white") + + +# %% [markdown] +# Here the purple is the read_roi of our current block, and the white is the write_roi. As before, the process function will read the input image in the read_roi and segment it. From the previous result visualization, we can see that the function will detect the top half of the crest and assign it the pink label. +# +# Before we write out the pink object to the write_roi of the output dataset, however, we can adapt the process function to **also read in the existing results in the output dataset** in the read_roi. The existing blue label will then overlap with the pink label, and our process function can relabel the top half of the crest blue based on this information, before writing to the write_roi. +# +# This approach only works if the overlapping blocks are run sequentially. If they are run in parallel, it is possible that the blue label will not yet be there when our current block reads existing labels, but also that our pink label will not yet be there when the block containing the blue object reads existing labels. If `read_write_conflicts` argument is set to True in a task, the daisy scheduler will ensure that pairs of blocks with overlapping read/write rois will never be run at the same time, thus avoiding this race condition. + +# %% +# here is the new and improved segmentation function that reads in neighboring output context def segment_blue_objects_with_context(input_group, output_group, block): import cv2 from funlib.persistence.arrays import open_ds, Array @@ -790,6 +827,7 @@ def segment_blue_objects_with_context(input_group, output_group, block): import time def get_overlapping_labels(array1, array2): + """ A function to get all pairs of labels that intsersect between two arrays""" array1 = array1.flatten() array2 = array2.flatten() # get indices where both are not zero (ignore background) @@ -799,50 +837,64 @@ def get_overlapping_labels(array1, array2): intersections = np.unique(flattened_stacked, axis=1) return intersections # a x 2 nparray + # load the data as usual input_ds = open_ds('sample_data.zarr', input_group, "r",) - output_ds = open_ds('sample_data.zarr', output_group, 'a') - data = input_ds.to_ndarray(block.read_roi, fill_value=0) - existing_labels = output_ds.to_ndarray(block.read_roi, fill_value=0) - + data = input_ds.to_ndarray(block.read_roi, fill_value=0) # add the fill value because context + + # massage the data into open cv hsv format :/ back_to_skimage = (data.transpose(1,2,0) * 255).astype(np.uint8) cv2_image = cv2.cvtColor(skimage.util.img_as_ubyte(back_to_skimage), cv2.COLOR_RGB2BGR) hsv_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2HSV) + # Define the color range for detection lower_blue = np.array([100,30,0]) upper_blue = np.array([150,255,255]) + # Threshold the image to get only blue colors - mask = cv2.inRange(hsv_image, lower_blue, upper_blue) + mask = cv2.inRange(hsv_image, lower_blue, upper_blue) # this returns 0 and 255 mask = mask.astype(np.uint16) - - mask = mask // 255 + mask = mask // 255 # turn into 0/1 labels + + # give each connected component its own instance segmentation label labels = skimage.measure.label(mask) + # get a unique ID for each element in the whole volume (avoid repeats between blocks) block_id_mask = mask * (block.block_id[1]) labels = labels + block_id_mask + # load the existing labels in the output + output_ds = open_ds('sample_data.zarr', output_group, 'a') + existing_labels = output_ds.to_ndarray(block.read_roi, fill_value=0) + # if there are existing labels, change the label to match - # note: This only works if objects never span multiple rows/columns. If you have long objects like neurons, you need to do true agglomeration + # note: This only works if objects never span multiple rows/columns. + # If you have long objects like neurons, you need to do true agglomeration intersections = get_overlapping_labels(labels, existing_labels) for index in range(intersections.shape[1]): - new_label, existing_label = intersections[:, index] - labels[labels == new_label] = existing_label + label, existing_label = intersections[:, index] + # Change the label to the one that was already in the neighbor + labels[labels == label] = existing_label - time.sleep(0.5) - output_ds = open_ds('sample_data.zarr', output_group, 'a') + time.sleep(0.5) # included to show that read_write_conflicts=False leads to race conditions + + # center crop and save the output dataset output_array = Array(labels, roi=block.read_roi, voxel_size=(1, 1)) output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi) +# %% + # %% seg_block_roi = daisy.Roi((0,0), (128, 128)) seg_block_read_roi = seg_block_roi.grow(context, context) +delete_ds("blue_objects_with_context") prepare_ds( "sample_data.zarr", "blue_objects_with_context", total_roi=total_write_roi, voxel_size=daisy.Coordinate((1,1)), - dtype=np.uint8, + dtype=np.uint16, write_size=seg_block_roi.shape, ) @@ -852,8 +904,7 @@ def get_overlapping_labels(array1, array2): total_roi=total_read_roi, read_roi=seg_block_read_roi, write_roi=seg_block_roi, - fit="shrink", - read_write_conflict=False, + read_write_conflict=True, # this ensures neighboring blocks aren't run at the same time num_workers=10, upstream_tasks=[smoothing_task], ) @@ -865,4 +916,12 @@ def get_overlapping_labels(array1, array2): axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects_with_context'][:]), origin="lower") +# %% [markdown] +# All the labels are now consistent! If you re-run the previous cell with `read_write_conflict=False`, you should see an inconsistent result again due to the race conditions, even though the process function still reads the neighboring output. + +# %% [markdown] +# This pattern where one block wants to see the output of its neighbors so it can incorporate them into its processing comes up surprisingly often, from agglomeration to tracking. See [this blog post](https://localshapedescriptors.github.io/#throughput) for visualizations of this process in neuron segmentation. Setting `read_write_conflicts=True` allows you to solve many complex tasks in this manner. +# +# **IMPORTANT PERFORMANCE NOTE:** Be aware that `read_write_conflicts` is set to `True` by default and can lead to performance hits in cases where you don't need it, so be sure to turn it off if you want every block to be run in parallel! + # %% From bdf845b3ceba61d7723626bffbfeee41b52f66c1 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Fri, 28 Jun 2024 13:12:55 -0400 Subject: [PATCH 09/12] Add Will's tutorial feedback --- examples/tutorial.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 5d04d96b..6e938cad 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -199,7 +199,7 @@ def fresh_image(): # %% [markdown] # ## Dataset Preparation -# As mentioned earlier, we highly recommend using a zarr/n5 backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a multiple of and aligned with the write_roi. The zarr dataset must exist before you start scheudling though - we recommend using [`funlib.persistence.prepare_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L423) function to prepare the dataset. Then later, you can use [`funlib.persistence.open_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L328) to open the dataset and it will automatically read the metadata and wrap it into a `funlib.persistence.Array`. +# As mentioned earlier, we highly recommend using a zarr/n5 backend for your volume. Daisy is designed such that no data is transmitted between the worker and the scheduler, including the output of the processing. That means that each worker is responsible for saving the results in the given block write_roi. With a zarr backend, each worker can write to a specific region of the zarr in parallel, assuming that the chunk size is a divisor of and aligned with the write_roi. The zarr dataset must exist before you start scheduling though - we recommend using [`funlib.persistence.prepare_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L423) function to prepare the dataset. Then later, you can use [`funlib.persistence.open_ds`](https://github.com/funkelab/funlib.persistence/blob/f5310dddb346585a28f3cb44f577f77d4f5da07c/funlib/persistence/arrays/datasets.py#L328) to open the dataset and it will automatically read the metadata and wrap it into a `funlib.persistence.Array`. # %% import zarr @@ -297,6 +297,7 @@ def smooth(block: daisy.Block): # The task ran successfully, but you'll notice that there are edge artefacts where the blocks border each other. This is because each worker only sees the inside of the block, and it needs more context to smooth seamlessly between blocks. If we increase the size of the read_roi so that each block sees all pixels that contribute meaningfully to the smoothed values in the interior (write_roi) of the block, the edge artefacts should disappear. # %% +sigma = 5 context = 2*sigma # pixels beyond 2*sigma contribute almost nothing to the output block_read_roi = block_roi.grow(context, context) block_write_roi = block_roi @@ -360,7 +361,7 @@ def smooth_in_block(output_group: str, block: daisy.Block): # %% [markdown] # Now we can re-run daisy. Note these changes from the previous example: # - using `functools.partial` to partially evaluate our `smooth_in_block` function , turning it into a function that only takes the block as an argument -# - the total_roi is now exapnded to include the context, as is the read_roi +# - the total_roi is now expanded to include the context, as is the read_roi # %% from functools import partial @@ -499,17 +500,19 @@ def start_subprocess_worker(cluster="local"): # %% [markdown] -# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. This means that rather than spawning one worker per block, the workers are persistent for the full time the task is running, and can request process and return many blocks. +# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. If you provide a process function that takes a block as input, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. # -# This is particularly helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that. +# Doing the `daisy.Client` set up yourself is helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that. # %% # note: Must be on submit node to run this with bsub argument +# For Janelians: Don't use the login node to run the scheduler! +# Instead, use the submit node, which can handle more computational load tutorial_task = daisy.Task( "smoothing_subprocess", total_roi=total_read_roi, - read_roi=read_roi, - write_roi=block_roi, + read_roi=block_read_roi, + write_roi=block_write_roi, process_function=partial(start_subprocess_worker, "local"), num_workers=2, ) From 4e253fd8447a5ab561c3d869fd66517787b50e96 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Fri, 28 Jun 2024 13:44:08 -0400 Subject: [PATCH 10/12] Last minute edits --- examples/tutorial.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 6e938cad..a295c999 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -21,6 +21,7 @@ # - daisy terminology and concepts # - running daisy locally with multiprocessing # - running daisy with independent worker processes (e.g., on a compute cluster) +# - key features of daisy that make it unique # %% # %pip install scikit-image @@ -224,7 +225,10 @@ def fresh_image(): total_roi=total_roi, # if your output has a different total_roi than your input, you would need to change this voxel_size=daisy.Coordinate((1,1)), dtype=raw_data_float.dtype, - write_size=block_size, + # The write size is important! If you don't set this correctly, your workers will have race conditions + # when writing to the same file, resulting in expected behavior or weird errors. + # The prepare_ds function takes care of making zarr chunk sizes that evenly divide your write size + write_size=block_size, num_channels=n_channels, ) print("Shape of output dataset:", f["smoothed"].shape) @@ -317,7 +321,7 @@ def smooth(block: daisy.Block): # %% [markdown] -# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the sample_data.zarr. +# Let's prepare another dataset to store our new and improved smoothing result in. We will be doing this repeatedly through the rest of the tutorial, so we define a helper function to prepare a smoothing result in a given group in the `sample_data.zarr`. We also define a helper function for deleting a dataset, in case you want to re-run a processing step and see a new result. # %% def prepare_smoothing_ds(group): @@ -330,6 +334,13 @@ def prepare_smoothing_ds(group): write_size=block_size, num_channels=3, ) + +def delete_ds(group): + root = zarr.open("sample_data.zarr", 'a') + if group in root: + del root[group] + + output_group = "smoothed_with_context" prepare_smoothing_ds(output_group) @@ -500,7 +511,7 @@ def start_subprocess_worker(cluster="local"): # %% [markdown] -# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. No longer does our process function accept a block as input - instead, it has no arguments, and is expected to specifically request a block. If you provide a process function that takes a block as input, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. +# The most important thing to notice about the new worker script is the use of the `client.acquire_block()` function. If you provide a process function that takes a block as input, as we did previously, daisy will create the `daisy.Client`, `while` loop, and `client.acquire_block()` context for you. If you provide a process function with no arguments, the worker is expected to set up the client and request blocks. # # Doing the `daisy.Client` set up yourself is helpful when worker startup is expensive - loading saved network weights can be more expensive than actually predicting for one block, so you definitely would not want to load the model separately for each block. We have simulated this by using time.sleep() in the setup of the worker, so when you run the next cell, it should take 20 seconds to start up and then the blocks should process quickly after that. @@ -582,7 +593,6 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% plt.imshow(zarr.open('sample_data.zarr', 'r')['fault_tolerance'][:].transpose(1, 2, 0), origin="lower") - # %% [markdown] # Debugging multi-process code is inherently difficult, but daisy tries to provide as much information as possible. First, you see the progress bar, which also reports the number of blocks at each state, including failed blocks. Any worker error messages are also logged to the scheduler log, although not the full traceback. Upon completion, daisy provides an error summary, which informs you of the final status of all the blocks, and points you to the full output and error logs for each worker, which can be found in `daisy_logs/`. The worker error log will contain the full traceback for debugging the exact source of an error. @@ -591,10 +601,6 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% # delete and re-create the dataset, so that we start from zeros again -def delete_ds(group): - root = zarr.open("sample_data.zarr", 'a') - if group in root: - del root[group] delete_ds("fault_tolerance") prepare_smoothing_ds("fault_tolerance") @@ -659,7 +665,9 @@ def check_complete(output_group, block): # %% [markdown] -# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data. +# Unfortunately, this is a pretty inefficient pre-check function, because you have to actually read the output data to see if the block is completed. Since this will be run on the scheduler on every block before it is passed to a worker, it might not even be faster than just re-processing the blocks (which is at least distributed). +# +# If you plan to have extremely long running jobs that might get killed in the middle, we recommend including a step in your process function after you write out the result of a block, in which you write out the block id to a database or a file. Then, the pre-check function can just check if the block id is in the file system or database, which is much faster than reading the actual data. # %% [markdown] # ## Task chaining @@ -885,8 +893,6 @@ def get_overlapping_labels(array1, array2): output_ds[block.write_roi] = output_array.to_ndarray(block.write_roi) -# %% - # %% seg_block_roi = daisy.Roi((0,0), (128, 128)) seg_block_read_roi = seg_block_roi.grow(context, context) @@ -895,7 +901,7 @@ def get_overlapping_labels(array1, array2): prepare_ds( "sample_data.zarr", "blue_objects_with_context", - total_roi=total_write_roi, + total_roi=total_roi, voxel_size=daisy.Coordinate((1,1)), dtype=np.uint16, write_size=seg_block_roi.shape, From d57599af21d86b98c36d07a588df0ea8b5ccb127 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Fri, 28 Jun 2024 13:50:39 -0400 Subject: [PATCH 11/12] Edit transition between task chaining and read_write_conflict --- examples/tutorial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index a295c999..66c056a5 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -781,7 +781,7 @@ def segment_blue_objects(input_group, output_group, block): axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") # %% [markdown] -# Success! On the task chaining, anyways. Now let's discuss how to fix the issue with blue objects that are split across blocks having different labels. +# Now we know how to chain tasks together, but we've created another issue. If objects crossed block boundaries, they were assigned different IDs. We will address this problem in the next section. # %% [markdown] # ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) @@ -789,7 +789,7 @@ def segment_blue_objects(input_group, output_group, block): # %% [markdown] # So far, we have always written process functions that read from one input array and write to one output array. However, we have much more flexibility than that - daisy just gives the worker the block, and the worker can do whatever it wants. # -# Say we expand our read_roi as before. Then each block can check the existing output dataset to see if its neighbors assigned any labels, and extend them if they are there. Let's visualize an example: +# Say we expand our read_roi as before. Reading more of the input context of a block does not help us resolve our conflicting labels, but looking at the block's neighbors' output can help! Let's visualize an example: # # %% From 357f616f2d960587d18c03a90bd085e7c82a02a1 Mon Sep 17 00:00:00 2001 From: Caroline Malin-Mayor Date: Sun, 30 Jun 2024 13:04:09 -0400 Subject: [PATCH 12/12] Fix unique IDs per block and label visualizaiton --- examples/tutorial.py | 63 ++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/examples/tutorial.py b/examples/tutorial.py index 66c056a5..5734ba9f 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -424,8 +424,9 @@ def smooth_in_block_dask(x): plt.imshow(smoothed.transpose((1, 2, 0)), origin="lower") # %% [markdown] -# For some tasks, dask is much simpler and better suited. One key difference between dask and daisy is that in dask, functions are not supposed to have side effects. In daisy, functions are expected to have "side effects" - saving the output, rather than returning it. In general, daisy is designed for... -# - Cases where the output is too big to be kept in memory +# For many situations, both dask and daisy can work well. Indeed, for some tasks, dask is simpler and better suited, as it does for you many things that daisy leaves to you to implement. One key difference between dask and daisy is that in dask, functions are not supposed to have side effects. In daisy, functions can have side effects, allowing blocks to depend on other blocks in the scheduling order (see the last two examples in this tutorial about task chaining and read-write conflicts). +# +# In general, daisy is designed for... # - Cases where you want to be able to pick up where you left off after an error, rather than starting the whole task over (because blocks that finished saved their results to disk) # - Cases where blocks should be executed in a particular order, so that certain blocks see other blocks outputs (without passing the output through the scheduler) # - Cases where the worker function needs setup and teardown that takes longer than processing a block (see our next example!) @@ -597,7 +598,7 @@ def smooth_in_block_with_failure(block: daisy.Block): # Debugging multi-process code is inherently difficult, but daisy tries to provide as much information as possible. First, you see the progress bar, which also reports the number of blocks at each state, including failed blocks. Any worker error messages are also logged to the scheduler log, although not the full traceback. Upon completion, daisy provides an error summary, which informs you of the final status of all the blocks, and points you to the full output and error logs for each worker, which can be found in `daisy_logs/`. The worker error log will contain the full traceback for debugging the exact source of an error. # %% [markdown] -# You may have noticed that while we coded the task to fail 50% of the time, much more than 50% of the blocks succeeded. This is because daisy by default retries each block 3 times (on different workers) before marking it as failed, to deal gracefully with random error. We will re-run this example, but set max_retries=1 to see the effect of this parameter. +# You may have noticed that while we coded the task to fail 50% of the time, much more than 50% of the blocks succeeded. This is because daisy by default retries each block 3 times (on different workers) before marking it as failed, to deal gracefully with random error. We will re-run this example, but set max_retries=0 to see the effect of this parameter. # %% # delete and re-create the dataset, so that we start from zeros again @@ -613,7 +614,7 @@ def smooth_in_block_with_failure(block: daisy.Block): read_roi=read_roi, write_roi=block_roi, read_write_conflict=False, - max_retries=1, + max_retries=0, num_workers=5, ) ] @@ -624,7 +625,7 @@ def smooth_in_block_with_failure(block: daisy.Block): # %% [markdown] -# We still have greater than 50% success! This is because if a specific worker fails multiple times, daisy will assume something might have gone wrong with that worker. The scheduler will shut down and restart the worker, and retry the blocks that failed on that worker. So daisy is very robust to random error. +# Now we should see a success rate closer to what we would expect. You might notice in the logs a message like ```Worker hostname=...:port=..:task_id=fault tolerance test:worker_id=... failed too many times, restarting this worker...```, which shows another way that daisy is robust to pseudo-random errors. If a specific worker fails multiple times, daisy will assume something might have gone wrong with that worker (e.g. GPU memory taken by another process that the cluster scheduler was not aware of). The scheduler will shut down and restart the worker, and retry the blocks that failed on that worker. So daisy is very robust to random error. # # But what about non-random error, like stopping after 8 hours? If you don't want to re-process all the already processed blocks from a prior run, you can write a function that takes a block and checks if it is complete, and pass it to the scheduler. The scheduler will run this check function on each block and skip the block if the check function returns true. # @@ -703,15 +704,16 @@ def segment_blue_objects(input_group, output_group, block): # Threshold the image to get only blue colors mask = cv2.inRange(hsv_image, lower_blue, upper_blue) # this returns 0 and 255 - mask = mask.astype(np.uint16) + mask = mask.astype(np.uint32) mask = mask // 255 # turn into 0/1 labels # give each connected component its own instance segmentation label labels = skimage.measure.label(mask) # get a unique ID for each element in the whole volume (avoid repeats between blocks) - block_id_mask = mask * (block.block_id[1]) - labels = labels + block_id_mask + max_number_obj = 128*128 # This number is an upper bound on the maximum number of objects in a block + block_id_mask = mask * (block.block_id[1] * max_number_obj) + labels = labels + block_id_mask # save the output output_ds = open_ds('sample_data.zarr', output_group, 'a') @@ -733,7 +735,7 @@ def segment_blue_objects(input_group, output_group, block): total_roi=total_read_roi, read_roi=read_roi, write_roi=block_roi, - num_workers=1, + num_workers=3, read_write_conflict=False, check_function=partial(check_complete, "smoothed_for_seg") ) @@ -753,7 +755,7 @@ def segment_blue_objects(input_group, output_group, block): "blue_objects", total_roi=total_roi, voxel_size=daisy.Coordinate((1,1)), - dtype=np.uint16, # This is different! Our labels will be uint16 + dtype=np.uint32, # This is different! Our labels will be uint32 write_size=seg_block_roi.shape, # use the new block roi to determine the chunk size ) @@ -775,10 +777,17 @@ def segment_blue_objects(input_group, output_group, block): daisy.run_blockwise([smoothing_task, seg_task]) # %% -from skimage.color import label2rgb +# make a colormap to display the labels as unique colors +# This doesn't actually guarantee uniqueness, since it wraps the list at some point +import matplotlib.colors as mcolors +colors_list = list(mcolors.XKCD_COLORS.keys()) +# colors_list.remove("black") +colormap = mcolors.ListedColormap(colors=["black"] + colors_list*101) + figure, axes = plt.subplots(1, 2) axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") -axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") +blue_objs = zarr.open('sample_data.zarr', 'r')['blue_objects'][:] +axes[1].imshow(blue_objs, origin="lower", cmap=colormap, interpolation='nearest') # %% [markdown] # Now we know how to chain tasks together, but we've created another issue. If objects crossed block boundaries, they were assigned different IDs. We will address this problem in the next section. @@ -787,10 +796,9 @@ def segment_blue_objects(input_group, output_group, block): # ## Process functions that need to read their neighbor's output (and the "read_write_conflict" flag) # %% [markdown] -# So far, we have always written process functions that read from one input array and write to one output array. However, we have much more flexibility than that - daisy just gives the worker the block, and the worker can do whatever it wants. -# -# Say we expand our read_roi as before. Reading more of the input context of a block does not help us resolve our conflicting labels, but looking at the block's neighbors' output can help! Let's visualize an example: +# There is a class of problem where it is useful for a block to see the output of its neighboring blocks. Usually, this need comes up when the task performs detection and linking in the same step. To detect an object in a block, it is useful to know if a neighbor has already detected an object that the object in the current block should link to. This is especially useful if there is some sort of continuity constraint, as in tracking objects over time. Even for tasks without a continuity constraint, like agglomeration of fragments for instance segmentation, performing the detection and linking at the same time can save you an extra pass over the dataset, which matters when the datasets are large. # +# The example in this tutorial is only to illustrate how daisy implements the ability to depend on neighboring blocks outputs, by showing how we can relabel the segmentation IDs to be consistent across blocks during the detection step. Let's visualize an example block as though it were not completed, but its neighbors are done, and the read_roi is expanded by a certain amount of context. # %% context = 10 # It could be as low as 1, but we use 10 for ease of visualization @@ -806,14 +814,13 @@ def segment_blue_objects(input_group, output_group, block): ) # simulate this block not being completed yet -from funlib.persistence import open_ds -output_ds = open_ds("sample_data.zarr", "blue_objects", "a") -output_ds[seg_block.write_roi] = 0 +blue_objs = zarr.open('sample_data.zarr', 'r')['blue_objects'][:] +blue_objs[128:356, 128:256] = 0 from skimage.color import label2rgb figure, axes = plt.subplots(1, 2) axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") -axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects'][:]), origin="lower") +axes[1].imshow(blue_objs, origin="lower", cmap=colormap, interpolation='nearest') display_roi(figure.axes[0], seg_block.read_roi, color="purple") display_roi(figure.axes[0], seg_block.write_roi, color="white") display_roi(figure.axes[1], seg_block.read_roi, color="purple") @@ -821,11 +828,11 @@ def segment_blue_objects(input_group, output_group, block): # %% [markdown] -# Here the purple is the read_roi of our current block, and the white is the write_roi. As before, the process function will read the input image in the read_roi and segment it. From the previous result visualization, we can see that the function will detect the top half of the crest and assign it the pink label. +# Here the purple is the read_roi of our current block, and the white is the write_roi. As before, the process function will read the input image in the read_roi and segment it. From the previous result visualization, we can see that the function will detect the top half of the crest and assign it the label id that is visualized as brown. # -# Before we write out the pink object to the write_roi of the output dataset, however, we can adapt the process function to **also read in the existing results in the output dataset** in the read_roi. The existing blue label will then overlap with the pink label, and our process function can relabel the top half of the crest blue based on this information, before writing to the write_roi. +# Before we write out the brown label to the write_roi of the output dataset, however, we can adapt the process function to **also read in the existing results in the output dataset** in the read_roi. The existing green label will then overlap with the brown label, and our process function can relabel the top half of the crest green based on this information, before writing to the write_roi. # -# This approach only works if the overlapping blocks are run sequentially. If they are run in parallel, it is possible that the blue label will not yet be there when our current block reads existing labels, but also that our pink label will not yet be there when the block containing the blue object reads existing labels. If `read_write_conflicts` argument is set to True in a task, the daisy scheduler will ensure that pairs of blocks with overlapping read/write rois will never be run at the same time, thus avoiding this race condition. +# This approach only works if the overlapping blocks are run sequentially (and if objects don't span across non-adjacent blocks - for large objects, you cannot relabel without a second pass). If the neighbors are run in parallel, it is possible that the green label will not yet be there when our current block reads existing labels, but also that our brown label will not yet be there when the block containing the green object reads existing labels. If `read_write_conflicts` argument is set to True in a task, the daisy scheduler will ensure that pairs of blocks with overlapping read/write rois will never be run at the same time, thus avoiding this race condition. # %% # here is the new and improved segmentation function that reads in neighboring output context @@ -863,14 +870,15 @@ def get_overlapping_labels(array1, array2): # Threshold the image to get only blue colors mask = cv2.inRange(hsv_image, lower_blue, upper_blue) # this returns 0 and 255 - mask = mask.astype(np.uint16) + mask = mask.astype(np.uint32) mask = mask // 255 # turn into 0/1 labels # give each connected component its own instance segmentation label labels = skimage.measure.label(mask) # get a unique ID for each element in the whole volume (avoid repeats between blocks) - block_id_mask = mask * (block.block_id[1]) + max_number_obj = 128*128 # This number is an upper bound on the maximum number of objects in a block + block_id_mask = mask * (block.block_id[1] * max_number_obj) labels = labels + block_id_mask # load the existing labels in the output @@ -903,7 +911,7 @@ def get_overlapping_labels(array1, array2): "blue_objects_with_context", total_roi=total_roi, voxel_size=daisy.Coordinate((1,1)), - dtype=np.uint16, + dtype=np.uint32, write_size=seg_block_roi.shape, ) @@ -920,17 +928,14 @@ def get_overlapping_labels(array1, array2): daisy.run_blockwise([seg_task]) # %% -from skimage.color import label2rgb figure, axes = plt.subplots(1, 2) axes[0].imshow(zarr.open('sample_data.zarr', 'r')['smoothed_for_seg'][:].transpose(1,2,0), origin="lower") -axes[1].imshow(label2rgb(zarr.open('sample_data.zarr', 'r')['blue_objects_with_context'][:]), origin="lower") +axes[1].imshow(zarr.open('sample_data.zarr', 'r')['blue_objects_with_context'][:], cmap=colormap, interpolation="nearest", origin="lower") # %% [markdown] # All the labels are now consistent! If you re-run the previous cell with `read_write_conflict=False`, you should see an inconsistent result again due to the race conditions, even though the process function still reads the neighboring output. # %% [markdown] -# This pattern where one block wants to see the output of its neighbors so it can incorporate them into its processing comes up surprisingly often, from agglomeration to tracking. See [this blog post](https://localshapedescriptors.github.io/#throughput) for visualizations of this process in neuron segmentation. Setting `read_write_conflicts=True` allows you to solve many complex tasks in this manner. -# # **IMPORTANT PERFORMANCE NOTE:** Be aware that `read_write_conflicts` is set to `True` by default and can lead to performance hits in cases where you don't need it, so be sure to turn it off if you want every block to be run in parallel! # %%