diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 6f6d95fb6..225d95f33 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -19,9 +19,6 @@ run-name: CI Pipeline on: workflow_call: inputs: - aws_region: - default: 'us-west-2' - type: string run_check: required: true type: boolean @@ -39,17 +36,10 @@ on: required: true CONDA_TOKEN: required: true - GHA_AWS_ACCESS_KEY_ID: - required: true - GHA_AWS_SECRET_ACCESS_KEY: - required: true NGC_API_KEY: required: true env: - AWS_DEFAULT_REGION: ${{ inputs.aws_region }} - AWS_ACCESS_KEY_ID: "${{ secrets.GHA_AWS_ACCESS_KEY_ID }}" - AWS_SECRET_ACCESS_KEY: "${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }}" CHANGE_TARGET: "${{ github.base_ref }}" GH_TOKEN: "${{ github.token }}" GIT_COMMIT: "${{ github.sha }}" @@ -57,6 +47,20 @@ env: WORKSPACE: "${{ github.workspace }}/mrc" WORKSPACE_TMP: "${{ github.workspace }}/tmp" +permissions: + actions: none + checks: none + contents: read + deployments: none + discussions: none + id-token: write + issues: none + packages: read + pages: none + pull-requests: read + repository-projects: none + security-events: none + statuses: none jobs: check: @@ -80,6 +84,13 @@ jobs: path: 'mrc' fetch-depth: 0 + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: Check shell: bash run: ./mrc/ci/scripts/github/checks.sh @@ -105,6 +116,13 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: Build:linux:x86_64 shell: bash env: @@ -114,7 +132,7 @@ jobs: test: name: Test needs: [build] - runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-525-1] timeout-minutes: 60 container: credentials: @@ -137,6 +155,13 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: Test:linux:x86_64 shell: bash env: @@ -146,7 +171,7 @@ jobs: codecov: name: Code Coverage - runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-525-1] timeout-minutes: 60 container: credentials: @@ -167,6 +192,13 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: Build shell: bash run: ./mrc/ci/scripts/github/build.sh @@ -197,6 +229,13 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: build_docs shell: bash run: ./mrc/ci/scripts/github/docs.sh @@ -222,6 +261,13 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: pre_benchmark shell: bash run: ./mrc/ci/scripts/github/pre_benchmark.sh @@ -255,6 +301,13 @@ jobs: path: 'mrc' fetch-depth: 0 + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h + - name: conda shell: bash env: diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 6cd91d66e..e310aa2d1 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -27,17 +27,30 @@ concurrency: group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' cancel-in-progress: true +permissions: + actions: none + checks: none + contents: read + deployments: none + discussions: none + id-token: write + issues: none + packages: read + pages: none + pull-requests: read + repository-projects: none + security-events: none + statuses: none + jobs: ci_pipe: uses: ./.github/workflows/ci_pipe.yml with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-221130 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-base-221130 + container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230315 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230315 secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} CONDA_TOKEN: ${{ secrets.CONDA_TOKEN }} - GHA_AWS_ACCESS_KEY_ID: ${{ secrets.GHA_AWS_ACCESS_KEY_ID }} - GHA_AWS_SECRET_ACCESS_KEY: ${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }} NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/.gitmodules b/.gitmodules index 10b4ee8b5..cdeafc917 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "morpheus_utils"] path = external/utilities url = https://github.com/nv-morpheus/utilities.git - branch = branch-23.01 + branch = branch-23.03 diff --git a/CHANGELOG.md b/CHANGELOG.md index c245fda00..9f19e9aef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,26 @@ +# MRC 23.03.00 (29 Mar 2023) + +## 🚨 Breaking Changes + +- Cleanup top-level forward.hpp and types.hpp ([#292](https://github.com/nv-morpheus/MRC/pull/292)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 🐛 Bug Fixes + +- Cleanup top-level forward.hpp and types.hpp ([#292](https://github.com/nv-morpheus/MRC/pull/292)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 🛠️ Improvements + +- Updating to driver 525 ([#299](https://github.com/nv-morpheus/MRC/pull/299)) [@jjacobelli](https://github.com/jjacobelli) +- Improvements to the python module generation CMake code ([#298](https://github.com/nv-morpheus/MRC/pull/298)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Update workflow `permissions` block ([#296](https://github.com/nv-morpheus/MRC/pull/296)) [@ajschmidt8](https://github.com/ajschmidt8) +- Set AWS credentials lifetime to 12h ([#295](https://github.com/nv-morpheus/MRC/pull/295)) [@jjacobelli](https://github.com/jjacobelli) +- Use AWS OIDC to get AWS creds ([#294](https://github.com/nv-morpheus/MRC/pull/294)) [@jjacobelli](https://github.com/jjacobelli) +- Pointer cast macro ([#293](https://github.com/nv-morpheus/MRC/pull/293)) [@dagardner-nv](https://github.com/dagardner-nv) +- Update `sccache` bucket ([#289](https://github.com/nv-morpheus/MRC/pull/289)) [@ajschmidt8](https://github.com/ajschmidt8) +- Update CMake to only add fcoroutines flag if clang version is less than 16 + bump to latest utils ([#288](https://github.com/nv-morpheus/MRC/pull/288)) [@drobison00](https://github.com/drobison00) +- Mirror module / buffer + python bindings. ([#286](https://github.com/nv-morpheus/MRC/pull/286)) [@drobison00](https://github.com/drobison00) +- Updating to use driver 520 ([#282](https://github.com/nv-morpheus/MRC/pull/282)) [@mdemoret-nv](https://github.com/mdemoret-nv) + # MRC 23.01.00 (30 Jan 2023) ## 🚨 Breaking Changes diff --git a/CMakeLists.txt b/CMakeLists.txt index 207f6e4ef..e19c3a8ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,7 +78,7 @@ morpheus_utils_initialize_package_manager( morpheus_utils_initialize_cuda_arch(mrc) project(mrc - VERSION 23.01.00 + VERSION 23.03.00 LANGUAGES C CXX ) @@ -92,7 +92,6 @@ if(NOT DEFINED CMAKE_CUDA_HOST_COMPILER) # incompatible with CUDA 11.4/11.5/11.6. See Issue #102 if(NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") set(CMAKE_CUDA_HOST_COMPILER ${CMAKE_CXX_COMPILER}) - endif() endif() @@ -100,7 +99,18 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") # Our version of NVCC officially only supports clang versions 3.2 - 13, we are now using 14 set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -allow-unsupported-compiler") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines-ts") + # Check if the major version of Clang is greater than 15 + execute_process(COMMAND "${CMAKE_CXX_COMPILER}" "--version" + OUTPUT_VARIABLE clang_version_info + ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE) + + string(REGEX MATCH "version [0-9]+" clang_version_value ${clang_version_info}) + string(REGEX REPLACE "version ([0-9]+)" "\\1" clang_version ${clang_version_value}) + + if(${clang_version} VERSION_LESS_EQUAL "15") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines-ts") + endif() + endif() # Now enable CUDA diff --git a/Dockerfile b/Dockerfile index d9c0a6944..266d514c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,10 +62,16 @@ RUN --mount=type=cache,target=/var/cache/apt \ apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ - libnvidia-compute-495 \ + libnvidia-compute-525 \ && \ rm -rf /var/lib/apt/lists/* +# ============ test ================== +FROM base as test + +# Add any test only dependencies here. For now there is none but we need the +# target to get the CI runner build scripts to work + # ========= development ================ FROM base as development diff --git a/ci/scripts/github/build.sh b/ci/scripts/github/build.sh index 7e6db5e6a..c1eae4d58 100755 --- a/ci/scripts/github/build.sh +++ b/ci/scripts/github/build.sh @@ -26,6 +26,7 @@ rapids-logger "Check versions" python3 --version cmake --version ninja --version +sccache --version if [[ "${BUILD_CC}" == "gcc" ]]; then rapids-logger "Building with GCC" diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index ccd36ab71..8765d51e6 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -59,8 +59,8 @@ export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}" # Set sccache env vars export SCCACHE_S3_KEY_PREFIX=mrc-${NVARCH}-${BUILD_CC} -export SCCACHE_BUCKET=rapids-sccache -export SCCACHE_REGION="${AWS_DEFAULT_REGION}" +export SCCACHE_BUCKET=rapids-sccache-east +export SCCACHE_REGION="us-east-2" export SCCACHE_IDLE_TIMEOUT=32768 #export SCCACHE_LOG=debug diff --git a/cpp/mrc/CMakeLists.txt b/cpp/mrc/CMakeLists.txt index 079277129..04424e300 100644 --- a/cpp/mrc/CMakeLists.txt +++ b/cpp/mrc/CMakeLists.txt @@ -21,14 +21,14 @@ add_library(libmrc src/internal/codable/codable_storage.cpp src/internal/codable/decodable_storage_view.cpp src/internal/codable/storage_view.cpp - src/internal/control_plane/client.cpp src/internal/control_plane/client/connections_manager.cpp + src/internal/control_plane/client.cpp src/internal/control_plane/client/instance.cpp src/internal/control_plane/client/state_manager.cpp src/internal/control_plane/client/subscription_service.cpp src/internal/control_plane/resources.cpp - src/internal/control_plane/server.cpp src/internal/control_plane/server/connection_manager.cpp + src/internal/control_plane/server.cpp src/internal/control_plane/server/subscription_manager.cpp src/internal/control_plane/server/tagged_issuer.cpp src/internal/data_plane/callbacks.cpp @@ -61,8 +61,8 @@ add_library(libmrc src/internal/resources/manager.cpp src/internal/resources/partition_resources_base.cpp src/internal/resources/partition_resources.cpp - src/internal/runnable/engine_factory.cpp src/internal/runnable/engine.cpp + src/internal/runnable/engine_factory.cpp src/internal/runnable/engines.cpp src/internal/runnable/fiber_engine.cpp src/internal/runnable/fiber_engines.cpp @@ -84,18 +84,18 @@ add_library(libmrc src/internal/system/fiber_pool.cpp src/internal/system/fiber_task_queue.cpp src/internal/system/gpu_info.cpp - src/internal/system/host_partition_provider.cpp src/internal/system/host_partition.cpp + src/internal/system/host_partition_provider.cpp src/internal/system/iresources.cpp src/internal/system/isystem.cpp - src/internal/system/partition_provider.cpp src/internal/system/partition.cpp + src/internal/system/partition_provider.cpp src/internal/system/partitions.cpp src/internal/system/resources.cpp - src/internal/system/system_provider.cpp src/internal/system/system.cpp - src/internal/system/thread_pool.cpp + src/internal/system/system_provider.cpp src/internal/system/thread.cpp + src/internal/system/thread_pool.cpp src/internal/system/topology.cpp src/internal/ucx/context.cpp src/internal/ucx/endpoint.cpp @@ -108,8 +108,8 @@ add_library(libmrc src/internal/utils/parse_config.cpp src/internal/utils/parse_ints.cpp src/internal/utils/shared_resource_bit_map.cpp - src/public/benchmarking/trace_statistics.cpp src/public/benchmarking/tracer.cpp + src/public/benchmarking/trace_statistics.cpp src/public/benchmarking/util.cpp src/public/channel/channel.cpp src/public/codable/encoded_object.cpp diff --git a/cpp/mrc/benchmarks/bench_segment.cpp b/cpp/mrc/benchmarks/bench_segment.cpp index c0aef1729..6960ebe18 100644 --- a/cpp/mrc/benchmarks/bench_segment.cpp +++ b/cpp/mrc/benchmarks/bench_segment.cpp @@ -272,7 +272,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture }), m_watcher->create_tracer_emit_tap(int_name)); - segment.make_dynamic_edge(last_node, internal); + segment.make_edge(last_node, internal); last_node = internal; } @@ -281,7 +281,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture sink_name, m_watcher->create_tracer_sink_lambda(sink_name, [](tracer_type_t& data) {})); - segment.make_dynamic_edge(last_node, sink); + segment.make_edge(last_node, sink); }; auto pipeline = pipeline::make_pipeline(); diff --git a/cpp/mrc/include/mrc/edge/edge_builder.hpp b/cpp/mrc/include/mrc/edge/edge_builder.hpp index 71b53688a..581b7da41 100644 --- a/cpp/mrc/include/mrc/edge/edge_builder.hpp +++ b/cpp/mrc/include/mrc/edge/edge_builder.hpp @@ -176,6 +176,101 @@ struct EdgeBuilder final sink.set_readable_edge_handle(edge); } + template + static void splice_edge(SourceT& source, SinkT& sink, SpliceInputT& splice_input, SpliceOutputT& splice_output) + { + using source_full_t = SourceT; + using sink_full_t = SinkT; + + // Have to jump through some hoops here, mimics what 'writable_acceptor_typed' does, so we get everything + // aligned correctly. + if constexpr (is_base_of_template::value && + is_base_of_template::value) + { + /* + * In this case, the source object has accepted a writable edge from the sink. + * + * Given: [source [edge_handle]] -> [sink] + * + * We will: + * - Get a reference to the edge_handle the source is holding + * - Reset the Source edge connection + * - Create a new edge from the source to our WritableProvider splice node + * - Set the edge_handle for the WritableAcceptor splice node to the edge_handle from the source + * + * This will result in the following: + * + * [source[new_edge_handle]] -> [splice_node[old_edge_handle]] -> [sink] + * + */ + // We don't need to know the data type of the sink, the source node will have the same data type as the + // splice node, and we already know the sink can provide an edge for the source's data type. + // [source] -> [sink] => [[source] -> [splice_node]] -> [sink] + auto* splice_writable_provider = dynamic_cast*>(&splice_input); + CHECK(splice_writable_provider != nullptr) << "Splice input is not a writable provider"; + + auto* splice_writable_acceptor = dynamic_cast*>(&splice_output); + CHECK(splice_writable_acceptor != nullptr) << "Splice output is not a writable acceptor"; + + auto* writable_acceptor = dynamic_cast*>(&source); + CHECK(writable_acceptor != nullptr) << "Source is not a writable acceptor"; + + auto* edge_holder_ptr = dynamic_cast*>(writable_acceptor); + if (edge_holder_ptr == nullptr) + { + LOG(FATAL) << "Writable acceptor failed to cast to EdgeHolder"; + } + + auto& edge_holder = *edge_holder_ptr; + CHECK(edge_holder.check_active_connection(false)) << "No active connection to splice into"; + + auto edge_handle = edge_holder.get_connected_edge(); + edge_holder.release_edge_connection(); + + make_edge_writable(*writable_acceptor, *splice_writable_provider); + make_edge_writable(*splice_writable_acceptor, sink); + } + else if constexpr (is_base_of_template::value && + is_base_of_template::value) + { + // We don't need to know the data type of the source, the sink node will have the same data type as the + // splice node, and we already know the source can provide an edge for the sink's data type. + // [source] -> [sink] => [source] -> [[splice_node] -> [sink]] + auto* splice_readable_provider = dynamic_cast*>(&splice_input); + CHECK(splice_readable_provider != nullptr) << "Splice input is not a writable provider"; + + auto* splice_readable_acceptor = dynamic_cast*>(&splice_output); + CHECK(splice_readable_acceptor != nullptr) << "Splice output is not a writable acceptor"; + + auto* readable_acceptor = dynamic_cast*>(&sink); + CHECK(readable_acceptor != nullptr) << "Sink is not a writable provider"; + + auto* edge_holder_ptr = dynamic_cast*>(readable_acceptor); + if (edge_holder_ptr == nullptr) + { + LOG(FATAL) << "Readable acceptor failed to cast to EdgeHolder"; + } + + auto& edge_holder = *edge_holder_ptr; + CHECK(edge_holder.check_active_connection(false)) << "No active connection to splice into"; + + // Grab the Acceptor's edge handle and release it from the Acceptor + // Make sure we hold the edge handle until the new edge to the splice has been formed. + // TODO(Devin): Can we double check that the edge handle from the source matches the one from the sink? + auto edge_handle = edge_holder.get_connected_edge(); + edge_holder.release_edge_connection(); + + make_edge_readable(source, *splice_readable_acceptor); + make_edge_readable(*splice_readable_provider, *readable_acceptor); + } + else + { + static_assert(!sizeof(source_full_t), + "Arguments to splice_edge were incorrect. Ensure you are providing either " + "WritableAcceptor->WritableProvider or ReadableProvider->ReadableAcceptor"); + } + } + private: static std::shared_ptr do_adapt_ingress(const EdgeTypeInfo& target_type, std::shared_ptr ingress); diff --git a/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp b/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp index 42f0b4476..7c9347a52 100644 --- a/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp +++ b/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp @@ -29,12 +29,13 @@ class Launchable; } // namespace mrc::runnable // todo(ryan) - most base classes that will be owned by the engine will need to be moved to engine api/lib +namespace mrc::modules { +class SegmentModule; +} namespace mrc::segment { - class ObjectProperties; class EgressPortBase; class IngressPortBase; - } // namespace mrc::segment namespace mrc::internal::segment { @@ -54,6 +55,7 @@ class IBuilder final bool has_object(const std::string& name) const; ::mrc::segment::ObjectProperties& find_object(const std::string& name); void add_object(const std::string& name, std::shared_ptr<::mrc::segment::ObjectProperties> object); + void add_module(const std::string& name, std::shared_ptr module); void add_runnable(const std::string& name, std::shared_ptr runnable); std::shared_ptr<::mrc::segment::IngressPortBase> get_ingress_base(const std::string& name); std::shared_ptr<::mrc::segment::EgressPortBase> get_egress_base(const std::string& name); diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp new file mode 100644 index 000000000..fa3ac3c11 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapModule : public SegmentModule, public PersistentModule +{ + using type_t = MirrorTapModule; + + public: + MirrorTapModule(std::string module_name); + + MirrorTapModule(std::string module_name, nlohmann::json config); + + std::string tap_egress_port_name() const; + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + [[maybe_unused]] static std::atomic s_tap_id; + + std::string m_egress_name; +}; + +template +std::atomic MirrorTapModule::s_tap_id{0}; + +template +MirrorTapModule::MirrorTapModule(std::string module_name) : + SegmentModule(std::move(module_name)), + m_egress_name("mirror_tap_source_" + std::to_string(s_tap_id++)) +{} + +template +MirrorTapModule::MirrorTapModule(std::string module_name, nlohmann::json _config) : + SegmentModule(std::move(module_name), std::move(_config)), + m_egress_name("mirror_tap_source_" + std::to_string(s_tap_id++)) +{ + if (config().contains("tap_id_override")) + { + m_egress_name = config()["tap_id_override"]; + } +} + +template +std::string MirrorTapModule::tap_egress_port_name() const +{ + return m_egress_name; +} + +template +void MirrorTapModule::initialize(segment::Builder& builder) +{ + // ********** Implementation ************ // + auto bcast = builder.construct_object>("broadcast"); + + builder.make_edge(bcast, builder.get_egress(m_egress_name)); // to mirror tap + + // Register the submodules output as one of this module's outputs + register_input_port("input", bcast); + register_output_port("output", bcast); +} + +template +std::string MirrorTapModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp new file mode 100644 index 000000000..a78a465a3 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp @@ -0,0 +1,136 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp" +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/segment/egress_ports.hpp" +#include "mrc/segment/ingress_ports.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapOrchestrator +{ + using initializer_t = std::function; + using type_t = MirrorTapOrchestrator; + + public: + MirrorTapOrchestrator(std::string tap_name); + + MirrorTapOrchestrator(std::string tap_name, nlohmann::json config); + + initializer_t tap(initializer_t initializer, const std::string tap_from, const std::string tap_to) + { + using namespace modules; + return [this, initializer, tap_from, tap_to](segment::Builder& builder) { + initializer(builder); + builder.init_module(m_tap); + + builder.splice_edge(tap_from, tap_to, m_tap->input_port("input"), m_tap->output_port("output")); + }; + } + + initializer_t stream_to(initializer_t initializer, const std::string entry_point) + { + using namespace modules; + return [this, initializer, entry_point](segment::Builder& builder) { + initializer(builder); + + builder.init_module(m_stream); + builder.make_edge(m_stream->output_port("output"), entry_point); + }; + } + + template + segment::IngressPorts create_or_extend_ingress_ports( + segment::IngressPorts& ingress_ports) const + { + auto names(ingress_ports.names()); + names.push_back(get_ingress_tap_name()); + + return segment::IngressPorts(std::move(names)); + } + + segment::IngressPorts create_or_extend_ingress_ports() const + { + return segment::IngressPorts({get_ingress_tap_name()}); + } + + template + segment::EgressPorts create_or_extend_egress_ports( + segment::EgressPorts& egress_ports) const + { + auto names(egress_ports.names()); + names.push_back(get_ingress_tap_name()); + + return segment::EgressPorts(std::move(names)); + } + + segment::EgressPorts create_or_extend_egress_ports() const + { + return segment::EgressPorts({get_egress_tap_name()}); + } + + std::string get_egress_tap_name() const; + + std::string get_ingress_tap_name() const; + + private: + std::shared_ptr> m_tap; + std::shared_ptr> m_stream; +}; + +template +MirrorTapOrchestrator::MirrorTapOrchestrator(std::string tap_name) : + m_tap(std::make_shared>(std::move(tap_name))), + m_stream(std::make_shared>(std::move(tap_name))) +{ + m_stream->tap_ingress_port_name(m_tap->tap_egress_port_name()); +} + +template +MirrorTapOrchestrator::MirrorTapOrchestrator(std::string tap_name, nlohmann::json config) : + m_tap(std::make_shared>(tap_name, config)), + m_stream(std::make_shared>(tap_name, config)) +{ + m_stream->tap_ingress_port_name(m_tap->tap_egress_port_name()); +} + +template +[[maybe_unused]] std::string MirrorTapOrchestrator::get_egress_tap_name() const +{ + return m_tap->tap_egress_port_name(); +} + +template +[[maybe_unused]] std::string MirrorTapOrchestrator::get_ingress_tap_name() const +{ + return m_stream->tap_ingress_port_name(); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp new file mode 100644 index 000000000..843a8b8c4 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp" +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapStreamModule : public SegmentModule, public PersistentModule +{ + using type_t = MirrorTapStreamModule; + + public: + MirrorTapStreamModule(std::string module_name); + + MirrorTapStreamModule(std::string module_name, nlohmann::json _config); + + std::string tap_ingress_port_name() const; + void tap_ingress_port_name(std::string name); + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + std::shared_ptr> m_stream_buffer; + + std::string m_ingress_name; +}; + +template +MirrorTapStreamModule::MirrorTapStreamModule(std::string module_name) : SegmentModule(std::move(module_name)) +{} + +template +MirrorTapStreamModule::MirrorTapStreamModule(std::string module_name, nlohmann::json _config) : + SegmentModule(std::move(module_name), std::move(_config)) +{ + if (config().contains("tap_id_override")) + { + m_ingress_name = config()["tap_id_override"]; + } +} + +template +std::string MirrorTapStreamModule::tap_ingress_port_name() const +{ + return m_ingress_name; +} + +template +void MirrorTapStreamModule::tap_ingress_port_name(std::string ingress_port_name) +{ + m_ingress_name = std::move(ingress_port_name); +} + +template +void MirrorTapStreamModule::initialize(segment::Builder& builder) +{ + auto mirror_ingress = builder.get_ingress(m_ingress_name); + m_stream_buffer = builder.make_module>("test", config()); + + builder.make_edge(mirror_ingress, m_stream_buffer->input_port("input")); + + register_output_port("output", m_stream_buffer->output_port("output")); +} + +template +std::string MirrorTapStreamModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp new file mode 100644 index 000000000..93f036f21 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp @@ -0,0 +1,45 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace mrc::modules::stream_buffers { + +template +class StreamBufferBase +{ + public: + virtual ~StreamBufferBase() = default; + + virtual std::size_t buffer_size() = 0; + + virtual void buffer_size(std::size_t size) = 0; + + virtual bool empty() = 0; + + virtual void push_back(DataTypeT&& data) = 0; + + virtual void flush_next(rxcpp::subscriber& subscriber) = 0; + + virtual void flush_all(rxcpp::subscriber& subscriber) = 0; +}; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp new file mode 100644 index 000000000..06e74f334 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mrc::modules::stream_buffers { + +template +class StreamBufferImmediate : public StreamBufferBase +{ + public: + StreamBufferImmediate() : m_ring_buffer_write(128), m_ring_buffer_read(128) {} + + StreamBufferImmediate(std::size_t buffer_size) : m_ring_buffer_write(buffer_size), m_ring_buffer_read(buffer_size) + {} + + std::size_t buffer_size() override + { + return m_ring_buffer_write.capacity(); + } + + void buffer_size(std::size_t size) override + { + m_ring_buffer_write.set_capacity(size); + m_ring_buffer_read.set_capacity(size); + } + + bool empty() override + { + return m_ring_buffer_write.empty(); + } + + void push_back(DataTypeT&& data) override + { + std::lock_guard lock(m_write_mutex); + m_ring_buffer_write.push_back(std::move(data)); + } + + // Not thread safe for multiple readers + void flush_next(rxcpp::subscriber& subscriber) override + { + std::lock_guard wlock(m_write_mutex); + + subscriber.on_next(m_ring_buffer_write.front()); + m_ring_buffer_write.pop_front(); + } + + // Not thread safe for multiple readers + void flush_all(rxcpp::subscriber& subscriber) override + { + { + std::lock_guard wlock(m_write_mutex); + // O(1), based on the size of the circular buffer. + m_ring_buffer_write.swap(m_ring_buffer_read); + } + + while (!m_ring_buffer_read.empty()) + { + subscriber.on_next(m_ring_buffer_read.front()); + m_ring_buffer_read.pop_front(); + } + } + + private: + std::mutex m_write_mutex; + + boost::circular_buffer m_ring_buffer_write; + boost::circular_buffer m_ring_buffer_read; +}; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp new file mode 100644 index 000000000..34f3bc9a0 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp @@ -0,0 +1,150 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/segment/builder.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mrc::modules { + +/** + * @brief Buffers a stream of data; avoiding any stalls by over-writing older data when the buffer is full. + * We guarantee not to block the data stream; but, may drop data if the buffer is full. + * @tparam DataTypeT The type of data to buffer + */ +template class StreamBufferTypeT = mrc::modules::stream_buffers::StreamBufferImmediate> +class StreamBufferModule : public SegmentModule, public PersistentModule +{ + static_assert(stream_buffers::IsStreamBuffer, + "StreamBufferTypeT must be derived from StreamBufferBase"); + using type_t = StreamBufferModule; + + public: + StreamBufferModule(std::string module_name); + + StreamBufferModule(std::string module_name, nlohmann::json config); + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + StreamBufferTypeT m_stream_buffer; + rxcpp::subjects::subject m_subject{}; +}; + +template class StreamBufferTypeT> +StreamBufferModule::StreamBufferModule(std::string module_name) : + SegmentModule(std::move(module_name)), + PersistentModule(), + m_stream_buffer() +{} + +template class StreamBufferTypeT> +StreamBufferModule::StreamBufferModule(std::string module_name, nlohmann::json config) : + SegmentModule(std::move(module_name), std::move(config)), + PersistentModule(), + m_stream_buffer() +{ + if (this->config().contains("buffer_size")) + { + m_stream_buffer.buffer_size(this->config()["buffer_size"]); + } +} + +template class StreamBufferTypeT> +void StreamBufferModule::initialize(segment::Builder& builder) +{ + auto buffer_sink = builder.template make_sink("buffer_sink_new", m_subject.get_subscriber()); + + // This is a hack, because we don't correctly support passing observables to RxSource creation yet + // Consume values from subject and push them to ring buffer + m_subject.get_observable().subscribe( + [this](DataTypeT data) { + m_stream_buffer.push_back(std::move(data)); + VLOG(10) << "Subscriber 1: OnNext -> push to ring buffer " << std::endl; + }, + [this](std::exception_ptr ep) { + VLOG(10) << "Subscriber 1: OnError" << std::endl; + }, + [this]() { + VLOG(10) << "Subscriber 1: OnCompleted" << std::endl; + }); + + // Example of adding a second subscriber + /* + m_subject.get_observable().subscribe( + [this](DataTypeT data) { + VLOG(10) << "Subscriber 2: OnNext -> " << data << std::endl; + }, + [this](std::exception_ptr ep) { + VLOG(10) << "Subscriber 2: OnError" << std::endl; + }, + [this]() { + VLOG(10) << "Subscriber 2: OnCompleted" << std::endl; + }); + */ + + // Create our source that reads from the buffer as long as it has a subscriber and + // our subject hasn't called on_complete() + auto buffer_source = builder.template make_source( + "buffer_source", + [this](rxcpp::subscriber& subscriber) { + // TODO(Devin): not currently supported + // m_subject.get_observable().subscribe(subscriber); + while (subscriber.is_subscribed() && m_subject.has_observers()) + { + if (!m_stream_buffer.empty()) + { + m_stream_buffer.flush_all(subscriber); + } + else + { + boost::this_fiber::yield(); + } + } + }); + + register_input_port("input", buffer_sink); + register_output_port("output", buffer_source); +} + +template class StreamBufferTypeT> +std::string StreamBufferModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp new file mode 100644 index 000000000..8433e2636 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp @@ -0,0 +1,32 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp" + +#include +#include + +namespace mrc::modules::stream_buffers { + +template class StreamBufferTypeT> +concept IsStreamBuffer = requires { + typename StreamBufferTypeT; + std::is_base_of_v, StreamBufferTypeT>; + }; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/forward.hpp b/cpp/mrc/include/mrc/forward.hpp index 1ac39f275..4eb296c25 100644 --- a/cpp/mrc/include/mrc/forward.hpp +++ b/cpp/mrc/include/mrc/forward.hpp @@ -20,36 +20,15 @@ namespace mrc { class Executor; -class ExecutorBase; -template -class NetworkDeserializer; - -class Options; - -class Runtime; - -class Placement; -class PlacementGroup; +struct PlacementGroup; namespace pipeline { class Pipeline; } // namespace pipeline -class PipelineConfiguration; - -class SegmentResources; - -class PipelineInstanceResources; - -class PipelineConfiguration; -class SegmentConfiguration; - -class PipelineAssignment; -class SegmentAssignment; - -class Cpuset; -class NumaSet; +struct CpuSet; +struct NumaSet; class Options; class FiberPoolOptions; diff --git a/cpp/mrc/include/mrc/modules/properties/persistent.hpp b/cpp/mrc/include/mrc/modules/properties/persistent.hpp new file mode 100644 index 000000000..c62932ee7 --- /dev/null +++ b/cpp/mrc/include/mrc/modules/properties/persistent.hpp @@ -0,0 +1,28 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace mrc::modules { +/** + * @brief A module that is persistent across segment restarts. + * @note This module is not intended to be used directly. It is used as a property indicator for other segment modules. + * It is intentionally empty. + */ +class PersistentModule +{}; +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/modules/segment_modules.hpp b/cpp/mrc/include/mrc/modules/segment_modules.hpp index b6324f79c..31bc93be5 100644 --- a/cpp/mrc/include/mrc/modules/segment_modules.hpp +++ b/cpp/mrc/include/mrc/modules/segment_modules.hpp @@ -27,6 +27,9 @@ namespace mrc::segment { class Builder; +} // namespace mrc::segment + +namespace mrc::segment { struct ObjectProperties; } // namespace mrc::segment @@ -147,11 +150,32 @@ class SegmentModule /** * Register an output port that should be exposed for the module * @param input_name Port name - * @param object ObjectProperties object assocaited with the port + * @param object ObjectProperties object associated with the port */ void register_output_port(std::string output_name, std::shared_ptr object); private: + /** + * Register an input port that should be exposed for the module, with explicit type index. This is + * necessary for Objects that aren't explicit Source or Sink types (e.g. a custom object type) + * @param input_name Port name + * @param object ObjectProperties object associated with the port + * @param tidx Type index of the object's payload data type + */ + void register_typed_input_port(std::string input_name, + std::shared_ptr object, + std::type_index tidx); + /** + * Register an output port that should be exposed for the module, with explicit type index. This is + * necessary for Objects that aren't explicit Source or Sink types (e.g. a custom object type) + * @param output_name Port name + * @param object ObjectProperties object associated with the port + * @param tidx Type index of the object's payload data type + */ + void register_typed_output_port(std::string output_name, + std::shared_ptr object, + std::type_index tidx); + const std::string m_module_instance_name; std::string m_module_instance_registered_namespace{}; diff --git a/cpp/mrc/include/mrc/node/operators/broadcast.hpp b/cpp/mrc/include/mrc/node/operators/broadcast.hpp index bd9071bb2..c3bfde7ae 100644 --- a/cpp/mrc/include/mrc/node/operators/broadcast.hpp +++ b/cpp/mrc/include/mrc/node/operators/broadcast.hpp @@ -194,6 +194,9 @@ class Broadcast : public WritableProvider, public edge::IWritableAcceptor }; public: + using source_type_t = T; + using sink_type_t = T; + Broadcast(bool deep_copy = false) { auto edge = std::make_shared(*this, deep_copy); diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index 8550c43f8..0962f5c38 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -19,26 +19,26 @@ #include "mrc/benchmarking/trace_statistics.hpp" #include "mrc/edge/edge_builder.hpp" -#include "mrc/edge/edge_readable.hpp" -#include "mrc/edge/edge_writable.hpp" -#include "mrc/engine/segment/ibuilder.hpp" // IWYU pragma: export +#include "mrc/engine/segment/ibuilder.hpp" #include "mrc/exceptions/runtime_error.hpp" #include "mrc/node/rx_node.hpp" #include "mrc/node/rx_sink.hpp" #include "mrc/node/rx_source.hpp" -#include "mrc/node/sink_properties.hpp" // IWYU pragma: export -#include "mrc/node/source_properties.hpp" // IWYU pragma: export +#include "mrc/node/sink_properties.hpp" // IWYU pragma: keep +#include "mrc/node/source_properties.hpp" // IWYU pragma: keep #include "mrc/runnable/context.hpp" -#include "mrc/runnable/runnable.hpp" // IWYU pragma: export -#include "mrc/segment/component.hpp" // IWYU pragma: export -#include "mrc/segment/object.hpp" // IWYU pragma: export -#include "mrc/segment/runnable.hpp" // IWYU pragma: export +#include "mrc/runnable/runnable.hpp" // IWYU pragma: keep +#include "mrc/segment/component.hpp" // IWYU pragma: keep +#include "mrc/segment/concepts/object_traits.hpp" +#include "mrc/segment/object.hpp" // IWYU pragma: keep +#include "mrc/segment/runnable.hpp" // IWYU pragma: keep #include "mrc/type_traits.hpp" #include "mrc/utils/macros.hpp" +#include "mrc/utils/type_utils.hpp" -#include // IWYU pragma: export -#include // IWYU pragma: export -#include // IWYU pragma: export +#include +#include +#include #include #include #include @@ -47,25 +47,31 @@ #include #include #include +#include #include #include #include #include #include +namespace mrc { +struct WatcherInterface; +} // namespace mrc + +namespace mrc::modules { +class SegmentModule; +} // namespace mrc::modules + namespace mrc::node { template class RxSinkBase; +} // namespace mrc::node + +namespace mrc::node { template class RxSourceBase; } // namespace mrc::node -namespace mrc { -struct WatcherInterface; -} // namespace mrc -namespace mrc::modules { -class SegmentModule; -} // namespace mrc::modules namespace mrc::segment { class Definition; } // namespace mrc::segment @@ -119,6 +125,7 @@ class Builder final DELETE_MOVEABILITY(Builder); std::shared_ptr get_ingress(std::string name, std::type_index type_index); + std::shared_ptr get_egress(std::string name, std::type_index type_index); template @@ -131,68 +138,105 @@ class Builder final std::shared_ptr> make_object(std::string name, std::unique_ptr node); template - std::shared_ptr> construct_object(std::string name, ArgsT&&... args) - { - auto ns_name = m_namespace_prefix.empty() ? name : m_namespace_prefix + "/" + name; - auto uptr = std::make_unique(std::forward(args)...); - - ::add_stats_watcher_if_rx_source(*uptr, ns_name); - ::add_stats_watcher_if_rx_sink(*uptr, ns_name); - - return make_object(std::move(ns_name), std::move(uptr)); - } + std::shared_ptr> construct_object(std::string name, ArgsT&&... args); + /** + * Create a source node using the provided name and function, the function is lifted to an observable + * @tparam SourceTypeT the type of data produced by the source + * @tparam NodeTypeT the type of node to use as the source + * @tparam CreateFnT the type of function used to create the source + * @param name the name of the source + * @param create_fn the function that will be lifted into a Observable and used to create the source + * @return the created source + */ template class NodeTypeT = node::RxSource, typename CreateFnT> - auto make_source(std::string name, CreateFnT&& create_fn) - { - return construct_object>( - name, - rxcpp::observable<>::create(std::forward(create_fn))); - } + auto make_source(std::string name, CreateFnT&& create_fn); + + /** + * Create a source node using the provided name and observable + * @tparam SourceTypeT The type of elements emitted by the source node + * @tparam NodeTypeT The type of node to be created, with default set to node::RxSource + * @param name The name of the source node + * @param obs The observable emitting elements of type SourceTypeT + * @return An object of type NodeTypeT + */ + template class NodeTypeT = node::RxSource> + auto make_source(std::string name, rxcpp::observable obs); + /** + * Creates a new instance of the sink component of type `NodeTypeT` with name `name` and observer `ops`. + * @tparam SinkTypeT Type of the data received by the sink component. + * @tparam NodeTypeT Type of the node to be constructed. + * @tparam ArgsT Types of arguments passed to the observer. + * @param name The name of the sink component. + * @param ops The observer for the sink component. + * @return The constructed sink as an Object. + */ template class NodeTypeT = node::RxSink, typename... ArgsT> - auto make_sink(std::string name, ArgsT&&... ops) - { - return construct_object>(name, - rxcpp::make_observer(std::forward(ops)...)); - } + auto make_sink(std::string name, ArgsT&&... ops); + /** + * Creates a sink component and returns it as an object + * @tparam SinkTypeT the type of data that the sink component will receive + * @tparam NodeTypeT the type of the node component to be constructed + * @tparam ArgsT the type of the arguments for constructing the node component + * @param name the name of the sink component + * @param ops the arguments for constructing the node component + * @return The constructed sink component as an Object + */ template class NodeTypeT = node::RxSinkComponent, typename... ArgsT> - auto make_sink_component(std::string name, ArgsT&&... ops) - { - return construct_object>(name, - rxcpp::make_observer(std::forward(ops)...)); - } + auto make_sink_component(std::string name, ArgsT&&... ops); + /** + * Creates a new instance of the specified node type, with the given name and arguments. + * @tparam SinkTypeT The type of the sink to be created. + * @tparam SourceTypeT The type of the source to be created. + * @tparam NodeTypeT The type of the node to be created. + * @tparam ArgsT Variadic parameter pack, representing the arguments to be passed to the constructor of the node. + * @param name The name of the node to be created. + * @param ops The arguments to be passed to the constructor of the node. + * @return The newly created node. + */ template class NodeTypeT = node::RxNode, typename... ArgsT> - auto make_node(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node(std::string name, ArgsT&&... ops); + /** + * Constructs a node object of type NodeTypeT, initialized with arguments ops + * @tparam SinkTypeT The type of the sink + * @tparam SourceTypeT The type of the source + * @tparam NodeTypeT The type of the node, defaults to mrc::node::RxNode + * @tparam ArgsT The types of the arguments passed to the node constructor + * @param name The name of the node + * @param ops The arguments passed to the node constructor + * @return An object of type NodeTypeT constructed with the provided arguments + */ template class NodeTypeT = node::RxNode, typename... ArgsT> - auto make_node(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node(std::string name, ArgsT&&... ops); + /** + * Creates and returns an instance of a node component with the specified type, name and arguments. + * @tparam SinkTypeT The sink type of the node component to be created. + * @tparam SourceTypeT The source type of the node component to be created. + * @tparam NodeTypeT The type of the node component to be created. + * @tparam ArgsT Variadic template argument for the node component's constructor. + * @param name The name of the node component to be created. + * @param ops Variadic argument for the node component's constructor. + * @return An instance of the created node component. + */ template class NodeTypeT = node::RxNodeComponent, typename... ArgsT> - auto make_node_component(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node_component(std::string name, ArgsT&&... ops); /** * Instantiate a segment module of `ModuleTypeT`, intialize it, and return it to the caller @@ -202,21 +246,13 @@ class Builder final * @return Return a shared pointer to the new module, which is a derived class of SegmentModule */ template - std::shared_ptr make_module(std::string module_name, nlohmann::json config = {}) - { - static_assert(std::is_base_of_v); - - auto module = std::make_shared(std::move(module_name), std::move(config)); - init_module(module); - - return std::move(module); - } + std::shared_ptr make_module(std::string module_name, nlohmann::json config = {}); /** * Initialize a SegmentModule that was instantiated outside of the builder. * @param module Module to initialize */ - void init_module(std::shared_ptr module); + void init_module(std::shared_ptr smodule); /** * Register an input port on the given module -- note: this in generally only necessary for dynamically @@ -242,167 +278,266 @@ class Builder final */ void register_module_output(std::string output_name, std::shared_ptr object); + /** + * Load an existing, registered module, initialize it, and return it to the caller + * @param module_id Unique ID of the module to load + * @param registry_namespace Namespace where the module id is registered + * @param module_name Unique name of this instance of the module + * @param config Configuration to pass to the module + * @return Return a shared pointer to the new module, which is a derived class of SegmentModule + */ std::shared_ptr load_module_from_registry(const std::string& module_id, const std::string& registry_namespace, std::string module_name, nlohmann::json config = {}); - template - void make_edge(std::shared_ptr> source, std::shared_ptr> sink) - { - DVLOG(10) << "forming segment edge between two segment objects"; - mrc::make_edge(source->object(), sink->object()); - } + /** + * Create an edge between two things that are convertible to ObjectProperties + * @tparam SourceNodeTypeT Type hint for the source node -- optional -- this will be used if the type of the source + * object cannot be directly determined. + * @tparam SinkNodeTypeT Type hint for the sink node -- optional -- this will be used if the type of the sink object + * cannot be directly determined. + * @tparam SourceObjectT Concept conforming type of the source object + * @tparam SinkObjectT Concept conforming type of the sink object + * @param source Edge source + * @param sink Edge Sink + */ + template + void make_edge(SourceObjectT source, SinkObjectT sink); /** - * Partial dynamic edge construction: * - * Create edge using a fully constructed Object and a type erased Object - * We extract the underlying node object (Likely an RxNode) and call make_edge with it and the type erased - * object. This works via a cascaded type extraction process. - * @tparam SourceNodeTypeT - * @param source Fully typed, wrapped, object - * @param sink Type erased object -- assumed to be convertible to source type + * @tparam EdgeDataTypeT + * @tparam SourceObjectT Data type that can be resolved to an ObjectProperties, representing the source + * @tparam SinkObjectT Data type that can be resolved to an ObjectProperties, representing the sink + * @tparam SpliceInputObjectT Data type that can be resolved to an ObjectProperties, representing the splice's input + * @tparam SpliceOutputObjectT Data type that can be resolved to an ObjectProperties, representing the splice's + * output + * @param source Existing, connected, edge source + * @param sink Existing, connected, edge sink + * @param splice_input Existing, unconnected, splice input + * @param splice_output Existing, unconnected, splice output */ - template - void make_edge(std::shared_ptr>& source, std::shared_ptr sink) - { - if constexpr (is_base_of_template::value) - { - if (sink->is_writable_provider()) - { - mrc::make_edge(source->object(), - sink->template writable_provider_typed()); - return; - } - } + template + void splice_edge(SourceObjectT source, + SinkObjectT sink, + SpliceInputObjectT splice_input, + SpliceOutputObjectT splice_output); - if constexpr (is_base_of_template::value) - { - if (sink->is_readable_acceptor()) - { - mrc::make_edge(source->object(), - sink->template readable_acceptor_typed()); - return; - } - } + template + void add_throughput_counter(std::shared_ptr> segment_object); - LOG(ERROR) << "Incorrect node types"; - } + template + void add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable); - /** - * Partial dynamic edge construction: - * - * Create edge using a fully constructed Object and a type erased Object - * We extract the underlying node object (Likely an RxNode) and call make_edge with it and the type erased - * object. This works via a cascaded type extraction process. - * @tparam SinkNodeTypeT - * @param source Fully typed, wrapped, object - * @param sink Fully typed, wrapped, object - */ - template - void make_edge(std::shared_ptr source, std::shared_ptr>& sink) - { - if constexpr (is_base_of_template::value) - { - if (source->is_writable_acceptor()) - { - mrc::make_edge(source->template writable_acceptor_typed(), - sink->object()); - return; - } - } + private: + using sp_segment_module_t = std::shared_ptr; - if constexpr (is_base_of_template::value) - { - if (source->is_readable_provider()) - { - mrc::make_edge(source->template readable_provider_typed(), - sink->object()); - return; - } - } + std::string m_namespace_prefix; + std::vector m_namespace_stack{}; + std::vector m_module_stack{}; - LOG(ERROR) << "Incorrect node types"; - } + internal::segment::IBuilder& m_backend; + + void ns_push(sp_segment_module_t smodule); + + void ns_pop(); + + template + ObjectProperties& to_object_properties(ObjectReprT& repr); + + friend Definition; +}; + +template +std::shared_ptr> Builder::construct_object(std::string name, ArgsT&&... args) +{ + auto ns_name = m_namespace_prefix.empty() ? name : m_namespace_prefix + "/" + name; + auto uptr = std::make_unique(std::forward(args)...); - template - void make_dynamic_edge(const std::string& source_name, const std::string& sink_name) + ::add_stats_watcher_if_rx_source(*uptr, ns_name); + ::add_stats_watcher_if_rx_sink(*uptr, ns_name); + + return make_object(std::move(ns_name), std::move(uptr)); +} + +template class NodeTypeT, typename CreateFnT> +auto Builder::make_source(std::string name, CreateFnT&& create_fn) +{ + return construct_object>( + name, + rxcpp::observable<>::create(std::forward(create_fn))); +} + +template class NodeTypeT> +auto Builder::make_source(std::string name, rxcpp::observable obs) +{ + return construct_object>(name, obs); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_sink(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, rxcpp::make_observer(std::forward(ops)...)); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_sink_component(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, rxcpp::make_observer(std::forward(ops)...)); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_node(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template + class NodeTypeT, + typename... ArgsT> +auto Builder::make_node(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_node_component(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template +std::shared_ptr Builder::make_module(std::string module_name, nlohmann::json config) +{ + static_assert(std::is_base_of_v); + + auto smodule = std::make_shared(std::move(module_name), std::move(config)); + init_module(smodule); + + return std::move(smodule); +} + +template +void Builder::make_edge(SourceObjectT source, SinkObjectT sink) + +{ + DVLOG(10) << "forming edge between two segment objects"; + using source_sp_type_t = typename mrc_object_sptr_type_t::source_type_t; // Might be void + using sink_sp_type_t = typename mrc_object_sptr_type_t::sink_type_t; // Might be void + + auto& source_object = to_object_properties(source); + auto& sink_object = to_object_properties(sink); + + // If we can determine the type from the actual object, use that, then fall back to hints or defaults. + using deduced_source_type_t = first_non_void_type_t; // Fallback to Sink explicit hint + using deduced_sink_type_t = first_non_void_type_t; // Fallback to Source explicit hint + + VLOG(2) << "Deduced source type: " << mrc::type_name() << std::endl; + VLOG(2) << "Deduced sink type: " << mrc::type_name() << std::endl; + + if (source_object.is_writable_acceptor() && sink_object.is_writable_provider()) { - auto& source_obj = m_backend.find_object(source_name); - auto& sink_obj = m_backend.find_object(sink_name); - this->make_dynamic_edge(source_obj, sink_obj); + mrc::make_edge(source_object.template writable_acceptor_typed(), + sink_object.template writable_provider_typed()); + return; } - template - void make_dynamic_edge(std::shared_ptr source, - std::shared_ptr sink) + if (source_object.is_readable_provider() && sink_object.is_readable_acceptor()) { - this->make_dynamic_edge(*source, *sink); + mrc::make_edge(source_object.template readable_provider_typed(), + sink_object.template readable_acceptor_typed()); + return; } - template - void make_dynamic_edge(segment::ObjectProperties& source, segment::ObjectProperties& sink) + LOG(ERROR) << "Incompatible node types"; +} + +template +void Builder::splice_edge(SourceObjectT source, + SinkObjectT sink, + SpliceInputObjectT splice_input, + SpliceOutputObjectT splice_output) + +{ + auto& source_object = to_object_properties(source); + auto& sink_object = to_object_properties(sink); + + auto& splice_input_object = to_object_properties(splice_input); + auto& splice_output_object = to_object_properties(splice_output); + + CHECK(source_object.is_source()) << "Source object is not a source"; + CHECK(sink_object.is_sink()) << "Sink object is not a sink"; + + // TODO(Devin): this is slightly more constrained that it needs to be. We don't actually need to know the + // type of a provider, but because of the way type testing is done on the edg ebuilder, its a bit of a pain + // to pass in an untyped Provider. We can fix this later. + if (source_object.is_writable_acceptor()) { - if (source.is_writable_acceptor() && sink.is_writable_provider()) + if (sink_object.is_writable_provider()) { - mrc::make_edge(source.template writable_acceptor_typed(), - sink.template writable_provider_typed()); - return; - } + CHECK(splice_input_object.is_writable_provider()) << "Splice input must be of type WritableProvider"; + CHECK(splice_output_object.is_writable_acceptor()) << "Splice output must be WritableAcceptor"; - if (source.is_readable_provider() && sink.is_readable_acceptor()) - { - mrc::make_edge(source.template readable_provider_typed(), - sink.template readable_acceptor_typed()); - return; - } + // Cast our object into something we can insert as a splice. + auto& splice_writable_provider = splice_input_object.template writable_provider_typed(); + auto& splice_writable_acceptor = splice_output_object.template writable_acceptor_typed(); - LOG(ERROR) << "Incorrect node types"; - } + auto& writable_acceptor = source_object.template writable_acceptor_typed(); + auto& writable_provider = sink_object.template writable_provider_typed(); - template - void add_throughput_counter(std::shared_ptr> segment_object) - { - auto runnable = std::dynamic_pointer_cast>(segment_object); - CHECK(runnable); - CHECK(segment_object->is_source()); - using source_type_t = typename ObjectT::source_type_t; - auto counter = m_backend.make_throughput_counter(runnable->name()); - runnable->object().add_epilogue_tap([counter](const source_type_t& data) { - counter(1); - }); - } + edge::EdgeBuilder::splice_edge(writable_acceptor, + writable_provider, + splice_writable_provider, + splice_writable_acceptor); - template - void add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable) - { - auto runnable = std::dynamic_pointer_cast>(segment_object); - CHECK(runnable); - CHECK(segment_object->is_source()); - using source_type_t = typename ObjectT::source_type_t; - using tick_fn_t = std::function; - tick_fn_t tick_fn = callable; - auto counter = m_backend.make_throughput_counter(runnable->name()); - runnable->object().add_epilogue_tap([counter, tick_fn](const source_type_t& data) { - counter(tick_fn(data)); - }); + return; + } } + else if (source_object.is_readable_provider()) + { + if (sink_object.is_readable_acceptor()) + { + CHECK(splice_input_object.is_readable_acceptor()) << "Splice input must be of type ReadableAcceptor"; + CHECK(splice_output_object.is_readable_provider()) << "Splice output must be ReadableProvider"; - private: - using sp_segment_module_t = std::shared_ptr; + // Cast our object into something we can insert as a splice. + auto& splice_readable_acceptor = splice_input_object.template readable_acceptor_typed(); + auto& splice_readable_provider = splice_output_object.template readable_provider_typed(); - std::string m_namespace_prefix; - std::vector m_namespace_stack{}; - std::vector m_module_stack{}; + auto& readable_provider = source_object.template readable_provider_typed(); + auto& readable_acceptor = sink_object.template readable_acceptor_typed(); - internal::segment::IBuilder& m_backend; + edge::EdgeBuilder::splice_edge(readable_provider, + readable_acceptor, + splice_readable_acceptor, + splice_readable_provider); - void ns_push(sp_segment_module_t module); - void ns_pop(); + return; + } + } - friend Definition; -}; + throw std::runtime_error("Attempt to splice unsupported edge types"); +} template std::shared_ptr> Builder::make_object(std::string name, std::unique_ptr node) @@ -472,4 +607,75 @@ std::shared_ptr>> Builder::get_ingress(std::string return port; } -} // namespace mrc::segment +template +void Builder::add_throughput_counter(std::shared_ptr> segment_object) +{ + auto runnable = std::dynamic_pointer_cast>(segment_object); + CHECK(runnable); + CHECK(segment_object->is_source()); + using source_type_t = typename ObjectT::source_type_t; + auto counter = m_backend.make_throughput_counter(runnable->name()); + runnable->object().add_epilogue_tap([counter](const source_type_t& data) { + counter(1); + }); +} + +template +void Builder::add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable) +{ + auto runnable = std::dynamic_pointer_cast>(segment_object); + CHECK(runnable); + CHECK(segment_object->is_source()); + using source_type_t = typename ObjectT::source_type_t; + using tick_fn_t = std::function; + tick_fn_t tick_fn = callable; + auto counter = m_backend.make_throughput_counter(runnable->name()); + runnable->object().add_epilogue_tap([counter, tick_fn](const source_type_t& data) { + counter(tick_fn(data)); + }); +} + +/* Private Member Functions */ +template +ObjectProperties& Builder::to_object_properties(ObjectReprT& repr) +{ + ObjectProperties* object_properties_ptr{nullptr}; + if constexpr (is_shared_ptr_v) + { + // SP to Object + if constexpr (MRCObjectSharedPtr) + { + auto object_properties_ptr_props_ptr = std::dynamic_pointer_cast(repr); + object_properties_ptr = std::addressof(*object_properties_ptr_props_ptr); + } + // SP to ObjectProperties + else if constexpr (MRCObjPropSharedPtr) + { + object_properties_ptr = std::addressof(*repr); + } + { + object_properties_ptr = std::addressof(*repr); + } + } + // Object + else if constexpr (MRCObject) + { + object_properties_ptr = std::addressof(dynamic_cast(repr)); + } + // ObjectProperties + else if constexpr (MRCObjProp) + { + object_properties_ptr = std::addressof(repr); + } + // String-like lookup + else + { + object_properties_ptr = std::addressof(m_backend.find_object(repr)); + } + + CHECK(object_properties_ptr != nullptr) << "If this fails, something is wrong with the concept definition"; + + return *object_properties_ptr; +} + +} // namespace mrc::segment \ No newline at end of file diff --git a/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp b/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp new file mode 100644 index 000000000..862b3f0ba --- /dev/null +++ b/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp @@ -0,0 +1,90 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// Created by drobison on 1/20/23. +// + +#pragma once + +#include "mrc/segment/object.hpp" +#include "mrc/type_traits.hpp" + +#include + +namespace mrc { + +template +struct is_mrc_object_type : public std::false_type +{}; + +template +struct is_mrc_object_type> : public std::true_type +{}; + +template +inline constexpr bool is_mrc_object_v = is_mrc_object_type::value; // NOLINT + +template +struct is_mrc_object_shared_pointer : public std::false_type +{}; + +template +struct is_mrc_object_shared_pointer>> : public std::true_type +{}; + +template +inline constexpr bool is_mrc_object_shared_ptr_v = is_mrc_object_shared_pointer::value; // NOLINT + +struct mrc_object_null_type // NOLINT +{ + using source_type_t = void; + using sink_type_t = void; +}; + +template +struct mrc_object_sptr_type // NOLINT +{ + using type_t = mrc_object_null_type; +}; + +template +struct mrc_object_sptr_type>> +{ + using type_t = T; +}; + +template +using mrc_object_sptr_type_t = typename mrc_object_sptr_type::type_t; + +template +concept MRCObject = is_mrc_object_v; + +template +concept MRCObjectSharedPtr = is_mrc_object_shared_ptr_v; + +template +concept MRCObjProp = std::is_same_v, mrc::segment::ObjectProperties>; + +template +concept MRCObjPropSharedPtr = std::is_same_v, std::shared_ptr>; + +template +concept MRCObjectProxy = MRCObject || MRCObjectSharedPtr || MRCObjProp || + MRCObjPropSharedPtr || std::is_convertible_v; + +} // namespace mrc diff --git a/cpp/mrc/include/mrc/segment/object.hpp b/cpp/mrc/include/mrc/segment/object.hpp index 1b7a7dd50..9d389a88a 100644 --- a/cpp/mrc/include/mrc/segment/object.hpp +++ b/cpp/mrc/include/mrc/segment/object.hpp @@ -353,5 +353,4 @@ edge::IReadableProviderBase& Object::readable_provider_base() CHECK(base); return *base; } - } // namespace mrc::segment diff --git a/cpp/mrc/include/mrc/type_traits.hpp b/cpp/mrc/include/mrc/type_traits.hpp index 19f235fd5..d8bb459ad 100644 --- a/cpp/mrc/include/mrc/type_traits.hpp +++ b/cpp/mrc/include/mrc/type_traits.hpp @@ -115,4 +115,26 @@ template