From 4bb794272583043dcd0c93f14752982224ec1a94 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Mon, 30 Jan 2023 18:08:25 -0700 Subject: [PATCH 01/14] Creating branch for v23.03 From 4f977a84ba968a4735573ee4a00d42b7b23171e5 Mon Sep 17 00:00:00 2001 From: Michael Demoret <42954918+mdemoret-nv@users.noreply.github.com> Date: Tue, 31 Jan 2023 11:17:57 -0700 Subject: [PATCH 02/14] Updating to use driver 520 (#282) Authors: - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - Devin Robison (https://github.com/drobison00) - Ryan Olson (https://github.com/ryanolson) URL: https://github.com/nv-morpheus/MRC/pull/282 --- .github/workflows/ci_pipe.yml | 4 ++-- .github/workflows/pull_request.yml | 4 ++-- Dockerfile | 8 +++++++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 6f6d95fb6..9b8c55e5e 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -114,7 +114,7 @@ jobs: test: name: Test needs: [build] - runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-520-1] timeout-minutes: 60 container: credentials: @@ -146,7 +146,7 @@ jobs: codecov: name: Code Coverage - runs-on: [self-hosted, linux, amd64, gpu-v100-495-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-520-1] timeout-minutes: 60 container: credentials: diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 6cd91d66e..1178a10d1 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -33,8 +33,8 @@ jobs: with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-221130 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-base-221130 + container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230131 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230131 secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} CONDA_TOKEN: ${{ secrets.CONDA_TOKEN }} diff --git a/Dockerfile b/Dockerfile index d9c0a6944..e78d512b1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,10 +62,16 @@ RUN --mount=type=cache,target=/var/cache/apt \ apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ - libnvidia-compute-495 \ + libnvidia-compute-520 \ && \ rm -rf /var/lib/apt/lists/* +# ============ test ================== +FROM base as test + +# Add any test only dependencies here. For now there is none but we need the +# target to get the CI runner build scripts to work + # ========= development ================ FROM base as development From e170431fa4df0e62fc0027fa89f99d58a524c4d1 Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Wed, 8 Feb 2023 15:41:03 -0700 Subject: [PATCH 03/14] Update CMake to only add fcoroutines flag if clang version is less than 16 + bump to latest utils (#288) Clang will complain about deprecated flags if you pass fcoroutines to 16 or higher; this handles both cases and avoids warning spam. Authors: - Devin Robison (https://github.com/drobison00) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/MRC/pull/288 --- CMakeLists.txt | 13 ++++++++++++- external/utilities | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 207f6e4ef..bcbe858af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,7 +100,18 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") # Our version of NVCC officially only supports clang versions 3.2 - 13, we are now using 14 set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -allow-unsupported-compiler") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines-ts") + # Check if the major version of Clang is greater than 15 + execute_process(COMMAND "${CMAKE_CXX_COMPILER}" "--version" + OUTPUT_VARIABLE clang_version_info + ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE) + + string(REGEX MATCH "version [0-9]+" clang_version_value ${clang_version_info}) + string(REGEX REPLACE "version ([0-9]+)" "\\1" clang_version ${clang_version_value}) + + if(${clang_version} VERSION_LESS_EQUAL "15") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines-ts") + endif() + endif() # Now enable CUDA diff --git a/external/utilities b/external/utilities index 6e1d4e62e..41136ca75 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 6e1d4e62e8ad36a3ff45652f4d1aa03810de3751 +Subproject commit 41136ca75c895da56bb973b539c1b18f2f5a5d24 From 7a6ea6a1d025a7f17aad79fa70a7aa0abf7307f2 Mon Sep 17 00:00:00 2001 From: AJ Schmidt Date: Tue, 14 Feb 2023 10:55:54 -0500 Subject: [PATCH 04/14] Update `sccache` bucket (#289) This PR updates the `sccache` configuration settings to use a new bucket, `rapids-sccache-east`. Unlike the previous `rapids-sccache` bucket, `rapids-sccache-east` resides in the same AWS region as the rest of our CI infrastructure (`us-east-2`). This should result in faster, more reliable `sccache` connections and will also help keep our data transfer costs down. **Important Note**: The changes from [this `sccache` PR](https://github.com/mozilla/sccache/pull/1403) are required to use a bucket in `us-east-2`. These changes were incorporated in `sccache` `v0.3.2`, so you'll need that version or later (preferably the latest, `v0.3.3`). Please ensure that your CI images satisfy this constraint before merging this PR. Additionally, this PR updates the `SCCACHE_REGION` to the region of the bucket. This value is not meant to be a variable. Authors: - AJ Schmidt (https://github.com/ajschmidt8) - David Gardner (https://github.com/dagardner-nv) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/MRC/pull/289 --- .github/workflows/ci_pipe.yml | 4 ---- .github/workflows/pull_request.yml | 4 ++-- ci/scripts/github/build.sh | 1 + ci/scripts/github/common.sh | 4 ++-- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 9b8c55e5e..08ff218cf 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -19,9 +19,6 @@ run-name: CI Pipeline on: workflow_call: inputs: - aws_region: - default: 'us-west-2' - type: string run_check: required: true type: boolean @@ -47,7 +44,6 @@ on: required: true env: - AWS_DEFAULT_REGION: ${{ inputs.aws_region }} AWS_ACCESS_KEY_ID: "${{ secrets.GHA_AWS_ACCESS_KEY_ID }}" AWS_SECRET_ACCESS_KEY: "${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }}" CHANGE_TARGET: "${{ github.base_ref }}" diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 1178a10d1..9b456e4ef 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -33,8 +33,8 @@ jobs: with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230131 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230131 + container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230213 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230213 secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} CONDA_TOKEN: ${{ secrets.CONDA_TOKEN }} diff --git a/ci/scripts/github/build.sh b/ci/scripts/github/build.sh index 7e6db5e6a..c1eae4d58 100755 --- a/ci/scripts/github/build.sh +++ b/ci/scripts/github/build.sh @@ -26,6 +26,7 @@ rapids-logger "Check versions" python3 --version cmake --version ninja --version +sccache --version if [[ "${BUILD_CC}" == "gcc" ]]; then rapids-logger "Building with GCC" diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index ccd36ab71..8765d51e6 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -59,8 +59,8 @@ export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}" # Set sccache env vars export SCCACHE_S3_KEY_PREFIX=mrc-${NVARCH}-${BUILD_CC} -export SCCACHE_BUCKET=rapids-sccache -export SCCACHE_REGION="${AWS_DEFAULT_REGION}" +export SCCACHE_BUCKET=rapids-sccache-east +export SCCACHE_REGION="us-east-2" export SCCACHE_IDLE_TIMEOUT=32768 #export SCCACHE_LOG=debug From 588a956abd98fae5bd03ec09836d67162db24672 Mon Sep 17 00:00:00 2001 From: Jordan Jacobelli Date: Tue, 7 Mar 2023 18:30:12 +0100 Subject: [PATCH 05/14] Use AWS OIDC to get AWS creds (#294) This PR is using the AWS OIDC to get AWS credentials ([doc](https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-amazon-web-services)). Currently we are using permanent tokens that we need to manually rotate every 90 days. This PR is removing this requirement. The `AWS_ROLE_ARN` and `AWS_REGION` are orgs variables defined here: https://github.com/organizations/nv-morpheus/settings/variables/actions. Authors: - Jordan Jacobelli (https://github.com/jjacobelli) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/MRC/pull/294 --- .github/workflows/ci_pipe.yml | 62 +++++++++++++++++++++++++++--- .github/workflows/pull_request.yml | 4 +- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 08ff218cf..b87f5995d 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -36,16 +36,10 @@ on: required: true CONDA_TOKEN: required: true - GHA_AWS_ACCESS_KEY_ID: - required: true - GHA_AWS_SECRET_ACCESS_KEY: - required: true NGC_API_KEY: required: true env: - AWS_ACCESS_KEY_ID: "${{ secrets.GHA_AWS_ACCESS_KEY_ID }}" - AWS_SECRET_ACCESS_KEY: "${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }}" CHANGE_TARGET: "${{ github.base_ref }}" GH_TOKEN: "${{ github.token }}" GIT_COMMIT: "${{ github.sha }}" @@ -67,6 +61,8 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true + permissions: + id-token: write steps: - name: Checkout @@ -76,6 +72,12 @@ jobs: path: 'mrc' fetch-depth: 0 + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: Check shell: bash run: ./mrc/ci/scripts/github/checks.sh @@ -93,6 +95,8 @@ jobs: fail-fast: true matrix: build_cc: ["gcc", "clang"] + permissions: + id-token: write steps: - name: Checkout @@ -101,6 +105,12 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: Build:linux:x86_64 shell: bash env: @@ -125,6 +135,8 @@ jobs: fail-fast: true matrix: build_cc: ["gcc", "clang"] + permissions: + id-token: write steps: - name: Checkout @@ -133,6 +145,12 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: Test:linux:x86_64 shell: bash env: @@ -155,6 +173,8 @@ jobs: image: ${{ inputs.test_container }} strategy: fail-fast: true + permissions: + id-token: write steps: - name: Checkout @@ -163,6 +183,12 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: Build shell: bash run: ./mrc/ci/scripts/github/build.sh @@ -185,6 +211,8 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true + permissions: + id-token: write steps: - name: Checkout @@ -193,6 +221,12 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: build_docs shell: bash run: ./mrc/ci/scripts/github/docs.sh @@ -210,6 +244,8 @@ jobs: options: --cap-add=sys_nice strategy: fail-fast: true + permissions: + id-token: write steps: - name: Checkout @@ -218,6 +254,12 @@ jobs: lfs: false path: 'mrc' + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: pre_benchmark shell: bash run: ./mrc/ci/scripts/github/pre_benchmark.sh @@ -242,6 +284,8 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true + permissions: + id-token: write steps: - name: Checkout @@ -251,6 +295,12 @@ jobs: path: 'mrc' fetch-depth: 0 + - name: Get AWS credentials using OIDC + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: ${{ vars.AWS_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + - name: conda shell: bash env: diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 9b456e4ef..ca1dc7a72 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -30,6 +30,8 @@ concurrency: jobs: ci_pipe: uses: ./.github/workflows/ci_pipe.yml + permissions: + id-token: write with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} @@ -38,6 +40,4 @@ jobs: secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} CONDA_TOKEN: ${{ secrets.CONDA_TOKEN }} - GHA_AWS_ACCESS_KEY_ID: ${{ secrets.GHA_AWS_ACCESS_KEY_ID }} - GHA_AWS_SECRET_ACCESS_KEY: ${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }} NGC_API_KEY: ${{ secrets.NGC_API_KEY }} From 8747af78a0b5394bb08a28c6efbc2e400fa9c587 Mon Sep 17 00:00:00 2001 From: Jordan Jacobelli Date: Tue, 7 Mar 2023 21:18:07 +0100 Subject: [PATCH 06/14] Set AWS credentials lifetime to 12h (#295) Increasing the AWS credentials lifetime from 1h to 12h Authors: - Jordan Jacobelli (https://github.com/jjacobelli) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/MRC/pull/295 --- .github/workflows/ci_pipe.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index b87f5995d..e5ad03748 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -77,6 +77,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: Check shell: bash @@ -110,6 +111,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: Build:linux:x86_64 shell: bash @@ -150,6 +152,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: Test:linux:x86_64 shell: bash @@ -188,6 +191,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: Build shell: bash @@ -226,6 +230,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: build_docs shell: bash @@ -259,6 +264,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: pre_benchmark shell: bash @@ -300,6 +306,7 @@ jobs: with: role-to-assume: ${{ vars.AWS_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} + role-duration-seconds: 43200 # 12h - name: conda shell: bash From d3f9d4c26356212c9093237216925b095311d2d8 Mon Sep 17 00:00:00 2001 From: AJ Schmidt Date: Wed, 8 Mar 2023 12:18:11 -0500 Subject: [PATCH 07/14] Update workflow `permissions` block (#296) This PR is a continuation of #294. Omitting the other keys in the `permissions` block sets them to `none` ([src](https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs#overview)). This is probably not a concern for `nv-morpheus`, but it caused issues for some private RAPIDS repositories so I wanted to update the `nv-morpheus` repos for consistency. To remedy this, this PR includes the following changes: - Keeps the `id-token` permission as `write`, but sets the remaining permissions as described in the `restricted` column [here](https://docs.github.com/en/actions/security-guides/automatic-token-authentication#permissions-for-the-github_token) - Also sets `pull_request` to `read` so that `fetch_base_branch` in `common.sh` can use it - Moves the `permissions` blocks to the top of the workflows to DRY them up Authors: - AJ Schmidt (https://github.com/ajschmidt8) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/MRC/pull/296 --- .github/workflows/ci_pipe.yml | 28 ++++++++++++++-------------- .github/workflows/pull_request.yml | 17 +++++++++++++++-- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index e5ad03748..a2d7b0533 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -47,6 +47,20 @@ env: WORKSPACE: "${{ github.workspace }}/mrc" WORKSPACE_TMP: "${{ github.workspace }}/tmp" +permissions: + actions: none + checks: none + contents: read + deployments: none + discussions: none + id-token: write + issues: none + packages: read + pages: none + pull-requests: read + repository-projects: none + security-events: none + statuses: none jobs: check: @@ -61,8 +75,6 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true - permissions: - id-token: write steps: - name: Checkout @@ -96,8 +108,6 @@ jobs: fail-fast: true matrix: build_cc: ["gcc", "clang"] - permissions: - id-token: write steps: - name: Checkout @@ -137,8 +147,6 @@ jobs: fail-fast: true matrix: build_cc: ["gcc", "clang"] - permissions: - id-token: write steps: - name: Checkout @@ -176,8 +184,6 @@ jobs: image: ${{ inputs.test_container }} strategy: fail-fast: true - permissions: - id-token: write steps: - name: Checkout @@ -215,8 +221,6 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true - permissions: - id-token: write steps: - name: Checkout @@ -249,8 +253,6 @@ jobs: options: --cap-add=sys_nice strategy: fail-fast: true - permissions: - id-token: write steps: - name: Checkout @@ -290,8 +292,6 @@ jobs: image: ${{ inputs.container }} strategy: fail-fast: true - permissions: - id-token: write steps: - name: Checkout diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index ca1dc7a72..5bfdf0948 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -27,11 +27,24 @@ concurrency: group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' cancel-in-progress: true +permissions: + actions: none + checks: none + contents: read + deployments: none + discussions: none + id-token: write + issues: none + packages: read + pages: none + pull-requests: read + repository-projects: none + security-events: none + statuses: none + jobs: ci_pipe: uses: ./.github/workflows/ci_pipe.yml - permissions: - id-token: write with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} From a89b849e227d54f03425c8fae6069f35b679f46f Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Wed, 8 Mar 2023 12:05:52 -0800 Subject: [PATCH 08/14] Pointer cast macro (#293) In debug it uses a dynamic cast and asserts not null, in release it uses a static cast. In Morpheus I have a bunch of code that looks like: ``` DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); return std::static_pointer_cast(self.memory); ``` This would allow for: ``` return MRC_PTR_CAST(morpheus::InferenceMemory, self.memory); ``` Related to: https://github.com/nv-morpheus/Morpheus/issues/735 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Ryan Olson (https://github.com/ryanolson) URL: https://github.com/nv-morpheus/MRC/pull/293 --- cpp/mrc/include/mrc/utils/macros.hpp | 6 ++++ cpp/mrc/tests/CMakeLists.txt | 1 + cpp/mrc/tests/test_macros.cpp | 53 ++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 cpp/mrc/tests/test_macros.cpp diff --git a/cpp/mrc/include/mrc/utils/macros.hpp b/cpp/mrc/include/mrc/utils/macros.hpp index 8edc6c7be..20ab0dabd 100644 --- a/cpp/mrc/include/mrc/utils/macros.hpp +++ b/cpp/mrc/include/mrc/utils/macros.hpp @@ -23,6 +23,12 @@ // __COUNTER__ isnt standard but is supported by msvc, gcc and clang #define MRC_UNIQUE_VAR_NAME(prefix) MRC_CONCAT_EVAL(prefix, __COUNTER__) +#if defined(NDEBUG) + #define MRC_PTR_CAST(PTR_T, ptr) std::static_pointer_cast(ptr) +#else + #define MRC_PTR_CAST(PTR_T, ptr) DCHECK_NOTNULL(std::dynamic_pointer_cast(ptr)) +#endif + #ifndef DELETE_COPYABILITY #define DELETE_COPYABILITY(foo) \ foo(const foo&) = delete; \ diff --git a/cpp/mrc/tests/CMakeLists.txt b/cpp/mrc/tests/CMakeLists.txt index 39724b0ca..b0b35991c 100644 --- a/cpp/mrc/tests/CMakeLists.txt +++ b/cpp/mrc/tests/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable(test_mrc test_channel.cpp test_edges.cpp test_executor.cpp + test_macros.cpp test_main.cpp test_metrics.cpp test_mrc.cpp diff --git a/cpp/mrc/tests/test_macros.cpp b/cpp/mrc/tests/test_macros.cpp new file mode 100644 index 000000000..3ae21b7e8 --- /dev/null +++ b/cpp/mrc/tests/test_macros.cpp @@ -0,0 +1,53 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "test_mrc.hpp" // IWYU pragma: associated + +#include "mrc/utils/macros.hpp" + +#include // IWYU pragma: keep +#include + +#include + +namespace { +class A +{ + public: + A(int val) : val(val) {} + int val; +}; + +class B : public A +{ + public: + B(int val) : A(val) {} +}; +} // namespace + +TEST_CLASS(Macros); + +TEST_F(TestMacros, MRC_PTR_CAST) +{ + // We can't test the fail case as that terminates, + // in addition to that we run tests in CI against release not debug builds + auto b_ptr = std::make_shared(5); + auto a_ptr = MRC_PTR_CAST(A, b_ptr); + + EXPECT_NE(a_ptr, nullptr); + EXPECT_EQ(a_ptr->val, 5); +} From c509b1c0bcab0c715a406a604a059c7756ce8ee1 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Wed, 8 Mar 2023 13:46:08 -0800 Subject: [PATCH 09/14] Cleanup top-level forward.hpp and types.hpp (#292) * Remove forward declares for classes that no longer exist, or no longer exist in the top-level `mrc` namespace. * Remove duplicate entries. * Forward-declare structs as structs. * Fix casing of `CpuSet` & `NumaSet` structs. * Remove unused type-aliases from `types.hpp`, this could be a potentially breaking change if any users were using these types. fixes #291 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Ryan Olson (https://github.com/ryanolson) URL: https://github.com/nv-morpheus/MRC/pull/292 --- cpp/mrc/include/mrc/forward.hpp | 27 +++------------------------ cpp/mrc/include/mrc/types.hpp | 15 --------------- 2 files changed, 3 insertions(+), 39 deletions(-) diff --git a/cpp/mrc/include/mrc/forward.hpp b/cpp/mrc/include/mrc/forward.hpp index 1ac39f275..4eb296c25 100644 --- a/cpp/mrc/include/mrc/forward.hpp +++ b/cpp/mrc/include/mrc/forward.hpp @@ -20,36 +20,15 @@ namespace mrc { class Executor; -class ExecutorBase; -template -class NetworkDeserializer; - -class Options; - -class Runtime; - -class Placement; -class PlacementGroup; +struct PlacementGroup; namespace pipeline { class Pipeline; } // namespace pipeline -class PipelineConfiguration; - -class SegmentResources; - -class PipelineInstanceResources; - -class PipelineConfiguration; -class SegmentConfiguration; - -class PipelineAssignment; -class SegmentAssignment; - -class Cpuset; -class NumaSet; +struct CpuSet; +struct NumaSet; class Options; class FiberPoolOptions; diff --git a/cpp/mrc/include/mrc/types.hpp b/cpp/mrc/include/mrc/types.hpp index 8fec05be2..fe1babbbc 100644 --- a/cpp/mrc/include/mrc/types.hpp +++ b/cpp/mrc/include/mrc/types.hpp @@ -24,9 +24,6 @@ namespace mrc { -// template -// using blocking_queue = boost::fibers::buffered_channel; - // Typedefs template using Promise = userspace_threads::promise; // NOLINT(readability-identifier-naming) @@ -45,27 +42,15 @@ using MachineID = std::uint64_t; // NOLINT(readability-identifier-naming) using InstanceID = std::uint64_t; // NOLINT(readability-identifier-naming) using TagID = std::uint64_t; // NOLINT(readability-identifier-naming) -using NodeID = std::uint32_t; // NOLINT(readability-identifier-naming) -using ObjectID = std::uint32_t; // NOLINT(readability-identifier-naming) - template using Handle = std::shared_ptr; // NOLINT(readability-identifier-naming) -using SegmentName = std::string; // NOLINT(readability-identifier-naming) using SegmentID = std::uint16_t; // NOLINT(readability-identifier-naming) using SegmentRank = std::uint16_t; // NOLINT(readability-identifier-naming) using SegmentAddress = std::uint32_t; // NOLINT(readability-identifier-naming) // id + rank using PortName = std::string; // NOLINT(readability-identifier-naming) using PortID = std::uint16_t; // NOLINT(readability-identifier-naming) -using PortGroup = std::uint32_t; // NOLINT(readability-identifier-naming) // port + group_id using PortAddress = std::uint64_t; // NOLINT(readability-identifier-naming) // id + rank + port -using CpuID = std::uint32_t; // NOLINT(readability-identifier-naming) -using GpuID = std::uint32_t; // NOLINT(readability-identifier-naming) - -using ResourceGroupID = std::size_t; // NOLINT(readability-identifier-naming) - -using Tags = std::vector; // NOLINT - } // namespace mrc From a5be44140f76b10802c61674ee0ac2694de4b00c Mon Sep 17 00:00:00 2001 From: Jordan Jacobelli Date: Wed, 15 Mar 2023 22:11:41 +0100 Subject: [PATCH 10/14] Updating to driver 525 (#299) Updating to use driver 520 Authors: - Jordan Jacobelli (https://github.com/jjacobelli) - Michael Demoret (https://github.com/mdemoret-nv) - David Gardner (https://github.com/dagardner-nv) Approvers: - David Gardner (https://github.com/dagardner-nv) - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/MRC/pull/299 --- .github/workflows/ci_pipe.yml | 4 ++-- .github/workflows/pull_request.yml | 4 ++-- Dockerfile | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index a2d7b0533..225d95f33 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -132,7 +132,7 @@ jobs: test: name: Test needs: [build] - runs-on: [self-hosted, linux, amd64, gpu-v100-520-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-525-1] timeout-minutes: 60 container: credentials: @@ -171,7 +171,7 @@ jobs: codecov: name: Code Coverage - runs-on: [self-hosted, linux, amd64, gpu-v100-520-1] + runs-on: [self-hosted, linux, amd64, gpu-v100-525-1] timeout-minutes: 60 container: credentials: diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 5bfdf0948..e310aa2d1 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -48,8 +48,8 @@ jobs: with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} run_package_conda: ${{ !startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230213 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230213 + container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-driver-230315 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:mrc-ci-test-230315 secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} CONDA_TOKEN: ${{ secrets.CONDA_TOKEN }} diff --git a/Dockerfile b/Dockerfile index e78d512b1..266d514c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,7 +62,7 @@ RUN --mount=type=cache,target=/var/cache/apt \ apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ - libnvidia-compute-520 \ + libnvidia-compute-525 \ && \ rm -rf /var/lib/apt/lists/* From d0c9ae69333e95bbe87e665c7cb0de4a0ab18998 Mon Sep 17 00:00:00 2001 From: Michael Demoret <42954918+mdemoret-nv@users.noreply.github.com> Date: Wed, 15 Mar 2023 15:20:52 -0600 Subject: [PATCH 11/14] Improvements to the python module generation CMake code (#298) This is the corresponding update to incorporate changes from https://github.com/nv-morpheus/utilities/pull/17. Mainly this reorganizes the python CMake checks so they only get executed once. Significantly speeding up the configure step. Authors: - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - David Gardner (https://github.com/dagardner-nv) - Devin Robison (https://github.com/drobison00) - Christopher Harris (https://github.com/cwharris) URL: https://github.com/nv-morpheus/MRC/pull/298 --- .gitmodules | 2 +- CMakeLists.txt | 2 +- docs/quickstart/CMakeLists.txt | 12 ++++-- docs/quickstart/compile.sh | 4 +- docs/quickstart/environment_cpp.yml | 2 +- .../mrc_qs_hybrid/common/CMakeLists.txt | 6 --- .../ex00_wrap_data_objects/CMakeLists.txt | 4 +- .../ex01_wrap_nodes/CMakeLists.txt | 4 +- external/utilities | 2 +- python/CMakeLists.txt | 43 +++++-------------- 10 files changed, 28 insertions(+), 53 deletions(-) diff --git a/.gitmodules b/.gitmodules index 10b4ee8b5..cdeafc917 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "morpheus_utils"] path = external/utilities url = https://github.com/nv-morpheus/utilities.git - branch = branch-23.01 + branch = branch-23.03 diff --git a/CMakeLists.txt b/CMakeLists.txt index bcbe858af..1ec9924d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,7 +78,7 @@ morpheus_utils_initialize_package_manager( morpheus_utils_initialize_cuda_arch(mrc) project(mrc - VERSION 23.01.00 + VERSION 23.03.00 LANGUAGES C CXX ) diff --git a/docs/quickstart/CMakeLists.txt b/docs/quickstart/CMakeLists.txt index 216955826..98271b9d6 100644 --- a/docs/quickstart/CMakeLists.txt +++ b/docs/quickstart/CMakeLists.txt @@ -17,23 +17,29 @@ list(APPEND CMAKE_MESSAGE_CONTEXT "quickstart") cmake_minimum_required(VERSION 3.24 FATAL_ERROR) +set(MRC_CACHE_DIR "${CMAKE_SOURCE_DIR}/.cache" CACHE PATH "Directory to contain all CPM and CCache data") +mark_as_advanced(MRC_CACHE_DIR) + # Add the Conda environment to the prefix path and add the CMake files list(PREPEND CMAKE_PREFIX_PATH "$ENV{CONDA_PREFIX}") list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../../cmake") +list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../../external/utilities/cmake") -morpheus_utils_python_modules_ensure_python3() +include(morpheus_utils/load) project(mrc-quickstart - VERSION 23.01 + VERSION 23.03 LANGUAGES C CXX ) +morpheus_utils_initialize_cpm(MRC_CACHE_DIR) + # Ensure CPM is initialized rapids_cpm_init() # Set the option prefix to match the outer project before including. Must be before find_package(mrc) set(OPTION_PREFIX "MRC") -include(morpheus_utils/python/register_api) +morpheus_utils_python_ensure_loaded() rapids_find_package(mrc REQUIRED) rapids_find_package(CUDAToolkit REQUIRED) diff --git a/docs/quickstart/compile.sh b/docs/quickstart/compile.sh index aa896a2b6..3895e8950 100755 --- a/docs/quickstart/compile.sh +++ b/docs/quickstart/compile.sh @@ -24,8 +24,8 @@ echo "Runing CMake configure..." cmake -B ${BUILD_DIR} -GNinja \ -DCMAKE_MESSAGE_CONTEXT_SHOW=ON \ -DMRC_PYTHON_INPLACE_BUILD:BOOL=ON \ - -DMRC_PYTHON_PERFORM_INSTALL:BOOL=ON `# Ensure all of the libraries are installed` \ - ${CMAKE_CONFIGURE_EXTRA_ARGS:-""} . + -DMRC_PYTHON_PERFORM_INSTALL:BOOL=ON # Ensure all of the libraries are installed` \ + ${CMAKE_CONFIGURE_EXTRA_ARGS:+CMAKE_CONFIGURE_EXTRA_ARGS} . echo "Running CMake build..." cmake --build ${BUILD_DIR} -j "$@" diff --git a/docs/quickstart/environment_cpp.yml b/docs/quickstart/environment_cpp.yml index 3ee4bbe0a..bb5408d10 100644 --- a/docs/quickstart/environment_cpp.yml +++ b/docs/quickstart/environment_cpp.yml @@ -31,7 +31,7 @@ dependencies: - python=3.8 - scikit-build>=0.12 - spdlog=1.8.5 - - mrc=23.01 + - mrc=23.03 - sysroot_linux-64=2.17 - pip: - cython diff --git a/docs/quickstart/hybrid/mrc_qs_hybrid/common/CMakeLists.txt b/docs/quickstart/hybrid/mrc_qs_hybrid/common/CMakeLists.txt index a53462b00..add9eec15 100644 --- a/docs/quickstart/hybrid/mrc_qs_hybrid/common/CMakeLists.txt +++ b/docs/quickstart/hybrid/mrc_qs_hybrid/common/CMakeLists.txt @@ -15,8 +15,6 @@ morpheus_utils_add_pybind11_library( data - # MODULE_ROOT - # ${QUICKSTART_HYBRID_HOME} SOURCE_FILES data.cpp LINK_TARGETS @@ -30,8 +28,6 @@ target_include_directories(${common_data_target} ./include ) -morpheus_utils_inplace_build_copy(${common_data_target} ${CMAKE_CURRENT_SOURCE_DIR}) - morpheus_utils_add_pybind11_library( nodes SOURCE_FILES @@ -42,8 +38,6 @@ morpheus_utils_add_pybind11_library( nodes_data_target ) -morpheus_utils_inplace_build_copy(${nodes_data_target} ${CMAKE_CURRENT_SOURCE_DIR}) - # Set this variable in the parent scope so other examples can link to it set(common_data_target ${common_data_target} PARENT_SCOPE) set(nodes_data_target ${nodes_data_target} PARENT_SCOPE) diff --git a/docs/quickstart/hybrid/mrc_qs_hybrid/ex00_wrap_data_objects/CMakeLists.txt b/docs/quickstart/hybrid/mrc_qs_hybrid/ex00_wrap_data_objects/CMakeLists.txt index 7cd4e0ce8..c46b9b0bd 100644 --- a/docs/quickstart/hybrid/mrc_qs_hybrid/ex00_wrap_data_objects/CMakeLists.txt +++ b/docs/quickstart/hybrid/mrc_qs_hybrid/ex00_wrap_data_objects/CMakeLists.txt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -morpheus_utils_add_pybind11_module( +mrc_quickstart_add_pybind11_module( data MODULE_ROOT ${QUICKSTART_HYBRID_HOME} @@ -24,5 +24,3 @@ morpheus_utils_add_pybind11_module( OUTPUT_TARGET data_target ) - -morpheus_utils_inplace_build_copy(${data_target} ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/docs/quickstart/hybrid/mrc_qs_hybrid/ex01_wrap_nodes/CMakeLists.txt b/docs/quickstart/hybrid/mrc_qs_hybrid/ex01_wrap_nodes/CMakeLists.txt index 6b3b429a2..60ede0c59 100644 --- a/docs/quickstart/hybrid/mrc_qs_hybrid/ex01_wrap_nodes/CMakeLists.txt +++ b/docs/quickstart/hybrid/mrc_qs_hybrid/ex01_wrap_nodes/CMakeLists.txt @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -morpheus_utils_add_pybind11_module( +mrc_quickstart_add_pybind11_module( nodes MODULE_ROOT ${QUICKSTART_HYBRID_HOME} @@ -24,5 +24,3 @@ morpheus_utils_add_pybind11_module( OUTPUT_TARGET nodes_target ) - -morpheus_utils_inplace_build_copy(${nodes_target} ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/external/utilities b/external/utilities index 41136ca75..3fde91526 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 41136ca75c895da56bb973b539c1b18f2f5a5d24 +Subproject commit 3fde91526855f6f3d01b3e58f904647a97257f4b diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 79676a206..03aa35d73 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -17,17 +17,10 @@ list(APPEND CMAKE_MESSAGE_CONTEXT "python") find_package(CUDAToolkit REQUIRED) -# Get the project name in uppercase if OPTION_PREFIX is not defined -if(NOT DEFINED OPTION_PREFIX) - string(TOUPPER "${PROJECT_NAME}" OPTION_PREFIX) -endif() -option(${OPTION_PREFIX}_PYTHON_INPLACE_BUILD "Whether or not to copy built python modules back to the source tree for debug purposes." OFF) -option(${OPTION_PREFIX}_PYTHON_PERFORM_INSTALL "Whether or not to automatically `pip install` any built python library. WARNING: This may overwrite any existing installation of the same name." OFF) -option(${OPTION_PREFIX}_PYTHON_BUILD_STUBS "Whether or not to generated .pyi stub files for C++ Python modules. Disable to avoid requiring loading the NVIDIA GPU Driver during build" ON) +# Ensure python is configured +morpheus_utils_python_ensure_loaded() -set(Python3_FIND_VIRTUALENV "FIRST") -set(Python3_FIND_STRATEGY "LOCATION") morpheus_utils_print_python_info() @@ -42,29 +35,13 @@ morpheus_utils_add_python_sources( ${pymrc_test_files} ) +# Build the pymrc library +add_subdirectory(mrc/_pymrc) -# Save the root of the python for relative paths -set(MRC_PY_ROOT ${CMAKE_CURRENT_SOURCE_DIR}) - -# A common macro for adding some default arguments to add_pybind11_module -macro(mrc_add_pybind11_module) - # Build up the common arguments for add_pybind11_module - set(_common_args) - list(APPEND _common_args "LINK_TARGETS" "pymrc") - - if(MRC_PYTHON_INPLACE_BUILD) - list(APPEND _common_args "COPY_INPLACE") - endif() - - if(MRC_PYTHON_BUILD_STUBS) - list(APPEND _common_args "BUILD_STUBS") - endif() - - # Forward all common arguments plus any arguments passed in - morpheus_utils_add_pybind11_module(${ARGN} ${_common_args}) -endmacro() +# Set the default link targets to avoid repeating this +morpheus_utils_python_package_set_default_link_targets(pymrc) -add_subdirectory(mrc/_pymrc) +# Now add the python bindings add_subdirectory(mrc/core) # ################################################################################################## @@ -78,12 +55,14 @@ if(MRC_BUILD_TESTS) endif() # Complete the python package -set(extra_args "") - if(MRC_PYTHON_INPLACE_BUILD) list(APPEND extra_args "IS_INPLACE") endif() +if(MRC_PYTHON_BUILD_WHEEL) + list(APPEND extra_args "BUILD_WHEEL") +endif() + if(MRC_PYTHON_PERFORM_INSTALL) list(APPEND extra_args "INSTALL_WHEEL") endif() From a7a2d1d788868849f62001b58392e4a570736f40 Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Fri, 24 Mar 2023 09:49:49 -0600 Subject: [PATCH 12/14] Mirror module / buffer + python bindings. (#286) Closes #126 Authors: - Devin Robison (https://github.com/drobison00) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/MRC/pull/286 --- CMakeLists.txt | 1 - cpp/mrc/CMakeLists.txt | 16 +- cpp/mrc/benchmarks/bench_segment.cpp | 4 +- cpp/mrc/include/mrc/edge/edge_builder.hpp | 95 +++ .../include/mrc/engine/segment/ibuilder.hpp | 6 +- .../modules/mirror_tap/mirror_tap.hpp | 100 +++ .../mirror_tap/mirror_tap_orchestrator.hpp | 136 ++++ .../modules/mirror_tap/mirror_tap_stream.hpp | 100 +++ .../stream_buffer/stream_buffer_base.hpp | 45 ++ .../stream_buffer/stream_buffer_immediate.hpp | 100 +++ .../stream_buffer/stream_buffer_module.hpp | 150 +++++ .../stream_buffer/stream_buffer_traits.hpp | 32 + .../mrc/modules/properties/persistent.hpp | 28 + .../include/mrc/modules/segment_modules.hpp | 26 +- .../include/mrc/node/operators/broadcast.hpp | 3 + cpp/mrc/include/mrc/segment/builder.hpp | 596 ++++++++++++------ .../mrc/segment/concepts/object_traits.hpp | 90 +++ cpp/mrc/include/mrc/segment/object.hpp | 1 - cpp/mrc/include/mrc/type_traits.hpp | 22 + cpp/mrc/include/mrc/utils/type_utils.hpp | 58 +- cpp/mrc/src/internal/segment/builder.cpp | 10 + cpp/mrc/src/internal/segment/builder.hpp | 11 + cpp/mrc/src/internal/segment/ibuilder.cpp | 6 + cpp/mrc/src/public/modules/sample_modules.cpp | 5 +- .../src/public/modules/segment_modules.cpp | 77 ++- cpp/mrc/src/public/segment/builder.cpp | 44 +- cpp/mrc/src/public/segment/definition.cpp | 13 +- cpp/mrc/src/tests/pipelines/multi_segment.cpp | 3 +- .../src/tests/segments/common_segments.cpp | 2 + cpp/mrc/src/tests/test_next.cpp | 2 + cpp/mrc/src/tests/test_pipeline.cpp | 4 +- cpp/mrc/tests/CMakeLists.txt | 3 + .../tests/benchmarking/test_benchmarking.hpp | 1 + .../tests/benchmarking/test_stat_gather.hpp | 5 +- cpp/mrc/tests/modules/dynamic_module.cpp | 1 - .../tests/modules/test_mirror_tap_module.cpp | 383 +++++++++++ .../modules/test_mirror_tap_orchestrator.cpp | 218 +++++++ .../tests/modules/test_module_registry.cpp | 2 + cpp/mrc/tests/modules/test_module_util.cpp | 2 +- cpp/mrc/tests/modules/test_modules.hpp | 118 +++- .../tests/modules/test_segment_modules.cpp | 6 +- .../modules/test_stream_buffer_modules.cpp | 306 +++++++++ cpp/mrc/tests/test_edges.cpp | 85 +++ cpp/mrc/tests/test_executor.cpp | 8 +- cpp/mrc/tests/test_node.cpp | 5 +- cpp/mrc/tests/test_pipeline.cpp | 5 +- cpp/mrc/tests/test_segment.cpp | 13 +- external/utilities | 2 +- python/mrc/__init__.py | 1 + python/mrc/_pymrc/include/pymrc/segment.hpp | 20 +- python/mrc/_pymrc/src/module_registry.cpp | 6 +- python/mrc/_pymrc/src/py_segment_module.cpp | 1 - python/mrc/_pymrc/src/segment.cpp | 6 +- python/mrc/_pymrc/tests/test_executor.cpp | 3 +- python/mrc/_pymrc/tests/test_pipeline.cpp | 1 + python/mrc/core/CMakeLists.txt | 7 +- python/mrc/core/common.cpp | 7 +- python/mrc/core/executor.cpp | 18 +- python/mrc/core/logging.cpp | 16 +- python/mrc/core/node.cpp | 13 +- python/mrc/core/operators.cpp | 26 +- python/mrc/core/options.cpp | 24 +- python/mrc/core/pipeline.cpp | 12 +- python/mrc/core/plugins.cpp | 12 +- python/mrc/core/segment.cpp | 120 +--- .../mirror_tap_orchestrator.cpp | 126 ++++ .../mirror_tap_orchestrator.hpp | 27 + .../segment_module_registry.cpp | 91 +++ .../segment_module_registry.hpp | 26 + .../module_definitions/segment_modules.cpp | 77 +++ .../module_definitions/segment_modules.hpp | 45 ++ python/mrc/core/subscriber.cpp | 16 +- python/mrc/tests/sample_modules.cpp | 8 +- python/mrc/tests/test_edges.cpp | 20 +- python/mrc/tests/utils.cpp | 10 +- python/tests/test_mirror_tap.py | 274 ++++++++ python/tests/test_module_registry.py | 2 +- 77 files changed, 3480 insertions(+), 484 deletions(-) create mode 100644 cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp create mode 100644 cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp create mode 100644 cpp/mrc/include/mrc/modules/properties/persistent.hpp create mode 100644 cpp/mrc/include/mrc/segment/concepts/object_traits.hpp create mode 100644 cpp/mrc/tests/modules/test_mirror_tap_module.cpp create mode 100644 cpp/mrc/tests/modules/test_mirror_tap_orchestrator.cpp create mode 100644 cpp/mrc/tests/modules/test_stream_buffer_modules.cpp create mode 100644 python/mrc/core/segment/module_definitions/mirror_tap_orchestrator.cpp create mode 100644 python/mrc/core/segment/module_definitions/mirror_tap_orchestrator.hpp create mode 100644 python/mrc/core/segment/module_definitions/segment_module_registry.cpp create mode 100644 python/mrc/core/segment/module_definitions/segment_module_registry.hpp create mode 100644 python/mrc/core/segment/module_definitions/segment_modules.cpp create mode 100644 python/mrc/core/segment/module_definitions/segment_modules.hpp create mode 100644 python/tests/test_mirror_tap.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ec9924d5..e19c3a8ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,7 +92,6 @@ if(NOT DEFINED CMAKE_CUDA_HOST_COMPILER) # incompatible with CUDA 11.4/11.5/11.6. See Issue #102 if(NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") set(CMAKE_CUDA_HOST_COMPILER ${CMAKE_CXX_COMPILER}) - endif() endif() diff --git a/cpp/mrc/CMakeLists.txt b/cpp/mrc/CMakeLists.txt index 079277129..04424e300 100644 --- a/cpp/mrc/CMakeLists.txt +++ b/cpp/mrc/CMakeLists.txt @@ -21,14 +21,14 @@ add_library(libmrc src/internal/codable/codable_storage.cpp src/internal/codable/decodable_storage_view.cpp src/internal/codable/storage_view.cpp - src/internal/control_plane/client.cpp src/internal/control_plane/client/connections_manager.cpp + src/internal/control_plane/client.cpp src/internal/control_plane/client/instance.cpp src/internal/control_plane/client/state_manager.cpp src/internal/control_plane/client/subscription_service.cpp src/internal/control_plane/resources.cpp - src/internal/control_plane/server.cpp src/internal/control_plane/server/connection_manager.cpp + src/internal/control_plane/server.cpp src/internal/control_plane/server/subscription_manager.cpp src/internal/control_plane/server/tagged_issuer.cpp src/internal/data_plane/callbacks.cpp @@ -61,8 +61,8 @@ add_library(libmrc src/internal/resources/manager.cpp src/internal/resources/partition_resources_base.cpp src/internal/resources/partition_resources.cpp - src/internal/runnable/engine_factory.cpp src/internal/runnable/engine.cpp + src/internal/runnable/engine_factory.cpp src/internal/runnable/engines.cpp src/internal/runnable/fiber_engine.cpp src/internal/runnable/fiber_engines.cpp @@ -84,18 +84,18 @@ add_library(libmrc src/internal/system/fiber_pool.cpp src/internal/system/fiber_task_queue.cpp src/internal/system/gpu_info.cpp - src/internal/system/host_partition_provider.cpp src/internal/system/host_partition.cpp + src/internal/system/host_partition_provider.cpp src/internal/system/iresources.cpp src/internal/system/isystem.cpp - src/internal/system/partition_provider.cpp src/internal/system/partition.cpp + src/internal/system/partition_provider.cpp src/internal/system/partitions.cpp src/internal/system/resources.cpp - src/internal/system/system_provider.cpp src/internal/system/system.cpp - src/internal/system/thread_pool.cpp + src/internal/system/system_provider.cpp src/internal/system/thread.cpp + src/internal/system/thread_pool.cpp src/internal/system/topology.cpp src/internal/ucx/context.cpp src/internal/ucx/endpoint.cpp @@ -108,8 +108,8 @@ add_library(libmrc src/internal/utils/parse_config.cpp src/internal/utils/parse_ints.cpp src/internal/utils/shared_resource_bit_map.cpp - src/public/benchmarking/trace_statistics.cpp src/public/benchmarking/tracer.cpp + src/public/benchmarking/trace_statistics.cpp src/public/benchmarking/util.cpp src/public/channel/channel.cpp src/public/codable/encoded_object.cpp diff --git a/cpp/mrc/benchmarks/bench_segment.cpp b/cpp/mrc/benchmarks/bench_segment.cpp index c0aef1729..6960ebe18 100644 --- a/cpp/mrc/benchmarks/bench_segment.cpp +++ b/cpp/mrc/benchmarks/bench_segment.cpp @@ -272,7 +272,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture }), m_watcher->create_tracer_emit_tap(int_name)); - segment.make_dynamic_edge(last_node, internal); + segment.make_edge(last_node, internal); last_node = internal; } @@ -281,7 +281,7 @@ class LongEmitReceiveFixture : public benchmark::Fixture sink_name, m_watcher->create_tracer_sink_lambda(sink_name, [](tracer_type_t& data) {})); - segment.make_dynamic_edge(last_node, sink); + segment.make_edge(last_node, sink); }; auto pipeline = pipeline::make_pipeline(); diff --git a/cpp/mrc/include/mrc/edge/edge_builder.hpp b/cpp/mrc/include/mrc/edge/edge_builder.hpp index 71b53688a..581b7da41 100644 --- a/cpp/mrc/include/mrc/edge/edge_builder.hpp +++ b/cpp/mrc/include/mrc/edge/edge_builder.hpp @@ -176,6 +176,101 @@ struct EdgeBuilder final sink.set_readable_edge_handle(edge); } + template + static void splice_edge(SourceT& source, SinkT& sink, SpliceInputT& splice_input, SpliceOutputT& splice_output) + { + using source_full_t = SourceT; + using sink_full_t = SinkT; + + // Have to jump through some hoops here, mimics what 'writable_acceptor_typed' does, so we get everything + // aligned correctly. + if constexpr (is_base_of_template::value && + is_base_of_template::value) + { + /* + * In this case, the source object has accepted a writable edge from the sink. + * + * Given: [source [edge_handle]] -> [sink] + * + * We will: + * - Get a reference to the edge_handle the source is holding + * - Reset the Source edge connection + * - Create a new edge from the source to our WritableProvider splice node + * - Set the edge_handle for the WritableAcceptor splice node to the edge_handle from the source + * + * This will result in the following: + * + * [source[new_edge_handle]] -> [splice_node[old_edge_handle]] -> [sink] + * + */ + // We don't need to know the data type of the sink, the source node will have the same data type as the + // splice node, and we already know the sink can provide an edge for the source's data type. + // [source] -> [sink] => [[source] -> [splice_node]] -> [sink] + auto* splice_writable_provider = dynamic_cast*>(&splice_input); + CHECK(splice_writable_provider != nullptr) << "Splice input is not a writable provider"; + + auto* splice_writable_acceptor = dynamic_cast*>(&splice_output); + CHECK(splice_writable_acceptor != nullptr) << "Splice output is not a writable acceptor"; + + auto* writable_acceptor = dynamic_cast*>(&source); + CHECK(writable_acceptor != nullptr) << "Source is not a writable acceptor"; + + auto* edge_holder_ptr = dynamic_cast*>(writable_acceptor); + if (edge_holder_ptr == nullptr) + { + LOG(FATAL) << "Writable acceptor failed to cast to EdgeHolder"; + } + + auto& edge_holder = *edge_holder_ptr; + CHECK(edge_holder.check_active_connection(false)) << "No active connection to splice into"; + + auto edge_handle = edge_holder.get_connected_edge(); + edge_holder.release_edge_connection(); + + make_edge_writable(*writable_acceptor, *splice_writable_provider); + make_edge_writable(*splice_writable_acceptor, sink); + } + else if constexpr (is_base_of_template::value && + is_base_of_template::value) + { + // We don't need to know the data type of the source, the sink node will have the same data type as the + // splice node, and we already know the source can provide an edge for the sink's data type. + // [source] -> [sink] => [source] -> [[splice_node] -> [sink]] + auto* splice_readable_provider = dynamic_cast*>(&splice_input); + CHECK(splice_readable_provider != nullptr) << "Splice input is not a writable provider"; + + auto* splice_readable_acceptor = dynamic_cast*>(&splice_output); + CHECK(splice_readable_acceptor != nullptr) << "Splice output is not a writable acceptor"; + + auto* readable_acceptor = dynamic_cast*>(&sink); + CHECK(readable_acceptor != nullptr) << "Sink is not a writable provider"; + + auto* edge_holder_ptr = dynamic_cast*>(readable_acceptor); + if (edge_holder_ptr == nullptr) + { + LOG(FATAL) << "Readable acceptor failed to cast to EdgeHolder"; + } + + auto& edge_holder = *edge_holder_ptr; + CHECK(edge_holder.check_active_connection(false)) << "No active connection to splice into"; + + // Grab the Acceptor's edge handle and release it from the Acceptor + // Make sure we hold the edge handle until the new edge to the splice has been formed. + // TODO(Devin): Can we double check that the edge handle from the source matches the one from the sink? + auto edge_handle = edge_holder.get_connected_edge(); + edge_holder.release_edge_connection(); + + make_edge_readable(source, *splice_readable_acceptor); + make_edge_readable(*splice_readable_provider, *readable_acceptor); + } + else + { + static_assert(!sizeof(source_full_t), + "Arguments to splice_edge were incorrect. Ensure you are providing either " + "WritableAcceptor->WritableProvider or ReadableProvider->ReadableAcceptor"); + } + } + private: static std::shared_ptr do_adapt_ingress(const EdgeTypeInfo& target_type, std::shared_ptr ingress); diff --git a/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp b/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp index 42f0b4476..7c9347a52 100644 --- a/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp +++ b/cpp/mrc/include/mrc/engine/segment/ibuilder.hpp @@ -29,12 +29,13 @@ class Launchable; } // namespace mrc::runnable // todo(ryan) - most base classes that will be owned by the engine will need to be moved to engine api/lib +namespace mrc::modules { +class SegmentModule; +} namespace mrc::segment { - class ObjectProperties; class EgressPortBase; class IngressPortBase; - } // namespace mrc::segment namespace mrc::internal::segment { @@ -54,6 +55,7 @@ class IBuilder final bool has_object(const std::string& name) const; ::mrc::segment::ObjectProperties& find_object(const std::string& name); void add_object(const std::string& name, std::shared_ptr<::mrc::segment::ObjectProperties> object); + void add_module(const std::string& name, std::shared_ptr module); void add_runnable(const std::string& name, std::shared_ptr runnable); std::shared_ptr<::mrc::segment::IngressPortBase> get_ingress_base(const std::string& name); std::shared_ptr<::mrc::segment::EgressPortBase> get_egress_base(const std::string& name); diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp new file mode 100644 index 000000000..fa3ac3c11 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapModule : public SegmentModule, public PersistentModule +{ + using type_t = MirrorTapModule; + + public: + MirrorTapModule(std::string module_name); + + MirrorTapModule(std::string module_name, nlohmann::json config); + + std::string tap_egress_port_name() const; + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + [[maybe_unused]] static std::atomic s_tap_id; + + std::string m_egress_name; +}; + +template +std::atomic MirrorTapModule::s_tap_id{0}; + +template +MirrorTapModule::MirrorTapModule(std::string module_name) : + SegmentModule(std::move(module_name)), + m_egress_name("mirror_tap_source_" + std::to_string(s_tap_id++)) +{} + +template +MirrorTapModule::MirrorTapModule(std::string module_name, nlohmann::json _config) : + SegmentModule(std::move(module_name), std::move(_config)), + m_egress_name("mirror_tap_source_" + std::to_string(s_tap_id++)) +{ + if (config().contains("tap_id_override")) + { + m_egress_name = config()["tap_id_override"]; + } +} + +template +std::string MirrorTapModule::tap_egress_port_name() const +{ + return m_egress_name; +} + +template +void MirrorTapModule::initialize(segment::Builder& builder) +{ + // ********** Implementation ************ // + auto bcast = builder.construct_object>("broadcast"); + + builder.make_edge(bcast, builder.get_egress(m_egress_name)); // to mirror tap + + // Register the submodules output as one of this module's outputs + register_input_port("input", bcast); + register_output_port("output", bcast); +} + +template +std::string MirrorTapModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp new file mode 100644 index 000000000..a78a465a3 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_orchestrator.hpp @@ -0,0 +1,136 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp" +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/segment/egress_ports.hpp" +#include "mrc/segment/ingress_ports.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapOrchestrator +{ + using initializer_t = std::function; + using type_t = MirrorTapOrchestrator; + + public: + MirrorTapOrchestrator(std::string tap_name); + + MirrorTapOrchestrator(std::string tap_name, nlohmann::json config); + + initializer_t tap(initializer_t initializer, const std::string tap_from, const std::string tap_to) + { + using namespace modules; + return [this, initializer, tap_from, tap_to](segment::Builder& builder) { + initializer(builder); + builder.init_module(m_tap); + + builder.splice_edge(tap_from, tap_to, m_tap->input_port("input"), m_tap->output_port("output")); + }; + } + + initializer_t stream_to(initializer_t initializer, const std::string entry_point) + { + using namespace modules; + return [this, initializer, entry_point](segment::Builder& builder) { + initializer(builder); + + builder.init_module(m_stream); + builder.make_edge(m_stream->output_port("output"), entry_point); + }; + } + + template + segment::IngressPorts create_or_extend_ingress_ports( + segment::IngressPorts& ingress_ports) const + { + auto names(ingress_ports.names()); + names.push_back(get_ingress_tap_name()); + + return segment::IngressPorts(std::move(names)); + } + + segment::IngressPorts create_or_extend_ingress_ports() const + { + return segment::IngressPorts({get_ingress_tap_name()}); + } + + template + segment::EgressPorts create_or_extend_egress_ports( + segment::EgressPorts& egress_ports) const + { + auto names(egress_ports.names()); + names.push_back(get_ingress_tap_name()); + + return segment::EgressPorts(std::move(names)); + } + + segment::EgressPorts create_or_extend_egress_ports() const + { + return segment::EgressPorts({get_egress_tap_name()}); + } + + std::string get_egress_tap_name() const; + + std::string get_ingress_tap_name() const; + + private: + std::shared_ptr> m_tap; + std::shared_ptr> m_stream; +}; + +template +MirrorTapOrchestrator::MirrorTapOrchestrator(std::string tap_name) : + m_tap(std::make_shared>(std::move(tap_name))), + m_stream(std::make_shared>(std::move(tap_name))) +{ + m_stream->tap_ingress_port_name(m_tap->tap_egress_port_name()); +} + +template +MirrorTapOrchestrator::MirrorTapOrchestrator(std::string tap_name, nlohmann::json config) : + m_tap(std::make_shared>(tap_name, config)), + m_stream(std::make_shared>(tap_name, config)) +{ + m_stream->tap_ingress_port_name(m_tap->tap_egress_port_name()); +} + +template +[[maybe_unused]] std::string MirrorTapOrchestrator::get_egress_tap_name() const +{ + return m_tap->tap_egress_port_name(); +} + +template +[[maybe_unused]] std::string MirrorTapOrchestrator::get_ingress_tap_name() const +{ + return m_stream->tap_ingress_port_name(); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp new file mode 100644 index 000000000..843a8b8c4 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/mirror_tap/mirror_tap_stream.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp" +#include "mrc/modules/module_registry_util.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/node/operators/broadcast.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/version.hpp" + +#include +#include + +#include + +namespace mrc::modules { +template +class MirrorTapStreamModule : public SegmentModule, public PersistentModule +{ + using type_t = MirrorTapStreamModule; + + public: + MirrorTapStreamModule(std::string module_name); + + MirrorTapStreamModule(std::string module_name, nlohmann::json _config); + + std::string tap_ingress_port_name() const; + void tap_ingress_port_name(std::string name); + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + std::shared_ptr> m_stream_buffer; + + std::string m_ingress_name; +}; + +template +MirrorTapStreamModule::MirrorTapStreamModule(std::string module_name) : SegmentModule(std::move(module_name)) +{} + +template +MirrorTapStreamModule::MirrorTapStreamModule(std::string module_name, nlohmann::json _config) : + SegmentModule(std::move(module_name), std::move(_config)) +{ + if (config().contains("tap_id_override")) + { + m_ingress_name = config()["tap_id_override"]; + } +} + +template +std::string MirrorTapStreamModule::tap_ingress_port_name() const +{ + return m_ingress_name; +} + +template +void MirrorTapStreamModule::tap_ingress_port_name(std::string ingress_port_name) +{ + m_ingress_name = std::move(ingress_port_name); +} + +template +void MirrorTapStreamModule::initialize(segment::Builder& builder) +{ + auto mirror_ingress = builder.get_ingress(m_ingress_name); + m_stream_buffer = builder.make_module>("test", config()); + + builder.make_edge(mirror_ingress, m_stream_buffer->input_port("input")); + + register_output_port("output", m_stream_buffer->output_port("output")); +} + +template +std::string MirrorTapStreamModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp new file mode 100644 index 000000000..93f036f21 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp @@ -0,0 +1,45 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace mrc::modules::stream_buffers { + +template +class StreamBufferBase +{ + public: + virtual ~StreamBufferBase() = default; + + virtual std::size_t buffer_size() = 0; + + virtual void buffer_size(std::size_t size) = 0; + + virtual bool empty() = 0; + + virtual void push_back(DataTypeT&& data) = 0; + + virtual void flush_next(rxcpp::subscriber& subscriber) = 0; + + virtual void flush_all(rxcpp::subscriber& subscriber) = 0; +}; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp new file mode 100644 index 000000000..06e74f334 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp @@ -0,0 +1,100 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mrc::modules::stream_buffers { + +template +class StreamBufferImmediate : public StreamBufferBase +{ + public: + StreamBufferImmediate() : m_ring_buffer_write(128), m_ring_buffer_read(128) {} + + StreamBufferImmediate(std::size_t buffer_size) : m_ring_buffer_write(buffer_size), m_ring_buffer_read(buffer_size) + {} + + std::size_t buffer_size() override + { + return m_ring_buffer_write.capacity(); + } + + void buffer_size(std::size_t size) override + { + m_ring_buffer_write.set_capacity(size); + m_ring_buffer_read.set_capacity(size); + } + + bool empty() override + { + return m_ring_buffer_write.empty(); + } + + void push_back(DataTypeT&& data) override + { + std::lock_guard lock(m_write_mutex); + m_ring_buffer_write.push_back(std::move(data)); + } + + // Not thread safe for multiple readers + void flush_next(rxcpp::subscriber& subscriber) override + { + std::lock_guard wlock(m_write_mutex); + + subscriber.on_next(m_ring_buffer_write.front()); + m_ring_buffer_write.pop_front(); + } + + // Not thread safe for multiple readers + void flush_all(rxcpp::subscriber& subscriber) override + { + { + std::lock_guard wlock(m_write_mutex); + // O(1), based on the size of the circular buffer. + m_ring_buffer_write.swap(m_ring_buffer_read); + } + + while (!m_ring_buffer_read.empty()) + { + subscriber.on_next(m_ring_buffer_read.front()); + m_ring_buffer_read.pop_front(); + } + } + + private: + std::mutex m_write_mutex; + + boost::circular_buffer m_ring_buffer_write; + boost::circular_buffer m_ring_buffer_read; +}; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp new file mode 100644 index 000000000..34f3bc9a0 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_module.hpp @@ -0,0 +1,150 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/mirror_tap/mirror_tap.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_immediate.hpp" +#include "mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp" +#include "mrc/modules/properties/persistent.hpp" +#include "mrc/modules/segment_modules.hpp" +#include "mrc/segment/builder.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace mrc::modules { + +/** + * @brief Buffers a stream of data; avoiding any stalls by over-writing older data when the buffer is full. + * We guarantee not to block the data stream; but, may drop data if the buffer is full. + * @tparam DataTypeT The type of data to buffer + */ +template class StreamBufferTypeT = mrc::modules::stream_buffers::StreamBufferImmediate> +class StreamBufferModule : public SegmentModule, public PersistentModule +{ + static_assert(stream_buffers::IsStreamBuffer, + "StreamBufferTypeT must be derived from StreamBufferBase"); + using type_t = StreamBufferModule; + + public: + StreamBufferModule(std::string module_name); + + StreamBufferModule(std::string module_name, nlohmann::json config); + + protected: + void initialize(segment::Builder& builder) override; + + std::string module_type_name() const override; + + private: + StreamBufferTypeT m_stream_buffer; + rxcpp::subjects::subject m_subject{}; +}; + +template class StreamBufferTypeT> +StreamBufferModule::StreamBufferModule(std::string module_name) : + SegmentModule(std::move(module_name)), + PersistentModule(), + m_stream_buffer() +{} + +template class StreamBufferTypeT> +StreamBufferModule::StreamBufferModule(std::string module_name, nlohmann::json config) : + SegmentModule(std::move(module_name), std::move(config)), + PersistentModule(), + m_stream_buffer() +{ + if (this->config().contains("buffer_size")) + { + m_stream_buffer.buffer_size(this->config()["buffer_size"]); + } +} + +template class StreamBufferTypeT> +void StreamBufferModule::initialize(segment::Builder& builder) +{ + auto buffer_sink = builder.template make_sink("buffer_sink_new", m_subject.get_subscriber()); + + // This is a hack, because we don't correctly support passing observables to RxSource creation yet + // Consume values from subject and push them to ring buffer + m_subject.get_observable().subscribe( + [this](DataTypeT data) { + m_stream_buffer.push_back(std::move(data)); + VLOG(10) << "Subscriber 1: OnNext -> push to ring buffer " << std::endl; + }, + [this](std::exception_ptr ep) { + VLOG(10) << "Subscriber 1: OnError" << std::endl; + }, + [this]() { + VLOG(10) << "Subscriber 1: OnCompleted" << std::endl; + }); + + // Example of adding a second subscriber + /* + m_subject.get_observable().subscribe( + [this](DataTypeT data) { + VLOG(10) << "Subscriber 2: OnNext -> " << data << std::endl; + }, + [this](std::exception_ptr ep) { + VLOG(10) << "Subscriber 2: OnError" << std::endl; + }, + [this]() { + VLOG(10) << "Subscriber 2: OnCompleted" << std::endl; + }); + */ + + // Create our source that reads from the buffer as long as it has a subscriber and + // our subject hasn't called on_complete() + auto buffer_source = builder.template make_source( + "buffer_source", + [this](rxcpp::subscriber& subscriber) { + // TODO(Devin): not currently supported + // m_subject.get_observable().subscribe(subscriber); + while (subscriber.is_subscribed() && m_subject.has_observers()) + { + if (!m_stream_buffer.empty()) + { + m_stream_buffer.flush_all(subscriber); + } + else + { + boost::this_fiber::yield(); + } + } + }); + + register_input_port("input", buffer_sink); + register_output_port("output", buffer_source); +} + +template class StreamBufferTypeT> +std::string StreamBufferModule::module_type_name() const +{ + return std::string(::mrc::type_name()); +} +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp new file mode 100644 index 000000000..8433e2636 --- /dev/null +++ b/cpp/mrc/include/mrc/experimental/modules/stream_buffer/stream_buffer_traits.hpp @@ -0,0 +1,32 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/experimental/modules/stream_buffer/stream_buffer_base.hpp" + +#include +#include + +namespace mrc::modules::stream_buffers { + +template class StreamBufferTypeT> +concept IsStreamBuffer = requires { + typename StreamBufferTypeT; + std::is_base_of_v, StreamBufferTypeT>; + }; +} // namespace mrc::modules::stream_buffers diff --git a/cpp/mrc/include/mrc/modules/properties/persistent.hpp b/cpp/mrc/include/mrc/modules/properties/persistent.hpp new file mode 100644 index 000000000..c62932ee7 --- /dev/null +++ b/cpp/mrc/include/mrc/modules/properties/persistent.hpp @@ -0,0 +1,28 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace mrc::modules { +/** + * @brief A module that is persistent across segment restarts. + * @note This module is not intended to be used directly. It is used as a property indicator for other segment modules. + * It is intentionally empty. + */ +class PersistentModule +{}; +} // namespace mrc::modules diff --git a/cpp/mrc/include/mrc/modules/segment_modules.hpp b/cpp/mrc/include/mrc/modules/segment_modules.hpp index b6324f79c..31bc93be5 100644 --- a/cpp/mrc/include/mrc/modules/segment_modules.hpp +++ b/cpp/mrc/include/mrc/modules/segment_modules.hpp @@ -27,6 +27,9 @@ namespace mrc::segment { class Builder; +} // namespace mrc::segment + +namespace mrc::segment { struct ObjectProperties; } // namespace mrc::segment @@ -147,11 +150,32 @@ class SegmentModule /** * Register an output port that should be exposed for the module * @param input_name Port name - * @param object ObjectProperties object assocaited with the port + * @param object ObjectProperties object associated with the port */ void register_output_port(std::string output_name, std::shared_ptr object); private: + /** + * Register an input port that should be exposed for the module, with explicit type index. This is + * necessary for Objects that aren't explicit Source or Sink types (e.g. a custom object type) + * @param input_name Port name + * @param object ObjectProperties object associated with the port + * @param tidx Type index of the object's payload data type + */ + void register_typed_input_port(std::string input_name, + std::shared_ptr object, + std::type_index tidx); + /** + * Register an output port that should be exposed for the module, with explicit type index. This is + * necessary for Objects that aren't explicit Source or Sink types (e.g. a custom object type) + * @param output_name Port name + * @param object ObjectProperties object associated with the port + * @param tidx Type index of the object's payload data type + */ + void register_typed_output_port(std::string output_name, + std::shared_ptr object, + std::type_index tidx); + const std::string m_module_instance_name; std::string m_module_instance_registered_namespace{}; diff --git a/cpp/mrc/include/mrc/node/operators/broadcast.hpp b/cpp/mrc/include/mrc/node/operators/broadcast.hpp index bd9071bb2..c3bfde7ae 100644 --- a/cpp/mrc/include/mrc/node/operators/broadcast.hpp +++ b/cpp/mrc/include/mrc/node/operators/broadcast.hpp @@ -194,6 +194,9 @@ class Broadcast : public WritableProvider, public edge::IWritableAcceptor }; public: + using source_type_t = T; + using sink_type_t = T; + Broadcast(bool deep_copy = false) { auto edge = std::make_shared(*this, deep_copy); diff --git a/cpp/mrc/include/mrc/segment/builder.hpp b/cpp/mrc/include/mrc/segment/builder.hpp index 8550c43f8..0962f5c38 100644 --- a/cpp/mrc/include/mrc/segment/builder.hpp +++ b/cpp/mrc/include/mrc/segment/builder.hpp @@ -19,26 +19,26 @@ #include "mrc/benchmarking/trace_statistics.hpp" #include "mrc/edge/edge_builder.hpp" -#include "mrc/edge/edge_readable.hpp" -#include "mrc/edge/edge_writable.hpp" -#include "mrc/engine/segment/ibuilder.hpp" // IWYU pragma: export +#include "mrc/engine/segment/ibuilder.hpp" #include "mrc/exceptions/runtime_error.hpp" #include "mrc/node/rx_node.hpp" #include "mrc/node/rx_sink.hpp" #include "mrc/node/rx_source.hpp" -#include "mrc/node/sink_properties.hpp" // IWYU pragma: export -#include "mrc/node/source_properties.hpp" // IWYU pragma: export +#include "mrc/node/sink_properties.hpp" // IWYU pragma: keep +#include "mrc/node/source_properties.hpp" // IWYU pragma: keep #include "mrc/runnable/context.hpp" -#include "mrc/runnable/runnable.hpp" // IWYU pragma: export -#include "mrc/segment/component.hpp" // IWYU pragma: export -#include "mrc/segment/object.hpp" // IWYU pragma: export -#include "mrc/segment/runnable.hpp" // IWYU pragma: export +#include "mrc/runnable/runnable.hpp" // IWYU pragma: keep +#include "mrc/segment/component.hpp" // IWYU pragma: keep +#include "mrc/segment/concepts/object_traits.hpp" +#include "mrc/segment/object.hpp" // IWYU pragma: keep +#include "mrc/segment/runnable.hpp" // IWYU pragma: keep #include "mrc/type_traits.hpp" #include "mrc/utils/macros.hpp" +#include "mrc/utils/type_utils.hpp" -#include // IWYU pragma: export -#include // IWYU pragma: export -#include // IWYU pragma: export +#include +#include +#include #include #include #include @@ -47,25 +47,31 @@ #include #include #include +#include #include #include #include #include #include +namespace mrc { +struct WatcherInterface; +} // namespace mrc + +namespace mrc::modules { +class SegmentModule; +} // namespace mrc::modules + namespace mrc::node { template class RxSinkBase; +} // namespace mrc::node + +namespace mrc::node { template class RxSourceBase; } // namespace mrc::node -namespace mrc { -struct WatcherInterface; -} // namespace mrc -namespace mrc::modules { -class SegmentModule; -} // namespace mrc::modules namespace mrc::segment { class Definition; } // namespace mrc::segment @@ -119,6 +125,7 @@ class Builder final DELETE_MOVEABILITY(Builder); std::shared_ptr get_ingress(std::string name, std::type_index type_index); + std::shared_ptr get_egress(std::string name, std::type_index type_index); template @@ -131,68 +138,105 @@ class Builder final std::shared_ptr> make_object(std::string name, std::unique_ptr node); template - std::shared_ptr> construct_object(std::string name, ArgsT&&... args) - { - auto ns_name = m_namespace_prefix.empty() ? name : m_namespace_prefix + "/" + name; - auto uptr = std::make_unique(std::forward(args)...); - - ::add_stats_watcher_if_rx_source(*uptr, ns_name); - ::add_stats_watcher_if_rx_sink(*uptr, ns_name); - - return make_object(std::move(ns_name), std::move(uptr)); - } + std::shared_ptr> construct_object(std::string name, ArgsT&&... args); + /** + * Create a source node using the provided name and function, the function is lifted to an observable + * @tparam SourceTypeT the type of data produced by the source + * @tparam NodeTypeT the type of node to use as the source + * @tparam CreateFnT the type of function used to create the source + * @param name the name of the source + * @param create_fn the function that will be lifted into a Observable and used to create the source + * @return the created source + */ template class NodeTypeT = node::RxSource, typename CreateFnT> - auto make_source(std::string name, CreateFnT&& create_fn) - { - return construct_object>( - name, - rxcpp::observable<>::create(std::forward(create_fn))); - } + auto make_source(std::string name, CreateFnT&& create_fn); + + /** + * Create a source node using the provided name and observable + * @tparam SourceTypeT The type of elements emitted by the source node + * @tparam NodeTypeT The type of node to be created, with default set to node::RxSource + * @param name The name of the source node + * @param obs The observable emitting elements of type SourceTypeT + * @return An object of type NodeTypeT + */ + template class NodeTypeT = node::RxSource> + auto make_source(std::string name, rxcpp::observable obs); + /** + * Creates a new instance of the sink component of type `NodeTypeT` with name `name` and observer `ops`. + * @tparam SinkTypeT Type of the data received by the sink component. + * @tparam NodeTypeT Type of the node to be constructed. + * @tparam ArgsT Types of arguments passed to the observer. + * @param name The name of the sink component. + * @param ops The observer for the sink component. + * @return The constructed sink as an Object. + */ template class NodeTypeT = node::RxSink, typename... ArgsT> - auto make_sink(std::string name, ArgsT&&... ops) - { - return construct_object>(name, - rxcpp::make_observer(std::forward(ops)...)); - } + auto make_sink(std::string name, ArgsT&&... ops); + /** + * Creates a sink component and returns it as an object + * @tparam SinkTypeT the type of data that the sink component will receive + * @tparam NodeTypeT the type of the node component to be constructed + * @tparam ArgsT the type of the arguments for constructing the node component + * @param name the name of the sink component + * @param ops the arguments for constructing the node component + * @return The constructed sink component as an Object + */ template class NodeTypeT = node::RxSinkComponent, typename... ArgsT> - auto make_sink_component(std::string name, ArgsT&&... ops) - { - return construct_object>(name, - rxcpp::make_observer(std::forward(ops)...)); - } + auto make_sink_component(std::string name, ArgsT&&... ops); + /** + * Creates a new instance of the specified node type, with the given name and arguments. + * @tparam SinkTypeT The type of the sink to be created. + * @tparam SourceTypeT The type of the source to be created. + * @tparam NodeTypeT The type of the node to be created. + * @tparam ArgsT Variadic parameter pack, representing the arguments to be passed to the constructor of the node. + * @param name The name of the node to be created. + * @param ops The arguments to be passed to the constructor of the node. + * @return The newly created node. + */ template class NodeTypeT = node::RxNode, typename... ArgsT> - auto make_node(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node(std::string name, ArgsT&&... ops); + /** + * Constructs a node object of type NodeTypeT, initialized with arguments ops + * @tparam SinkTypeT The type of the sink + * @tparam SourceTypeT The type of the source + * @tparam NodeTypeT The type of the node, defaults to mrc::node::RxNode + * @tparam ArgsT The types of the arguments passed to the node constructor + * @param name The name of the node + * @param ops The arguments passed to the node constructor + * @return An object of type NodeTypeT constructed with the provided arguments + */ template class NodeTypeT = node::RxNode, typename... ArgsT> - auto make_node(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node(std::string name, ArgsT&&... ops); + /** + * Creates and returns an instance of a node component with the specified type, name and arguments. + * @tparam SinkTypeT The sink type of the node component to be created. + * @tparam SourceTypeT The source type of the node component to be created. + * @tparam NodeTypeT The type of the node component to be created. + * @tparam ArgsT Variadic template argument for the node component's constructor. + * @param name The name of the node component to be created. + * @param ops Variadic argument for the node component's constructor. + * @return An instance of the created node component. + */ template class NodeTypeT = node::RxNodeComponent, typename... ArgsT> - auto make_node_component(std::string name, ArgsT&&... ops) - { - return construct_object>(name, std::forward(ops)...); - } + auto make_node_component(std::string name, ArgsT&&... ops); /** * Instantiate a segment module of `ModuleTypeT`, intialize it, and return it to the caller @@ -202,21 +246,13 @@ class Builder final * @return Return a shared pointer to the new module, which is a derived class of SegmentModule */ template - std::shared_ptr make_module(std::string module_name, nlohmann::json config = {}) - { - static_assert(std::is_base_of_v); - - auto module = std::make_shared(std::move(module_name), std::move(config)); - init_module(module); - - return std::move(module); - } + std::shared_ptr make_module(std::string module_name, nlohmann::json config = {}); /** * Initialize a SegmentModule that was instantiated outside of the builder. * @param module Module to initialize */ - void init_module(std::shared_ptr module); + void init_module(std::shared_ptr smodule); /** * Register an input port on the given module -- note: this in generally only necessary for dynamically @@ -242,167 +278,266 @@ class Builder final */ void register_module_output(std::string output_name, std::shared_ptr object); + /** + * Load an existing, registered module, initialize it, and return it to the caller + * @param module_id Unique ID of the module to load + * @param registry_namespace Namespace where the module id is registered + * @param module_name Unique name of this instance of the module + * @param config Configuration to pass to the module + * @return Return a shared pointer to the new module, which is a derived class of SegmentModule + */ std::shared_ptr load_module_from_registry(const std::string& module_id, const std::string& registry_namespace, std::string module_name, nlohmann::json config = {}); - template - void make_edge(std::shared_ptr> source, std::shared_ptr> sink) - { - DVLOG(10) << "forming segment edge between two segment objects"; - mrc::make_edge(source->object(), sink->object()); - } + /** + * Create an edge between two things that are convertible to ObjectProperties + * @tparam SourceNodeTypeT Type hint for the source node -- optional -- this will be used if the type of the source + * object cannot be directly determined. + * @tparam SinkNodeTypeT Type hint for the sink node -- optional -- this will be used if the type of the sink object + * cannot be directly determined. + * @tparam SourceObjectT Concept conforming type of the source object + * @tparam SinkObjectT Concept conforming type of the sink object + * @param source Edge source + * @param sink Edge Sink + */ + template + void make_edge(SourceObjectT source, SinkObjectT sink); /** - * Partial dynamic edge construction: * - * Create edge using a fully constructed Object and a type erased Object - * We extract the underlying node object (Likely an RxNode) and call make_edge with it and the type erased - * object. This works via a cascaded type extraction process. - * @tparam SourceNodeTypeT - * @param source Fully typed, wrapped, object - * @param sink Type erased object -- assumed to be convertible to source type + * @tparam EdgeDataTypeT + * @tparam SourceObjectT Data type that can be resolved to an ObjectProperties, representing the source + * @tparam SinkObjectT Data type that can be resolved to an ObjectProperties, representing the sink + * @tparam SpliceInputObjectT Data type that can be resolved to an ObjectProperties, representing the splice's input + * @tparam SpliceOutputObjectT Data type that can be resolved to an ObjectProperties, representing the splice's + * output + * @param source Existing, connected, edge source + * @param sink Existing, connected, edge sink + * @param splice_input Existing, unconnected, splice input + * @param splice_output Existing, unconnected, splice output */ - template - void make_edge(std::shared_ptr>& source, std::shared_ptr sink) - { - if constexpr (is_base_of_template::value) - { - if (sink->is_writable_provider()) - { - mrc::make_edge(source->object(), - sink->template writable_provider_typed()); - return; - } - } + template + void splice_edge(SourceObjectT source, + SinkObjectT sink, + SpliceInputObjectT splice_input, + SpliceOutputObjectT splice_output); - if constexpr (is_base_of_template::value) - { - if (sink->is_readable_acceptor()) - { - mrc::make_edge(source->object(), - sink->template readable_acceptor_typed()); - return; - } - } + template + void add_throughput_counter(std::shared_ptr> segment_object); - LOG(ERROR) << "Incorrect node types"; - } + template + void add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable); - /** - * Partial dynamic edge construction: - * - * Create edge using a fully constructed Object and a type erased Object - * We extract the underlying node object (Likely an RxNode) and call make_edge with it and the type erased - * object. This works via a cascaded type extraction process. - * @tparam SinkNodeTypeT - * @param source Fully typed, wrapped, object - * @param sink Fully typed, wrapped, object - */ - template - void make_edge(std::shared_ptr source, std::shared_ptr>& sink) - { - if constexpr (is_base_of_template::value) - { - if (source->is_writable_acceptor()) - { - mrc::make_edge(source->template writable_acceptor_typed(), - sink->object()); - return; - } - } + private: + using sp_segment_module_t = std::shared_ptr; - if constexpr (is_base_of_template::value) - { - if (source->is_readable_provider()) - { - mrc::make_edge(source->template readable_provider_typed(), - sink->object()); - return; - } - } + std::string m_namespace_prefix; + std::vector m_namespace_stack{}; + std::vector m_module_stack{}; - LOG(ERROR) << "Incorrect node types"; - } + internal::segment::IBuilder& m_backend; + + void ns_push(sp_segment_module_t smodule); + + void ns_pop(); + + template + ObjectProperties& to_object_properties(ObjectReprT& repr); + + friend Definition; +}; + +template +std::shared_ptr> Builder::construct_object(std::string name, ArgsT&&... args) +{ + auto ns_name = m_namespace_prefix.empty() ? name : m_namespace_prefix + "/" + name; + auto uptr = std::make_unique(std::forward(args)...); - template - void make_dynamic_edge(const std::string& source_name, const std::string& sink_name) + ::add_stats_watcher_if_rx_source(*uptr, ns_name); + ::add_stats_watcher_if_rx_sink(*uptr, ns_name); + + return make_object(std::move(ns_name), std::move(uptr)); +} + +template class NodeTypeT, typename CreateFnT> +auto Builder::make_source(std::string name, CreateFnT&& create_fn) +{ + return construct_object>( + name, + rxcpp::observable<>::create(std::forward(create_fn))); +} + +template class NodeTypeT> +auto Builder::make_source(std::string name, rxcpp::observable obs) +{ + return construct_object>(name, obs); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_sink(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, rxcpp::make_observer(std::forward(ops)...)); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_sink_component(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, rxcpp::make_observer(std::forward(ops)...)); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_node(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template + class NodeTypeT, + typename... ArgsT> +auto Builder::make_node(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template class NodeTypeT, typename... ArgsT> +auto Builder::make_node_component(std::string name, ArgsT&&... ops) +{ + return construct_object>(name, std::forward(ops)...); +} + +template +std::shared_ptr Builder::make_module(std::string module_name, nlohmann::json config) +{ + static_assert(std::is_base_of_v); + + auto smodule = std::make_shared(std::move(module_name), std::move(config)); + init_module(smodule); + + return std::move(smodule); +} + +template +void Builder::make_edge(SourceObjectT source, SinkObjectT sink) + +{ + DVLOG(10) << "forming edge between two segment objects"; + using source_sp_type_t = typename mrc_object_sptr_type_t::source_type_t; // Might be void + using sink_sp_type_t = typename mrc_object_sptr_type_t::sink_type_t; // Might be void + + auto& source_object = to_object_properties(source); + auto& sink_object = to_object_properties(sink); + + // If we can determine the type from the actual object, use that, then fall back to hints or defaults. + using deduced_source_type_t = first_non_void_type_t; // Fallback to Sink explicit hint + using deduced_sink_type_t = first_non_void_type_t; // Fallback to Source explicit hint + + VLOG(2) << "Deduced source type: " << mrc::type_name() << std::endl; + VLOG(2) << "Deduced sink type: " << mrc::type_name() << std::endl; + + if (source_object.is_writable_acceptor() && sink_object.is_writable_provider()) { - auto& source_obj = m_backend.find_object(source_name); - auto& sink_obj = m_backend.find_object(sink_name); - this->make_dynamic_edge(source_obj, sink_obj); + mrc::make_edge(source_object.template writable_acceptor_typed(), + sink_object.template writable_provider_typed()); + return; } - template - void make_dynamic_edge(std::shared_ptr source, - std::shared_ptr sink) + if (source_object.is_readable_provider() && sink_object.is_readable_acceptor()) { - this->make_dynamic_edge(*source, *sink); + mrc::make_edge(source_object.template readable_provider_typed(), + sink_object.template readable_acceptor_typed()); + return; } - template - void make_dynamic_edge(segment::ObjectProperties& source, segment::ObjectProperties& sink) + LOG(ERROR) << "Incompatible node types"; +} + +template +void Builder::splice_edge(SourceObjectT source, + SinkObjectT sink, + SpliceInputObjectT splice_input, + SpliceOutputObjectT splice_output) + +{ + auto& source_object = to_object_properties(source); + auto& sink_object = to_object_properties(sink); + + auto& splice_input_object = to_object_properties(splice_input); + auto& splice_output_object = to_object_properties(splice_output); + + CHECK(source_object.is_source()) << "Source object is not a source"; + CHECK(sink_object.is_sink()) << "Sink object is not a sink"; + + // TODO(Devin): this is slightly more constrained that it needs to be. We don't actually need to know the + // type of a provider, but because of the way type testing is done on the edg ebuilder, its a bit of a pain + // to pass in an untyped Provider. We can fix this later. + if (source_object.is_writable_acceptor()) { - if (source.is_writable_acceptor() && sink.is_writable_provider()) + if (sink_object.is_writable_provider()) { - mrc::make_edge(source.template writable_acceptor_typed(), - sink.template writable_provider_typed()); - return; - } + CHECK(splice_input_object.is_writable_provider()) << "Splice input must be of type WritableProvider"; + CHECK(splice_output_object.is_writable_acceptor()) << "Splice output must be WritableAcceptor"; - if (source.is_readable_provider() && sink.is_readable_acceptor()) - { - mrc::make_edge(source.template readable_provider_typed(), - sink.template readable_acceptor_typed()); - return; - } + // Cast our object into something we can insert as a splice. + auto& splice_writable_provider = splice_input_object.template writable_provider_typed(); + auto& splice_writable_acceptor = splice_output_object.template writable_acceptor_typed(); - LOG(ERROR) << "Incorrect node types"; - } + auto& writable_acceptor = source_object.template writable_acceptor_typed(); + auto& writable_provider = sink_object.template writable_provider_typed(); - template - void add_throughput_counter(std::shared_ptr> segment_object) - { - auto runnable = std::dynamic_pointer_cast>(segment_object); - CHECK(runnable); - CHECK(segment_object->is_source()); - using source_type_t = typename ObjectT::source_type_t; - auto counter = m_backend.make_throughput_counter(runnable->name()); - runnable->object().add_epilogue_tap([counter](const source_type_t& data) { - counter(1); - }); - } + edge::EdgeBuilder::splice_edge(writable_acceptor, + writable_provider, + splice_writable_provider, + splice_writable_acceptor); - template - void add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable) - { - auto runnable = std::dynamic_pointer_cast>(segment_object); - CHECK(runnable); - CHECK(segment_object->is_source()); - using source_type_t = typename ObjectT::source_type_t; - using tick_fn_t = std::function; - tick_fn_t tick_fn = callable; - auto counter = m_backend.make_throughput_counter(runnable->name()); - runnable->object().add_epilogue_tap([counter, tick_fn](const source_type_t& data) { - counter(tick_fn(data)); - }); + return; + } } + else if (source_object.is_readable_provider()) + { + if (sink_object.is_readable_acceptor()) + { + CHECK(splice_input_object.is_readable_acceptor()) << "Splice input must be of type ReadableAcceptor"; + CHECK(splice_output_object.is_readable_provider()) << "Splice output must be ReadableProvider"; - private: - using sp_segment_module_t = std::shared_ptr; + // Cast our object into something we can insert as a splice. + auto& splice_readable_acceptor = splice_input_object.template readable_acceptor_typed(); + auto& splice_readable_provider = splice_output_object.template readable_provider_typed(); - std::string m_namespace_prefix; - std::vector m_namespace_stack{}; - std::vector m_module_stack{}; + auto& readable_provider = source_object.template readable_provider_typed(); + auto& readable_acceptor = sink_object.template readable_acceptor_typed(); - internal::segment::IBuilder& m_backend; + edge::EdgeBuilder::splice_edge(readable_provider, + readable_acceptor, + splice_readable_acceptor, + splice_readable_provider); - void ns_push(sp_segment_module_t module); - void ns_pop(); + return; + } + } - friend Definition; -}; + throw std::runtime_error("Attempt to splice unsupported edge types"); +} template std::shared_ptr> Builder::make_object(std::string name, std::unique_ptr node) @@ -472,4 +607,75 @@ std::shared_ptr>> Builder::get_ingress(std::string return port; } -} // namespace mrc::segment +template +void Builder::add_throughput_counter(std::shared_ptr> segment_object) +{ + auto runnable = std::dynamic_pointer_cast>(segment_object); + CHECK(runnable); + CHECK(segment_object->is_source()); + using source_type_t = typename ObjectT::source_type_t; + auto counter = m_backend.make_throughput_counter(runnable->name()); + runnable->object().add_epilogue_tap([counter](const source_type_t& data) { + counter(1); + }); +} + +template +void Builder::add_throughput_counter(std::shared_ptr> segment_object, CallableT&& callable) +{ + auto runnable = std::dynamic_pointer_cast>(segment_object); + CHECK(runnable); + CHECK(segment_object->is_source()); + using source_type_t = typename ObjectT::source_type_t; + using tick_fn_t = std::function; + tick_fn_t tick_fn = callable; + auto counter = m_backend.make_throughput_counter(runnable->name()); + runnable->object().add_epilogue_tap([counter, tick_fn](const source_type_t& data) { + counter(tick_fn(data)); + }); +} + +/* Private Member Functions */ +template +ObjectProperties& Builder::to_object_properties(ObjectReprT& repr) +{ + ObjectProperties* object_properties_ptr{nullptr}; + if constexpr (is_shared_ptr_v) + { + // SP to Object + if constexpr (MRCObjectSharedPtr) + { + auto object_properties_ptr_props_ptr = std::dynamic_pointer_cast(repr); + object_properties_ptr = std::addressof(*object_properties_ptr_props_ptr); + } + // SP to ObjectProperties + else if constexpr (MRCObjPropSharedPtr) + { + object_properties_ptr = std::addressof(*repr); + } + { + object_properties_ptr = std::addressof(*repr); + } + } + // Object + else if constexpr (MRCObject) + { + object_properties_ptr = std::addressof(dynamic_cast(repr)); + } + // ObjectProperties + else if constexpr (MRCObjProp) + { + object_properties_ptr = std::addressof(repr); + } + // String-like lookup + else + { + object_properties_ptr = std::addressof(m_backend.find_object(repr)); + } + + CHECK(object_properties_ptr != nullptr) << "If this fails, something is wrong with the concept definition"; + + return *object_properties_ptr; +} + +} // namespace mrc::segment \ No newline at end of file diff --git a/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp b/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp new file mode 100644 index 000000000..862b3f0ba --- /dev/null +++ b/cpp/mrc/include/mrc/segment/concepts/object_traits.hpp @@ -0,0 +1,90 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// Created by drobison on 1/20/23. +// + +#pragma once + +#include "mrc/segment/object.hpp" +#include "mrc/type_traits.hpp" + +#include + +namespace mrc { + +template +struct is_mrc_object_type : public std::false_type +{}; + +template +struct is_mrc_object_type> : public std::true_type +{}; + +template +inline constexpr bool is_mrc_object_v = is_mrc_object_type::value; // NOLINT + +template +struct is_mrc_object_shared_pointer : public std::false_type +{}; + +template +struct is_mrc_object_shared_pointer>> : public std::true_type +{}; + +template +inline constexpr bool is_mrc_object_shared_ptr_v = is_mrc_object_shared_pointer::value; // NOLINT + +struct mrc_object_null_type // NOLINT +{ + using source_type_t = void; + using sink_type_t = void; +}; + +template +struct mrc_object_sptr_type // NOLINT +{ + using type_t = mrc_object_null_type; +}; + +template +struct mrc_object_sptr_type>> +{ + using type_t = T; +}; + +template +using mrc_object_sptr_type_t = typename mrc_object_sptr_type::type_t; + +template +concept MRCObject = is_mrc_object_v; + +template +concept MRCObjectSharedPtr = is_mrc_object_shared_ptr_v; + +template +concept MRCObjProp = std::is_same_v, mrc::segment::ObjectProperties>; + +template +concept MRCObjPropSharedPtr = std::is_same_v, std::shared_ptr>; + +template +concept MRCObjectProxy = MRCObject || MRCObjectSharedPtr || MRCObjProp || + MRCObjPropSharedPtr || std::is_convertible_v; + +} // namespace mrc diff --git a/cpp/mrc/include/mrc/segment/object.hpp b/cpp/mrc/include/mrc/segment/object.hpp index 1b7a7dd50..9d389a88a 100644 --- a/cpp/mrc/include/mrc/segment/object.hpp +++ b/cpp/mrc/include/mrc/segment/object.hpp @@ -353,5 +353,4 @@ edge::IReadableProviderBase& Object::readable_provider_base() CHECK(base); return *base; } - } // namespace mrc::segment diff --git a/cpp/mrc/include/mrc/type_traits.hpp b/cpp/mrc/include/mrc/type_traits.hpp index 19f235fd5..d8bb459ad 100644 --- a/cpp/mrc/include/mrc/type_traits.hpp +++ b/cpp/mrc/include/mrc/type_traits.hpp @@ -115,4 +115,26 @@ template