From a8b99e48c8507eeb36781086ff033591e18b85e3 Mon Sep 17 00:00:00 2001 From: Ruiyu Zhu Date: Mon, 6 Mar 2023 11:03:14 -0800 Subject: [PATCH 1/5] logic for processing peer's data Differential Revision: D43576520 fbshipit-source-id: dd12e10bbcf10aa9f807494a2377ead042fb6ba8 --- .../UdpEncryptor/UdpEncryptor.cpp | 49 +++++++++++++++++++ .../UdpEncryptor/UdpEncryptor.h | 49 +++++++++++++++++++ .../UdpEncryptor/test/UdpEncryptionMock.h | 41 ++++++++++++++++ .../UdpEncryptor/test/UdpEncryptorTest.cpp | 36 ++++++++++++++ 4 files changed, 175 insertions(+) create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp new file mode 100644 index 000000000..9d639c31c --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" + +namespace unified_data_process { + +// load a line that is to be processed later. +void UdpEncryptor::pushOneLineFromMe( + std::vector&& /*serializedLine*/) { + throw std::runtime_error("not implemented"); +} + +// set the config for peer's data. +void UdpEncryptor::setPeerConfig( + size_t totalNumberOfPeerRows, + size_t peerDataWidth, + const std::vector& indexes) { + udpEncryption_->prepareToProcessPeerData(peerDataWidth, indexes); + auto loop = [this, totalNumberOfPeerRows]() { + size_t numberOfProcessedRow = 0; + while (numberOfProcessedRow < totalNumberOfPeerRows) { + udpEncryption_->processPeerData( + std::min(chunkSize_, totalNumberOfPeerRows - numberOfProcessedRow)); + numberOfProcessedRow += chunkSize_; + } + }; + udpThreadForPeer_ = std::make_unique(loop); +} + +UdpEncryptor::EncryptionResuts UdpEncryptor::getEncryptionResults() const { + if (udpThreadForPeer_ == nullptr) { + throw std::runtime_error("No thread to join for peer!"); + } + udpThreadForPeer_->join(); + + auto [ciphertexts, nonces, indexes] = udpEncryption_->getProcessedData(); + return EncryptionResuts{ciphertexts, nonces, indexes}; +} + +std::vector<__m128i> UdpEncryptor::getExpandedKey() const { + throw std::runtime_error("not implemented"); +} + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h new file mode 100644 index 000000000..16f4386ee --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/IUdpEncryption.h" + +namespace unified_data_process { + +class UdpEncryptor { + using UdpEncryption = + fbpcf::mpc_std_lib::unified_data_process::data_processor::IUdpEncryption; + + public: + using EncryptionResuts = UdpEncryption::EncryptionResuts; + + UdpEncryptor(std::unique_ptr udpEncryption, size_t chunkSize) + : udpEncryption_(std::move(udpEncryption)), + udpThreadForMySelf_(nullptr), + udpThreadForPeer_(nullptr), + chunkSize_(chunkSize) {} + + // load a line that is to be processed later. + void pushOneLineFromMe(std::vector&& serializedLine); + + // set the config for peer's data. + void setPeerConfig( + size_t totalNumberOfPeerRows, + size_t peerDataWidth, + const std::vector& indexes); + + EncryptionResuts getEncryptionResults() const; + + std::vector<__m128i> getExpandedKey() const; + + private: + std::unique_ptr udpEncryption_; + std::unique_ptr udpThreadForMySelf_; + std::unique_ptr udpThreadForPeer_; + size_t chunkSize_; +}; + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h new file mode 100644 index 000000000..b767e806c --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/IUdpEncryption.h" + +namespace unified_data_process { + +using namespace ::testing; + +class UdpEncryptionMock final + : public fbpcf::mpc_std_lib::unified_data_process::data_processor:: + IUdpEncryption { + public: + MOCK_METHOD(void, prepareToProcessMyData, (size_t)); + + MOCK_METHOD( + void, + processMyData, + (const std::vector>&)); + + MOCK_METHOD(std::vector<__m128i>, getExpandedKey, ()); + + MOCK_METHOD( + void, + prepareToProcessPeerData, + (size_t, const std::vector&)); + + MOCK_METHOD(void, processPeerData, (size_t)); + + MOCK_METHOD(EncryptionResuts, getProcessedData, ()); +}; + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp new file mode 100644 index 000000000..705d2fed5 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h" + +#include +#include + +using namespace ::testing; +namespace unified_data_process { + +TEST(UdpEncryptorTest, testProcessingPeerData) { + int chunkSize = 500; + int totalRow = 1200; + size_t dataWidth = 32; + std::vector indexes{3, 31, 6, 12, 5}; + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessPeerData(dataWidth, indexes)).Times(1); + EXPECT_CALL(*mock, processPeerData(chunkSize)).Times(totalRow / chunkSize); + if (totalRow % chunkSize != 0) { + EXPECT_CALL(*mock, processPeerData(totalRow % chunkSize)).Times(1); + } + EXPECT_CALL(*mock, getProcessedData()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + encryptor.setPeerConfig(totalRow, dataWidth, indexes); + encryptor.getEncryptionResults(); +} + +} // namespace unified_data_process From d4e04d57a25422fa7216d3b3912c644c8ee06c6e Mon Sep 17 00:00:00 2001 From: Ruiyu Zhu Date: Mon, 6 Mar 2023 11:03:14 -0800 Subject: [PATCH 2/5] logic for processing my data Differential Revision: D43576518 fbshipit-source-id: 5f041b01f875d68747743d9642aad5c04682bfaf --- .../UdpEncryptor/UdpEncryptor.cpp | 74 ++++++++++++- .../UdpEncryptor/UdpEncryptor.h | 23 +++- .../UdpEncryptor/test/UdpEncryptorTest.cpp | 101 +++++++++++++++++- 3 files changed, 191 insertions(+), 7 deletions(-) diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp index 9d639c31c..458cb1ce5 100644 --- a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp @@ -6,13 +6,74 @@ */ #include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" +#include namespace unified_data_process { +/** + * The idea here is to distribute the workload across more threads. This + * UdpEncryptor object will read in data in the main thread and buffer them. + * Once there are sufficient buffered data (defined by chunkSize_), the buffered + * data will be passed to the underlying udp encryption object to process in a + * background thread. + */ +void UdpEncryptor::processDataInBuffer() { + if (udpThreadForMySelf_ != nullptr) { + // this is not the first time of executing processingMyData + udpThreadForMySelf_->join(); + } else { + // this is the first time of executing processingMyData, need to call + // preparation first + udpEncryption_->prepareToProcessMyData( + bufferForMyDataInLoading_->at(0).size()); + } + if (bufferIndex_ < chunkSize_) { + bufferForMyDataInLoading_->resize(bufferIndex_); + } + std::swap(bufferForMyDataInLoading_, bufferForMyDataInProcessing_); + bufferIndex_ = 0; + udpThreadForMySelf_ = std::make_unique([this]() { + if (bufferForMyDataInProcessing_->size() == 0) { + return; + } + udpEncryption_->processMyData(*bufferForMyDataInProcessing_); + }); +} + // load a line that is to be processed later. void UdpEncryptor::pushOneLineFromMe( - std::vector&& /*serializedLine*/) { - throw std::runtime_error("not implemented"); + std::vector&& serializedLine) { + bufferForMyDataInLoading_->at(bufferIndex_++) = std::move(serializedLine); + if (bufferIndex_ >= chunkSize_) { + processDataInBuffer(); + } +} + +// load multiple lines into the buffer. +void UdpEncryptor::pushLinesFromMe( + std::vector>&& serializedLines) { + size_t inputIndex = 0; + + while (inputIndex < serializedLines.size()) { + if (chunkSize_ - bufferIndex_ <= serializedLines.size() - inputIndex) { + std::copy( + serializedLines.begin() + inputIndex, + serializedLines.begin() + inputIndex + chunkSize_ - bufferIndex_, + bufferForMyDataInLoading_->begin() + bufferIndex_); + inputIndex += chunkSize_ - bufferIndex_; + // the buffer is full, the index should be changed to chunkSize_ + bufferIndex_ = chunkSize_; + processDataInBuffer(); + } else { + std::copy( + serializedLines.begin() + inputIndex, + serializedLines.end(), + bufferForMyDataInLoading_->begin() + bufferIndex_); + + bufferIndex_ += serializedLines.size() - inputIndex; + inputIndex = serializedLines.size(); + } + } } // set the config for peer's data. @@ -42,8 +103,13 @@ UdpEncryptor::EncryptionResuts UdpEncryptor::getEncryptionResults() const { return EncryptionResuts{ciphertexts, nonces, indexes}; } -std::vector<__m128i> UdpEncryptor::getExpandedKey() const { - throw std::runtime_error("not implemented"); +std::vector<__m128i> UdpEncryptor::getExpandedKey() { + processDataInBuffer(); + if (udpThreadForMySelf_ == nullptr) { + throw std::runtime_error("No thread to join for peer!"); + } + udpThreadForMySelf_->join(); + return udpEncryption_->getExpandedKey(); } } // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h index 16f4386ee..ee6380552 100644 --- a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h @@ -24,11 +24,22 @@ class UdpEncryptor { : udpEncryption_(std::move(udpEncryption)), udpThreadForMySelf_(nullptr), udpThreadForPeer_(nullptr), - chunkSize_(chunkSize) {} + chunkSize_(chunkSize), + bufferIndex_(0), + bufferForMyDataInLoading_{ + std::make_unique>>( + chunkSize_)}, + bufferForMyDataInProcessing_( + std::make_unique>>( + chunkSize_)) {} // load a line that is to be processed later. void pushOneLineFromMe(std::vector&& serializedLine); + // load a number of lines that is to be processed later. + void pushLinesFromMe( + std::vector>&& serializedLines); + // set the config for peer's data. void setPeerConfig( size_t totalNumberOfPeerRows, @@ -37,13 +48,21 @@ class UdpEncryptor { EncryptionResuts getEncryptionResults() const; - std::vector<__m128i> getExpandedKey() const; + std::vector<__m128i> getExpandedKey(); private: + void processDataInBuffer(); + std::unique_ptr udpEncryption_; std::unique_ptr udpThreadForMySelf_; std::unique_ptr udpThreadForPeer_; size_t chunkSize_; + + size_t bufferIndex_; + std::unique_ptr>> + bufferForMyDataInLoading_; + std::unique_ptr>> + bufferForMyDataInProcessing_; }; } // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp index 705d2fed5..bf7008780 100644 --- a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp @@ -14,7 +14,7 @@ using namespace ::testing; namespace unified_data_process { -TEST(UdpEncryptorTest, testProcessingPeerData) { +TEST(UdpEncryptorTestWithMock, testProcessingPeerData) { int chunkSize = 500; int totalRow = 1200; size_t dataWidth = 32; @@ -33,4 +33,103 @@ TEST(UdpEncryptorTest, testProcessingPeerData) { encryptor.getEncryptionResults(); } +TEST(UdpEncryptorTestWithMock, testProcessingMyData) { + size_t chunkSize = 200; + size_t sampleSize = 219; + size_t totalRow = 1200; + size_t width = 32; + + std::vector>> testData; + std::vector>> testData1; + for (size_t i = 0; i < totalRow; i += chunkSize) { + testData.push_back(std::vector>()); + testData.back().reserve(std::min(chunkSize, totalRow - i)); + for (size_t j = 0; (j < chunkSize) && (j + i < totalRow); j++) { + testData.back().push_back( + std::vector(width, (i + j) & 0xFF)); + if ((i + j) % sampleSize == 0) { + testData1.push_back(std::vector>()); + } + testData1.back().push_back( + std::vector(width, (i + j) & 0xFF)); + } + } + + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessMyData(width)).Times(1); + for (size_t i = 0; i < testData.size(); i++) { + EXPECT_CALL(*mock, processMyData(testData.at(i))).Times(1); + } + EXPECT_CALL(*mock, getExpandedKey()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + for (size_t i = 0; i < testData1.size() / 2; i++) { + for (size_t j = 0; j < testData1.at(i).size(); j++) { + encryptor.pushOneLineFromMe(std::move(testData1.at(i).at(j))); + } + } + for (size_t i = testData1.size() / 2; i < testData1.size(); i++) { + encryptor.pushLinesFromMe(std::move(testData1.at(i))); + } + encryptor.getExpandedKey(); +} + +TEST(UdpEncryptorTestWithMock, testProcessingBothSidesData) { + size_t chunkSize = 200; + size_t sampleSize = 219; + size_t myTotalRow = 1200; + size_t peerTotalRow = 1500; + size_t myWidth = 32; + size_t peerWidth = 35; + std::vector indexes{3, 31, 6, 12, 5}; + + std::vector>> testData; + std::vector>> testData1; + + for (size_t i = 0; i < myTotalRow; i += chunkSize) { + testData.push_back(std::vector>()); + testData.back().reserve(std::min(chunkSize, myTotalRow - i)); + for (size_t j = 0; j < chunkSize && j + i < myTotalRow; j++) { + testData.back().push_back( + std::vector(myWidth, (i + j) & 0xFF)); + if ((i + j) % sampleSize == 0) { + testData1.push_back(std::vector>()); + } + testData1.back().push_back( + std::vector(myWidth, (i + j) & 0xFF)); + } + } + + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessMyData(myWidth)).Times(1); + for (size_t i = 0; i < testData.size(); i++) { + EXPECT_CALL(*mock, processMyData(testData.at(i))).Times(1); + } + EXPECT_CALL(*mock, getExpandedKey()).Times(1); + + EXPECT_CALL(*mock, prepareToProcessPeerData(peerWidth, indexes)).Times(1); + EXPECT_CALL(*mock, processPeerData(chunkSize)) + .Times(peerTotalRow / chunkSize); + if (peerTotalRow % chunkSize != 0) { + EXPECT_CALL(*mock, processPeerData(peerTotalRow % chunkSize)).Times(1); + } + EXPECT_CALL(*mock, getProcessedData()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + encryptor.setPeerConfig(peerTotalRow, peerWidth, indexes); + + for (size_t i = 0; i < testData1.size() / 2; i++) { + for (size_t j = 0; j < testData1.at(i).size(); j++) { + encryptor.pushOneLineFromMe(std::move(testData1.at(i).at(j))); + } + } + for (size_t i = testData1.size() / 2; i < testData1.size(); i++) { + encryptor.pushLinesFromMe(std::move(testData1.at(i))); + } + encryptor.getExpandedKey(); + encryptor.getEncryptionResults(); +} + } // namespace unified_data_process From 39e434e0850cbc83bb249f46e59a33358cabe162 Mon Sep 17 00:00:00 2001 From: Ruiyu Zhu Date: Mon, 6 Mar 2023 11:03:14 -0800 Subject: [PATCH 3/5] Add global parameters Differential Revision: D43726208 fbshipit-source-id: dd0b3edf01ef1c4231d3a49650e6bc1fa9f9e584 --- .../global_parameters/GlobalParameters.cpp | 52 +++++++++++++++++++ .../global_parameters/GlobalParameters.h | 37 +++++++++++++ .../test/GlobalParametersTest.cpp | 39 ++++++++++++++ 3 files changed, 128 insertions(+) create mode 100644 fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp create mode 100644 fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h create mode 100644 fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp diff --git a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp new file mode 100644 index 000000000..7ff51096d --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" + +namespace global_parameters { + +std::string serialize(const GlobalParameters& src) { + std::ostringstream s; + boost::archive::text_oarchive oa(s); + oa << src; + return s.str(); +} + +GlobalParameters deserialize(const std::string& src) { + GlobalParameters data; + std::istringstream s(src); + boost::archive::text_iarchive ia(s); + ia >> data; + return data; +} + +void writeToFile(const std::string& file, const GlobalParameters& gp) { + auto writer = std::make_unique( + std::make_unique(file)); + auto string = global_parameters::serialize(gp); + writer->writeString(string); + writer->close(); +} + +GlobalParameters readFromFile(const std::string& file) { + auto reader = std::make_unique( + std::make_unique(file)); + auto serializedGlobalParameters = reader->readLine(); + while (!reader->eof()) { + auto line = reader->readLine(); + serializedGlobalParameters += "\n" + line; + } + reader->close(); + return global_parameters::deserialize(serializedGlobalParameters); +} + +} // namespace global_parameters diff --git a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h new file mode 100644 index 000000000..6bc4bac5f --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace global_parameters { + +using GlobalParameterType = + boost::variant>; + +using GlobalParameters = std::unordered_map; + +std::string serialize(const GlobalParameters& src); + +GlobalParameters deserialize(const std::string& src); + +void writeToFile(const std::string& file, const GlobalParameters& gp); + +GlobalParameters readFromFile(const std::string& file); + +} // namespace global_parameters diff --git a/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp b/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp new file mode 100644 index 000000000..383592fc9 --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp @@ -0,0 +1,39 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" +#include +#include "folly/Format.h" +#include "folly/Random.h" + +using namespace ::testing; + +namespace global_parameters { + +TEST(GlobalParametersSerialization, testSerializeAndDeserialize) { + const std::string file = + folly::sformat("./global_parameter_{}", folly::Random::rand32()); + + GlobalParameters gp; + gp.emplace("test1", 3); + gp.emplace( + "test2", std::unordered_map({{1, 2}, {3, 4}, {5, 6}})); + + writeToFile(file, gp); + auto gp1 = readFromFile(file); + std::remove(file.c_str()); + + EXPECT_EQ( + boost::get(gp.at("test1")), + boost::get(gp1.at("test1"))); + + EXPECT_EQ( + (boost::get>(gp.at("test2"))), + (boost::get>(gp1.at("test2")))); +} + +} // namespace global_parameters From 7f7262f620c51d70a7e662d3d3d16c7e27dd4878 Mon Sep 17 00:00:00 2001 From: Ruiyu Zhu Date: Mon, 6 Mar 2023 11:03:14 -0800 Subject: [PATCH 4/5] Encryptor app Summary: UDP encryption app This an assembling object to host file read/write and calling into the encryptor. The integration test will be in later diff. Differential Revision: D43752781 fbshipit-source-id: e9179cc6ce47904a1f5a9590f32682830fe21e15 --- .../global_parameters/GlobalParameters.h | 8 ++ .../UdpEncryptor/UdpEncryptorApp.cpp | 126 ++++++++++++++++++ .../UdpEncryptor/UdpEncryptorApp.h | 41 ++++++ 3 files changed, 175 insertions(+) create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp create mode 100644 fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h diff --git a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h index 6bc4bac5f..96847fd03 100644 --- a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h +++ b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h @@ -21,6 +21,14 @@ namespace global_parameters { +inline const std::string KAdvRowCount = "Advertiser_Row_Count"; +inline const std::string KPubRowCount = "Publisher_Row_Count"; + +inline const std::string KAdvDataWidth = "Advertiser_Data_Width"; +inline const std::string KPubDataWidth = "Publisher_Data_Width"; + +inline const std::string KMatchedUserCount = "Matched_User_Count"; + using GlobalParameterType = boost::variant>; diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp new file mode 100644 index 000000000..f1bec45a2 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp @@ -0,0 +1,126 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/UdpUtil.h" +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" +#include "folly/String.h" + +namespace unified_data_process { + +void UdpEncryptorApp::invokeUdpEncryption( + const std::vector& indexFiles, + const std::vector& serializedDataFiles, + const std::string& globalParameters, + const std::string& dataFile, + const std::string& expandedKeyFile) { + auto t = std::thread( + [this]( + const std::vector& indexFiles, + const std::string& globalParameters) { + processPeerData(indexFiles, globalParameters); + }, + indexFiles, + globalParameters); + std::vector>>> futures; + + for (size_t i = 1; i < serializedDataFiles.size(); i++) { + futures.push_back( + std::async(UdpEncryptorApp::readDataFile, serializedDataFiles.at(i))); + } + { + // process the first file in main thread. + auto reader = std::make_unique( + std::make_unique(serializedDataFiles.at(0))); + reader->readLine(); // header, useless + while (!reader->eof()) { + auto line = reader->readLine(); + + encryptor_->pushOneLineFromMe( + std::vector(line.begin(), line.end())); + } + reader->close(); + } + for (auto& future : futures) { + encryptor_->pushLinesFromMe(future.get()); + } + t.join(); + fbpcf::mpc_std_lib::unified_data_process::data_processor:: + writeEncryptionResultsToFile( + encryptor_->getEncryptionResults(), dataFile); + fbpcf::mpc_std_lib::unified_data_process::data_processor:: + writeExpandedKeyToFile(encryptor_->getExpandedKey(), expandedKeyFile); +} + +std::vector UdpEncryptorApp::readIndexFile( + const std::string& fileName) { + auto reader = std::make_unique( + std::make_unique(fileName)); + reader->readLine(); // header, useless + + std::vector rst; + while (!reader->eof()) { + std::vector data; + auto line = reader->readLine(); + folly::split(",", std::move(line), data); + rst.push_back(stoi(data.at(1))); + } + reader->close(); + return rst; +} + +std::vector> UdpEncryptorApp::readDataFile( + const std::string& fileName) { + auto reader = std::make_unique( + std::make_unique(fileName)); + reader->readLine(); // header, useless + + std::vector> rst; + while (!reader->eof()) { + auto line = reader->readLine(); + rst.push_back(std::vector(line.begin(), line.end())); + } + reader->close(); + return rst; +} + +void UdpEncryptorApp::processPeerData( + const std::vector& indexFiles, + const std::string& globalParameterFile) const { + std::vector>> futures; + for (auto& file : indexFiles) { + futures.push_back(std::async(UdpEncryptorApp::readIndexFile, file)); + } + + auto globalParameters = global_parameters::readFromFile(globalParameterFile); + + std::vector indexes; + for (auto& future : futures) { + auto indexInFile = future.get(); + indexes.insert(indexes.end(), indexInFile.begin(), indexInFile.end()); + } + auto totalNumberOfPeerRows = boost::get( + amIPublisher_ ? globalParameters.at(global_parameters::KAdvRowCount) + : globalParameters.at(global_parameters::KPubRowCount)); + auto peerDataWidth = boost::get( + amIPublisher_ ? globalParameters.at(global_parameters::KAdvDataWidth) + : globalParameters.at(global_parameters::KPubDataWidth)); + + encryptor_->setPeerConfig(totalNumberOfPeerRows, peerDataWidth, indexes); +} + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h new file mode 100644 index 000000000..68d6fcb9f --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" + +namespace unified_data_process { + +class UdpEncryptorApp { + public: + ~UdpEncryptorApp() {} + + UdpEncryptorApp(std::unique_ptr encryptor, bool amIPublisher) + : encryptor_(std::move(encryptor)), amIPublisher_(amIPublisher) {} + + void invokeUdpEncryption( + const std::vector& indexFiles, + const std::vector& serializedDataFiles, + const std::string& globalParameters, + const std::string& dataFile, + const std::string& expandedKeyFile); + + private: + static std::vector readIndexFile(const std::string& fileName); + static std::vector> readDataFile( + const std::string& fileName); + + void processPeerData( + const std::vector& indexFiles, + const std::string& globalParameterFile) const; + + std::unique_ptr encryptor_; + bool amIPublisher_; +}; + +} // namespace unified_data_process From 9a4e42233a1393b0038e217126c1fabbc3d364f6 Mon Sep 17 00:00:00 2001 From: Mohammed Das Date: Mon, 6 Mar 2023 11:03:47 -0800 Subject: [PATCH 5/5] Use Threadpools with Coroutines over D43752781. Summary: Example diff over D43752781 v1. Differential Revision: D43843234 fbshipit-source-id: 3e8c880ecfeeb5079b140e62337bc2e54fe5fb47 --- .../UdpEncryptor/UdpEncryptorApp.cpp | 55 ++++++++++--------- .../UdpEncryptor/UdpEncryptorApp.h | 10 ++-- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp index f1bec45a2..284938ca3 100644 --- a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp @@ -19,29 +19,29 @@ #include "fbpcf/mpc_std_lib/unified_data_process/data_processor/UdpUtil.h" #include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" #include "folly/String.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/experimental/coro/Collect.h" namespace unified_data_process { -void UdpEncryptorApp::invokeUdpEncryption( +folly::coro::Task UdpEncryptorApp::invokeUdpEncryption( const std::vector& indexFiles, const std::vector& serializedDataFiles, const std::string& globalParameters, const std::string& dataFile, const std::string& expandedKeyFile) { - auto t = std::thread( - [this]( - const std::vector& indexFiles, - const std::string& globalParameters) { - processPeerData(indexFiles, globalParameters); - }, - indexFiles, - globalParameters); - std::vector>>> futures; + auto executor = + std::make_shared(serializedDataFiles.size() + 1); + + auto task1 = + processPeerData(indexFiles, globalParameters).scheduleOn(executor.get()); + + std::vector>>> tasks; for (size_t i = 1; i < serializedDataFiles.size(); i++) { - futures.push_back( - std::async(UdpEncryptorApp::readDataFile, serializedDataFiles.at(i))); + tasks.push_back(readDataFile(serializedDataFiles[i]).scheduleOn(executor.get())); } + { // process the first file in main thread. auto reader = std::make_unique( @@ -55,10 +55,12 @@ void UdpEncryptorApp::invokeUdpEncryption( } reader->close(); } - for (auto& future : futures) { - encryptor_->pushLinesFromMe(future.get()); + + auto data = co_await folly::coro::collectAllRange(std::move(tasks)); + for (auto& datum : data) { + encryptor_->pushLinesFromMe(std::move(datum)); } - t.join(); + fbpcf::mpc_std_lib::unified_data_process::data_processor:: writeEncryptionResultsToFile( encryptor_->getEncryptionResults(), dataFile); @@ -66,7 +68,7 @@ void UdpEncryptorApp::invokeUdpEncryption( writeExpandedKeyToFile(encryptor_->getExpandedKey(), expandedKeyFile); } -std::vector UdpEncryptorApp::readIndexFile( +folly::coro::Task> UdpEncryptorApp::readIndexFile( const std::string& fileName) { auto reader = std::make_unique( std::make_unique(fileName)); @@ -80,11 +82,11 @@ std::vector UdpEncryptorApp::readIndexFile( rst.push_back(stoi(data.at(1))); } reader->close(); - return rst; + co_return rst; } -std::vector> UdpEncryptorApp::readDataFile( - const std::string& fileName) { +folly::coro::Task>> +UdpEncryptorApp::readDataFile(const std::string& fileName) { auto reader = std::make_unique( std::make_unique(fileName)); reader->readLine(); // header, useless @@ -95,22 +97,25 @@ std::vector> UdpEncryptorApp::readDataFile( rst.push_back(std::vector(line.begin(), line.end())); } reader->close(); - return rst; + co_return rst; } -void UdpEncryptorApp::processPeerData( +folly::coro::Task UdpEncryptorApp::processPeerData( const std::vector& indexFiles, const std::string& globalParameterFile) const { - std::vector>> futures; + auto executor = std::make_shared(indexFiles.size()); + + std::vector>> tasks; for (auto& file : indexFiles) { - futures.push_back(std::async(UdpEncryptorApp::readIndexFile, file)); + tasks.push_back(readIndexFile(file).scheduleOn(executor.get())); } + auto results = co_await folly::coro::collectAllRange(std::move(tasks)); + auto globalParameters = global_parameters::readFromFile(globalParameterFile); std::vector indexes; - for (auto& future : futures) { - auto indexInFile = future.get(); + for (auto& indexInFile : results) { indexes.insert(indexes.end(), indexInFile.begin(), indexInFile.end()); } auto totalNumberOfPeerRows = boost::get( diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h index 68d6fcb9f..a11f890e9 100644 --- a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h @@ -7,6 +7,7 @@ #pragma once +#include #include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" namespace unified_data_process { @@ -18,7 +19,7 @@ class UdpEncryptorApp { UdpEncryptorApp(std::unique_ptr encryptor, bool amIPublisher) : encryptor_(std::move(encryptor)), amIPublisher_(amIPublisher) {} - void invokeUdpEncryption( + folly::coro::Task invokeUdpEncryption( const std::vector& indexFiles, const std::vector& serializedDataFiles, const std::string& globalParameters, @@ -26,11 +27,12 @@ class UdpEncryptorApp { const std::string& expandedKeyFile); private: - static std::vector readIndexFile(const std::string& fileName); - static std::vector> readDataFile( + static folly::coro::Task> readIndexFile( const std::string& fileName); + static folly::coro::Task>> + readDataFile(const std::string& fileName); - void processPeerData( + folly::coro::Task processPeerData( const std::vector& indexFiles, const std::string& globalParameterFile) const;