Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DaggerMPI subpackage for MPI integrations #356

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jpsamaroo
Copy link
Member

Building on Dagger's unified hashing framework, DaggerMPI.jl allows DAGs to execute efficiently under an MPI cluster. Per-task hashes are used to "color" the DAG, disabling execution of each task on all but one MPI worker. Data movement is typically peer-to-peer using MPI Send and Recv, and is coordinated by using tags computed from the same coloring scheme. This scheme allows Dagger's scheduler to remain unmodified and unaware of the existence of an MPI cluster, while still providing "exactly once" execution semantics for each task in the DAG.

Implements a "semantic" hashing algorithm which hashes Thunks based on
the functional behavior of the code being executed. The intention is to
have a hash which has an identical value across different Julia sessions
for tasks which compute the same value. This is important for
implementing a "headless" worker-worker cluster, where there is no
coordinating head worker, and all workers can see the entire
computational program.

Hashes are computed automatically and can be queried with
`get_task_hash()` while running in a task context, or directly as
`get_task_hash(task)` for any Dagger task type. Hashes are also provided
within `Dagger.move` calls, where the input task's hash is also available.
Building on Dagger's unified hashing framework, DaggerMPI.jl allows DAGs
to execute efficiently under an MPI cluster. Per-task hashes are used to
"color" the DAG, disabling execution of each task on all but one MPI
worker. Data movement is typically peer-to-peer using MPI Send and Recv,
and is coordinated by using tags computed from the same coloring scheme.
This scheme allows Dagger's scheduler to remain unmodified and unaware
of the existence of an MPI cluster, while still providing "exactly once"
execution semantics for each task in the DAG.
@jpsamaroo
Copy link
Member Author

This PR is mostly good-to-go, however there is one aspect I'm not happy about: all nodes must spawn all the same tasks in the same order, or else we get a hang. This is currently necessary because we do one-sided "blind" sends and receives, assuming that the counterpart will be posted.

A partial fix for this problem might involve asynchronously exchanging task hashes between nodes, and when we find a task hash that hasn't been registered on a given node, we "vote" to assign it to a node which does have it (and ensure all nodes are aware of that decision for downstream tasks which depend on that task's result). We can initiate this vote from any node which has tasks that are stalled waiting on data (send or receive); it's basically a more active way to ensure that the data becomes available, by ensuring that some node will eventually post/consume the result.

That fix is really only a workaround; a more complete solution might involve providing a way to inform the cluster that there is conditional logic around a task spawn point, and using that as a cue to initiate an early vote, or let the user explicitly select which node(s) to consider.

@jpsamaroo jpsamaroo marked this pull request as draft July 30, 2024 19:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants