diff --git a/.gitmodules b/.gitmodules index 48aeaf7..efc3331 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,12 @@ [submodule "lib/FlameGraph"] path = lib/FlameGraph url = https://github.com/brendangregg/FlameGraph +[submodule "lib/cpp-bredis"] + path = lib/cpp-bredis + url = git@github.com:basiliscos/cpp-bredis.git +[submodule "lib/continuable"] + path = lib/continuable + url = https://github.com/Naios/continuable.git +[submodule "lib/function2"] + path = lib/function2 + url = https://github.com/Naios/function2.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 602fb65..6e13533 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ endif() cmake_minimum_required (VERSION 3.10) project(MatchingEngine) -set(CMAKE_CXX_COMPILER g++) +set(CMAKE_CXX_COMPILER /usr/bin/g++) include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) @@ -26,7 +26,8 @@ option(ENABLE_BENCHMARKS "Build benchmarks" YES) # Compiler configuration set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native") +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wall -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native") set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(CMAKE_HAVE_THREADS_LIBRARY TRUE) @@ -41,6 +42,11 @@ conan_basic_setup(TARGETS) add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY -DBOOST_BEAST_USE_STD_STRING_VIEW) +add_subdirectory(lib/cpp-bredis) + +add_subdirectory(lib/function2) +add_subdirectory(lib/continuable) + add_subdirectory(Matching) add_subdirectory(Service) diff --git a/Dockerfile b/Dockerfile index 867689a..f6f1165 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ WORKDIR /opt/matching/ RUN conan profile new default --detect RUN conan profile update settings.compiler=gcc default RUN conan profile update settings.compiler.version=8 default -RUN conan profile update settings.compiler.libcxx=libstdc++ default +RUN conan profile update settings.compiler.libcxx=libstdc++11 default RUN conan profile update env.CC=gcc default RUN conan profile update env.CXX=g++ default RUN conan profile update env.CXXFLAGS=-std=c++17 default diff --git a/Makefile b/Makefile index 2d785f8..528e36c 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ build: clean conan remove -f -s -b -- '*' tidy: - astyle --style=linux --exclude=build --recursive *.cpp,*.c,*.h,*.hpp + astyle --style=stroustrup --exclude=build --exclude=lib --recursive *.cpp,*.c,*.h,*.hpp find . -type f -name '*.orig' -delete clean: diff --git a/Matching/CMakeLists.txt b/Matching/CMakeLists.txt index fd2f200..1590696 100644 --- a/Matching/CMakeLists.txt +++ b/Matching/CMakeLists.txt @@ -4,6 +4,8 @@ target_sources(matching INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries( matching INTERFACE + bredis + continuable CONAN_PKG::jemalloc CONAN_PKG::boost CONAN_PKG::spdlog diff --git a/Matching/src/influxdb.hpp b/Matching/src/influxdb.hpp index 25c2702..d5c157c 100644 --- a/Matching/src/influxdb.hpp +++ b/Matching/src/influxdb.hpp @@ -34,8 +34,7 @@ inline __int64 writev(int sock, struct iovec* iov, int cnt) #define closesocket close #endif -namespace influxdb_cpp -{ +namespace influxdb_cpp { struct server_info { std::string host_; int port_; @@ -46,8 +45,7 @@ struct server_info { server_info(const std::string& host, int port, const std::string& db = "", const std::string& usr = "", const std::string& pwd = "", const std::string& precision="ms") : host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision) {} }; -namespace detail -{ +namespace detail { struct meas_caller; struct tag_caller; struct field_caller; @@ -169,8 +167,7 @@ struct builder { std::stringstream lines_; }; -namespace detail -{ +namespace detail { struct tag_caller : public builder { detail::tag_caller& tag(const std::string& k, const std::string& v) { @@ -362,7 +359,8 @@ inline int inner::http_request(const char* method, const char* uri, if(resp) resp->append(&header[len], iv[1].iov_len); len += iv[1].iov_len; } - } while(chunked); + } + while(chunked); } goto END; } diff --git a/Matching/src/order_router.hpp b/Matching/src/order_router.hpp index 7ad24b5..b0b62c0 100644 --- a/Matching/src/order_router.hpp +++ b/Matching/src/order_router.hpp @@ -13,18 +13,17 @@ #include -namespace matching_engine -{ -namespace router -{ -class consumer -{ +namespace matching_engine { +namespace router { +class consumer : public std::enable_shared_from_this { + using queue = moodycamel::BlockingConcurrentQueue; using ms = std::chrono::milliseconds; using ns = std::chrono::nanoseconds; public: consumer(std::shared_ptr console): should_exit_{false}, console_(console) {} consumer(const consumer&) = delete; + consumer& operator=(const consumer&) = delete; consumer() = delete; void shutdown() { @@ -55,7 +54,7 @@ class consumer auto elapsed = Time::now() - start; /* Post consumer stats */ if (std::chrono::duration_cast(start.time_since_epoch()).count() - - std::chrono::duration_cast(last_log.time_since_epoch()).count() >= 250) { + - std::chrono::duration_cast(last_log.time_since_epoch()).count() >= 250) { std::async(std::launch::async,[&,market=ob.market_name(),elapsed] { /* NOTE Does it actually make sense to use async here?*/ influxdb_cpp::builder() @@ -77,16 +76,16 @@ class consumer return !should_exit_ or queue_.size_approx() > 0; } std::unordered_map markets; - moodycamel::BlockingConcurrentQueue queue_; + queue queue_; std::atomic_bool should_exit_; std::shared_ptr console_; }; -class dispatcher -{ +class dispatcher : public std::enable_shared_from_this { public: dispatcher() = default; dispatcher(const dispatcher&) = delete; + dispatcher& operator=(const dispatcher&) = delete; dispatcher(std::vector markets, std::shared_ptr console = nullptr, const uint64_t available_cores = std::max(1u, std::thread::hardware_concurrency() - 1)): @@ -94,17 +93,17 @@ class dispatcher console_{console} { const auto markets_per_core = uint64_t(markets.size() / available_cores); - auto reminder = markets.size() % available_cores; + auto remainder = markets.size() % available_cores; std::vector> consumer_pool; for (unsigned int core = 0; core < available_cores; core++) { consumer_pool.emplace_back(std::make_shared(console_)); auto& market_consumer = consumer_pool.back(); - if (reminder > 0) { + if (remainder > 0) { auto market = markets.back(); market_registry_[market] = market_consumer; market_consumer->register_market(market); markets.pop_back(); - --reminder; + --remainder; } /* Costruct consumer for N markets distributed evenly across logical cores */ for (auto index = markets_per_core; index > 0; --index) { diff --git a/Matching/src/orderbook.hpp b/Matching/src/orderbook.hpp index e9aa2e1..504ed0a 100644 --- a/Matching/src/orderbook.hpp +++ b/Matching/src/orderbook.hpp @@ -26,8 +26,7 @@ #include #include -namespace matching_engine -{ +namespace matching_engine { using Price = unsigned long; using Quantity = double; @@ -41,8 +40,7 @@ enum STATE { INACTIVE, ACTIVE, CANCELLED, FULFILLED }; enum TIF { GTC }; -class Order -{ +class Order { private: const std::string_view market_name_; /* TODO Replace with std::string_view */ const SIDE side_; @@ -99,8 +97,7 @@ using queue_allocator = boost::container::allocator; using order_queue_type = boost::container::deque; class OrderQueue - : public order_queue_type -{ + : public order_queue_type { public: OrderQueue() = default; OrderQueue(const OrderQueue &) = delete; @@ -113,8 +110,7 @@ class OrderQueue Quantity accumulate() const; }; -class OrderBook -{ +class OrderBook { private: const std::string_view market_name_; struct Comp { @@ -255,7 +251,8 @@ bool OrderBook::cancel(const UUID uuid, const SIDE side, const Price price) if (order_queue.empty()) /* Drop price node */ tree.erase(price); return result; - } catch (const std::out_of_range &) { /* No price point exist */ + } + catch (const std::out_of_range &) { /* No price point exist */ return false; } } @@ -268,59 +265,61 @@ std::string_view OrderBook::market_name() const bool OrderBook::match(OrderPtr src) { auto &&src_tree = src->is_buy() ? buy_tree_ : sell_tree_; - auto &&dist_tree = src->is_buy() ? sell_tree_ : buy_tree_; + auto &&dest_tree = src->is_buy() ? sell_tree_ : buy_tree_; src->state(STATE::ACTIVE); auto should_exit_tree = false; - for (auto node = dist_tree.begin(), end = dist_tree.end(); - !should_exit_tree && node != end;) { - auto &&dist_queue = node->second; + for (auto node = dest_tree.begin(), end = dest_tree.end(); + !should_exit_tree && node != end;) { + auto &&dest_queue = node->second; /* Buy cheap; sell expensive – conduct price improvement */ if (src->is_buy() ? src->price() >= node->first - : src->price() <= node->first) { - for (auto exit_queue = false; !exit_queue && !dist_queue.empty();) { - auto dist = dist_queue.front().get(); - auto leftover = dist->leftover() - src->leftover(); + : src->price() <= node->first) { + for (auto exit_queue = false; !exit_queue && !dest_queue.empty();) { + auto dest = dest_queue.front().get(); + auto leftover = dest->leftover() - src->leftover(); - /* Fulfilled source; partially or fulfilled dist */ + /* Fulfilled source; partially or fulfilled dest */ if (leftover >= 0) { src->execute(src->leftover()); src->state(STATE::FULFILLED); /* Exact match */ if (leftover == 0) { - dist->execute(dist->leftover()); + dest->execute(dest->leftover()); } /* Partial match */ else { - dist->execute(dist->leftover() - leftover); + dest->execute(dest->leftover() - leftover); } /* Remove fulfilled order from queue */ - if (dist->leftover() == 0) { - dist->state(STATE::FULFILLED); - dist_queue.pop_front(); + if (dest->leftover() == 0) { + dest->state(STATE::FULFILLED); + dest_queue.pop_front(); } /* Matching is complete */ exit_queue = true; should_exit_tree = true; } - /* Partially-filled source; fulfilled dist */ + /* Partially-filled source; fulfilled dest */ else { - src->execute(dist->leftover()); - dist->execute(dist->leftover()); - dist->state(STATE::FULFILLED); + src->execute(dest->leftover()); + dest->execute(dest->leftover()); + dest->state(STATE::FULFILLED); /* Remove fulfilled order from queue */ - dist_queue.pop_front(); + dest_queue.pop_front(); /* Try next order in the queue */ } } /* Try next price node */ - if (dist_queue.empty()) { + if (dest_queue.empty()) { /* Purge the price point with empty queue */ - node = dist_tree.erase(node++); - } else { + node = dest_tree.erase(node++); + } + else { ++node; } - } else { + } + else { should_exit_tree = true; } } @@ -332,7 +331,8 @@ bool OrderBook::match(OrderPtr src) node, src->price(), std::move(src)); - } else { /* Insert in existing price node */ + } + else { /* Insert in existing price node */ node->second.emplace_back(std::move(src)); } return false; @@ -388,7 +388,5 @@ std::vector OrderBook::snapshot() const traverse(sell_tree_, SIDE::SELL); return snapshot; } - - } // namespace matching_engine diff --git a/Matching/src/tcp_server.hpp b/Matching/src/tcp_server.hpp deleted file mode 100644 index 25fee4e..0000000 --- a/Matching/src/tcp_server.hpp +++ /dev/null @@ -1,196 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace matching_engine -{ -namespace tcp -{ -using boost::asio::ip::tcp; -namespace http = boost::beast::http; -using request_t = http::request; -using response_t = http::response; - -class connection_handler - : public std::enable_shared_from_this -{ -public: - connection_handler(boost::asio::io_context &ioc, - const std::shared_ptr dispatcher, - const std::shared_ptr& console) - : strand_{std::make_unique(ioc)}, - socket_{ioc}, work_{ioc}, dispatcher_{dispatcher}, console_{console} {} - - connection_handler(const connection_handler&) = delete; - - void dispatch() - { - auto self = shared_from_this(); - http::async_read( - socket_, buffer_, request_, - [this, self](boost::system::error_code ec, std::size_t) { - if (ec == http::error::end_of_stream) { - return; - } - if (ec) { - console_->error("connection_handler::async_read: {}", ec.message()); - return; - } - //logger_->info("connection_handler::async_read: {}", request_.target().to_string()); - - std::string_view target = request_.target(); - //boost::trim_if(target, [](auto ch) { return ch == '/'; }); - std::vector params; - //boost::split(params, target, [](auto ch) { return ch == '/'; }, - // boost::token_compress_on); - - size_t first = 0; - while (first < target.size()) { - const auto second = target.find_first_of('/', first); - if (first != second) params.emplace_back(target.substr(first, second-first)); - if (second == std::string_view::npos) break; - first = second + 1; - } - - //std::ostringstream ss; - http::status status; - if (params.size() < 4 or target.find(u8"favicon.ico") != std::string_view::npos) { - console_->warn("connection_handler::async_read: Invalid request"); - //ss << nlohmann::json::parse("{\"target\":\""+target+"\",\"status\": \"FAILED\",\"origin\":\"" + - // boost::lexical_cast(socket_.remote_endpoint()) + "\"}"); - status = http::status::bad_request; - } else { - SIDE side = params[0] == u8"BUY" ? SIDE::BUY : SIDE::SELL; - std::string_view market = dispatcher_->registered_market_name(params[1]); - Price price = std::stod(params[2].data()); - Quantity quantity = std::stod(params[3].data()); - dispatcher_->send(std::move(std::make_unique(market, side, price, quantity))); - status = http::status::ok; - } - self->strand_->dispatch([self, status] { self->reply(status); }); - }); - } - - tcp::socket &socket() - { - return socket_; - } - - void start() - { - strand_->dispatch([self = shared_from_this()] { self->dispatch(); }); - } - -private: - response_t static build_response(http::status status, - http::request &req) - { - response_t res{status, req.version()}; - res.set(http::field::server, BOOST_BEAST_VERSION_STRING); - res.set(http::field::content_type, "application/json"); - res.keep_alive(req.keep_alive()); - //res.body() = std::string(msg); - res.prepare_payload(); - return res; - } - - void reply(http::status status) - { - auto self = shared_from_this(); - auto response = std::make_shared( - build_response(status, request_)); - http::async_write( - socket_, *response, - [this, self, response](boost::system::error_code ec, std::size_t) { - if (ec) { - console_->error("server::async_write: {}", ec.message()); - return; - } - if (response->need_eof()) - return; - - if (!ec) - self->strand_->dispatch([self] { self->dispatch(); }); - }); - } - - tcp::socket socket_; - std::unique_ptr strand_; - boost::asio::io_context::work work_; - boost::beast::flat_buffer buffer_; - request_t request_; - std::shared_ptr dispatcher_; - const std::shared_ptr& console_; -}; - -class server -{ -public: - server(boost::asio::io_context &ioc, - const std::shared_ptr dispatcher, - const std::shared_ptr console, - const short port = 8080, - const uint64_t available_cores = std::max(1u, std::thread::hardware_concurrency() - 1)) - : ioc_{ioc}, acceptor_{ioc, tcp::endpoint(tcp::v6(), port)}, - dispatcher_{dispatcher}, console_{console}, pool_{available_cores} - { - acceptor_.set_option(tcp::acceptor::reuse_address(true)); - async_start_(); - /* Start event loop for all threads */ - for (auto core = available_cores; core > 0; --core) { - boost::asio::post(pool_, [&] {ioc.run();}); - } - } - server(const server&) = delete; - - boost::system::error_code shutdown() - { - boost::system::error_code ec; - acceptor_.close(ec); - return ec; - } - -private: - void async_start_() - { - const auto endpoint = boost::lexical_cast(acceptor_.local_endpoint()); - acceptor_.listen(boost::asio::socket_base::max_listen_connections); - if (acceptor_.is_open()) { - console_->info("server::start: started on {}", endpoint); - } else { - console_->warn("server::start: failed to start on {}", endpoint); - } - async_accept_(); - } - void async_accept_() - { - auto handler = std::make_shared(ioc_, dispatcher_, console_); - acceptor_.async_accept( - handler->socket(), [this, handler](boost::system::error_code ec) { - //logger_->info("server::async_accept: {}", - // boost::lexical_cast( - // handler->socket().remote_endpoint())); - if (ec) { - console_->error("server::async_accept: {}", ec.message()); - } else { - handler->dispatch(); - } - async_accept_(); - }); - } - - tcp::acceptor acceptor_; - boost::asio::io_context &ioc_; - std::shared_ptr dispatcher_; - const std::shared_ptr console_; - boost::asio::thread_pool pool_; -}; -} // namespace tcp -} // namespace matching_engine diff --git a/Matching/src/transport.hpp b/Matching/src/transport.hpp new file mode 100644 index 0000000..17e8a8e --- /dev/null +++ b/Matching/src/transport.hpp @@ -0,0 +1,378 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace matching_engine { +namespace transport { +class exponential_backoff { + boost::asio::steady_timer timer; + unsigned int retry_count, max_retries; + + + exponential_backoff(auto from = 1s, unsigned int max_retries = 1e4): timer{ioc, from}, + retry_count{0}, max_retries{max_retries} + { + + } + + bool retry(func) + { + bool result = func(); + t.expires_at(std::pow(2, ++retry_count) - 1); + + + } + +}; + +namespace redis { +using socket_t = boost::asio::ip::tcp::socket; +using endpoint_t = boost::asio::ip::tcp::endpoint; +using context_t = boost::asio::io_context; +namespace r = bredis; +using buffer = boost::asio::streambuf; +using iterator_t = typename r::to_iterator::iterator_t; +using endpoint_t = boost::asio::ip::tcp::endpoint; +using tcp = boost::asio::ip::tcp; +using policy_t = r::parsing_policy::keep_result; +using parse_result_t = r::parse_result_mapper_t; +using read_callback_t = std::function; +using extractor_t = r::extractor; + +class client { +public: + client(const client&) = delete; + client& operator=(const client&) = delete; + client(context_t& ioc, + const endpoint_t& endpoint, + const std::shared_ptr& console): console_{console} + { + socket = std::make_shared(ioc, endpoint.protocol()); + subscribtion_socket = std::make_shared(ioc, endpoint.protocol()); + + socket->connect(endpoint); + subscribtion_socket->connect(endpoint); + + subscription_connection_ = std::make_shared< r::Connection >(subscribtion_socket); + connection_ = std::make_shared< r::Connection >(std::move(socket); + + subscription_connection_->async_write(mc_subscription_buffer_, + r::single_command_t{psubscribe_cmd.data(), channel_name.data()}, cti::use_continuable) + .then(validate_subscription).fail(recover_failed_subscription) + .then(handle_notifications); + } + auto validate_subscription(const boost::system::error_code& ec, std::size_t bytes) + { + mc_subscription_buffer_.consume(bytes); + return subscription_connection_->async_read(mc_subscription_buffer_, cti::use_continuable) + .then([&](const boost::system::error_code& ec, parse_result_t &&r) { + auto extract = boost::apply_visitor(extractor_t(), r.result); + mc_subscription_buffer_.consume(r.consumed); + auto& replies = boost::get(extract); + auto* type = boost::get(&replies.elements[0]); + if (type && type->str.compare(psubscribe_cmd) == 0) { + auto& channel = boost::get(replies.elements[1]); + auto& payload = boost::get(replies.elements[2]); + if (channel.str.compare(channel_name) == 0 && payload == 1) { + console_->info("redis::subscribed to {}", channel_name); + } + else { + console_->warn("redis::failed to subscribe to {}", channel_name); + throw std::exception("Something bad has just happend! Need to report and resubscribe again"); + } + } + else { + throw std::exception("Something bad has just happend! Need to report and resubscribe again"); + } + }); + } + auto recover_failed_subscription(std::exception_ptr e) + { + // TODO resubscribe + } + auto handle_notifications(const boost::system::error_code& ec, parse_result_t &&r) + { + auto extract = boost::apply_visitor(extractor_t(), r.result); + mc_subscription_buffer_.consume(r.consumed); + auto& replies = boost::get(extract); + auto* type = boost::get(&replies.elements[0]); + if (type && type->str.compare(psubscribe_cmd) == 0) { + auto& channel = boost::get(replies.elements[1]); + auto& payload = boost::get(replies.elements[2]); + if (channel.str.compare(channel_name) == 0 && payload == 1) { + console_->info("redis::subscribed to {}", channel_name); + } + else { + console_->warn("redis::failed to subscribe to {}", channel_name); + throw + } + } + else if (type && type->str.compare(pmessage_cmd) == 0) { + auto& payload = boost::get(replies.elements[3]); + if (payload.str.compare(lpush_cmd) == 0) { + console_->info("redis::notification::new element in {}", channel_name); + connection_->async_write(mc_list_buffer_, + r::single_command_t{rpoplpush_cmd.data(), queue_name.data(), processing_queue_name.data()}, + cti::use_continuable) + .then(unpack_order).fail(cti::stop) + .then(set_processing).fail(cti::stop) + .then(remove_order_from_processing_queue).fail(cti::stop) + .then(dispatch_order); + } + } + read_callback_t wait_for_notification = handle_notifications; + subscription_connection_->async_read(mc_subscription_buffer_, wait_for_notification); + } + + auto set_processing(OrderPtr order) + { + // TODO DB I/O bound query to update order status + // retry if failed to update the status + return std::move(order); + } + + auto remove_order_from_processing_queue(OrderPtr order) + { + // ocd on failure? It should never ever happen in theory! But in practice we will just ignore... + // Bcoz it can only be removed once the state is persisted in the db + return cti::make_continuable([order = std::move(order)](auto&& promise) { + promise.set_value(std::move(order)); + }); + } + + auto unpack_order(const boost::system::error_code& ec, std::size_t bytes) + { + mc_list_buffer_.consume(bytes); + return connection_->async_read(mc_list_buffer_, cti::use_continuable) + .then([&](const boost::system::error_code& ec, parse_result_t &&r) { + auto extract = boost::apply_visitor(extractor_t(), r.result); + mc_list_buffer_.consume(r.consumed); + auto &payload = boost::get(extract); + console_->info("redis::{}::{}", rpoplpush_cmd, payload.str); + // if unable to parse the object then stop the chain + return cti::make_continuable([raw_order = std::move(payload.str)](auto&& promise) { + auto order = std::make_unique(); // TODO parse JSON + promise.set_value(std::move(order)); + }); + }); + } +private: + const std::shared_ptr& console_; + + std::shared_ptr> subscription_connection_; + std::shared_ptr> connection_; + + buffer mc_subscription_buffer_; + buffer mc_list_buffer_; + + const std::string_view mc_list_name{"MARKET_CONSUMER"}; + const std::string_view oc_list_name{"ORDER_CANCELATION"}; + const std::string_view mcp_list_name{mc_list_name + "_PROCESSING"}; + const std::string_view ocp_list_name{oc_list_name + "_PROCESSING"}; + + const std::string_view mc_channel_name{"__keyspace@0__:" + mc_list_name}; + const std::string_view oc_channel_name{"__keyspace@0__:" + oc_list_name}; + + const std::string_view psubscribe_cmd{"psubscribe"}; + const std::string_view pmessage_cmd{"pmessage"}; + const std::string_view lpush_cmd{"lpush"}; + const std::string_view rpoplpush_cmd{"rpoplpush"}; +}; +} // namespace redis + +namespace http { +using boost::asio::ip::tcp; +namespace http = boost::beast::http; +using request_t = http::request; +using response_t = http::response; + +class connection_handler + : public std::enable_shared_from_this { +public: + connection_handler(boost::asio::io_context &ioc, + const std::shared_ptr dispatcher, + const std::shared_ptr& console) + : socket_{ioc}, strand_{std::make_unique(ioc)}, + work_{ioc}, dispatcher_{dispatcher}, console_{console} {} + + connection_handler(const connection_handler&) = delete; + connection_handler& operator=(const connection_handler&) = delete; + + void dispatch() + { + auto self = shared_from_this(); + http::async_read( + socket_, buffer_, request_, + [this, self](boost::system::error_code ec, std::size_t) { + if (ec == http::error::end_of_stream) { + return; + } + if (ec) { + console_->error("connection_handler::async_read: {}", ec.message()); + return; + } + //logger_->info("connection_handler::async_read: {}", request_.target().to_string()); + + std::string_view target = request_.target(); + //boost::trim_if(target, [](auto ch) { return ch == '/'; }); + std::vector params; + //boost::split(params, target, [](auto ch) { return ch == '/'; }, + // boost::token_compress_on); + + size_t first = 0; + while (first < target.size()) { + const auto second = target.find_first_of('/', first); + if (first != second) params.emplace_back(target.substr(first, second-first)); + if (second == std::string_view::npos) break; + first = second + 1; + } + + //std::ostringstream ss; + http::status status; + if (params.size() < 4 or target.find(u8"favicon.ico") != std::string_view::npos) { + console_->warn("connection_handler::async_read: Invalid request"); + //ss << nlohmann::json::parse("{\"target\":\""+target+"\",\"status\": \"FAILED\",\"origin\":\"" + + // boost::lexical_cast(socket_.remote_endpoint()) + "\"}"); + status = http::status::bad_request; + } + else { + SIDE side = params[0] == u8"BUY" ? SIDE::BUY : SIDE::SELL; + std::string_view market = dispatcher_->registered_market_name(params[1]); + Price price = std::stod(params[2].data()); + Quantity quantity = std::stod(params[3].data()); + dispatcher_->send(std::move(std::make_unique(market, side, price, quantity))); + status = http::status::ok; + } + self->strand_->dispatch([self, status] { self->reply(status); }); + }); + } + + tcp::socket &socket() + { + return socket_; + } + + void start() + { + strand_->dispatch([self = shared_from_this()] { self->dispatch(); }); + } + +private: + response_t static build_response(http::status status, + http::request &req) + { + response_t res{status, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "application/json"); + res.keep_alive(req.keep_alive()); + //res.body() = std::string(msg); + res.prepare_payload(); + return res; + } + + void reply(http::status status) + { + auto self = shared_from_this(); + auto response = std::make_shared( + build_response(status, request_)); + http::async_write( + socket_, *response, + [this, self, response](boost::system::error_code ec, std::size_t) { + if (ec) { + console_->error("server::async_write: {}", ec.message()); + return; + } + if (response->need_eof()) + return; + + if (!ec) + self->strand_->dispatch([self] { self->dispatch(); }); + }); + } + + tcp::socket socket_; + std::unique_ptr strand_; + boost::asio::io_context::work work_; + boost::beast::flat_buffer buffer_; + request_t request_; + std::shared_ptr dispatcher_; + const std::shared_ptr& console_; +}; + +class server { +public: + server(boost::asio::io_context &ioc, + const std::shared_ptr dispatcher, + const std::shared_ptr console, + const short port = 8080, + const uint64_t available_cores = std::max(1u, std::thread::hardware_concurrency() - 1)) + : ioc_{ioc}, acceptor_{ioc, tcp::endpoint(tcp::v6(), port)}, + dispatcher_{dispatcher}, console_{console}, pool_{available_cores} + { + acceptor_.set_option(tcp::acceptor::reuse_address(true)); + async_start_(); + /* Start event loop for all threads */ + for (auto core = available_cores; core > 0; --core) { + boost::asio::post(pool_, [&] {ioc.run();}); + } + } + server(const server&) = delete; + server& operator=(const server&) = delete; + + boost::system::error_code shutdown() + { + boost::system::error_code ec; + acceptor_.close(ec); + return ec; + } + +private: + void async_start_() + { + const auto endpoint = boost::lexical_cast(acceptor_.local_endpoint()); + acceptor_.listen(boost::asio::socket_base::max_listen_connections); + if (acceptor_.is_open()) { + console_->info("server::start: started on {}", endpoint); + } + else { + console_->warn("server::start: failed to start on {}", endpoint); + } + async_accept_(); + } + void async_accept_() + { + auto handler = std::make_shared(ioc_, dispatcher_, console_); + acceptor_.async_accept( + handler->socket(), [this, handler](boost::system::error_code ec) { + //logger_->info("server::async_accept: {}", + // boost::lexical_cast( + // handler->socket().remote_endpoint())); + if (ec) { + console_->error("server::async_accept: {}", ec.message()); + } + else { + handler->dispatch(); + } + async_accept_(); + }); + } + + boost::asio::io_context &ioc_; + tcp::acceptor acceptor_; + std::shared_ptr dispatcher_; + const std::shared_ptr console_; + boost::asio::thread_pool pool_; +}; +} // namespace http +} // namespace transport +} // namespace matching_engine diff --git a/Service/src/main.cpp b/Service/src/main.cpp index fd50ba3..2f96d93 100644 --- a/Service/src/main.cpp +++ b/Service/src/main.cpp @@ -19,10 +19,12 @@ #include #include #include -#include +#include #include using namespace std::chrono_literals; +using tcp_t = boost::asio::ip::tcp; +using endpoint_t = tcp_t::endpoint; namespace me = matching_engine; int main(int argc, char *argv[]) @@ -45,7 +47,9 @@ int main(int argc, char *argv[]) /* Initialise TCP transport layer */ boost::asio::io_context ioc{(int)std::thread::hardware_concurrency()}; - me::tcp::server server(ioc, dispatcher, console); + me::transport::http::server server(ioc, dispatcher, console); + auto redis_endpoint = endpoint_t(tcp_t::v4(), 6379); + me::transport::redis::client redis(ioc, redis_endpoint, console); ioc.run(); diff --git a/Testing/Benchmark/main.cpp b/Testing/Benchmark/main.cpp index 00b1ac2..a4205e6 100644 --- a/Testing/Benchmark/main.cpp +++ b/Testing/Benchmark/main.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include "markov.h" using namespace matching_engine; @@ -60,22 +60,22 @@ static void OrderMatching(benchmark::State& state) BENCHMARK(OrderMatching)->DenseRange(1, 1000, 250)->UseManualTime()->Complexity(benchmark::oLogN); // TODO Starts and hangs, issue with thread pool. Fix it. -static void OrderDispatching(benchmark::State& state) -{ - // Perform setup here - const std::vector markets = {u8"USD_JPY"}; - auto dispatcher = std::make_shared(markets); - auto prices = SimulateMarket(state.range(0)); - for(auto _ : state) { - for (auto price : prices) { - auto side = rand() % 2 ? SIDE::BUY : SIDE::SELL; - auto quantity = double(rand() % 10 + 1) / (rand() % 20 + 1); - auto order = std::make_unique(markets[0], side, price, quantity); - dispatcher->send(std::move(order)); - } - } - dispatcher->shutdown(); -} +//static void OrderDispatching(benchmark::State& state) +//{ +// // Perform setup here +// const std::vector markets = {u8"USD_JPY"}; +// auto dispatcher = std::make_shared(markets); +// auto prices = SimulateMarket(state.range(0)); +// for(auto _ : state) { +// for (auto price : prices) { +// auto side = rand() % 2 ? SIDE::BUY : SIDE::SELL; +// auto quantity = double(rand() % 10 + 1) / (rand() % 20 + 1); +// auto order = std::make_unique(markets[0], side, price, quantity); +// dispatcher->send(std::move(order)); +// } +// } +// dispatcher->shutdown(); +//} //BENCHMARK(OrderDispatching)->DenseRange(0, 1000, 250)->MeasureProcessCPUTime(); diff --git a/Testing/inc/markov.h b/Testing/inc/markov.h index d6f3991..96dbe60 100644 --- a/Testing/inc/markov.h +++ b/Testing/inc/markov.h @@ -86,8 +86,7 @@ std::vector DTMC (std::vector< std::vector > trans, int steps, int } // BEGIN CTMC CLASS: -class CTMC -{ +class CTMC { public: CTMC(std::vector< std::vector > initMatrix); ~CTMC(); @@ -153,7 +152,8 @@ void CTMC::simulate(double T, int state) //push time and state times.push_back(t); states.push_back(j); - } else { + } + else { j++; } } diff --git a/lib/continuable b/lib/continuable new file mode 160000 index 0000000..f57c589 --- /dev/null +++ b/lib/continuable @@ -0,0 +1 @@ +Subproject commit f57c5898ebfa2aa9fd52dfcd86454fe2a3cdfd57 diff --git a/lib/cpp-bredis b/lib/cpp-bredis new file mode 160000 index 0000000..b8218cb --- /dev/null +++ b/lib/cpp-bredis @@ -0,0 +1 @@ +Subproject commit b8218cb441065d38ee395035fda22431c632fe12 diff --git a/lib/function2 b/lib/function2 new file mode 160000 index 0000000..3a0746b --- /dev/null +++ b/lib/function2 @@ -0,0 +1 @@ +Subproject commit 3a0746bf5f601dfed05330aefcb6854354fce07d