From 3f4fbedb8ac05fa6aae4bd4d085fa7d7f4719b47 Mon Sep 17 00:00:00 2001 From: Dmitry Sirotkin Date: Sun, 12 Apr 2020 02:57:32 +0800 Subject: [PATCH 1/4] Add Ivan's cpp-redis as submodule Signed-off-by: Dmitry Sirotkin --- .gitmodules | 6 ++++++ lib/cpp-bredis | 1 + 2 files changed, 7 insertions(+) create mode 160000 lib/cpp-bredis diff --git a/.gitmodules b/.gitmodules index 48aeaf7..098e790 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,9 @@ [submodule "lib/FlameGraph"] path = lib/FlameGraph url = https://github.com/brendangregg/FlameGraph +[submodule "lib/--force"] + path = lib/--force + url = git@github.com:basiliscos/cpp-bredis.git +[submodule "lib/cpp-bredis"] + path = lib/cpp-bredis + url = git@github.com:basiliscos/cpp-bredis.git diff --git a/lib/cpp-bredis b/lib/cpp-bredis new file mode 160000 index 0000000..395830c --- /dev/null +++ b/lib/cpp-bredis @@ -0,0 +1 @@ +Subproject commit 395830c1b68d47ff54b93724ac22e2255765015d From aa1b1dd3e99fda269d6d05366b4e30ff43760404 Mon Sep 17 00:00:00 2001 From: Dmitry Sirotkin Date: Mon, 13 Apr 2020 06:28:23 +0800 Subject: [PATCH 2/4] Add redis to transport layer: pub/sub on keyspace, RPOPLPUSH Signed-off-by: Dmitry Sirotkin --- CMakeLists.txt | 7 +- Dockerfile | 2 +- Matching/CMakeLists.txt | 1 + Matching/src/order_router.hpp | 15 ++- Matching/src/orderbook.hpp | 38 +++---- .../src/{tcp_server.hpp => transport.hpp} | 106 +++++++++++++++++- Service/src/main.cpp | 8 +- Testing/Benchmark/main.cpp | 34 +++--- lib/cpp-bredis | 2 +- 9 files changed, 160 insertions(+), 53 deletions(-) rename Matching/src/{tcp_server.hpp => transport.hpp} (60%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 602fb65..57a5992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ endif() cmake_minimum_required (VERSION 3.10) project(MatchingEngine) -set(CMAKE_CXX_COMPILER g++) +set(CMAKE_CXX_COMPILER /usr/bin/g++) include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) @@ -26,7 +26,8 @@ option(ENABLE_BENCHMARKS "Build benchmarks" YES) # Compiler configuration set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native") +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wall -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native") set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(CMAKE_HAVE_THREADS_LIBRARY TRUE) @@ -41,6 +42,8 @@ conan_basic_setup(TARGETS) add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY -DBOOST_BEAST_USE_STD_STRING_VIEW) +add_subdirectory(lib/cpp-bredis) + add_subdirectory(Matching) add_subdirectory(Service) diff --git a/Dockerfile b/Dockerfile index 867689a..f6f1165 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ WORKDIR /opt/matching/ RUN conan profile new default --detect RUN conan profile update settings.compiler=gcc default RUN conan profile update settings.compiler.version=8 default -RUN conan profile update settings.compiler.libcxx=libstdc++ default +RUN conan profile update settings.compiler.libcxx=libstdc++11 default RUN conan profile update env.CC=gcc default RUN conan profile update env.CXX=g++ default RUN conan profile update env.CXXFLAGS=-std=c++17 default diff --git a/Matching/CMakeLists.txt b/Matching/CMakeLists.txt index fd2f200..3674b01 100644 --- a/Matching/CMakeLists.txt +++ b/Matching/CMakeLists.txt @@ -4,6 +4,7 @@ target_sources(matching INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries( matching INTERFACE + bredis CONAN_PKG::jemalloc CONAN_PKG::boost CONAN_PKG::spdlog diff --git a/Matching/src/order_router.hpp b/Matching/src/order_router.hpp index 7ad24b5..99fb4da 100644 --- a/Matching/src/order_router.hpp +++ b/Matching/src/order_router.hpp @@ -17,14 +17,16 @@ namespace matching_engine { namespace router { -class consumer +class consumer : public std::enable_shared_from_this { + using queue = moodycamel::BlockingConcurrentQueue; using ms = std::chrono::milliseconds; using ns = std::chrono::nanoseconds; public: consumer(std::shared_ptr console): should_exit_{false}, console_(console) {} consumer(const consumer&) = delete; + consumer& operator=(const consumer&) = delete; consumer() = delete; void shutdown() { @@ -77,16 +79,17 @@ class consumer return !should_exit_ or queue_.size_approx() > 0; } std::unordered_map markets; - moodycamel::BlockingConcurrentQueue queue_; + queue queue_; std::atomic_bool should_exit_; std::shared_ptr console_; }; -class dispatcher +class dispatcher : public std::enable_shared_from_this { public: dispatcher() = default; dispatcher(const dispatcher&) = delete; + dispatcher& operator=(const dispatcher&) = delete; dispatcher(std::vector markets, std::shared_ptr console = nullptr, const uint64_t available_cores = std::max(1u, std::thread::hardware_concurrency() - 1)): @@ -94,17 +97,17 @@ class dispatcher console_{console} { const auto markets_per_core = uint64_t(markets.size() / available_cores); - auto reminder = markets.size() % available_cores; + auto remainder = markets.size() % available_cores; std::vector> consumer_pool; for (unsigned int core = 0; core < available_cores; core++) { consumer_pool.emplace_back(std::make_shared(console_)); auto& market_consumer = consumer_pool.back(); - if (reminder > 0) { + if (remainder > 0) { auto market = markets.back(); market_registry_[market] = market_consumer; market_consumer->register_market(market); markets.pop_back(); - --reminder; + --remainder; } /* Costruct consumer for N markets distributed evenly across logical cores */ for (auto index = markets_per_core; index > 0; --index) { diff --git a/Matching/src/orderbook.hpp b/Matching/src/orderbook.hpp index e9aa2e1..4ab04e2 100644 --- a/Matching/src/orderbook.hpp +++ b/Matching/src/orderbook.hpp @@ -268,55 +268,55 @@ std::string_view OrderBook::market_name() const bool OrderBook::match(OrderPtr src) { auto &&src_tree = src->is_buy() ? buy_tree_ : sell_tree_; - auto &&dist_tree = src->is_buy() ? sell_tree_ : buy_tree_; + auto &&dest_tree = src->is_buy() ? sell_tree_ : buy_tree_; src->state(STATE::ACTIVE); auto should_exit_tree = false; - for (auto node = dist_tree.begin(), end = dist_tree.end(); + for (auto node = dest_tree.begin(), end = dest_tree.end(); !should_exit_tree && node != end;) { - auto &&dist_queue = node->second; + auto &&dest_queue = node->second; /* Buy cheap; sell expensive – conduct price improvement */ if (src->is_buy() ? src->price() >= node->first : src->price() <= node->first) { - for (auto exit_queue = false; !exit_queue && !dist_queue.empty();) { - auto dist = dist_queue.front().get(); - auto leftover = dist->leftover() - src->leftover(); + for (auto exit_queue = false; !exit_queue && !dest_queue.empty();) { + auto dest = dest_queue.front().get(); + auto leftover = dest->leftover() - src->leftover(); - /* Fulfilled source; partially or fulfilled dist */ + /* Fulfilled source; partially or fulfilled dest */ if (leftover >= 0) { src->execute(src->leftover()); src->state(STATE::FULFILLED); /* Exact match */ if (leftover == 0) { - dist->execute(dist->leftover()); + dest->execute(dest->leftover()); } /* Partial match */ else { - dist->execute(dist->leftover() - leftover); + dest->execute(dest->leftover() - leftover); } /* Remove fulfilled order from queue */ - if (dist->leftover() == 0) { - dist->state(STATE::FULFILLED); - dist_queue.pop_front(); + if (dest->leftover() == 0) { + dest->state(STATE::FULFILLED); + dest_queue.pop_front(); } /* Matching is complete */ exit_queue = true; should_exit_tree = true; } - /* Partially-filled source; fulfilled dist */ + /* Partially-filled source; fulfilled dest */ else { - src->execute(dist->leftover()); - dist->execute(dist->leftover()); - dist->state(STATE::FULFILLED); + src->execute(dest->leftover()); + dest->execute(dest->leftover()); + dest->state(STATE::FULFILLED); /* Remove fulfilled order from queue */ - dist_queue.pop_front(); + dest_queue.pop_front(); /* Try next order in the queue */ } } /* Try next price node */ - if (dist_queue.empty()) { + if (dest_queue.empty()) { /* Purge the price point with empty queue */ - node = dist_tree.erase(node++); + node = dest_tree.erase(node++); } else { ++node; } diff --git a/Matching/src/tcp_server.hpp b/Matching/src/transport.hpp similarity index 60% rename from Matching/src/tcp_server.hpp rename to Matching/src/transport.hpp index 25fee4e..0d870b6 100644 --- a/Matching/src/tcp_server.hpp +++ b/Matching/src/transport.hpp @@ -6,12 +6,105 @@ #include #include #include +#include #include #include +#include namespace matching_engine { -namespace tcp +namespace transport +{ +namespace redis +{ +using socket_t = boost::asio::ip::tcp::socket; +using endpoint_t = boost::asio::ip::tcp::endpoint; +using context_t = boost::asio::io_context; +namespace r = bredis; +using buffer = boost::asio::streambuf; +using iterator_t = typename r::to_iterator::iterator_t; +using endpoint_t = boost::asio::ip::tcp::endpoint; +using tcp = boost::asio::ip::tcp; +using policy_t = r::parsing_policy::keep_result; +using parse_result_t = r::parse_result_mapper_t; +using read_callback_t = std::function; +using extractor_t = r::extractor; + +class client +{ +public: + client(const client&) = delete; + client& operator=(const client&) = delete; + client(context_t& ioc, + const endpoint_t& endpoint, + const std::shared_ptr& console): console_{console} + { + socket_t socket{ioc, endpoint.protocol()}; + socket_t subscribtion_socket{ioc, endpoint.protocol()}; + socket.connect(endpoint); + subscribtion_socket.connect(endpoint); + c_ = std::make_shared< r::Connection >(std::move(subscribtion_socket)); + conn_ = std::make_shared< r::Connection >(std::move(socket)); + c_->write(r::single_command_t{psubscribe_cmd.data(), channel_name.data()}); + c_->async_read(b_, notification_handler_); + }; +private: + const std::shared_ptr& console_; + std::shared_ptr> c_; + std::shared_ptr> conn_; + buffer b_; + buffer queue_buff_; + const std::string_view channel_name{"__keyspace@0__:CONSUMER"}; + const std::string_view queue_name{"CONSUMER"}; + const std::string_view processing_queue_name{"CONSUMER_PROCESSING"}; + const std::string_view psubscribe_cmd{"psubscribe"}; + const std::string_view pmessage_cmd{"pmessage"}; + const std::string_view lpush_cmd{"lpush"}; + const std::string_view rpoplpush_cmd{"rpoplpush"}; + + read_callback_t notification_handler_ = [&](const boost::system::error_code& ec, parse_result_t &&r) + { + if (ec) { + throw ec; + } + auto extract = boost::apply_visitor(extractor_t(), r.result); + b_.consume(r.consumed); + auto& replies = boost::get(extract); + auto* type_reply = boost::get(&replies.elements[0]); + if (type_reply && type_reply->str.compare(psubscribe_cmd) == 0) { + auto& channel = boost::get(replies.elements[1]); + auto& payload = boost::get(replies.elements[2]); + if (channel.str.compare(channel_name) == 0 && payload == 1) { + console_->info("redis::subscribed to {}", channel_name); + } else { + console_->warn("redis::failed to subscribe to {}", channel_name); + } + } else if (type_reply && type_reply->str.compare(pmessage_cmd) == 0) { + auto& payload = boost::get(replies.elements[3]); + if (payload.str.compare(lpush_cmd) == 0) { + console_->info("redis::notification::new element in {}", channel_name); + conn_->async_write(queue_buff_, + r::single_command_t{rpoplpush_cmd.data(), queue_name.data(), processing_queue_name.data()}, + [&](const boost::system::error_code& ec, std::size_t bytes) { + if (ec) throw ec; + queue_buff_.consume(bytes); + conn_->async_read(queue_buff_, queue_handler_); + }); + } + } + c_->async_read(b_, notification_handler_); + }; + read_callback_t queue_handler_ = [&](const boost::system::error_code& ec, parse_result_t &&r) + { + if(ec) throw ec; + auto extract = boost::apply_visitor(extractor_t(), r.result); + queue_buff_.consume(r.consumed); + auto &payload = boost::get(extract); + console_->info("redis::rpoplpush::{}", payload.str); + }; +}; +} // namespace redis +namespace http { using boost::asio::ip::tcp; namespace http = boost::beast::http; @@ -25,10 +118,11 @@ class connection_handler connection_handler(boost::asio::io_context &ioc, const std::shared_ptr dispatcher, const std::shared_ptr& console) - : strand_{std::make_unique(ioc)}, - socket_{ioc}, work_{ioc}, dispatcher_{dispatcher}, console_{console} {} + : socket_{ioc}, strand_{std::make_unique(ioc)}, + work_{ioc}, dispatcher_{dispatcher}, console_{console} {} connection_handler(const connection_handler&) = delete; + connection_handler& operator=(const connection_handler&) = delete; void dispatch() { @@ -149,6 +243,7 @@ class server } } server(const server&) = delete; + server& operator=(const server&) = delete; boost::system::error_code shutdown() { @@ -186,11 +281,12 @@ class server }); } - tcp::acceptor acceptor_; boost::asio::io_context &ioc_; + tcp::acceptor acceptor_; std::shared_ptr dispatcher_; const std::shared_ptr console_; boost::asio::thread_pool pool_; }; -} // namespace tcp +} // namespace http +} // namespace transport } // namespace matching_engine diff --git a/Service/src/main.cpp b/Service/src/main.cpp index fd50ba3..2f96d93 100644 --- a/Service/src/main.cpp +++ b/Service/src/main.cpp @@ -19,10 +19,12 @@ #include #include #include -#include +#include #include using namespace std::chrono_literals; +using tcp_t = boost::asio::ip::tcp; +using endpoint_t = tcp_t::endpoint; namespace me = matching_engine; int main(int argc, char *argv[]) @@ -45,7 +47,9 @@ int main(int argc, char *argv[]) /* Initialise TCP transport layer */ boost::asio::io_context ioc{(int)std::thread::hardware_concurrency()}; - me::tcp::server server(ioc, dispatcher, console); + me::transport::http::server server(ioc, dispatcher, console); + auto redis_endpoint = endpoint_t(tcp_t::v4(), 6379); + me::transport::redis::client redis(ioc, redis_endpoint, console); ioc.run(); diff --git a/Testing/Benchmark/main.cpp b/Testing/Benchmark/main.cpp index 00b1ac2..a4205e6 100644 --- a/Testing/Benchmark/main.cpp +++ b/Testing/Benchmark/main.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include "markov.h" using namespace matching_engine; @@ -60,22 +60,22 @@ static void OrderMatching(benchmark::State& state) BENCHMARK(OrderMatching)->DenseRange(1, 1000, 250)->UseManualTime()->Complexity(benchmark::oLogN); // TODO Starts and hangs, issue with thread pool. Fix it. -static void OrderDispatching(benchmark::State& state) -{ - // Perform setup here - const std::vector markets = {u8"USD_JPY"}; - auto dispatcher = std::make_shared(markets); - auto prices = SimulateMarket(state.range(0)); - for(auto _ : state) { - for (auto price : prices) { - auto side = rand() % 2 ? SIDE::BUY : SIDE::SELL; - auto quantity = double(rand() % 10 + 1) / (rand() % 20 + 1); - auto order = std::make_unique(markets[0], side, price, quantity); - dispatcher->send(std::move(order)); - } - } - dispatcher->shutdown(); -} +//static void OrderDispatching(benchmark::State& state) +//{ +// // Perform setup here +// const std::vector markets = {u8"USD_JPY"}; +// auto dispatcher = std::make_shared(markets); +// auto prices = SimulateMarket(state.range(0)); +// for(auto _ : state) { +// for (auto price : prices) { +// auto side = rand() % 2 ? SIDE::BUY : SIDE::SELL; +// auto quantity = double(rand() % 10 + 1) / (rand() % 20 + 1); +// auto order = std::make_unique(markets[0], side, price, quantity); +// dispatcher->send(std::move(order)); +// } +// } +// dispatcher->shutdown(); +//} //BENCHMARK(OrderDispatching)->DenseRange(0, 1000, 250)->MeasureProcessCPUTime(); diff --git a/lib/cpp-bredis b/lib/cpp-bredis index 395830c..b8218cb 160000 --- a/lib/cpp-bredis +++ b/lib/cpp-bredis @@ -1 +1 @@ -Subproject commit 395830c1b68d47ff54b93724ac22e2255765015d +Subproject commit b8218cb441065d38ee395035fda22431c632fe12 From 4ca54f08a1280627f23c2a71cf267b92dc048e4a Mon Sep 17 00:00:00 2001 From: Dmitry Sirotkin Date: Mon, 13 Apr 2020 06:55:39 +0800 Subject: [PATCH 3/4] Remove junk from .gitmodules Signed-off-by: Dmitry Sirotkin --- .gitmodules | 3 --- 1 file changed, 3 deletions(-) diff --git a/.gitmodules b/.gitmodules index 098e790..4427987 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,6 @@ [submodule "lib/FlameGraph"] path = lib/FlameGraph url = https://github.com/brendangregg/FlameGraph -[submodule "lib/--force"] - path = lib/--force - url = git@github.com:basiliscos/cpp-bredis.git [submodule "lib/cpp-bredis"] path = lib/cpp-bredis url = git@github.com:basiliscos/cpp-bredis.git From a487bd4560cfafc33ab2ea83f59f0d7f6eb1dd17 Mon Sep 17 00:00:00 2001 From: Dmitry Sirotkin Date: Fri, 17 Apr 2020 16:46:06 +0800 Subject: [PATCH 4/4] Use Continuable for future chains Signed-off-by: Dmitry Sirotkin --- .gitmodules | 6 + CMakeLists.txt | 3 + Makefile | 2 +- Matching/CMakeLists.txt | 1 + Matching/src/influxdb.hpp | 12 +- Matching/src/order_router.hpp | 14 +-- Matching/src/orderbook.hpp | 30 +++-- Matching/src/transport.hpp | 214 ++++++++++++++++++++++++---------- Testing/inc/markov.h | 6 +- lib/continuable | 1 + lib/function2 | 1 + 11 files changed, 190 insertions(+), 100 deletions(-) create mode 160000 lib/continuable create mode 160000 lib/function2 diff --git a/.gitmodules b/.gitmodules index 4427987..efc3331 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,9 @@ [submodule "lib/cpp-bredis"] path = lib/cpp-bredis url = git@github.com:basiliscos/cpp-bredis.git +[submodule "lib/continuable"] + path = lib/continuable + url = https://github.com/Naios/continuable.git +[submodule "lib/function2"] + path = lib/function2 + url = https://github.com/Naios/function2.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 57a5992..6e13533 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,9 @@ add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY -DBOOST_BEAST_USE_STD_STRING_VIEW add_subdirectory(lib/cpp-bredis) +add_subdirectory(lib/function2) +add_subdirectory(lib/continuable) + add_subdirectory(Matching) add_subdirectory(Service) diff --git a/Makefile b/Makefile index 2d785f8..528e36c 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ build: clean conan remove -f -s -b -- '*' tidy: - astyle --style=linux --exclude=build --recursive *.cpp,*.c,*.h,*.hpp + astyle --style=stroustrup --exclude=build --exclude=lib --recursive *.cpp,*.c,*.h,*.hpp find . -type f -name '*.orig' -delete clean: diff --git a/Matching/CMakeLists.txt b/Matching/CMakeLists.txt index 3674b01..1590696 100644 --- a/Matching/CMakeLists.txt +++ b/Matching/CMakeLists.txt @@ -5,6 +5,7 @@ target_link_libraries( matching INTERFACE bredis + continuable CONAN_PKG::jemalloc CONAN_PKG::boost CONAN_PKG::spdlog diff --git a/Matching/src/influxdb.hpp b/Matching/src/influxdb.hpp index 25c2702..d5c157c 100644 --- a/Matching/src/influxdb.hpp +++ b/Matching/src/influxdb.hpp @@ -34,8 +34,7 @@ inline __int64 writev(int sock, struct iovec* iov, int cnt) #define closesocket close #endif -namespace influxdb_cpp -{ +namespace influxdb_cpp { struct server_info { std::string host_; int port_; @@ -46,8 +45,7 @@ struct server_info { server_info(const std::string& host, int port, const std::string& db = "", const std::string& usr = "", const std::string& pwd = "", const std::string& precision="ms") : host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision) {} }; -namespace detail -{ +namespace detail { struct meas_caller; struct tag_caller; struct field_caller; @@ -169,8 +167,7 @@ struct builder { std::stringstream lines_; }; -namespace detail -{ +namespace detail { struct tag_caller : public builder { detail::tag_caller& tag(const std::string& k, const std::string& v) { @@ -362,7 +359,8 @@ inline int inner::http_request(const char* method, const char* uri, if(resp) resp->append(&header[len], iv[1].iov_len); len += iv[1].iov_len; } - } while(chunked); + } + while(chunked); } goto END; } diff --git a/Matching/src/order_router.hpp b/Matching/src/order_router.hpp index 99fb4da..b0b62c0 100644 --- a/Matching/src/order_router.hpp +++ b/Matching/src/order_router.hpp @@ -13,12 +13,9 @@ #include -namespace matching_engine -{ -namespace router -{ -class consumer : public std::enable_shared_from_this -{ +namespace matching_engine { +namespace router { +class consumer : public std::enable_shared_from_this { using queue = moodycamel::BlockingConcurrentQueue; using ms = std::chrono::milliseconds; using ns = std::chrono::nanoseconds; @@ -57,7 +54,7 @@ class consumer : public std::enable_shared_from_this auto elapsed = Time::now() - start; /* Post consumer stats */ if (std::chrono::duration_cast(start.time_since_epoch()).count() - - std::chrono::duration_cast(last_log.time_since_epoch()).count() >= 250) { + - std::chrono::duration_cast(last_log.time_since_epoch()).count() >= 250) { std::async(std::launch::async,[&,market=ob.market_name(),elapsed] { /* NOTE Does it actually make sense to use async here?*/ influxdb_cpp::builder() @@ -84,8 +81,7 @@ class consumer : public std::enable_shared_from_this std::shared_ptr console_; }; -class dispatcher : public std::enable_shared_from_this -{ +class dispatcher : public std::enable_shared_from_this { public: dispatcher() = default; dispatcher(const dispatcher&) = delete; diff --git a/Matching/src/orderbook.hpp b/Matching/src/orderbook.hpp index 4ab04e2..504ed0a 100644 --- a/Matching/src/orderbook.hpp +++ b/Matching/src/orderbook.hpp @@ -26,8 +26,7 @@ #include #include -namespace matching_engine -{ +namespace matching_engine { using Price = unsigned long; using Quantity = double; @@ -41,8 +40,7 @@ enum STATE { INACTIVE, ACTIVE, CANCELLED, FULFILLED }; enum TIF { GTC }; -class Order -{ +class Order { private: const std::string_view market_name_; /* TODO Replace with std::string_view */ const SIDE side_; @@ -99,8 +97,7 @@ using queue_allocator = boost::container::allocator; using order_queue_type = boost::container::deque; class OrderQueue - : public order_queue_type -{ + : public order_queue_type { public: OrderQueue() = default; OrderQueue(const OrderQueue &) = delete; @@ -113,8 +110,7 @@ class OrderQueue Quantity accumulate() const; }; -class OrderBook -{ +class OrderBook { private: const std::string_view market_name_; struct Comp { @@ -255,7 +251,8 @@ bool OrderBook::cancel(const UUID uuid, const SIDE side, const Price price) if (order_queue.empty()) /* Drop price node */ tree.erase(price); return result; - } catch (const std::out_of_range &) { /* No price point exist */ + } + catch (const std::out_of_range &) { /* No price point exist */ return false; } } @@ -273,11 +270,11 @@ bool OrderBook::match(OrderPtr src) auto should_exit_tree = false; for (auto node = dest_tree.begin(), end = dest_tree.end(); - !should_exit_tree && node != end;) { + !should_exit_tree && node != end;) { auto &&dest_queue = node->second; /* Buy cheap; sell expensive – conduct price improvement */ if (src->is_buy() ? src->price() >= node->first - : src->price() <= node->first) { + : src->price() <= node->first) { for (auto exit_queue = false; !exit_queue && !dest_queue.empty();) { auto dest = dest_queue.front().get(); auto leftover = dest->leftover() - src->leftover(); @@ -317,10 +314,12 @@ bool OrderBook::match(OrderPtr src) if (dest_queue.empty()) { /* Purge the price point with empty queue */ node = dest_tree.erase(node++); - } else { + } + else { ++node; } - } else { + } + else { should_exit_tree = true; } } @@ -332,7 +331,8 @@ bool OrderBook::match(OrderPtr src) node, src->price(), std::move(src)); - } else { /* Insert in existing price node */ + } + else { /* Insert in existing price node */ node->second.emplace_back(std::move(src)); } return false; @@ -388,7 +388,5 @@ std::vector OrderBook::snapshot() const traverse(sell_tree_, SIDE::SELL); return snapshot; } - - } // namespace matching_engine diff --git a/Matching/src/transport.hpp b/Matching/src/transport.hpp index 0d870b6..17e8a8e 100644 --- a/Matching/src/transport.hpp +++ b/Matching/src/transport.hpp @@ -10,13 +10,32 @@ #include #include #include +#include -namespace matching_engine -{ -namespace transport -{ -namespace redis -{ +namespace matching_engine { +namespace transport { +class exponential_backoff { + boost::asio::steady_timer timer; + unsigned int retry_count, max_retries; + + + exponential_backoff(auto from = 1s, unsigned int max_retries = 1e4): timer{ioc, from}, + retry_count{0}, max_retries{max_retries} + { + + } + + bool retry(func) + { + bool result = func(); + t.expires_at(std::pow(2, ++retry_count) - 1); + + + } + +}; + +namespace redis { using socket_t = boost::asio::ip::tcp::socket; using endpoint_t = boost::asio::ip::tcp::endpoint; using context_t = boost::asio::io_context; @@ -30,8 +49,7 @@ using parse_result_t = r::parse_result_mapper_t; using read_callback_t = std::function; using extractor_t = r::extractor; -class client -{ +class client { public: client(const client&) = delete; client& operator=(const client&) = delete; @@ -39,81 +57,147 @@ class client const endpoint_t& endpoint, const std::shared_ptr& console): console_{console} { - socket_t socket{ioc, endpoint.protocol()}; - socket_t subscribtion_socket{ioc, endpoint.protocol()}; - socket.connect(endpoint); - subscribtion_socket.connect(endpoint); - c_ = std::make_shared< r::Connection >(std::move(subscribtion_socket)); - conn_ = std::make_shared< r::Connection >(std::move(socket)); - c_->write(r::single_command_t{psubscribe_cmd.data(), channel_name.data()}); - c_->async_read(b_, notification_handler_); - }; -private: - const std::shared_ptr& console_; - std::shared_ptr> c_; - std::shared_ptr> conn_; - buffer b_; - buffer queue_buff_; - const std::string_view channel_name{"__keyspace@0__:CONSUMER"}; - const std::string_view queue_name{"CONSUMER"}; - const std::string_view processing_queue_name{"CONSUMER_PROCESSING"}; - const std::string_view psubscribe_cmd{"psubscribe"}; - const std::string_view pmessage_cmd{"pmessage"}; - const std::string_view lpush_cmd{"lpush"}; - const std::string_view rpoplpush_cmd{"rpoplpush"}; + socket = std::make_shared(ioc, endpoint.protocol()); + subscribtion_socket = std::make_shared(ioc, endpoint.protocol()); - read_callback_t notification_handler_ = [&](const boost::system::error_code& ec, parse_result_t &&r) + socket->connect(endpoint); + subscribtion_socket->connect(endpoint); + + subscription_connection_ = std::make_shared< r::Connection >(subscribtion_socket); + connection_ = std::make_shared< r::Connection >(std::move(socket); + + subscription_connection_->async_write(mc_subscription_buffer_, + r::single_command_t{psubscribe_cmd.data(), channel_name.data()}, cti::use_continuable) + .then(validate_subscription).fail(recover_failed_subscription) + .then(handle_notifications); + } + auto validate_subscription(const boost::system::error_code& ec, std::size_t bytes) + { + mc_subscription_buffer_.consume(bytes); + return subscription_connection_->async_read(mc_subscription_buffer_, cti::use_continuable) + .then([&](const boost::system::error_code& ec, parse_result_t &&r) { + auto extract = boost::apply_visitor(extractor_t(), r.result); + mc_subscription_buffer_.consume(r.consumed); + auto& replies = boost::get(extract); + auto* type = boost::get(&replies.elements[0]); + if (type && type->str.compare(psubscribe_cmd) == 0) { + auto& channel = boost::get(replies.elements[1]); + auto& payload = boost::get(replies.elements[2]); + if (channel.str.compare(channel_name) == 0 && payload == 1) { + console_->info("redis::subscribed to {}", channel_name); + } + else { + console_->warn("redis::failed to subscribe to {}", channel_name); + throw std::exception("Something bad has just happend! Need to report and resubscribe again"); + } + } + else { + throw std::exception("Something bad has just happend! Need to report and resubscribe again"); + } + }); + } + auto recover_failed_subscription(std::exception_ptr e) + { + // TODO resubscribe + } + auto handle_notifications(const boost::system::error_code& ec, parse_result_t &&r) { - if (ec) { - throw ec; - } auto extract = boost::apply_visitor(extractor_t(), r.result); - b_.consume(r.consumed); + mc_subscription_buffer_.consume(r.consumed); auto& replies = boost::get(extract); - auto* type_reply = boost::get(&replies.elements[0]); - if (type_reply && type_reply->str.compare(psubscribe_cmd) == 0) { + auto* type = boost::get(&replies.elements[0]); + if (type && type->str.compare(psubscribe_cmd) == 0) { auto& channel = boost::get(replies.elements[1]); auto& payload = boost::get(replies.elements[2]); if (channel.str.compare(channel_name) == 0 && payload == 1) { console_->info("redis::subscribed to {}", channel_name); - } else { + } + else { console_->warn("redis::failed to subscribe to {}", channel_name); + throw } - } else if (type_reply && type_reply->str.compare(pmessage_cmd) == 0) { + } + else if (type && type->str.compare(pmessage_cmd) == 0) { auto& payload = boost::get(replies.elements[3]); if (payload.str.compare(lpush_cmd) == 0) { console_->info("redis::notification::new element in {}", channel_name); - conn_->async_write(queue_buff_, - r::single_command_t{rpoplpush_cmd.data(), queue_name.data(), processing_queue_name.data()}, - [&](const boost::system::error_code& ec, std::size_t bytes) { - if (ec) throw ec; - queue_buff_.consume(bytes); - conn_->async_read(queue_buff_, queue_handler_); - }); + connection_->async_write(mc_list_buffer_, + r::single_command_t{rpoplpush_cmd.data(), queue_name.data(), processing_queue_name.data()}, + cti::use_continuable) + .then(unpack_order).fail(cti::stop) + .then(set_processing).fail(cti::stop) + .then(remove_order_from_processing_queue).fail(cti::stop) + .then(dispatch_order); } } - c_->async_read(b_, notification_handler_); - }; - read_callback_t queue_handler_ = [&](const boost::system::error_code& ec, parse_result_t &&r) + read_callback_t wait_for_notification = handle_notifications; + subscription_connection_->async_read(mc_subscription_buffer_, wait_for_notification); + } + + auto set_processing(OrderPtr order) { - if(ec) throw ec; - auto extract = boost::apply_visitor(extractor_t(), r.result); - queue_buff_.consume(r.consumed); - auto &payload = boost::get(extract); - console_->info("redis::rpoplpush::{}", payload.str); - }; + // TODO DB I/O bound query to update order status + // retry if failed to update the status + return std::move(order); + } + + auto remove_order_from_processing_queue(OrderPtr order) + { + // ocd on failure? It should never ever happen in theory! But in practice we will just ignore... + // Bcoz it can only be removed once the state is persisted in the db + return cti::make_continuable([order = std::move(order)](auto&& promise) { + promise.set_value(std::move(order)); + }); + } + + auto unpack_order(const boost::system::error_code& ec, std::size_t bytes) + { + mc_list_buffer_.consume(bytes); + return connection_->async_read(mc_list_buffer_, cti::use_continuable) + .then([&](const boost::system::error_code& ec, parse_result_t &&r) { + auto extract = boost::apply_visitor(extractor_t(), r.result); + mc_list_buffer_.consume(r.consumed); + auto &payload = boost::get(extract); + console_->info("redis::{}::{}", rpoplpush_cmd, payload.str); + // if unable to parse the object then stop the chain + return cti::make_continuable([raw_order = std::move(payload.str)](auto&& promise) { + auto order = std::make_unique(); // TODO parse JSON + promise.set_value(std::move(order)); + }); + }); + } +private: + const std::shared_ptr& console_; + + std::shared_ptr> subscription_connection_; + std::shared_ptr> connection_; + + buffer mc_subscription_buffer_; + buffer mc_list_buffer_; + + const std::string_view mc_list_name{"MARKET_CONSUMER"}; + const std::string_view oc_list_name{"ORDER_CANCELATION"}; + const std::string_view mcp_list_name{mc_list_name + "_PROCESSING"}; + const std::string_view ocp_list_name{oc_list_name + "_PROCESSING"}; + + const std::string_view mc_channel_name{"__keyspace@0__:" + mc_list_name}; + const std::string_view oc_channel_name{"__keyspace@0__:" + oc_list_name}; + + const std::string_view psubscribe_cmd{"psubscribe"}; + const std::string_view pmessage_cmd{"pmessage"}; + const std::string_view lpush_cmd{"lpush"}; + const std::string_view rpoplpush_cmd{"rpoplpush"}; }; } // namespace redis -namespace http -{ + +namespace http { using boost::asio::ip::tcp; namespace http = boost::beast::http; using request_t = http::request; using response_t = http::response; class connection_handler - : public std::enable_shared_from_this -{ + : public std::enable_shared_from_this { public: connection_handler(boost::asio::io_context &ioc, const std::shared_ptr dispatcher, @@ -160,7 +244,8 @@ class connection_handler //ss << nlohmann::json::parse("{\"target\":\""+target+"\",\"status\": \"FAILED\",\"origin\":\"" + // boost::lexical_cast(socket_.remote_endpoint()) + "\"}"); status = http::status::bad_request; - } else { + } + else { SIDE side = params[0] == u8"BUY" ? SIDE::BUY : SIDE::SELL; std::string_view market = dispatcher_->registered_market_name(params[1]); Price price = std::stod(params[2].data()); @@ -224,8 +309,7 @@ class connection_handler const std::shared_ptr& console_; }; -class server -{ +class server { public: server(boost::asio::io_context &ioc, const std::shared_ptr dispatcher, @@ -259,7 +343,8 @@ class server acceptor_.listen(boost::asio::socket_base::max_listen_connections); if (acceptor_.is_open()) { console_->info("server::start: started on {}", endpoint); - } else { + } + else { console_->warn("server::start: failed to start on {}", endpoint); } async_accept_(); @@ -274,7 +359,8 @@ class server // handler->socket().remote_endpoint())); if (ec) { console_->error("server::async_accept: {}", ec.message()); - } else { + } + else { handler->dispatch(); } async_accept_(); diff --git a/Testing/inc/markov.h b/Testing/inc/markov.h index d6f3991..96dbe60 100644 --- a/Testing/inc/markov.h +++ b/Testing/inc/markov.h @@ -86,8 +86,7 @@ std::vector DTMC (std::vector< std::vector > trans, int steps, int } // BEGIN CTMC CLASS: -class CTMC -{ +class CTMC { public: CTMC(std::vector< std::vector > initMatrix); ~CTMC(); @@ -153,7 +152,8 @@ void CTMC::simulate(double T, int state) //push time and state times.push_back(t); states.push_back(j); - } else { + } + else { j++; } } diff --git a/lib/continuable b/lib/continuable new file mode 160000 index 0000000..f57c589 --- /dev/null +++ b/lib/continuable @@ -0,0 +1 @@ +Subproject commit f57c5898ebfa2aa9fd52dfcd86454fe2a3cdfd57 diff --git a/lib/function2 b/lib/function2 new file mode 160000 index 0000000..3a0746b --- /dev/null +++ b/lib/function2 @@ -0,0 +1 @@ +Subproject commit 3a0746bf5f601dfed05330aefcb6854354fce07d