Skip to content

Commit

Permalink
Add Router nodes to support single input, multi output routing. (#502)
Browse files Browse the repository at this point in the history
Overview of new classes added in this PR:

- `DynamicRouterBase`
  - Sink
    - `WritableProvider`, `ReadableAcceptor`, `SinkChannelOwner`
  - Source
    - `MultiWritableAcceptor`, `MultiReadableProvider`, `MultiSourceChannelOwner`
  - Runnable with ability to change number of outputs at runtime
  - Cannot be added to a Segment
  - Good backpressure support. Can still process requests if downstream blocks
  - `LambdaDynamicRouter`
    - Provide key function via lambda
- `DynamicRouterComponentBase`
  - Sink
    - `ForwardingWritableProvider`
  - Source
    - `MultiWritableAcceptor`, `MultiReadableProvider`, `MultiSourceChannelOwner`
  - Non-Runnable with ability to change number of outputs at runtime
  - Limited backpressure support. Will block if downstream blocks
  - `LambdaDynamicRouterComponent`
    - Provide key function via lambda
- `StaticRouterRunnableBase`
  - Sink
    - `WritableProvider`, `ReadableAcceptor`, `SinkChannelOwner`
  - Source
    - `MultiWritableAcceptor`, `MultiReadableProvider`, `MultiSourceChannelOwner`
  - Runnable with fixed number of outputs
  - Can be added to a Segment since it supports child outputs
  - Good backpressure support. Can still process requests if downstream blocks
  - `LambdaStaticRouterRunnable`
    - Provide key function via lambda
- `StaticRouterComponentBase`
  - Sink
    - `ForwardingWritableProvider`
  - Source
    - `MultiWritableAcceptor`, `MultiReadableProvider`, `MultiSourceChannelOwner`
  - Non-Runnable with fixed number of outputs
  - Can be added to a Segment since it supports child outputs
  - Limited backpressure support. Will block if downstream blocks
  - `LambdaStaticRouterComponent`
    - Provide key function via lambda

Closes #491

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: #502
  • Loading branch information
mdemoret-nv authored Oct 22, 2024
1 parent 61862b5 commit a4b2ca1
Show file tree
Hide file tree
Showing 72 changed files with 5,127 additions and 1,047 deletions.
43 changes: 41 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ project(mrc

morpheus_utils_initialize_install_prefix(MRC_USE_CONDA)

rapids_cmake_write_version_file(${CMAKE_BINARY_DIR}/autogenerated/include/mrc/version.hpp)

# Delay enabling CUDA until after we have determined our CXX compiler
if(NOT DEFINED CMAKE_CUDA_HOST_COMPILER)
message(STATUS "Setting CUDA host compiler to match CXX compiler: ${CMAKE_CXX_COMPILER}")
Expand Down Expand Up @@ -180,6 +178,47 @@ if(MRC_BUILD_DOCS)
add_subdirectory(docs)
endif()

# ##################################################################################################
# - install export ---------------------------------------------------------------------------------

set(doc_string
[=[
Provide targets for mrc.
]=])

set(code_string "")

set(rapids_project_version_compat SameMinorVersion)

# Need to explicitly set VERSION ${PROJECT_VERSION} here since rapids_cmake gets
# confused with the `RAPIDS_VERSION` variable we use
rapids_export(INSTALL ${PROJECT_NAME}
EXPORT_SET ${PROJECT_NAME}-exports
GLOBAL_TARGETS libmrc pymrc
COMPONENTS python
COMPONENTS_EXPORT_SET ${PROJECT_NAME}-python-exports
VERSION ${PROJECT_VERSION}
NAMESPACE mrc::
DOCUMENTATION doc_string
FINAL_CODE_BLOCK code_string
)

# ##################################################################################################
# - build export -----------------------------------------------------------------------------------
rapids_export(BUILD ${PROJECT_NAME}
EXPORT_SET ${PROJECT_NAME}-exports
GLOBAL_TARGETS libmrc pymrc
COMPONENTS python
COMPONENTS_EXPORT_SET ${PROJECT_NAME}-python-exports
VERSION ${PROJECT_VERSION}
LANGUAGES C CXX CUDA
NAMESPACE mrc::
DOCUMENTATION doc_string
FINAL_CODE_BLOCK code_string
)

# ##################################################################################################
# - debug info -------------------------------------------------------------------------------------
if (MRC_ENABLE_DEBUG_INFO)
morpheus_utils_print_all_targets()

Expand Down
1 change: 1 addition & 0 deletions ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Include mappings

# stdlib
{ "include": [ "<bits/chrono.h>", private, "<chrono>", "public" ] },
{ "include": [ "<bits/cxxabi_forced.h>", private, "<mutex>", "public" ] },
{ "include": [ "<bits/cxxabi_forced.h>", private, "<vector>", "public" ] },
{ "include": [ "<bits/this_thread_sleep.h>", private, "<thread>", "public" ] },
Expand Down
3 changes: 1 addition & 2 deletions ci/scripts/cpp_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ if [[ -n "${MRC_MODIFIED_FILES}" ]]; then

# Include What You Use
if [[ "${SKIP_IWYU}" == "" ]]; then
# Remove .h, .hpp, and .cu files from the modified list
shopt -s extglob
IWYU_MODIFIED_FILES=( "${MRC_MODIFIED_FILES[@]/*.@(h|hpp|cu)/}" )
IWYU_MODIFIED_FILES=( "${MRC_MODIFIED_FILES[@]}" )

if [[ -n "${IWYU_MODIFIED_FILES}" ]]; then
# Get the list of compiled files relative to this directory
Expand Down
88 changes: 48 additions & 40 deletions cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
# ##################################################################################################
# - libmrc -----------------------------------------------------------------------------------------

include(GenerateExportHeader)

# Generate the version header file
rapids_cmake_write_version_file(${CMAKE_CURRENT_BINARY_DIR}/autogenerated/include/mrc/version.hpp)

# Keep all source files sorted!!!
add_library(libmrc
src/internal/codable/codable_storage.cpp
Expand Down Expand Up @@ -125,6 +130,7 @@ add_library(libmrc
src/public/cuda/sync.cpp
src/public/edge/edge_adapter_registry.cpp
src/public/edge/edge_builder.cpp
src/public/exceptions/checks.cpp
src/public/exceptions/exception_catcher.cpp
src/public/manifold/manifold.cpp
src/public/memory/buffer_view.cpp
Expand Down Expand Up @@ -204,6 +210,47 @@ target_compile_features(libmrc PUBLIC cxx_std_20)

set_target_properties(libmrc PROPERTIES OUTPUT_NAME ${PROJECT_NAME})

# Generates an include file for specifying external linkage since everything is hidden by default
generate_export_header(libmrc
NO_EXPORT_MACRO_NAME
MRC_LOCAL
EXPORT_FILE_NAME
"${CMAKE_CURRENT_BINARY_DIR}/autogenerated/include/mrc/export.h"
)

# ##################################################################################################
# - source information -----------------------------------------------------------------------------

# Ideally, we dont use glob here. But there is no good way to guarantee you dont miss anything like *.cpp
file(GLOB_RECURSE libmrc_public_headers
LIST_DIRECTORIES FALSE
CONFIGURE_DEPENDS
"${CMAKE_CURRENT_SOURCE_DIR}/include/mrc/*"
)

# Add headers to target sources file_set so they can be installed
# https://discourse.cmake.org/t/installing-headers-the-modern-way-regurgitated-and-revisited/3238/3
target_sources(libmrc
PUBLIC
FILE_SET public_headers
TYPE HEADERS
BASE_DIRS "${CMAKE_CURRENT_SOURCE_DIR}/include"
FILES
${libmrc_public_headers}
)

# Add generated headers to fileset
target_sources(libmrc
PUBLIC
FILE_SET public_headers
TYPE HEADERS
BASE_DIRS
"${CMAKE_CURRENT_BINARY_DIR}/autogenerated/include"
FILES
"${CMAKE_CURRENT_BINARY_DIR}/autogenerated/include/mrc/version.hpp"
"${CMAKE_CURRENT_BINARY_DIR}/autogenerated/include/mrc/export.h"
)

# ##################################################################################################
# - install targets --------------------------------------------------------------------------------
rapids_cmake_install_lib_dir(lib_dir)
Expand All @@ -215,12 +262,7 @@ install(
DESTINATION ${lib_dir}
EXPORT ${PROJECT_NAME}-exports
COMPONENT Core
)

install(
DIRECTORY include/
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
COMPONENT Core
FILE_SET public_headers
)

# ##################################################################################################
Expand All @@ -234,37 +276,3 @@ endif()
if(MRC_BUILD_BENCHMARKS)
add_subdirectory(benchmarks)
endif()

# ##################################################################################################
# - install export ---------------------------------------------------------------------------------
set(doc_string
[=[
Provide targets for mrc.
]=])

set(code_string "")

set(rapids_project_version_compat SameMinorVersion)

# Need to explicitly set VERSION ${PROJECT_VERSION} here since rapids_cmake gets
# confused with the `RAPIDS_VERSION` variable we use
rapids_export(INSTALL ${PROJECT_NAME}
EXPORT_SET ${PROJECT_NAME}-exports
GLOBAL_TARGETS libmrc
VERSION ${PROJECT_VERSION}
NAMESPACE mrc::
DOCUMENTATION doc_string
FINAL_CODE_BLOCK code_string
)

# ##################################################################################################
# - build export ----------------------------------------------------------------------------------
rapids_export(BUILD ${PROJECT_NAME}
EXPORT_SET ${PROJECT_NAME}-exports
GLOBAL_TARGETS libmrc
VERSION ${PROJECT_VERSION}
LANGUAGES C CXX CUDA
NAMESPACE mrc::
DOCUMENTATION doc_string
FINAL_CODE_BLOCK code_string
)
77 changes: 0 additions & 77 deletions cpp/mrc/include/mrc/api.hpp

This file was deleted.

25 changes: 24 additions & 1 deletion cpp/mrc/include/mrc/channel/status.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,6 +17,8 @@

#pragma once

#include <ostream>

namespace mrc::channel {

enum class Status
Expand All @@ -29,4 +31,25 @@ enum class Status
error
};

static inline std::ostream& operator<<(std::ostream& os, const Status& s)
{
switch (s)

Check warning on line 36 in cpp/mrc/include/mrc/channel/status.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/channel/status.hpp#L36

Added line #L36 was not covered by tests
{
case Status::success:
return os << "success";
case Status::empty:
return os << "empty";
case Status::full:
return os << "full";
case Status::closed:
return os << "closed";
case Status::timeout:
return os << "timeout";
case Status::error:
return os << "error";
default:
throw std::logic_error("Unsupported channel::Status enum. Was a new value added recently?");

Check warning on line 51 in cpp/mrc/include/mrc/channel/status.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/channel/status.hpp#L38-L51

Added lines #L38 - L51 were not covered by tests
}
}

} // namespace mrc::channel
6 changes: 3 additions & 3 deletions cpp/mrc/include/mrc/core/fiber_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -43,14 +43,14 @@ class FiberPool
[[nodiscard]] virtual std::size_t thread_count() const = 0;

template <class F, class... ArgsT>
auto enqueue(std::uint32_t index, F&& f, ArgsT&&... args) -> Future<typename std::result_of<F(ArgsT...)>::type>
auto enqueue(std::uint32_t index, F&& f, ArgsT&&... args) -> Future<typename std::invoke_result_t<F, ArgsT...>>
{
return task_queue(index).enqueue(f, std::forward<ArgsT>(args)...);
}

template <class MetaDataT, class F, class... ArgsT>
auto enqueue(std::uint32_t index, MetaDataT&& md, F&& f, ArgsT&&... args)
-> Future<typename std::result_of<F(ArgsT...)>::type>
-> Future<typename std::invoke_result_t<F, ArgsT...>>
{
return task_queue(index).enqueue(std::forward<MetaDataT>(md), std::forward<F>(f), std::forward<ArgsT>(args)...);
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/mrc/include/mrc/core/task_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -40,31 +40,31 @@ class FiberTaskQueue
virtual ~FiberTaskQueue() = default;

template <class F, class... ArgsT>
auto enqueue(F&& f, ArgsT&&... args) -> Future<typename std::result_of<F(ArgsT...)>::type>
auto enqueue(F&& f, ArgsT&&... args) -> Future<typename std::invoke_result_t<F, ArgsT...>>
{
FiberMetaData meta_data;
return enqueue(meta_data, std::forward<F>(f), std::forward<ArgsT>(args)...);
}

template <class F, class... ArgsT>
auto enqueue(const FiberMetaData& meta_data, F&& f, ArgsT&&... args)
-> Future<typename std::result_of<F(ArgsT...)>::type>
-> Future<typename std::invoke_result_t<F, ArgsT...>>
{
FiberMetaData copy = meta_data;
return enqueue(std::move(copy), std::forward<F>(f), std::forward<ArgsT>(args)...);
}

template <class F, class... ArgsT>
auto enqueue(FiberMetaData&& meta_data, F&& f, ArgsT&&... args)
-> Future<typename std::result_of<F(ArgsT...)>::type>
-> Future<typename std::invoke_result_t<F, ArgsT...>>
{
if (task_queue().is_closed())
{
throw std::runtime_error("enqueue on stopped ws fiber pool");
}

using namespace boost::fibers;
using return_type_t = typename std::result_of<F(ArgsT...)>::type;
using return_type_t = typename std::invoke_result_t<F, ArgsT...>;

packaged_task<return_type_t()> task(std::bind(std::forward<F>(f), std::forward<ArgsT>(args)...));
future<return_type_t> future = task.get_future();
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/include/mrc/edge/deferred_edge.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -30,7 +30,7 @@

namespace mrc::edge {

class DeferredWritableMultiEdgeBase : public IMultiWritableAcceptorBase<std::size_t>,
class DeferredWritableMultiEdgeBase : public virtual IMultiWritableAcceptorBase<std::size_t>,
public virtual IEdgeWritableBase,
public virtual EdgeBase
{
Expand Down
Loading

0 comments on commit a4b2ca1

Please sign in to comment.