Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Threadpools with Coroutines over D43752781. #2146

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <fbpcf/io/api/BufferedReader.h>
#include <fbpcf/io/api/BufferedWriter.h>
#include <fbpcf/io/api/FileReader.h>
#include <fbpcf/io/api/FileWriter.h>

#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h"

namespace global_parameters {

std::string serialize(const GlobalParameters& src) {
std::ostringstream s;
boost::archive::text_oarchive oa(s);
oa << src;
return s.str();
}

GlobalParameters deserialize(const std::string& src) {
GlobalParameters data;
std::istringstream s(src);
boost::archive::text_iarchive ia(s);
ia >> data;
return data;
}

void writeToFile(const std::string& file, const GlobalParameters& gp) {
auto writer = std::make_unique<fbpcf::io::BufferedWriter>(
std::make_unique<fbpcf::io::FileWriter>(file));
auto string = global_parameters::serialize(gp);
writer->writeString(string);
writer->close();
}

GlobalParameters readFromFile(const std::string& file) {
auto reader = std::make_unique<fbpcf::io::BufferedReader>(
std::make_unique<fbpcf::io::FileReader>(file));
auto serializedGlobalParameters = reader->readLine();
while (!reader->eof()) {
auto line = reader->readLine();
serializedGlobalParameters += "\n" + line;
}
reader->close();
return global_parameters::deserialize(serializedGlobalParameters);
}

} // namespace global_parameters
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <stdint.h>
#include <fstream>

#include <boost/serialization/unordered_map.hpp>
#include <boost/serialization/variant.hpp>
#include <memory>
#include <string>
#include <unordered_map>
#include <variant>

namespace global_parameters {

inline const std::string KAdvRowCount = "Advertiser_Row_Count";
inline const std::string KPubRowCount = "Publisher_Row_Count";

inline const std::string KAdvDataWidth = "Advertiser_Data_Width";
inline const std::string KPubDataWidth = "Publisher_Data_Width";

inline const std::string KMatchedUserCount = "Matched_User_Count";

using GlobalParameterType =
boost::variant<int32_t, std::unordered_map<int32_t, int32_t>>;

using GlobalParameters = std::unordered_map<std::string, GlobalParameterType>;

std::string serialize(const GlobalParameters& src);

GlobalParameters deserialize(const std::string& src);

void writeToFile(const std::string& file, const GlobalParameters& gp);

GlobalParameters readFromFile(const std::string& file);

} // namespace global_parameters
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h"
#include <gtest/gtest.h>
#include "folly/Format.h"
#include "folly/Random.h"

using namespace ::testing;

namespace global_parameters {

TEST(GlobalParametersSerialization, testSerializeAndDeserialize) {
const std::string file =
folly::sformat("./global_parameter_{}", folly::Random::rand32());

GlobalParameters gp;
gp.emplace("test1", 3);
gp.emplace(
"test2", std::unordered_map<int32_t, int32_t>({{1, 2}, {3, 4}, {5, 6}}));

writeToFile(file, gp);
auto gp1 = readFromFile(file);
std::remove(file.c_str());

EXPECT_EQ(
boost::get<int32_t>(gp.at("test1")),
boost::get<int32_t>(gp1.at("test1")));

EXPECT_EQ(
(boost::get<std::unordered_map<int32_t, int32_t>>(gp.at("test2"))),
(boost::get<std::unordered_map<int32_t, int32_t>>(gp1.at("test2"))));
}

} // namespace global_parameters
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h"
#include <thread>

