Skip to content

Commit

Permalink
Add AsyncioRunnable (#411)
Browse files Browse the repository at this point in the history
Moves the CoroutineRunnable from Morpheus' Sherlock feature branch to MRC and renames it to AsyncioRunnable as it is heavily dependent on asyncio. Adjustments were made such that the Scheduler would no longer own a task container and/or tasks, leaving the scheduler interface simpler. Instead, the runnable is responsible for the lifetime of the tasks it creates. This leaves the scheduler with a single responsibility.

Much of the code could be moved to MRC proper from PyMRC, but it's not immediately obvious where the code should live or whether it would be reused, so keeping it colocated with the AsyncioRunnable makes the most sense for now, imo.

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Devin Robison (https://github.com/drobison00)

URL: #411
  • Loading branch information
cwharris authored Nov 2, 2023
1 parent 1ebd4e2 commit 62e1834
Show file tree
Hide file tree
Showing 10 changed files with 929 additions and 174 deletions.
2 changes: 1 addition & 1 deletion cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ add_library(libmrc
src/public/core/logging.cpp
src/public/core/thread.cpp
src/public/coroutines/event.cpp
src/public/coroutines/scheduler.cpp
src/public/coroutines/sync_wait.cpp
src/public/coroutines/task_container.cpp
src/public/coroutines/thread_local_context.cpp
Expand All @@ -124,6 +123,7 @@ add_library(libmrc
src/public/cuda/sync.cpp
src/public/edge/edge_adapter_registry.cpp
src/public/edge/edge_builder.cpp
src/public/exceptions/exception_catcher.cpp
src/public/manifold/manifold.cpp
src/public/memory/buffer_view.cpp
src/public/memory/codable/buffer.cpp
Expand Down
91 changes: 6 additions & 85 deletions cpp/mrc/include/mrc/coroutines/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,109 +25,30 @@
#include <mutex>
#include <string>

// IWYU thinks this is needed, but it's not
// IWYU pragma: no_include "mrc/coroutines/task_container.hpp"

namespace mrc::coroutines {

class TaskContainer; // IWYU pragma: keep

/**
* @brief Scheduler base class
*
* Allows all schedulers to be discovered via the mrc::this_thread::current_scheduler()
*/
class Scheduler : public std::enable_shared_from_this<Scheduler>
{
public:
struct Operation
{
Operation(Scheduler& scheduler);

constexpr static auto await_ready() noexcept -> bool
{
return false;
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept;

constexpr static auto await_resume() noexcept -> void {}

Scheduler& m_scheduler;
std::coroutine_handle<> m_awaiting_coroutine;
Operation* m_next{nullptr};
};

Scheduler();
virtual ~Scheduler() = default;

/**
* @brief Description of Scheduler
*/
virtual std::string description() const = 0;

/**
* Schedules the currently executing coroutine to be run on this thread pool. This must be
* called from within the coroutines function body to schedule the coroutine on the thread pool.
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
* @return The operation to switch from the calling scheduling thread to the executor thread
* pool thread.
*/
[[nodiscard]] virtual auto schedule() -> Operation;

// Enqueues a message without waiting for it. Must return void since the caller will not get the return value
virtual void schedule(Task<void>&& task);

/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
virtual auto resume(std::coroutine_handle<> coroutine) -> void = 0;

/**
* Yields the current task to the end of the queue of waiting tasks.
*/
[[nodiscard]] auto yield() -> Operation;

/**
* If the calling thread controlled by a Scheduler, return a pointer to the Scheduler
* @brief Resumes a coroutine according to the scheduler's implementation.
*/
static auto from_current_thread() noexcept -> Scheduler*;
virtual void resume(std::coroutine_handle<> handle) noexcept = 0;

/**
* If the calling thread is owned by a thread_pool, return the thread index (rank) of the current thread with
* respect the threads in the pool; otherwise, return the std::hash of std::this_thread::get_id
* @brief Suspends the current function and resumes it according to the scheduler's implementation.
*/
static auto get_thread_id() noexcept -> std::size_t;
[[nodiscard]] virtual Task<> schedule() = 0;

protected:
virtual auto on_thread_start(std::size_t) -> void;

/**
* @brief Get the task container object
*
* @return TaskContainer&
*/
TaskContainer& get_task_container() const;

private:
/**
* @brief When co_await schedule() is called, this function will be executed by the awaiter. Each scheduler
* implementation should determine how and when to execute the operation.
*
* @param operation The schedule() awaitable pointer
* @return std::coroutine_handle<> Return a coroutine handle to which will be
* used as the return value for await_suspend().
* @brief Suspends the current function and resumes it according to the scheduler's implementation.
*/
virtual std::coroutine_handle<> schedule_operation(Operation* operation) = 0;

mutable std::mutex m_mutex;

// Maintains the lifetime of fire-and-forget tasks scheduled with schedule(Task<void>&& task)
std::unique_ptr<TaskContainer> m_task_container;

thread_local static Scheduler* m_thread_local_scheduler;
thread_local static std::size_t m_thread_id;
[[nodiscard]] virtual Task<> yield() = 0;
};

} // namespace mrc::coroutines
53 changes: 53 additions & 0 deletions cpp/mrc/include/mrc/exceptions/exception_catcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <exception>
#include <mutex>
#include <queue>

#pragma once

namespace mrc {

/**
* @brief A utility for catching out-of-stack exceptions in a thread-safe manner such that they
* can be checked and throw from a parent thread.
*/
class ExceptionCatcher
{
public:
/**
* @brief "catches" an exception to the catcher
*/
void push_exception(std::exception_ptr ex);

/**
* @brief checks to see if any exceptions have been "caught" by the catcher.
*/
bool has_exception();

/**
* @brief rethrows the next exception (in the order in which it was "caught").
*/
void rethrow_next_exception();

private:
std::mutex m_mutex{};
std::queue<std::exception_ptr> m_exceptions{};
};

} // namespace mrc
85 changes: 0 additions & 85 deletions cpp/mrc/src/public/coroutines/scheduler.cpp

This file was deleted.

50 changes: 50 additions & 0 deletions cpp/mrc/src/public/exceptions/exception_catcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <mrc/exceptions/exception_catcher.hpp>

namespace mrc {

void ExceptionCatcher::push_exception(std::exception_ptr ex)
{
auto lock = std::lock_guard(m_mutex);
m_exceptions.push(ex);
}

bool ExceptionCatcher::has_exception()
{
auto lock = std::lock_guard(m_mutex);
return not m_exceptions.empty();
}

void ExceptionCatcher::rethrow_next_exception()
{
auto lock = std::lock_guard(m_mutex);

if (m_exceptions.empty())
{
return;
}

auto ex = m_exceptions.front();

m_exceptions.pop();

std::rethrow_exception(ex);
}

} // namespace mrc
Loading

0 comments on commit 62e1834

Please sign in to comment.