Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mthrok authored and ci committed Sep 19, 2024
0 parents commit 1d67164
Show file tree
Hide file tree
Showing 450 changed files with 119,461 additions and 0 deletions.
Empty file added .nojekyll
Empty file.
11 changes: 11 additions & 0 deletions index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!DOCTYPE html>
<html>
<head>
<title>SPDL</title>
</head>
<body>
<p>Redirecting to the main document.</p>
<script>
window.location.href= "./main/index.html";
</script>
</html>
12 changes: 12 additions & 0 deletions main/_sources/api.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
API Reference
=============

.. autosummary::
:toctree: generated
:template: _custom_autosummary_module.rst
:caption: API Reference
:recursive:

spdl.io
spdl.dataloader
spdl.utils
133 changes: 133 additions & 0 deletions main/_sources/best_practice.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
Best Practices
==============

Avoid creating intermediate tensors
-----------------------------------

For efficient and performant data processing, it is advised to not create
an intermediate Tensor for each individual media object (such as single image),
instead create a batch Tensor directly.

We recommend decoding individual frames, then using :py:func:`spdl.io.convert_frames`
to create a batch Tensor directly without creating an intermediate Tensors.

If you are decoding a batch of images, and you have pre-determined set of images
that should go together into a same batch, use
:py:func:`spdl.io.load_image_batch` (or its async variant
:py:func:`spdl.io.async_load_image_batch`).

Otherwise, demux, decode and preprocess multiple media, then combine them with
:py:func:`spdl.io.convert_frames` (or :py:func:`spdl.io.async_convert_frames`).
For example, the following functions implement decoding and tensor creation
separately.

.. code-block::
import spdl.io
from spdl.io import ImageFrames
def decode_image(src: str) -> ImageFrames:
packets = spdl.io.async_demux_image(src)
return spdl.io.async_decode_packets(packets)
def batchify(frames: list[ImageFrames]) -> ImageFrames:
buffer = spdl.io.convert_frames(frames)
return spdl.io.to_torch(buffer)
They can be combined in :py:class:`~spdl.dataloader.Pipeline`, which automatically
discards the items failed to process (for example due to invalid data), and
keep the batch size consistent by using other items successefully processed.

.. code-block::
from spdl.dataloader import PipelineBuilder
pipeline = (
PipelineBuilder()
.add_source(...)
.pipe(decode_image, concurrency=...)
.aggregate(32)
.pipe(batchify)
.add_sink(3)
.build(num_threads=...)
)
.. seealso::

:py:mod:`multi_thread_preprocessing`

Make Dataset class composable
-----------------------------

If you are publishing a dataset and providing an implementation of
`Dataset` class, we recommend to make it composable.

That is, in addition to the conventional ``Dataset`` class that
returns Tensors, make the components of the ``Dataset``
implementation available by breaking down the implementation into

* Iterator (or map) interface that returns paths instead of Tensors.
* A helper function that loads the soure path into Tensor.

For example, the interface of a ``Dataset`` for image classification
might look like the following.

.. code-block::
class Dataset:
def __getitem__(self, key: int) -> tuple[Tensor, int]:
...
We recommend to separate the source and process and make them additional
public interface.
(Also, as descibed above, we recommend to not convert each item into
``Tensor`` for the performance reasons.)

.. code-block::
class Source:
def __getitem__(self, key: int) -> tuple[str, int]:
...
def load(data: tuple[str, int]) -> tuple[ImageFrames, int]:
...
and if the processing is composed of stages with different bounding
factor, then split them further into primitive functions.

.. code-block::
def download(src: tuple[str, int]) -> tuple[bytes, int]:
...
def decode_and_preprocess(data: tuple[bytes, int]) -> tuple[ImageFrames, int]:
...
then the original ``Dataset`` can be implemented as a composition

.. code-block::
class Dataset:
def __init__(self, ...):
self._src = Source(...)
def __getitem__(self, key:int) -> tuple[str, int]:
metadata = self._src[key]
item = download(metadata)
frames, cls = decode_and_preprocess(item)
tensor = spdl.io.to_torch(frames)
return tensor, cls
Such decomposition makes the dataset compatible with SPDL's Pipeline,
and allows users to run them more efficiently

