Skip to content
This repository has been archived by the owner on Feb 1, 2022. It is now read-only.

Redis support #7

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
[submodule "lib/FlameGraph"]
path = lib/FlameGraph
url = https://github.com/brendangregg/FlameGraph
[submodule "lib/cpp-bredis"]
path = lib/cpp-bredis
url = [email protected]:basiliscos/cpp-bredis.git
[submodule "lib/continuable"]
path = lib/continuable
url = https://github.com/Naios/continuable.git
[submodule "lib/function2"]
path = lib/function2
url = https://github.com/Naios/function2.git
10 changes: 8 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ endif()
cmake_minimum_required (VERSION 3.10)
project(MatchingEngine)

set(CMAKE_CXX_COMPILER g++)
set(CMAKE_CXX_COMPILER /usr/bin/g++)

include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)

Expand All @@ -26,7 +26,8 @@ option(ENABLE_BENCHMARKS "Build benchmarks" YES)

# Compiler configuration
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native")
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexceptions -Wdeprecated -Wextra -Wall -Wno-switch-enum -Wno-float-equal -Wno-unused-parameter -Wno-unused-member-function -Werror -fno-omit-frame-pointer -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -lpthread -lm -march=native")

set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(CMAKE_HAVE_THREADS_LIBRARY TRUE)
Expand All @@ -41,6 +42,11 @@ conan_basic_setup(TARGETS)

add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY -DBOOST_BEAST_USE_STD_STRING_VIEW)

add_subdirectory(lib/cpp-bredis)

add_subdirectory(lib/function2)
add_subdirectory(lib/continuable)

add_subdirectory(Matching)

add_subdirectory(Service)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ WORKDIR /opt/matching/
RUN conan profile new default --detect
RUN conan profile update settings.compiler=gcc default
RUN conan profile update settings.compiler.version=8 default
RUN conan profile update settings.compiler.libcxx=libstdc++ default
RUN conan profile update settings.compiler.libcxx=libstdc++11 default
RUN conan profile update env.CC=gcc default
RUN conan profile update env.CXX=g++ default
RUN conan profile update env.CXXFLAGS=-std=c++17 default
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ build: clean
conan remove -f -s -b -- '*'

tidy:
astyle --style=linux --exclude=build --recursive *.cpp,*.c,*.h,*.hpp
astyle --style=stroustrup --exclude=build --exclude=lib --recursive *.cpp,*.c,*.h,*.hpp
find . -type f -name '*.orig' -delete