namespace unified_data_process {

/**
* The idea here is to distribute the workload across more threads. This
* UdpEncryptor object will read in data in the main thread and buffer them.
* Once there are sufficient buffered data (defined by chunkSize_), the buffered
* data will be passed to the underlying udp encryption object to process in a
* background thread.
*/
void UdpEncryptor::processDataInBuffer() {
if (udpThreadForMySelf_ != nullptr) {
// this is not the first time of executing processingMyData
udpThreadForMySelf_->join();
} else {
// this is the first time of executing processingMyData, need to call
// preparation first
udpEncryption_->prepareToProcessMyData(
bufferForMyDataInLoading_->at(0).size());
}
if (bufferIndex_ < chunkSize_) {
bufferForMyDataInLoading_->resize(bufferIndex_);
}
std::swap(bufferForMyDataInLoading_, bufferForMyDataInProcessing_);
bufferIndex_ = 0;
udpThreadForMySelf_ = std::make_unique<std::thread>([this]() {
if (bufferForMyDataInProcessing_->size() == 0) {
return;
}
udpEncryption_->processMyData(*bufferForMyDataInProcessing_);
});
}

// load a line that is to be processed later.
void UdpEncryptor::pushOneLineFromMe(
std::vector<unsigned char>&& serializedLine) {
bufferForMyDataInLoading_->at(bufferIndex_++) = std::move(serializedLine);
if (bufferIndex_ >= chunkSize_) {
processDataInBuffer();
}
}

// load multiple lines into the buffer.
void UdpEncryptor::pushLinesFromMe(
std::vector<std::vector<unsigned char>>&& serializedLines) {
size_t inputIndex = 0;

while (inputIndex < serializedLines.size()) {
if (chunkSize_ - bufferIndex_ <= serializedLines.size() - inputIndex) {
std::copy(
serializedLines.begin() + inputIndex,
serializedLines.begin() + inputIndex + chunkSize_ - bufferIndex_,
bufferForMyDataInLoading_->begin() + bufferIndex_);
inputIndex += chunkSize_ - bufferIndex_;
// the buffer is full, the index should be changed to chunkSize_
bufferIndex_ = chunkSize_;
processDataInBuffer();
} else {
std::copy(
serializedLines.begin() + inputIndex,
serializedLines.end(),
bufferForMyDataInLoading_->begin() + bufferIndex_);

bufferIndex_ += serializedLines.size() - inputIndex;
inputIndex = serializedLines.size();
}
}
}

// set the config for peer's data.
void UdpEncryptor::setPeerConfig(
size_t totalNumberOfPeerRows,
size_t peerDataWidth,
const std::vector<int32_t>& indexes) {
udpEncryption_->prepareToProcessPeerData(peerDataWidth, indexes);
auto loop = [this, totalNumberOfPeerRows]() {
size_t numberOfProcessedRow = 0;
while (numberOfProcessedRow < totalNumberOfPeerRows) {
udpEncryption_->processPeerData(
std::min(chunkSize_, totalNumberOfPeerRows - numberOfProcessedRow));
numberOfProcessedRow += chunkSize_;
}
};
udpThreadForPeer_ = std::make_unique<std::thread>(loop);
}

UdpEncryptor::EncryptionResuts UdpEncryptor::getEncryptionResults() const {
if (udpThreadForPeer_ == nullptr) {
throw std::runtime_error("No thread to join for peer!");
}
udpThreadForPeer_->join();

auto [ciphertexts, nonces, indexes] = udpEncryption_->getProcessedData();
return EncryptionResuts{ciphertexts, nonces, indexes};
}

std::vector<__m128i> UdpEncryptor::getExpandedKey() {
processDataInBuffer();
if (udpThreadForMySelf_ == nullptr) {
throw std::runtime_error("No thread to join for peer!");
}
udpThreadForMySelf_->join();
return udpEncryption_->getExpandedKey();
}

} // namespace unified_data_process
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <memory>
#include <thread>
#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/IUdpEncryption.h"

namespace unified_data_process {

class UdpEncryptor {
using UdpEncryption =
fbpcf::mpc_std_lib::unified_data_process::data_processor::IUdpEncryption;

public:
using EncryptionResuts = UdpEncryption::EncryptionResuts;

UdpEncryptor(std::unique_ptr<UdpEncryption> udpEncryption, size_t chunkSize)
: udpEncryption_(std::move(udpEncryption)),
udpThreadForMySelf_(nullptr),
udpThreadForPeer_(nullptr),
chunkSize_(chunkSize),
bufferIndex_(0),
bufferForMyDataInLoading_{
std::make_unique<std::vector<std::vector<unsigned char>>>(
chunkSize_)},
bufferForMyDataInProcessing_(
std::make_unique<std::vector<std::vector<unsigned char>>>(
chunkSize_)) {}

// load a line that is to be processed later.
void pushOneLineFromMe(std::vector<unsigned char>&& serializedLine);

// load a number of lines that is to be processed later.
void pushLinesFromMe(
std::vector<std::vector<unsigned char>>&& serializedLines);

// set the config for peer's data.
void setPeerConfig(
size_t totalNumberOfPeerRows,
size_t peerDataWidth,
const std::vector<int32_t>& indexes);

EncryptionResuts getEncryptionResults() const;

std::vector<__m128i> getExpandedKey();

private:
void processDataInBuffer();

std::unique_ptr<UdpEncryption> udpEncryption_;
std::unique_ptr<std::thread> udpThreadForMySelf_;
std::unique_ptr<std::thread> udpThreadForPeer_;
size_t chunkSize_;

size_t bufferIndex_;
std::unique_ptr<std::vector<std::vector<unsigned char>>>
bufferForMyDataInLoading_;
std::unique_ptr<std::vector<std::vector<unsigned char>>>
bufferForMyDataInProcessing_;
};

} // namespace unified_data_process
Loading