.. code-block::
pipeline = (
PipelineBuilder()
.add_source(Source(...))
.pipe(download, concurrency=8)
.pipe(decode_and_preprocess, concurrency=4)
...
.build(...)
)
9 changes: 9 additions & 0 deletions main/_sources/cpp.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
API References (C++)
====================

.. toctree::
:caption: API References (C++)

Class List <generated/libspdl/libspdl_class>
File List <generated/libspdl/libspdl_file>
API <generated/libspdl/libspdl_api>
13 changes: 13 additions & 0 deletions main/_sources/examples.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Examples
========

.. autosummary::
:toctree: generated
:template: _custom_autosummary_example.rst
:caption: Examples
:recursive:

image_dataloading
video_dataloading
imagenet_classification
multi_thread_preprocessing
106 changes: 106 additions & 0 deletions main/_sources/faq.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
Frequently Asked Questions
==========================

How to work around GIL?
-----------------------

In Python, GIL (Global Interpreter Lock) practically prevents the use of multi-threading, however extension modules that are written in low-level languages, such as C, C++ and Rust, can release GIL when executing operations that do not interact with Python interpreter.

Many libraries used for dataloading release the GIL. To name a few;

- Pillow
- OpenCV
- Decord
- tiktoken

Typically, the bottleneck of model training in loading and preprocessing the media data.
So even though there are still parts of pipelines that are constrained by GIL,
by taking advantage of preprocessing functions that release GIL,
we can achieve high throughput.

What if a function does not release GIL?
----------------------------------------

In case you need to use a function that takes long time to execute (e.g. network utilities)
but it does not release GIL, you can delegate the stage to subprocess.

:py:meth:`spdl.dataloader.PipelineBuilder.pipe` method takes an optional ``executor`` argument.
The default behavior of the ``Pipeline`` is to use the thread pool shared among all stages.
You can pass an instance of :py:class:`concurrent.futures.ProcessPoolExecutor`,
and that stage will execute the function in the subprocess.

.. code-block::
executor = ProcessPoolExecutor(max_workers=num_processes)
pipeline = (
PipelineBuilder()
.add_source(src)
.pipe(stage1, executor=executor, concurrency=num_processes)
.pipe(stage2, ...)
.pipe(stage3, ...)
.add_sink(1)
.build()
)
This will build pipeline like the following.

.. include:: ./plots/faq_subprocess_chart.txt

.. note::

Along with the function arguments and return values, the function itself is also
serialized and passed to the subprocess. Therefore, the function to be executed
must be a plain function. Closures and class methods cannot be passed.

.. tip::

If you need to perform one-time initialization in subporcess, you can use
``initializer`` and ``initargs`` arguments.

The values passed as ``initializer`` and ``initargs`` must be picklable.
If constructing an object in a process that does not support picke, then
you can pass constructor arguments instead and store the resulting object
in global scope. See also https://stackoverflow.com/a/68783184/3670924.

Example

.. code-block::
def init_resource(*args):
global rsc
rsc = ResourceClass(*args)
def process_with_resource(item):
global rsc
return rsc.process(item)
executor = ProcessPoolExecutor(
max_workers=4,
mp_context=None,
initializer=init_resource,
initargs=(...),
)
pipeline = (
PipelineBuilder()
.add_source()
.pipe(
process_with_resource,
executor=executor,
concurrency=4,
)
.add_sink(3)
.build()
)
Why Async IO?
-------------

When training a model with large amount of data, the data are retrieved from remote locations. Network utilities often provide APIs based on Async I/O.

The Async I/O allows to easily build complex data preprocessing pipeline and execute them while automatically parallelizing parts of the pipline, achiving high throughput.

Synchronous operations that release GIL can be converted to async operations easily by running them in a thread pool. So by converting the synchronous preprocessing functions that release GIL into asynchronous operations, the entire data preprocessing pipeline can be executed in async event loop. The event loop handles the scheduling of data processing functions, and execute them concurrently.
Loading

0 comments on commit 1d67164

Please sign in to comment.