diff --git a/docs/getting-started/tutorials/season-2-scaling-out-and-up/episode05.md b/docs/getting-started/tutorials/season-2-scaling-out-and-up/episode05.md index 2a038a9e..f1e0005f 100644 --- a/docs/getting-started/tutorials/season-2-scaling-out-and-up/episode05.md +++ b/docs/getting-started/tutorials/season-2-scaling-out-and-up/episode05.md @@ -4,7 +4,7 @@ This flow is a simple linear workflow that verifies your cloud configuration. The `start` and `end` steps will run locally, while the `hello` step will [run -remotely](/scaling/remote-tasks/introduction). After [configuring +remotely](/scaling/remote-tasks/requesting-resources). After [configuring Metaflow](/getting-started/infrastructure) to run in the cloud, data and metadata about your runs will be stored remotely. This means you can use the client to access information about any flow from anywhere. diff --git a/docs/index.md b/docs/index.md index 8f6a03fa..a8bfd220 100644 --- a/docs/index.md +++ b/docs/index.md @@ -31,12 +31,12 @@ projects. - [Creating Flows](metaflow/basics) - [Inspecting Flows and Results](metaflow/client) - [Debugging Flows](metaflow/debugging) -- [Visualizing Results](metaflow/visualizing-results/) ✨*New Features*✨ +- [Visualizing Results](metaflow/visualizing-results/) ## II. Scalable Flows - [Introduction to Scalable Compute and Data](scaling/introduction) -- [Executing Tasks remotely](scaling/remote-tasks/introduction) +- [Computing at Scale](scaling/remote-tasks/introduction) ✨*Updated and Expanded*✨ - [Managing Dependencies](scaling/dependencies) - [Dealing with Failures](scaling/failures) - [Loading and Storing Data](scaling/data) diff --git a/docs/introduction/metaflow-resources.md b/docs/introduction/metaflow-resources.md index 28a20607..11afffb6 100644 --- a/docs/introduction/metaflow-resources.md +++ b/docs/introduction/metaflow-resources.md @@ -36,12 +36,17 @@ resource that should be listed here 🤗 ## Videos + - [Metaflow Office Hours](https://www.youtube.com/watch?v=76k5r6s6M1Q&list=PLUsOvkBBnJBcdF7SScDIndbnkMnFOoskA) - + cool stories from companies using Metaflow. - [Metaflow on YouTube](https://www.youtube.com/results?search_query=metaflow+ml). - You can start with [this recent overview](https://www.youtube.com/watch?v=gZnhSHvhuFQ). ## Blogs +Find [a more comprehensive list of Metaflow users and posts here](https://outerbounds.com/stories/). Here's +a small sample: + - 23andMe: [Developing safe and reliable ML products at 23andMe](https://medium.com/23andme-engineering/machine-learning-eeee69d40736) - AWS: [Getting started with the open source data science tool Metaflow on @@ -50,6 +55,7 @@ resource that should be listed here 🤗 CNN](https://medium.com/cnn-digital/accelerating-ml-within-cnn-983f6b7bd2eb) - Latana: [Brand Tracking with Bayesian Statistics and AWS Batch](https://aws.amazon.com/blogs/startups/brand-tracking-with-bayesian-statistics-and-aws-batch/) + - Netflix: [Supporting Diverse ML Systems at Netflix](https://netflixtechblog.com/supporting-diverse-ml-systems-at-netflix-2d2e6b6d205d) - Netflix: [Open-Sourcing Metaflow, a Human-Centric Framework for Data Science](https://netflixtechblog.com/open-sourcing-metaflow-a-human-centric-framework-for-data-science-fa72e04a5d9) - Netflix: [Unbundling Data Science Workflows with Metaflow and AWS Step diff --git a/docs/metaflow/visualizing-results/README.md b/docs/metaflow/visualizing-results/README.md index c8ae7af3..d6fa225b 100644 --- a/docs/metaflow/visualizing-results/README.md +++ b/docs/metaflow/visualizing-results/README.md @@ -39,7 +39,8 @@ every time a new model is trained. In contrast, cards are not meant for building complex, interactive dashboards or for ad-hoc exploration that is a spot-on use case for notebooks. If you are curious, you can read -more about the motivation for cards in [the original release blog post](https://outerbounds.com/blog/integrating-pythonic-visual-reports-into-ml-pipelines/) and +more about the motivation for cards in [the original release blog +post](https://outerbounds.com/blog/integrating-pythonic-visual-reports-into-ml-pipelines/) and [the announcement post for updating cards](https://outerbounds.com/blog/metaflow-dynamic-cards/). ## How to use cards? @@ -85,7 +86,7 @@ detail below: above. Also, crucially, cards work in any compute -environment such as laptops, [any remote tasks](/scaling/remote-tasks/introduction), or +environment such as laptops, [any remote tasks](/scaling/remote-tasks/requesting-resources), or when the flow is [scheduled to run automatically](/production/introduction). Hence, you can use cards to inspect and debug results during prototyping, as well as monitor the quality of production runs. diff --git a/docs/metaflow/visualizing-results/advanced-shareable-cards-with-card-templates.md b/docs/metaflow/visualizing-results/advanced-shareable-cards-with-card-templates.md index 711ca783..3ebd777e 100644 --- a/docs/metaflow/visualizing-results/advanced-shareable-cards-with-card-templates.md +++ b/docs/metaflow/visualizing-results/advanced-shareable-cards-with-card-templates.md @@ -80,7 +80,7 @@ You should see a blank page with a blue “Hello World!” text. ![]() A particularly useful feature of card templates is that they work in any compute -environment, even when [executing tasks remotely](/scaling/remote-tasks/introduction). +environment, even when [executing tasks remotely](/scaling/remote-tasks/requesting-resources). For instance, if you have AWS Batch set up, you can run the flow as follows: ``` diff --git a/docs/production/scheduling-metaflow-flows/scheduling-with-airflow.md b/docs/production/scheduling-metaflow-flows/scheduling-with-airflow.md index d581769a..817e2b9c 100644 --- a/docs/production/scheduling-metaflow-flows/scheduling-with-airflow.md +++ b/docs/production/scheduling-metaflow-flows/scheduling-with-airflow.md @@ -36,7 +36,7 @@ required in the code. All data artifacts produced by steps run on Airflow are av using the [Client API](../../metaflow/client.md). All tasks are run on Kubernetes respecting the `@resources` decorator, as if the `@kubernetes` decorator was added to all steps, as explained in [Executing Tasks -Remotely](/scaling/remote-tasks/introduction#safeguard-flags). +Remotely](/scaling/remote-tasks/requesting-resources). This document describes the basics of Airflow scheduling. If your project involves multiple people, multiple workflows, or it is becoming business-critical, check out the @@ -115,7 +115,7 @@ the default with the `--max-workers` option. For instance, `airflow create --max 500` allows 500 tasks to be executed concurrently for every foreach step. This option is similar to [`run ---max-workers`](/scaling/remote-tasks/introduction#safeguard-flags) that is used to +--max-workers`](/scaling/remote-tasks/controlling-parallelism) that is used to limit concurrency outside Airflow. ### Deploy-time parameters diff --git a/docs/production/scheduling-metaflow-flows/scheduling-with-argo-workflows.md b/docs/production/scheduling-metaflow-flows/scheduling-with-argo-workflows.md index de5459a1..ae52bca7 100644 --- a/docs/production/scheduling-metaflow-flows/scheduling-with-argo-workflows.md +++ b/docs/production/scheduling-metaflow-flows/scheduling-with-argo-workflows.md @@ -25,7 +25,7 @@ changes are required in the code. All data artifacts produced by steps run on Ar Workflows are available using the [Client API](../../metaflow/client.md). All tasks are run on Kubernetes respecting the `@resources` decorator, as if the `@kubernetes` decorator was added to all steps, as explained in [Executing Tasks -Remotely](/scaling/remote-tasks/introduction#safeguard-flags). +Remotely](/scaling/remote-tasks/requesting-resources). This document describes the basics of Argo Workflows scheduling. If your project involves multiple people, multiple workflows, or it is becoming business-critical, check @@ -129,7 +129,7 @@ the default with the `--max-workers` option. For instance, `argo-workflows creat --max-workers 500` allows 500 tasks to be executed concurrently for every foreach step. This option is similar to [`run ---max-workers`](/scaling/remote-tasks/introduction#safeguard-flags) that is used to +--max-workers`](/scaling/remote-tasks/controlling-parallelism) that is used to limit concurrency outside Argo Workflows. ### Deploy-time parameters diff --git a/docs/production/scheduling-metaflow-flows/scheduling-with-aws-step-functions.md b/docs/production/scheduling-metaflow-flows/scheduling-with-aws-step-functions.md index b8d952d7..77869009 100644 --- a/docs/production/scheduling-metaflow-flows/scheduling-with-aws-step-functions.md +++ b/docs/production/scheduling-metaflow-flows/scheduling-with-aws-step-functions.md @@ -23,7 +23,7 @@ changes are required in the code. All data artifacts produced by steps run on AW Functions are available using the [Client API](../../metaflow/client). All tasks are run on AWS Batch respecting the `@resources` decorator, as if the `@batch` decorator was added to all steps, as explained in [Executing Remote -Tasks](/scaling/remote-tasks/introduction). +Tasks](/scaling/remote-tasks/requesting-resources). This document describes the basics of AWS Step Functions scheduling. If your project involves multiple people, multiple workflows, or it is becoming business-critical, check @@ -146,7 +146,7 @@ the default with the `--max-workers` option. For instance, `step-functions creat --max-workers 500` allows 500 tasks to be executed concurrently for every foreach step. This option is similar to [`run ---max-workers`](/scaling/remote-tasks/introduction#safeguard-flags) that is used to +--max-workers`](/scaling/remote-tasks/controlling-parallelism) that is used to limit concurrency outside AWS Step Functions. ### Deploy-time parameters diff --git a/docs/scaling/data.md b/docs/scaling/data.md index 67e14019..2d186bdc 100644 --- a/docs/scaling/data.md +++ b/docs/scaling/data.md @@ -29,7 +29,7 @@ manipulation: are caused by out-of-sync features. * It is quicker to iterate on your model. Testing and debugging Python is easier than testing and debugging SQL. -* You can request [arbitrary amount of resources](/scaling/remote-tasks/introduction) +* You can request [arbitrary amount of resources](/scaling/remote-tasks/requesting-resources) for your data manipulation needs. * Instead of having data manipulation code in two places (SQL and Python), all code can be clearly laid out in a single place, in a single language, for maximum readability. @@ -65,7 +65,7 @@ of `metaflow.S3` to directly interface with data files on S3 backing your tables data is loaded directly from S3, there is no limitation to the number of parallel processes. The size of data is only limited by the size of your instance, which can be easily controlled with [the `@resources` -decorator](/scaling/remote-tasks/introduction#requesting-resources-with-resources-decorator). +decorator](/scaling/remote-tasks/requesting-resources). The best part is that this approach is blazingly fast compared to executing SQL. The main downside of this approach is that the table needs to have partitions that match @@ -549,7 +549,7 @@ Read more about [fast data processing with `metaflow.S3` in this blog post](http For maximum performance, ensure that [the `@resources(memory=)` -setting](/scaling/remote-tasks/introduction#requesting-resources-with-resources-decorator) +setting](/scaling/remote-tasks/requesting-resources) is higher than the amount of data you are downloading with `metaflow.S3`. If the amount of data is higher than the available disk space, you can use the diff --git a/docs/scaling/dependencies/README.md b/docs/scaling/dependencies/README.md index 7094bbce..952c57c4 100644 --- a/docs/scaling/dependencies/README.md +++ b/docs/scaling/dependencies/README.md @@ -30,7 +30,7 @@ others. to retrieve a flow, rerun it, and get similar results. To make this possible, a flow can't depend on libraries that are only available on your laptop. -3. **[Remote execution of tasks](/scaling/remote-tasks/introduction)** requires that +3. **[Remote execution of tasks](/scaling/remote-tasks/requesting-resources)** requires that all dependencies can be reinstantated on the fly in a remote environment. Again, this is not possible if the flow depends on libraries that are only available on your laptop. @@ -61,7 +61,7 @@ dependencies like this automatically. 3. Crucially, Metaflow packages Metaflow itself for remote execution so that you don't have to install it manually when -[using `@batch` and `@kubernetes`](/scaling/remote-tasks/introduction). Also, this +[using `@batch` and `@kubernetes`](/scaling/remote-tasks/requesting-resources). Also, this guarantees that all tasks use the same version of Metaflow consistently. 4. External libraries can be included via [the `@pypi` and `@conda` diff --git a/docs/scaling/dependencies/containers.md b/docs/scaling/dependencies/containers.md index deb2c1be..277a9e34 100644 --- a/docs/scaling/dependencies/containers.md +++ b/docs/scaling/dependencies/containers.md @@ -1,7 +1,7 @@ # Defining Custom Images -All [tasks executed remotely](/scaling/remote-tasks/introduction) run in a container, +All [tasks executed remotely](/scaling/remote-tasks/requesting-resources) run in a container, both on [Kubernetes](/scaling/remote-tasks/kubernetes) and in [AWS Batch](/scaling/remote-tasks/aws-batch). Hence, the default environment for remote tasks is defined by the container (Docker) image used. diff --git a/docs/scaling/dependencies/libraries.md b/docs/scaling/dependencies/libraries.md index 328a08dd..07838fd1 100644 --- a/docs/scaling/dependencies/libraries.md +++ b/docs/scaling/dependencies/libraries.md @@ -236,7 +236,7 @@ in the `end` step. ## Libraries in remote tasks A major benefit of `@pypi` and `@conda` is that they allow you to define libraries that will be -automatically made available when [you execute tasks remotely](/scaling/remote-tasks/introduction) +automatically made available when [you execute tasks remotely](/scaling/remote-tasks/requesting-resources) on `@kubernetes` or on `@batch`. You don't need to do anything special to make this happen. If you have `@kubernetes` or `@batch` [configured to work with diff --git a/docs/scaling/dependencies/project-structure.md b/docs/scaling/dependencies/project-structure.md index 35e22ae3..face1b26 100644 --- a/docs/scaling/dependencies/project-structure.md +++ b/docs/scaling/dependencies/project-structure.md @@ -3,7 +3,7 @@ This page describes how to arrange files in your projects to follow software development best practices, which also leads to -[easy remote execution](/scaling/remote-tasks/introduction). +[easy remote execution](/scaling/remote-tasks/requesting-resources). ## Separating code to modules @@ -55,7 +55,7 @@ can run the flow as usual: python teaflow.py run ``` The `teatime.py` module works out of the box. If you have -[remote execution](/scaling/remote-tasks/introduction) set up, +[remote execution](/scaling/remote-tasks/requesting-resources) set up, you can run the code `--with batch` or `--with kubernetes` and it works equally well! diff --git a/docs/scaling/failures.md b/docs/scaling/failures.md index 70dc5cb6..889cdd2a 100644 --- a/docs/scaling/failures.md +++ b/docs/scaling/failures.md @@ -52,7 +52,7 @@ sometimes the `start` step raises an exception and needs to be retried. By defau always succeed. It is recommended that you use `retry` every time you [run tasks -remotely](/scaling/remote-tasks/introduction). Instead of annotating every step with a +remotely](/scaling/remote-tasks/requesting-resources). Instead of annotating every step with a retry decorator, you can also automatically add a retry decorator to all steps that do not have one as follows: @@ -104,7 +104,7 @@ can be retried without concern. ### Maximizing Safety By default, `retry` will retry the step for three times before giving up. It waits for 2 -minutes between retries for [remote tasks](/scaling/remote-tasks/introduction). This +minutes between retries for [remote tasks](/scaling/remote-tasks/requesting-resources). This means that if your code fails fast, any transient platform issues need to get resolved in less than 10 minutes or the whole run will fail. 10 minutes is typically more than enough, but sometimes you want both a belt and suspenders. diff --git a/docs/scaling/introduction.md b/docs/scaling/introduction.md index 146cf6e8..d62d702c 100644 --- a/docs/scaling/introduction.md +++ b/docs/scaling/introduction.md @@ -9,7 +9,7 @@ optimization](https://xkcd.com/1691/) is probably not the best use of your time. Instead, you can leverage the cloud to get a bigger laptop or more laptops (virtually, not literally). This is the Stage II in Metaflow development: Scaling flows with the -cloud. Luckily Metaflow makes this trivially easy - no changes in the code required - +cloud. Metaflow makes this easy - no changes in the code required - after you have done the initial legwork to [configure infrastructure for Metaflow](/getting-started/infrastructure). @@ -17,9 +17,10 @@ Metaflow](/getting-started/infrastructure). ## Supersizing Flows -Here's how Metaflow can help make your project more scalable: +Here's how Metaflow can help make your project more scalable, both tecnically and +organizationally: -1. You can make your existing flows more scalable just by adding a line of code, +1. You can make your existing flow scalable just by adding a line of code, `@resources`. This way you can request more CPUs, memory, or GPUs in your flows. Or, you can parallelize processing over multiple instances, even thousands of them. @@ -28,18 +29,18 @@ attracting interest from colleagues too. Metaflow contains a number of features, [namespaces](/scaling/tagging), which make collaboration smoother by allowing many people contribute without interfering with each other's work accidentally. -### Toolbox of Scalability +### Easy patterns for scalable, high-performance code There is no single magic formula for scalability. Instead of proposing a novel paradigm -to make your Python code faster, Metaflow provides a set of pragmatic tools, leveraging -the best off-the-shelf components and services, which help you make code more scalable -and performant depending on your specific needs. +to make your Python code faster, [Metaflow provides a set of +practical, commonly used patterns](/scaling/remote-tasks/introduction), which help you +make code more scalable and performant depending on your specific needs. -The scalability tools fall into three categories: +The patterns fall into three categories: - **Performance Optimization**: You can improve performance of your code by utilizing off-the-shelf, high-performance libraries such as - [XGboost](https://github.com/dmlc/xgboost) or [Tensorflow](https://tensorflow.org). + [XGboost](https://github.com/dmlc/xgboost) or [PyTorch](https://pytorch.org/). Or, if you need something more custom, you can leverage the vast landscape of data tools for Python, including compilers like [Numba](https://numba.pydata.org) to speed up your code. @@ -53,7 +54,9 @@ care of provisioning such machines on demand. - **Scaling Out**: Besides executing code on a single instance, Metaflow makes it easy to parallelize steps over an arbitrarily large number of instances, leveraging Kubernetes and AWS Batch, giving you access to virtually unlimited amount of computing - power. + power. Besides many independent tasks, you can [create large compute + clusters](/scaling/remote-tasks/distributed-computing) too on the fly, e.g. + to train large (language) models. Often an effective recipe for scalability is a combination of these three techniques: Start with high-performance Python libraries, run them on large instances, and if @@ -73,8 +76,7 @@ In this section, you will learn how to make your flows capable of handling more execute faster. You will also learn how to scale projects over multiple people by organizing results better. We cover five topics: -1. [Executing tasks remotely with Kubernetes or AWS - Batch](/scaling/remote-tasks/introduction) +1. [Scaling compute in the cloud](/scaling/remote-tasks/introduction) 2. [Dealing with failures](/scaling/failures) 3. [Managing execution environments](/scaling/dependencies) 4. [Loading and storing data efficiently](/scaling/data) diff --git a/docs/scaling/remote-tasks/aws-batch.md b/docs/scaling/remote-tasks/aws-batch.md index c3c7c358..888c5775 100644 --- a/docs/scaling/remote-tasks/aws-batch.md +++ b/docs/scaling/remote-tasks/aws-batch.md @@ -35,11 +35,49 @@ You can also specify the resource requirements on command line as well: $ python BigSum.py run --with batch:cpu=4,memory=10000,queue=default,image=ubuntu:latest ``` -## My job is stuck in `RUNNABLE` state. What do I do? +## Using GPUs and Trainium instances with AWS Batch + +To use GPUs in Metaflow tasks that run on AWS Batch, you need to run the flow in a +[Job Queue](https://docs.aws.amazon.com/batch/latest/userguide/job_queues.html) that +is attached to a [Compute +Environment](https://docs.aws.amazon.com/batch/latest/userguide/compute_environments.html) +with GPU/Trainium instances. + +To set this up, you can either modify the allowable instances in a [Metaflow AWS deployment +template](https://github.com/outerbounds/metaflow-tools/tree/master/aws) or manually add such a +compute environment from the AWS console. The steps are: + +1. Create a compute environment with GPU-enabled EC2 instances or Trainium instances. +2. Attach the compute environment to a new Job Queue - for example named `my-gpu-queue`. +3. Run a flow with a GPU task in the `my-gpu-queue` job queue by + - setting the `METAFLOW_BATCH_JOB_QUEUE` environment variable, or + - setting the `METAFLOW_BATCH_JOB_QUEUE` value in your Metaflow config, or + - (most explicit) setting the `queue` parameter in the `@batch` decorator. + +It is a good practice to separate the job queues that you run GPU tasks on from those that do not +require GPUs (or Trainium instances). This makes it easier to track hardware-accelerated workflows, +which can be costly, independent of other workflows. Just add a line like +```python +@batch(gpu=1, queue='my-gpu-queue') +``` +in steps that require GPUs. + +## My job is stuck in `RUNNABLE` state. What should I do? + +Does the Batch job queue you are trying to run the Metaflow task in have a compute environment +with EC2 instances with the resources requested? For example, if your job queue is connected to +a single compute environment that only has `p3.2xlarge` as a GPU instance, and a user requests 2 +GPUs, that job will never get scheduled because `p3.2xlarge` only have 1 GPU per instance. -Consult [this +For more information, [see this article](https://docs.aws.amazon.com/batch/latest/userguide/troubleshooting.html#job_stuck_in_runnable). +## My job is stuck in `STARTING` state. What should I do? + +Are the resources requested in your Metaflow code/command sufficient? Especially when using +custom GPU images, you might need to increase the requested memory to pull the container image +into your compute environment. + ## Listing and killing AWS Batch tasks If you interrupt a Metaflow run, any AWS Batch tasks launched by the run get killed by @@ -101,8 +139,15 @@ As a convenience feature, you can also see the logs of any past step as follows: $ python bigsum.py logs 15/end ``` - ## Disk space You can request higher disk space on AWS Batch instances by using an unmanaged Compute -Environment with a custom AMI. \ No newline at end of file +Environment with a custom AMI. + +## How to configure AWS Batch for distributed computing? + +[See these instructions](https://outerbounds.com1/engineering/operations/distributed-computing/) +if you want to use AWS Batch for [distributed computing](/scaling/remote-tasks/distributed-computing). + + + diff --git a/docs/scaling/remote-tasks/controlling-parallelism.md b/docs/scaling/remote-tasks/controlling-parallelism.md new file mode 100644 index 00000000..962509f2 --- /dev/null +++ b/docs/scaling/remote-tasks/controlling-parallelism.md @@ -0,0 +1,31 @@ + +# Controlling Parallelism + +It is almost too easy to execute tasks remotely using Metaflow. Consider a foreach loop +defined as follows: + +```python +self.params = range(1000) +self.next(self.fanned_out, foreach='params') +``` + +When run with `--with batch` or `--with kubernetes`, this code would launch up to 1000 +parallel instances which may turn out to be quite expensive. + +To safeguard against inadvertent launching of many parallel jobs, the `run` and `resume` +commands have a flag `--max-num-splits` which fails the task if it attempts to launch +more than 100 splits by default. Use the flag to increase the limit if you actually need +more tasks. + +```bash +$ python myflow.py run --max-num-splits 200 +``` + +Another flag, `--max-workers`, limits the number of tasks run in parallel. Even if a +foreach launched 100 splits, `--max-workers` would make only 16 \(by default\) of them +run in parallel at any point in time. If you want more parallelism, increase the value +of `--max-workers`. + +```bash +$ python myflow.py run --max-workers 32 +``` \ No newline at end of file diff --git a/docs/scaling/remote-tasks/distributed-computing.md b/docs/scaling/remote-tasks/distributed-computing.md new file mode 100644 index 00000000..0da5ae76 --- /dev/null +++ b/docs/scaling/remote-tasks/distributed-computing.md @@ -0,0 +1,143 @@ + +# Distributed Computing + +Metaflow's [`foreach` construct](/metaflow/basics#foreach) allows you to run tasks concurrently. +In the case `foreach`, tasks execute independently. This pattern works well when the workload +is [*embarrassingly parallel*](https://en.wikipedia.org/wiki/Embarrassingly_parallel), that is, +tasks don't communicate with each other and they don't have to execute simultaneously. + +There are other workloads, such as distributed training of large models, which require +tasks to interact with each other. Metaflow provides another mechanism, the `@parallel` decorator, +which orchestrates such inter-dependent tasks. In effect, the decorator launches +an ephemeral compute cluster on the fly, as a part of a Metaflow flow, benefiting from +Metaflow features like [dependency management](/scaling/dependencies), +[versioning](/scaling/tagging), and [production deployments](/production/introduction). + +Typically, this pattern is used through one of the framework-specific decorators like `@torchrun` +or `@deepspeed`, described below, which make it easy to use a particular framework for distributed +training. If you need low-level access to the cluster, e.g. to use it with a framework that doesn't +have a corresponding high-level decorator yet, see documentation for low-level access at the +end of this page. + +:::info +To use distributed computing, follow [set up instructions +here](https://outerbounds.com/engineering/operations/distributed-computing/). If you need +help getting started, contact [Metaflow Slack](http://slack.metaflow.org). +::: + +## High-level decorators + +The easiest way to get started is to use one of the high-level decorators - [see an overview +in this blog post](https://outerbounds.com/blog/distributed-training-with-metaflow/): + +| Decorator Implementation | UX | Description | PyPi Release | Example | +| :---: | --- | --- | :---: | :---: | +| [`@torchrun`](https://github.com/outerbounds/metaflow-torchrun) | Use `current.torch.run` to submit your `torch.distributed` program. No need to log into each node, call the code once in `@step`. | A [`torchrun`](https://pytorch.org/docs/stable/elastic/run.html) command that runs `@step` function code on each node. [Torch distributed](https://pytorch.org/tutorials/beginner/dist_overview.html) is used under the hood to handle communication between nodes. | [`metaflow-torchrun`](https://pypi.org/project/metaflow-torchrun/) | [MinGPT](https://github.com/outerbounds/metaflow-torchrun/blob/main/examples/min-gpt/flow.py) | +| [`@deepspeed`](https://github.com/outerbounds/metaflow-deepspeed) | Exposes `current.deepspeed.run`
Requires OpenSSH and OpenMPI installed in the Metaflow task container. | Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run. | [`metaflow-deepspeed`](https://pypi.org/project/metaflow-deepspeed/) | [Bert](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/bert) & [Dolly](https://github.com/outerbounds/metaflow-deepspeed/tree/main/examples/dolly) | +| [`@metaflow_ray`](https://github.com/outerbounds/metaflow-ray/tree/main) | Write a Ray program locally or call script from `@step` function, `@metaflow_ray` takes care of forming the Ray cluster. | Forms a [Ray cluster](https://docs.ray.io/en/latest/cluster/getting-started.html) dynamically. Runs the `@step` function code on the control task as Ray’s “head node”. | [`metaflow-ray`](https://pypi.org/project/metaflow-ray/) | [GPT-J](https://github.com/outerbounds/metaflow-ray/tree/main/examples/ray-fine-tuning-gpt-j) & [Distributed XGBoost](https://github.com/outerbounds/metaflow-ray/tree/main/examples/train) | +| [`@tensorflow`](https://github.com/outerbounds/metaflow-tensorflow/tree/main) | Put TensorFlow code in a distributed strategy scope, and call it from step function. | Run the `@step` function code on each node. This means the user picks the appropriate [strategy](https://www.tensorflow.org/guide/distributed_training#types_of_strategies) in their code. | [`metaflow-tensorflow`](https://pypi.org/project/metaflow-tensorflow/) | [Keras Distributed](https://github.com/outerbounds/metaflow-tensorflow/tree/main/examples/multi-node) | +| [`@mpi`](https://github.com/outerbounds/metaflow-mpi) | Exposes `current.mpi.cc`, `current.mpi.broadcast_file`, `current.mpi.run`, `current.mpi.exec`. Cluster SSH config is handled automatically inside the decorator. Requires OpenSSH and an MPI implementation are installed in the Metaflow task container. It was tested against OpenMPI, which you can find a sample Dockerfile for [here](https://github.com/outerbounds/metaflow-mpi/blob/main/examples/Dockerfile). | Forms an MPI cluster with passwordless SSH configured at task runtime. Users can submit a `mpi4py` program or compile, broadcast, and submit a C program. | [`metaflow-mpi`](https://pypi.org/project/metaflow-mpi/) | [Libgrape](https://github.com/outerbounds/metaflow-mpi/tree/main/examples/libgrape-ldbc-graph-benchmark) | + +:::info +Note that these decorators are not included in the `metaflow` package but they are implemented as Metaflow +Extensions. You need to install them separately in your development environment, but they will get +packaged automatically by Metaflow, so you don't need to include them in Docker images +or `@conda`/`@pypi`. Also note that the extensions are not part of [the stable Metaflow API](/api), so +they are subject to change. +::: + +## Low-level access + +Under the hood, Metaflow guarantees that you get a desired kind and number of compute nodes running +simultaneously, so that they are able to communicate and coordinate amongst each other. + +You can use this compute cluster to implement any distributed computing algorithms of your own. +To illustrate this, consider a simple example that sets up a cluster of tasks that communicate +with each other over [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface). +Technically, MPI is not required - you could communicate with any protocol you want - but MPI is +a popular choice. + +### MPI example + +Let’s create a simple Hello World MPI program based on +[this example](https://github.com/outerbounds/metaflow-mpi/tree/main/examples/python-hello). +The program identifies the main node (`rank == 0`) that sends a message to +all workers nodes which they receive and print out. We use +[mpi4py](https://mpi4py.readthedocs.io/en/stable/) as a Python wrapper for the MPI protocol. + +First, let's create an MPI script, `mpi_hello.py`: +```python +import mpi4py +from mpi4py import MPI + +if __name__ == "__main__": + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + + if rank == 0: + print(f"Cluster has {size} processes") + for i in range(1, size): + msg = "Main node says hi! 👋" + comm.send(msg, dest=i) + else: + msg = comm.recv() + print(f"👷 Worker node {rank} received message: {msg}") +``` + +Next, let's create a flow that launches a cluster of four nodes, thanks +to `num_parallel=4`, and runs the MPI script we defined above in the cluster, +launching two worker processes on each node. + +```python +from metaflow import FlowSpec, step, batch, mpi, current + +N_CPU = 2 +N_NODES = 4 + +class MPI4PyFlow(FlowSpec): + + @step + def start(self): + self.next(self.multinode, num_parallel=N_NODES) + + @batch(image="eddieob/mpi-base:1", cpu=N_CPU) + @mpi + @step + def multinode(self): + current.mpi.exec( + args=["-n", str(N_CPU * N_NODES), "--allow-run-as-root"], + program="python mpi_hello.py", + ) + self.next(self.join) + + @step + def join(self, inputs): + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == "__main__": + MPI4PyFlow() +``` + +To run the flow, make sure your AWS Batch environment is +[configured to support multinode +jobs](https://outerbounds.com/engineering/operations/distributed-computing/). Then, install +the `MPI` extension for Metaflow +``` +pip install metaflow-mpi +``` +and run the flow with +``` +python mpiflow.py run +``` + +The example uses an image, `eddieob/mpi-base`, defined in +[this Dockerfile](https://github.com/outerbounds/metaflow-mpi/blob/main/examples/Dockerfile). The image +includes MPI and `ssh` for communication. Note that Metaflow packages `mpi_hello.py` automatically, +so it doesn't have to be included in the image. + diff --git a/docs/scaling/remote-tasks/gpu-compute.md b/docs/scaling/remote-tasks/gpu-compute.md new file mode 100644 index 00000000..7fc518c7 --- /dev/null +++ b/docs/scaling/remote-tasks/gpu-compute.md @@ -0,0 +1,102 @@ + +# Using GPUs and Other Accelerators + +Metaflow enables access to hardware-accelerated computing, GPUs in particular, when +using [AWS Batch](aws-batch) or [Kubernetes](kubernetes). +You can leverage + +- Single accelerators - e.g. `@resources(gpu=1)` +- Single instances with multiple accelerators - e.g. `@resources(gpu=4)` +- [Multiple instances with multiple accelerators](distributed-computing) + +You can find many examples of how to use Metaflow to fine-tune LLMs and +other generative AI models, as well as how to train computer +vision and other deep learning models [in these articles](https://outerbounds.com/blog/?category=Foundation%20Models). + +## Using accelerators + +Before you can start taking advantage of hardware-accelerated steps, you need +to take care of two prerequisites: + +1. Add hardware-accelerated instances in [your Metaflow stack](/getting-started/infrastructure). +Take a look specific tips for [AWS Batch](aws-batch) and [Kubernetes](kubernetes). + +2. Configure your flow to [include necessary drivers and frameworks](installing-drivers-and-frameworks). + +After this, using the accelerators in straightforward as explained below. + +:::tip +Don't hesitate to reach out to [Metaflow Slack](http://chat.metaflow.org) if you need +help get started! +::: + +### GPUs + +To use GPUs in your compute environment, use [the +`@resources` decorator](requesting-resources) to get quick access to one or more GPUs +like in this example: + +```python +from metaflow import FlowSpec, step, resources + +class GPUFlow(FlowSpec): + + @resources(memory=32000, cpu=4, gpu=1) + @step + def start(self): + from my_script import my_gpu_routine + my_gpu_routine() + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + GPUFlow() +``` + +As usual with `@resources`, the decorator is ignored for local runs. This allows you to +develop the code locally, e.g. using GPU resources on your local workstation. To get access +to the requested resources in the cloud, run the flow `--with kubernetes` or `--with batch`. + +If you need more fine-grained control over what GPUs get used, use the decorators +specific to compute environment: For instance, [`@kubernetes` allows you to +specify a `gpu_vendor`](/api/step-decorators/kubernetes) and [`@batch` allows you to +specify a `queue`](/api/step-decorators/batch) targeting a compute environment containing +specific GPUs. For more information, see guidance for [AWS Batch](aws-batch) and [Kubernetes](kubernetes). + + +### Using AWS Trainium and Inferentia + +On AWS, you can use AWS-specific hardware accelerators, Trainium and Inferentia. +For more details, see [a blog post outlining them in the context of Metaflow](https://aws.amazon.com/blogs/machine-learning/develop-and-train-large-models-cost-efficiently-with-metaflow-and-aws-trainium/). + +When using AWS Batch, you can request the accelerators simply by defining the number +of Trainium or Inferentia cores in `@batch`: + +* `@batch(trainium=16)` +* `@batch(inferentia=16)` + +Note that Metaflow supports [distributed training](distributed-computing) over multiple +Trainium instances. For detailed instructions, visit +the [`metaflow-trainium` repository](https://github.com/outerbounds/metaflow-trainium/tree/main). + +:::note +Contact [Metaflow Slack](http://chat.metaflow.org) if you are interested in using Trainium +of Inferentia with `@kubernetes`. +::: + + +### Using Google's Tensor Processing Units (TPUs) + +Contact [Metaflow Slack](http://chat.metaflow.org) if you are interested in using TPUs with +Metaflow in the Google cloud. + +## Monitoring GPU utilization + +To monitor GPU devices and their utilization, add [a custom card +`@gpu_profile`](https://github.com/outerbounds/metaflow-gpu-profile) in your GPU steps. + +![](/assets/gpu_profile.png) + diff --git a/docs/scaling/remote-tasks/installing-drivers-and-frameworks.md b/docs/scaling/remote-tasks/installing-drivers-and-frameworks.md new file mode 100644 index 00000000..7c4f74e6 --- /dev/null +++ b/docs/scaling/remote-tasks/installing-drivers-and-frameworks.md @@ -0,0 +1,125 @@ + +# Installing Drivers and Frameworks + +Paradoxically, often the hardest part of using an hardware accelerator is to get all the +necessary software installed, such as CUDA drivers and platform-specific ML/AI frameworks. + +Metaflow allows you to [specify software dependencies as a part of the flow](/scaling/dependencies). +You can either use a Docker image with necessary dependenices included, or layer them on top of +a generic image on the fly using `@conda` or `@pypi` decorators. We cover both the approaches below. + +## Using a GPU-ready Docker image + +You can use the `image` argument in `@batch` and `@kubernetes` decorators to choose a suitable +image on the fly, like [an official `pytorch` image](https://hub.docker.com/r/pytorch/pytorch) +we use below: + +```python +from metaflow import FlowSpec, step, kubernetes + +class GPUImageFlow(FlowSpec): + + @kubernetes( + gpu=1, + image='pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime' + ) + @step + def start(self): + import torch # pylint: disable=import-error + if torch.cuda.is_available(): + print('Cuda found 🙌') + for d in range(torch.cuda.device_count()): + print(f"GPU device {d}:", torch.cuda.get_device_name(d)) + else: + print('No CUDA 😭') + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + GPUImageFlow() +``` + +If you want to avoid spec an image in the code, you can configure a default image in your [Metaflow +configuration file](https://outerbounds.com/engineering/operations/configure-metaflow/) through the +`METAFLOW_KUBERNETES_CONTAINER_IMAGE` and `METAFLOW_BATCH_CONTAINER_IMAGE` settings. + +Many GPU-ready images are available online, e.g. at: + +- [Nvidia's NVCR catalogs](https://catalog.ngc.nvidia.com/containers). +- [PyTorch DockerHub Registry](https://hub.docker.com/r/pytorch/pytorch). +- [TensorFlow DockerHub Registry](https://hub.docker.com/r/tensorflow/tensorflow). +- [AWS' registry of Docker images for deep + learning](https://github.com/aws/deep-learning-containers/blob/master/available_images.md) + +You can also [build a Docker image of your own](https://outerbounds.com/docs/build-custom-image/), +using a GPU-ready image as a base image. + +## Installing libraries with `@conda` and `@pypi` + +[The `@conda` and `@pypi` decorators](/scaling/dependencies/libraries) allow you to install +packages on the fly on top of a default image. This makes it easy to test different libraries +quickly without having to build custom images. + +The CUDA drivers are hosted at [NVIDIA's official Conda +channel](https://docs.nvidia.com/cuda/cuda-installation-guide-linux/index.html#conda-installation). +Run this command once to include the channel in your environment: +```bash +conda config --add channels nvidia +``` + +After this, you can install PyTorch and other CUDA-enabled libraries with `@conda` and +`@conda_base` as usual. Try this: + +```python +from metaflow import FlowSpec, step, resources, conda_base + +@conda_base( + libraries={ + "pytorch::pytorch": "2.0.1", + "pytorch::pytorch-cuda": "11.8" + }, + python="3.9" +) +class GPUCondaFlow(FlowSpec): + + @resources(gpu=1) + @step + def start(self): + import torch # pylint: disable=import-error + if torch.cuda.is_available(): + print('Cuda found 🙌') + for d in range(torch.cuda.device_count()): + print(f"GPU device {d}:", torch.cuda.get_device_name(d)) + else: + print('No CUDA 😭') + self.next(self.end) + + @step + def end(self): + pass + +if __name__ == '__main__': + GPUCondaFlow() +``` +Run the flow as +``` +python gpuconda.py run --with batch +``` +or `--with kubernetes`. When you run the flow for the first time, it will create an +execution environment and cache it, which will take a few minutes. Subsequent runs will +start faster. + +:::note +If you run workflows from a machine with a different operating system +than where remote tasks run, for example launching Metaflow runs that have remote +`@kubernetes` tasks from a Mac, some available dependencies and versions may not be +the same for each operating system. In this case, you can go to +the [conda-forge website](https://conda-forge.org/feedstock-outputs/) and find +which package versions are available across each platform. +::: + + + diff --git a/docs/scaling/remote-tasks/introduction.md b/docs/scaling/remote-tasks/introduction.md index f7c9c280..40c4377d 100644 --- a/docs/scaling/remote-tasks/introduction.md +++ b/docs/scaling/remote-tasks/introduction.md @@ -1,252 +1,136 @@ -# Executing Tasks Remotely - -There are two ways to handle larger amounts of data and compute: - -1. _Scale up_ by running your code on a larger machine with more memory, CPU cores, and - GPUs, or -2. _Scale out_ by using more machines in parallel. - -As described below, Metaflow supports the former through the `@resources` decorator and -the latter through [foreach](/metaflow/basics#foreach) when flows are run on Kubernetes -or AWS Batch. - -Everything described on this page applies to all compute platforms supported by -Metaflow. The data scientist can write their flows using foreaches and the `@resource` -decorator knowing that the code will execute on any supported platforms. For additional -tips and tricks related to specific systems, see [Using AWS Batch](aws-batch) and [Using -Kubernetes](kubernetes). - -## Requesting resources with `resources` decorator - -Consider the following example: - -```python -from metaflow import FlowSpec, step, resources - -class BigSum(FlowSpec): - - @resources(memory=60000, cpu=1) - @step - def start(self): - import numpy - import time - big_matrix = numpy.random.ranf((80000, 80000)) - t = time.time() - self.sum = numpy.sum(big_matrix) - self.took = time.time() - t - self.next(self.end) - - @step - def end(self): - print("The sum is %f." % self.sum) - print("Computing it took %dms." % (self.took * 1000)) - -if __name__ == '__main__': - BigSum() +# Computing at Scale + +Metaflow makes it easy to run compute in the cloud. Instead of prescribing one paradigm for all +compute needs, Metaflow allows you to mix and match various patterns of scalable compute, +keeping simple things simple while making advanced use cases possible. + +When your needs are modest, you can +run Metaflow as any Python code, such as a notebook or a local script. When you need more +compute power, say to train a model on GPUs or to handle a large dataframe, you can get the +job done by adding a line code. Or, you can execute even thousands of such tasks in parallel +or train a large model, such as an LLM, over many GPUs. + +Below, we provide an overview of the patterns of compute that Metaflow supports with pointers +for more details. Importantly, you can mix and match these patterns freely, even in a single +flow. + +:::note +To enable the cloud computing capabilities of Metaflow - `@batch` and `@kubernetes` - you need to +[deploy a Metaflow stack](/getting-started/infrastructure) first. To test these concepts +before deploying, [try the Metaflow Sandbox](https://outerbounds.com/sandbox/). +::: + +```mdx-code-block +import ReactPlayer from 'react-player'; ``` -This example creates a huge 80000x80000 random matrix, `big_matrix`. The matrix requires -about 80000^2 \* 8 bytes = 48GB of memory. - -If you attempt to run this on your local machine, it is likely that the following will -happen: - -```bash -$ python BigSum.py run - -2019-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "BugSum.py", line 11, in start -2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] big_matrix = numpy.random.ranf((80000, 80000)) -2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 856, in mtrand.RandomState.random_sample -2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 167, in mtrand.cont0_array -2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] MemoryError -2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] -2018-11-29 02:43:39.844 [5/start/21975 (pid 83812)] Task failed. -2018-11-29 02:43:39.844 Workflow failed. - Step failure: - Step start (task-id 21975) failed. -``` +## Rapid development with local execution -This fails quickly due to a `MemoryError` on most laptops as we are unable to allocate -48GB of memory. +When you run a flow without special decorators, e.g. +[run `LinearFlow` by typing `python linear.py run`](/metaflow/basics#linear), +the flow runs locally on your computer like any Python script or a notebook. -The `@resources` decorator suggests resource requirements for a step. The `memory` -argument specifies the amount of RAM in megabytes and `cpu` the number of CPU cores -requested. It does not produce the resources magically, which is why the run above -failed. The `@resources` decorator takes effect only when combined with another -decorator that describes what compute platform, like Kubernetes or AWS Batch, to use. + -Let's use the `--with` option to attach a desired decorator to all steps on the command -line. Choose one of the commands in the tabs below corresponding to whichever you use- -Kubernetes or AWS Batch. This assumes that you have [configured one of these systems -work with Metaflow](/getting-started/infrastructure). +This allows you to develop and test code rapidly without having to rely on any infrastructure +outside your workstation. -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; +Running your code as a flow can provide an immediate performance benefit: If your flow has +[branches](/metaflow/basics#branch) or [foreaches](/metaflow/basics#foreach), +Metaflow leverages multiple CPU cores to speed up compute by running parallel tasks as separate +processes. In addition to Metaflow parallelizing tasks, you can speed up compute by using +optimized Python libraries such as [PyTorch](https://pytorch.org/) to leverage GPUs or a library +like +[XGBoost](https://xgboost.readthedocs.io/en/stable/) that can utilize multiple CPU cores. - - +### Parallelizing Python over multiple CPU cores -```batch -$ python BigSum.py run --with kubernetes -``` +If you need to execute a medium amount of compute - too much to handle in sequential Python +code but not enough to warrant parallel tasks using `foreach` - [Metaflow provides a helper function, +`parallel_map`](multicore) that parallelizes execution of a Python function over multiple CPU cores. +For instance, you can use `parallel_map` to process a list of 10M items in batches of 1M items +in parallel. - +## Requesting compute `@resources` - +If your job requires more resources than what is available on your workstation, e.g. more +memory or more GPUs, Metaflow makes it easy to run the task remotely on a cloud instance: +[Simply annotate the step with the `@resources` decorator](requesting-resources). -```k8s -$ python BigSum.py run --with batch -``` + - - +In this case, Metaflow executes the task remotely in the cloud using [one of the supported compute +backends](/getting-started/infrastructure), AWS Batch or Kubernetes. -The `--with batch` or `--with kubernetes` option instructs Metaflow to run all tasks as -separate jobs on the chosen compute platform, instead of using a local process for each -task. It has the same effect as adding the decorator above all steps in the source code. +This is often the easiest way +to scale up compute to handle larger datasets or models. It is like getting a bigger computer with +a line of code. While larger cloud instances cost more, they are only needed for as long as a +`@step` executes, so this approach can be cost-effective as well. This manner of scaling is called +[*vertical scaling*](https://en.wikipedia.org/wiki/Scalability#Vertical_or_scale_up). -This time the run should succeed thanks to the large enough instance, assuming a large -enough instance is available in your compute environment. In this case the `resources` -decorator is used as a prescription for the size of the instance that the job should run -on. Make sure that this resource requirement can be met. If a large enough instance is -not available, the task won't start executing. +### Requesting GPUs and other hardware accelerators -You should see an output like this: +ML/AI workloads often require hardware acceleration such as GPUs. Learn more on [a dedicated +page about hardware-accelerated compute](gpu-compute). -```bash -The sum is 3200003911.795288. -Computing it took 4497ms. -``` +## Executing steps in parallel -In addition to `cpu` and `memory` you can specify `gpu=N` to request N GPUs for the -instance. +If you want to execute two or more `@step`s in parallel, [make them `branch`](/metaflow/basics#branch). -### Running only specific steps remotely + -The `resources` decorator is an annotation that signals how much resources are required -by a step. By itself, it does not force the step to be executed on any particular -platform. This is convenient as you can make the choice later, executing the same flow -on different environments without changes. +When you run a flow with branches locally, each `@step` is run in a process of its own, taking +advantage of multiple CPU cores in your workstation to speed up processing. When you [execute the +flow (or some of its steps) remotely](requesting-resources), each `@step` is +run in a separate container, allowing you to run even thousands of steps in parallel. -Sometimes it is useful to make sure that a step always executes on a certain compute -platform, maybe using a platform-specific configuration. You can achieve this by adding -either `@batch` or `@kubernetes` above steps that should be executed remotely. The -decorators accept the same keyword arguments as `@resources` as well as -platform-specific arguments that you can find listed in [the API -reference](/api/step-decorators). +Branches come in handy in two scenarios: -For instance, in the example above, replace `@resources` with `@batch` or `@kubernetes` -and run it as follows: +1. You have separate operations that can be executed independently. -```bash -$ python BigSum.py run -``` +2. You want to allocate separate `@resources` (or other decorators) for different sets of data, e.g. + to build a small model with CPUs and a large one with GPUs. Just create branches, each with their + own set of decorators. -You will see that the `start` step gets executed on a remote instance but the `end` -step, which does not need special resources, is executed locally. You could even mix -decorators so that some steps execute on `@kubernetes`, some on `@batch`, and some -locally. - -### Parallelization over multiple cores - -When running locally, tasks are executed as separate processes. The operating system -takes care of allocating them to separate CPU cores, so they will actually execute in -parallel assuming that enough CPU cores are available. Hence, your flow can utilize -multiple cores without you having to do anything special besides defining branches in -the flow. - -When running remotely on `@batch` or `@kubernetes`, branches are mapped to separate jobs -that are executed in parallel, allowing you to *scale horizontally* to any number of -parallel tasks. In addition, you may take advantage of multiple CPU cores inside a task. -This may happen automatically if you use a modern ML library like PyTorch or Scikit -Learn, or you may parallelize functions explicitly, as explained below. - -#### Parallel map - -Metaflow provides a utility function called `parallel_map` that helps take advantage of -multiple CPU cores. This function is almost equivalent to `Pool().map` in the Python's -built-in -[multiprocessing](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map) -library. The main differences are the following: - -* `parallel_map` supports lambdas and any other callables of Python. -* `parallel_map` does not suffer from bugs present in `multiprocessing`. -* `parallel_map` can handle larger amounts of data. - -You may also use `parallel_map` to parallelize simple operations that might be too -cumbersome to implement as separate steps. - -Here is an extension of our previous example that implements a multicore `sum()` by -partitioning the matrix by row: - -```python -from metaflow import FlowSpec, step, batch, parallel_map - -class BigSum(FlowSpec): - - @resources(memory=60000, cpu=8) - @step - def start(self): - import numpy - import time - big_matrix = numpy.random.ranf((80000, 80000)) - t = time.time() - parts = parallel_map(lambda i: big_matrix[i:i+10000].sum(), - range(0, 80000, 10000)) - self.sum = sum(parts) - self.took = time.time() - t - self.next(self.end) - - @step - def end(self): - print("The sum is %f." % self.sum) - print("Computing it took %dms." % (self.took * 1000)) - -if __name__ == '__main__': - BigSum() -``` +## Running many tasks in parallel with `foreach` -Note that we use `cpu=8` to request enough CPU cores, so our `parallel_map` can benefit -from optimal parallelism. Disappointingly, in this case the parallel `sum` is not faster -than the original simple implementation due to the overhead of launching separate -processes in `parallel_map`. A less trivial operation might see a much larger -performance boost. +A very common scenario in ML, AI, and data processing is to run the same operation, e.g. data +transformation or model training, for each shard of data or a set of parameters. -## **Safeguard flags** + -It is almost too easy to execute tasks remotely using Metaflow. Consider a foreach loop -defined as follows: +Metaflow's [`foreach`](/metaflow/basics#foreach) is similar to +[Python's built-in `map` function](https://realpython.com/python-map-function/) which allows +you to apply a function - or in the case of Metaflow, a `@step` - to all elements in a list. -```python -self.params = range(1000) -self.next(self.fanned_out, foreach='params') -``` +The difference to `branch` is that `foreach` applies **the same operation** to all elements, +utilizing [*data parallelism*](https://en.wikipedia.org/wiki/Data_parallelism), whereas `branch` +applies **a different operation** to each, utilizing +[*task parallelism*](https://en.wikipedia.org/wiki/Task_parallelism). -When run with `--with batch` or `--with kubernetes`, this code would launch up to 1000 -parallel instances which may turn out to be quite expensive. +The superpower of Metaflow is that you can [run these tasks in parallel](requesting-resources), +processing even thousands of items concurrently in the cloud. Hence you can use foreaches to +process large datasets, train many models, or run hyperparameter searches in parallel, that is, +execute any [*embarrassingly parallel*](https://en.wikipedia.org/wiki/Embarrassingly_parallel) +operations that can benefit from +[*horizontal scaling*](https://en.wikipedia.org/wiki/Scalability#Horizontal_or_scale_out). -To safeguard against inadvertent launching of many parallel jobs, the `run` and `resume` -commands have a flag `--max-num-splits` which fails the task if it attempts to launch -more than 100 splits by default. Use the flag to increase the limit if you actually need -more tasks. +### Options for controlling parallelism -```bash -$ python myflow.py run --max-num-splits 200 -``` +Note that regardless of the size of your list to `foreach` over, you can control the number +of tasks actually run in parallel with [the `--max-workers` flag](#). Also you will want to +[increase `--max-num-splits` when you list is long](#). -Another flag, `--max-workers`, limits the number of tasks run in parallel. Even if a -foreach launched 100 splits, `--max-workers` would make only 16 \(by default\) of them -run in parallel at any point in time. If you want more parallelism, increase the value -of `--max-workers`. +## Distributed computing with ephemeral compute clusters -```bash -$ python myflow.py run --max-workers 32 -``` +The most advanced pattern of compute that Metaflow supports is distributed computing. In +this case, Metaflow sets up a cluster of instances on the fly which can communicate with +each other, e.g. to train a Large Language Model (LLM) over many GPU instances. -## Big Data + -Thus far, we have focused on CPU and memory-bound steps. Loading and processing big data -is often an IO-bound operation which requires a different approach. Read [Loading and -Storing Data](/scaling/data) for more details about how to build efficient data -pipelines in Metaflow. \ No newline at end of file +While there are many other ways to set up such clusters, a major benefit of Metaflow is +that you can *embed an ephemeral cluster* as a part of a larger workflow, instead of having +to maintain the cluster separately. Learn more on [a dedicated page about distributed +computing](distributed-computing). diff --git a/docs/scaling/remote-tasks/kubernetes.md b/docs/scaling/remote-tasks/kubernetes.md index 9f69cd94..e2132e86 100644 --- a/docs/scaling/remote-tasks/kubernetes.md +++ b/docs/scaling/remote-tasks/kubernetes.md @@ -35,6 +35,47 @@ You can also specify the resource requirements on command line as well: $ python BigSum.py run --with kubernetes:cpu=4,memory=10000,namespace=foo,image=ubuntu:latest ``` +## How to configure GPUs for Kubernetes? + +Metaflow compute tasks can run on any Kubernetes cluster. For starters, take a +look at the [Kubernetes documentation on Scheduling +GPUs](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/). The guide explains how to +install [Kubernetes Device +Plugins](https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/) +so your cluster exposes a custom schedulable resource such as `amd.com/gpu` or `nvidia.com/gpu`, +which Metaflow’s Kubernetes resources integration is already configured to call when a user +specifies a decorator like `@kubernetes(gpu=1)`. + +For additional information, take a look at cloud-specific documentation: + +- **Amazon Web Services EKS** +Amazon has prepared the [EKS-optimized accelerated Amazon Linux +AMIs](https://docs.aws.amazon.com/eks/latest/userguide/eks-optimized-ami.html#gpu-ami). Read the +linked guide to install the hardware dependencies and choose the AMI you want to run on your GPU +node group. You will need to make the suggested modifications to the [Kubernetes cluster deployed +as part of your Metaflow AWS +deployment](https://github.com/outerbounds/terraform-aws-metaflow/blob/master/examples/eks_argo/eks.tf). + +- **Google Cloud Platform GKE** +Read GCP’s guide about [GPUs on GKE](https://cloud.google.com/kubernetes-engine/docs/concepts/gpus). +You will need to make the suggested modifications to the [Kubernetes cluster deployed as part of +your Metaflow GCP +deployment](https://github.com/outerbounds/metaflow-tools/blob/master/gcp/terraform/infra/kubernetes.tf). + +- **Microsoft Azure AKS** +Read Azure’s guide about [GPUs on AKS](https://learn.microsoft.com/en-us/azure/aks/gpu-cluster). +You will need to make the suggested modifications to the [Kubernetes cluster deployed as part of your +Metaflow Azure +deployment](https://github.com/outerbounds/metaflow-tools/blob/master/azure/terraform/infra/kubernetes.tf). + +Reach out to [Metaflow Slack channel](http://chat.metaflow.org) if you need help setting up a cluster. + +## A `@kubernetes` task has been stuck in `PENDING` forever. What should I do? + +Are the resources requested in your Metaflow code/command sufficient? Especially when +using custom GPU images, you might need to increase the requested memory to pull the +container image into your compute environment. + ## Accessing Kubernetes logs As a convenience feature, you can also see the logs of any past step as follows: diff --git a/docs/scaling/remote-tasks/multicore.md b/docs/scaling/remote-tasks/multicore.md new file mode 100644 index 00000000..2ff9f1dc --- /dev/null +++ b/docs/scaling/remote-tasks/multicore.md @@ -0,0 +1,66 @@ + + +# Using Multiple CPU Cores + +When running locally, tasks are executed as separate processes. The operating system +takes care of allocating them to separate CPU cores, so they will actually execute in +parallel assuming that enough CPU cores are available. Hence, your flow can utilize +multiple cores without you having to do anything special besides defining branches in +the flow. + +When running remotely on `@batch` or `@kubernetes`, branches are mapped to separate jobs +that are executed in parallel, allowing you to *scale horizontally* to any number of +parallel tasks. In addition, you may take advantage of multiple CPU cores inside a task. +This may happen automatically if you use a modern ML library like PyTorch or Scikit +Learn, or you may parallelize functions explicitly, as explained below. + +## Mapping items in parallel + +Metaflow provides a utility function called `parallel_map` that helps take advantage of +multiple CPU cores. This function is almost equivalent to `Pool().map` in the Python's +built-in +[multiprocessing](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map) +library. The main differences are the following: + +* `parallel_map` supports lambdas and any other callables of Python. +* `parallel_map` does not suffer from bugs present in `multiprocessing`. +* `parallel_map` can handle larger amounts of data. + +You may also use `parallel_map` to parallelize simple operations that might be too +cumbersome to implement as separate steps. + +Here is an extension of our previous example that implements a multicore `sum()` by +partitioning the matrix by row: + +```python +from metaflow import FlowSpec, step, batch, parallel_map + +class BigSum(FlowSpec): + + @resources(memory=60000, cpu=8) + @step + def start(self): + import numpy + import time + big_matrix = numpy.random.ranf((80000, 80000)) + t = time.time() + parts = parallel_map(lambda i: big_matrix[i:i+10000].sum(), + range(0, 80000, 10000)) + self.sum = sum(parts) + self.took = time.time() - t + self.next(self.end) + + @step + def end(self): + print("The sum is %f." % self.sum) + print("Computing it took %dms." % (self.took * 1000)) + +if __name__ == '__main__': + BigSum() +``` + +Note that we use `cpu=8` to request enough CPU cores, so our `parallel_map` can benefit +from optimal parallelism. Disappointingly, in this case the parallel `sum` is not faster +than the original simple implementation due to the overhead of launching separate +processes in `parallel_map`. A less trivial operation might see a much larger +performance boost. \ No newline at end of file diff --git a/docs/scaling/remote-tasks/requesting-resources.md b/docs/scaling/remote-tasks/requesting-resources.md new file mode 100644 index 00000000..079bacd1 --- /dev/null +++ b/docs/scaling/remote-tasks/requesting-resources.md @@ -0,0 +1,174 @@ + +# Requesting Compute Resources + +You can run any Metaflow flow in the cloud simply by adding an option on the command line: + + + + +```batch +$ python hello.py run --with kubernetes +``` + + + + + +```k8s +$ python hello.py run --with batch +``` + + + + +When you add `--with kubernetes` (for Kubernetes) or `--with batch` (for AWS Batch) on the +command line ([depending on your deployment](/getting-started/infrastructure)), Metaflow +runs the flow on the chosen compute backend. + +Every step gets allocated a modest amount of resources by default - around 1 CPU core and 4GB of +RAM. If your step needs more CPU cores, memory, disk, or [more GPUs (or other hardware +accelerators)](gpu-compute), annotate your resource requirements with the +[`@resources`](/api/step-decorators/resources) decorator. + +Another benefit of `@resources` is that it allows you to move smoothly between local +development and the cloud. The decorator doesn't have an effect for local runs, but when +combined with `--with kubernetes` or `--with batch`, you can use the flow to handle bigger +models or more data without changing anything in the code. Note that +[production deployments](/production/introduction) always run in the cloud, respecting +`@resources` requirements. + +:::note +Note that `@kubernetes` can target any Kubernetes cluster, including on-premise clusters. +For brevity, we use the term *the cloud* to refer to all compute backends. +::: + +## Example + +Consider the following example: + +```python +from metaflow import FlowSpec, step, resources + +class BigSum(FlowSpec): + + @resources(memory=60000, cpu=1) + @step + def start(self): + import numpy + import time + big_matrix = numpy.random.ranf((80000, 80000)) + t = time.time() + self.sum = numpy.sum(big_matrix) + self.took = time.time() - t + self.next(self.end) + + @step + def end(self): + print("The sum is %f." % self.sum) + print("Computing it took %dms." % (self.took * 1000)) + +if __name__ == '__main__': + BigSum() +``` + +This example creates a huge 80000x80000 random matrix, `big_matrix`. The matrix requires +about 80000^2 \* 8 bytes = 48GB of memory. + +If you attempt to run this on your local machine, it is likely that the following will +happen: + +```bash +$ python BigSum.py run + +2019-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "BugSum.py", line 11, in start +2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] big_matrix = numpy.random.ranf((80000, 80000)) +2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 856, in mtrand.RandomState.random_sample +2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 167, in mtrand.cont0_array +2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] MemoryError +2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] +2018-11-29 02:43:39.844 [5/start/21975 (pid 83812)] Task failed. +2018-11-29 02:43:39.844 Workflow failed. + Step failure: + Step start (task-id 21975) failed. +``` + +This fails quickly due to a `MemoryError` on most laptops as we are unable to allocate +48GB of memory. + +The `@resources` decorator suggests resource requirements for a step. The `memory` +argument specifies the amount of RAM in megabytes and `cpu` the number of CPU cores +requested. It does not produce the resources magically, which is why the run above +failed. The `@resources` decorator takes effect only when combined with another +decorator that describes what compute platform, like Kubernetes or AWS Batch, to use. + +Let's use the `--with` option to attach a desired decorator to all steps on the command +line. Choose one of the commands in the tabs below corresponding to whichever you use- +Kubernetes or AWS Batch. This assumes that you have [configured one of these systems +work with Metaflow](/getting-started/infrastructure). + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + + + +```batch +$ python BigSum.py run --with kubernetes +``` + + + + + +```k8s +$ python BigSum.py run --with batch +``` + + + + +The `--with batch` or `--with kubernetes` option instructs Metaflow to run all tasks as +separate jobs on the chosen compute platform, instead of using a local process for each +task. It has the same effect as adding the decorator above all steps in the source code. + +This time the run should succeed thanks to the large enough instance, assuming a large +enough instance is available in your compute environment. In this case the `resources` +decorator is used as a prescription for the size of the instance that the job should run +on. Make sure that this resource requirement can be met. If a large enough instance is +not available, the task won't start executing. + +You should see an output like this: + +```bash +The sum is 3200003911.795288. +Computing it took 4497ms. +``` + +In addition to `cpu` and `memory` you can specify `gpu=N` to request N GPUs for the +instance. + +### Running only specific steps remotely + +The `resources` decorator is an annotation that signals how much resources are required +by a step. By itself, it does not force the step to be executed on any particular +platform. This is convenient as you can make the choice later, executing the same flow +on different environments without changes. + +Sometimes it is useful to make sure that a step always executes on a certain compute +platform, maybe using a platform-specific configuration. You can achieve this by adding +either `@batch` or `@kubernetes` above steps that should be executed remotely. The +decorators accept the same keyword arguments as `@resources` as well as +platform-specific arguments that you can find listed in [the API +reference](/api/step-decorators). + +For instance, in the example above, replace `@resources` with `@batch` or `@kubernetes` +and run it as follows: + +```bash +$ python BigSum.py run +``` + +You will see that the `start` step gets executed on a remote instance but the `end` +step, which does not need special resources, is executed locally. You could even mix +decorators so that some steps execute on `@kubernetes`, some on `@batch`, and some +locally. diff --git a/sidebars.js b/sidebars.js index 7c1f9347..fef7e7da 100644 --- a/sidebars.js +++ b/sidebars.js @@ -108,12 +108,18 @@ const sidebars = { items: [ { type: "category", - label: "Executing Tasks Remotely", + label: "Computing at Scale", link: { type: "doc", id: "scaling/remote-tasks/introduction", }, items: [ + "scaling/remote-tasks/requesting-resources", + "scaling/remote-tasks/multicore", + "scaling/remote-tasks/controlling-parallelism", + "scaling/remote-tasks/gpu-compute", + "scaling/remote-tasks/installing-drivers-and-frameworks", + "scaling/remote-tasks/distributed-computing", "scaling/remote-tasks/kubernetes", "scaling/remote-tasks/aws-batch", ], diff --git a/static/assets/compute1.mp4 b/static/assets/compute1.mp4 new file mode 100644 index 00000000..626f464d Binary files /dev/null and b/static/assets/compute1.mp4 differ diff --git a/static/assets/compute2.mp4 b/static/assets/compute2.mp4 new file mode 100644 index 00000000..f3372825 Binary files /dev/null and b/static/assets/compute2.mp4 differ diff --git a/static/assets/compute3.mp4 b/static/assets/compute3.mp4 new file mode 100644 index 00000000..1b90080d Binary files /dev/null and b/static/assets/compute3.mp4 differ diff --git a/static/assets/compute4.mp4 b/static/assets/compute4.mp4 new file mode 100644 index 00000000..ca60d37c Binary files /dev/null and b/static/assets/compute4.mp4 differ diff --git a/static/assets/compute5.mp4 b/static/assets/compute5.mp4 new file mode 100644 index 00000000..6f00d94b Binary files /dev/null and b/static/assets/compute5.mp4 differ diff --git a/static/assets/foreach-job.mp4 b/static/assets/foreach-job.mp4 new file mode 100644 index 00000000..ca60d37c Binary files /dev/null and b/static/assets/foreach-job.mp4 differ diff --git a/static/assets/gpu_profile.png b/static/assets/gpu_profile.png new file mode 100644 index 00000000..fc8d2ecd Binary files /dev/null and b/static/assets/gpu_profile.png differ diff --git a/static/assets/parallel-job.mp4 b/static/assets/parallel-job.mp4 new file mode 100644 index 00000000..6f00d94b Binary files /dev/null and b/static/assets/parallel-job.mp4 differ