clean:
Expand Down
2 changes: 2 additions & 0 deletions Matching/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ target_sources(matching INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
matching
INTERFACE
bredis
continuable
CONAN_PKG::jemalloc
CONAN_PKG::boost
CONAN_PKG::spdlog
Expand Down
12 changes: 5 additions & 7 deletions Matching/src/influxdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ inline __int64 writev(int sock, struct iovec* iov, int cnt)
#define closesocket close
#endif

namespace influxdb_cpp
{
namespace influxdb_cpp {
struct server_info {
std::string host_;
int port_;
Expand All @@ -46,8 +45,7 @@ struct server_info {
server_info(const std::string& host, int port, const std::string& db = "", const std::string& usr = "", const std::string& pwd = "", const std::string& precision="ms")
: host_(host), port_(port), db_(db), usr_(usr), pwd_(pwd), precision_(precision) {}
};
namespace detail
{
namespace detail {
struct meas_caller;
struct tag_caller;
struct field_caller;
Expand Down Expand Up @@ -169,8 +167,7 @@ struct builder {
std::stringstream lines_;
};

namespace detail
{
namespace detail {
struct tag_caller : public builder {
detail::tag_caller& tag(const std::string& k, const std::string& v)
{
Expand Down Expand Up @@ -362,7 +359,8 @@ inline int inner::http_request(const char* method, const char* uri,
if(resp) resp->append(&header[len], iv[1].iov_len);
len += iv[1].iov_len;
}
} while(chunked);
}
while(chunked);
}
goto END;
}
Expand Down
25 changes: 12 additions & 13 deletions Matching/src/order_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
#include <moodycamel/blockingconcurrentqueue.h>


namespace matching_engine
{
namespace router
{
class consumer
{
namespace matching_engine {
namespace router {
class consumer : public std::enable_shared_from_this<consumer> {
using queue = moodycamel::BlockingConcurrentQueue<OrderPtr>;
using ms = std::chrono::milliseconds;
using ns = std::chrono::nanoseconds;
public:
consumer(std::shared_ptr<spdlog::logger> console):
should_exit_{false}, console_(console) {}
consumer(const consumer&) = delete;
consumer& operator=(const consumer&) = delete;
consumer() = delete;
void shutdown()
{
Expand Down Expand Up @@ -55,7 +54,7 @@ class consumer
auto elapsed = Time::now() - start;
/* Post consumer stats */
if (std::chrono::duration_cast<ms>(start.time_since_epoch()).count()
- std::chrono::duration_cast<ms>(last_log.time_since_epoch()).count() >= 250) {
- std::chrono::duration_cast<ms>(last_log.time_since_epoch()).count() >= 250) {
std::async(std::launch::async,[&,market=ob.market_name(),elapsed] {
/* NOTE Does it actually make sense to use async here?*/
influxdb_cpp::builder()
Expand All @@ -77,34 +76,34 @@ class consumer
return !should_exit_ or queue_.size_approx() > 0;
}
std::unordered_map<std::string_view, OrderBook> markets;
moodycamel::BlockingConcurrentQueue<OrderPtr> queue_;
queue queue_;
std::atomic_bool should_exit_;
std::shared_ptr<spdlog::logger> console_;
};

class dispatcher
{
class dispatcher : public std::enable_shared_from_this<dispatcher> {
public:
dispatcher() = default;
dispatcher(const dispatcher&) = delete;
dispatcher& operator=(const dispatcher&) = delete;
dispatcher(std::vector<std::string_view> markets,
std::shared_ptr<spdlog::logger> console = nullptr,
const uint64_t available_cores = std::max(1u, std::thread::hardware_concurrency() - 1)):
pool_{available_cores},
console_{console}
{
const auto markets_per_core = uint64_t(markets.size() / available_cores);
auto reminder = markets.size() % available_cores;
auto remainder = markets.size() % available_cores;
std::vector<std::shared_ptr<consumer>> consumer_pool;
for (unsigned int core = 0; core < available_cores; core++) {
consumer_pool.emplace_back(std::make_shared<consumer>(console_));
auto& market_consumer = consumer_pool.back();
if (reminder > 0) {
if (remainder > 0) {
auto market = markets.back();
market_registry_[market] = market_consumer;
market_consumer->register_market(market);
markets.pop_back();
--reminder;
--remainder;
}
/* Costruct consumer for N markets distributed evenly across logical cores */
for (auto index = markets_per_core; index > 0; --index) {
Expand Down
68 changes: 33 additions & 35 deletions Matching/src/orderbook.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>

namespace matching_engine
{
namespace matching_engine {

using Price = unsigned long;
using Quantity = double;
Expand All @@ -41,8 +40,7 @@ enum STATE { INACTIVE, ACTIVE, CANCELLED, FULFILLED };

enum TIF { GTC };

class Order
{
class Order {
private:
const std::string_view market_name_; /* TODO Replace with std::string_view */
const SIDE side_;
Expand Down Expand Up @@ -99,8 +97,7 @@ using queue_allocator = boost::container::allocator<OrderPtr>;
using order_queue_type = boost::container::deque<OrderPtr, queue_allocator>;

class OrderQueue
: public order_queue_type
{
: public order_queue_type {
public:
OrderQueue() = default;
OrderQueue(const OrderQueue &) = delete;
Expand All @@ -113,8 +110,7 @@ class OrderQueue
Quantity accumulate() const;
};

class OrderBook
{
class OrderBook {
private:
const std::string_view market_name_;
struct Comp {
Expand Down Expand Up @@ -255,7 +251,8 @@ bool OrderBook::cancel(const UUID uuid, const SIDE side, const Price price)
if (order_queue.empty()) /* Drop price node */
tree.erase(price);
return result;
} catch (const std::out_of_range &) { /* No price point exist */
}
catch (const std::out_of_range &) { /* No price point exist */
return false;
}
}
Expand All @@ -268,59 +265,61 @@ std::string_view OrderBook::market_name() const
bool OrderBook::match(OrderPtr src)
{
auto &&src_tree = src->is_buy() ? buy_tree_ : sell_tree_;
auto &&dist_tree = src->is_buy() ? sell_tree_ : buy_tree_;
auto &&dest_tree = src->is_buy() ? sell_tree_ : buy_tree_;
src->state(STATE::ACTIVE);

auto should_exit_tree = false;
for (auto node = dist_tree.begin(), end = dist_tree.end();
!should_exit_tree && node != end;) {
auto &&dist_queue = node->second;
for (auto node = dest_tree.begin(), end = dest_tree.end();
!should_exit_tree && node != end;) {
auto &&dest_queue = node->second;
/* Buy cheap; sell expensive – conduct price improvement */
if (src->is_buy() ? src->price() >= node->first
: src->price() <= node->first) {
for (auto exit_queue = false; !exit_queue && !dist_queue.empty();) {
auto dist = dist_queue.front().get();
auto leftover = dist->leftover() - src->leftover();
: src->price() <= node->first) {
for (auto exit_queue = false; !exit_queue && !dest_queue.empty();) {
auto dest = dest_queue.front().get();
auto leftover = dest->leftover() - src->leftover();

/* Fulfilled source; partially or fulfilled dist */
/* Fulfilled source; partially or fulfilled dest */
if (leftover >= 0) {
src->execute(src->leftover());
src->state(STATE::FULFILLED);
/* Exact match */
if (leftover == 0) {
dist->execute(dist->leftover());
dest->execute(dest->leftover());
}
/* Partial match */
else {
dist->execute(dist->leftover() - leftover);
dest->execute(dest->leftover() - leftover);
}
/* Remove fulfilled order from queue */
if (dist->leftover() == 0) {
dist->state(STATE::FULFILLED);
dist_queue.pop_front();
if (dest->leftover() == 0) {
dest->state(STATE::FULFILLED);
dest_queue.pop_front();
}
/* Matching is complete */
exit_queue = true;
should_exit_tree = true;
}
/* Partially-filled source; fulfilled dist */
/* Partially-filled source; fulfilled dest */
else {
src->execute(dist->leftover());
dist->execute(dist->leftover());
dist->state(STATE::FULFILLED);
src->execute(dest->leftover());
dest->execute(dest->leftover());
dest->state(STATE::FULFILLED);
/* Remove fulfilled order from queue */
dist_queue.pop_front();
dest_queue.pop_front();
/* Try next order in the queue */
}
}
/* Try next price node */
if (dist_queue.empty()) {
if (dest_queue.empty()) {
/* Purge the price point with empty queue */
node = dist_tree.erase(node++);
} else {
node = dest_tree.erase(node++);
}
else {
++node;
}
} else {
}
else {
should_exit_tree = true;
}
}
Expand All @@ -332,7 +331,8 @@ bool OrderBook::match(OrderPtr src)
node,
src->price(),
std::move(src));
} else { /* Insert in existing price node */
}
else { /* Insert in existing price node */
node->second.emplace_back(std::move(src));
}
return false;
Expand Down Expand Up @@ -388,7 +388,5 @@ std::vector<OrderBook::snapshot_point> OrderBook::snapshot() const
traverse(sell_tree_, SIDE::SELL);
return snapshot;
}


} // namespace matching_engine

Loading