Skip to content

Commit

Permalink
feat: log index (#12)
Browse files Browse the repository at this point in the history
* feat: add binlog in floyd

* fix: format

* feat: return error status when time is out

* Add log index code

* get log index from sst file

* check if apply in multi cf

* add missing AddUserKey

* fix wrong function call

* fix: compile errors

* fix: adjust after cherry pick

* style

* test: generic cmake for tests

* test: log index test

* test: custom table property of rocksdb test

* test: log index property test

* refactor: adjust LogIndexOfCF::Init

* feat: check if apply and set before write binlog

* test: log index test

* test: commont in test for seq no

* perf: replace map with pair for cache

* refactor: move TimerGuard to utils.h

* refactor: use fmt

---------

Co-authored-by: qcloud <[email protected]>
Co-authored-by: ohmhong <[email protected]>
  • Loading branch information
3 people authored Mar 6, 2024
1 parent 0436f90 commit b6d54df
Show file tree
Hide file tree
Showing 19 changed files with 910 additions and 23 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ include(cmake/spdlog.cmake)
include(cmake/folly.cmake)
include(cmake/gtest.cmake)
include(cmake/rocksdb.cmake)
include(cmake/protobuf.cmake)

enable_testing()

Expand All @@ -126,7 +127,7 @@ SET(CMAKE_EXPORT_COMPILE_COMMANDS ON)
STRING(CONCAT FORMAT_DIRS "${PROJECT_SOURCE_DIR}/src,")
FILE(WRITE
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
"${PROJECT_SOURCE_DIR}/src/redis_intset.c\n${PROJECT_SOURCE_DIR}/src/redis_zip_list.c")
"${PROJECT_SOURCE_DIR}/src/redis_intset.c\n${PROJECT_SOURCE_DIR}/src/redis_zip_list.c\n${PROJECT_SOURCE_DIR}/src/storage/src/binlog.pb.h\n${PROJECT_SOURCE_DIR}/src/storage/src/binlog.pb.cc")
ADD_CUSTOM_TARGET(format
COMMAND ${BUILD_SUPPORT_DIR}/run_clang_format.py
${CLANG_FORMAT_BIN}
Expand Down
48 changes: 48 additions & 0 deletions cmake/protobuf.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.

set(Protobuf_USE_STATIC_LIBS ON)
set(Protobuf_MSVC_STATIC_RUNTIME OFF)

FetchContent_Declare(
protocolbuffers_protobuf
GIT_REPOSITORY https://github.com/protocolbuffers/protobuf
GIT_TAG v3.15.8
)
FetchContent_MakeAvailable(protocolbuffers_protobuf)

set(Protobuf_ROOT ${protocolbuffers_protobuf_SOURCE_DIR}/cmake)

message(STATUS "Setting up protobuf ...")
execute_process(
COMMAND
${CMAKE_COMMAND} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -D protobuf_BUILD_TESTS=OFF -D protobuf_BUILD_PROTOC_BINARIES=ON -D CMAKE_POSITION_INDEPENDENT_CODE=ON -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE result
WORKING_DIRECTORY ${Protobuf_ROOT})
if(result)
message(FATAL_ERROR "Failed to download protobuf (${result})!")
endif()

message(STATUS "Building protobuf ...")
execute_process(
COMMAND ${CMAKE_COMMAND} --build .
RESULT_VARIABLE result
WORKING_DIRECTORY ${Protobuf_ROOT})
if(result)
message(FATAL_ERROR "Failed to build protobuf (${result})!")
endif()

message(STATUS "Installing protobuf ...")

set(PROTOBUF_INCLUDE_DIR ${protocolbuffers_protobuf_SOURCE_DIR}/src)
set(PROTOBUF_LIBRARIES ${Protobuf_ROOT})
set(PROTOC_LIB ${Protobuf_ROOT})
set(PROTOBUF_PROTOC "${Protobuf_ROOT}/protoc" CACHE FILEPATH "Path to protoc executable" FORCE)
set(PROTOBUF_PROTOC_EXECUTABLE "${Protobuf_ROOT}/protoc" CACHE FILEPATH "Path to protoc executable" FORCE)
if(${CMAKE_BUILD_TYPE} EQUAL Debug)
set(PROTOBUF_LITE_LIBRARY ${PROTOBUF_LIBRARIES}/libprotobuf-lited.a)
else()
set(PROTOBUF_LITE_LIBRARY ${PROTOBUF_LIBRARIES}/libprotobuf-lite.a)
endif()
8 changes: 4 additions & 4 deletions src/pstd/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ThreadPool final {
void operator=(const ThreadPool&) = delete;

template <typename F, typename... Args>
auto ExecuteTask(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type>;
auto ExecuteTask(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type>;

void JoinAll();
void SetMaxIdleThread(unsigned int m);
Expand All @@ -54,8 +54,8 @@ class ThreadPool final {
};

template <typename F, typename... Args>
auto ThreadPool::ExecuteTask(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type> {
using resultType = typename std::invoke_result<F(Args...)>::type;
auto ThreadPool::ExecuteTask(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type> {
using resultType = typename std::invoke_result<F, Args...>::type;

auto task =
std::make_shared<std::packaged_task<resultType()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
Expand All @@ -77,4 +77,4 @@ auto ThreadPool::ExecuteTask(F&& f, Args&&... args) -> std::future<typename std:
return task->get_future();
}

} // namespace pstd
} // namespace pstd
15 changes: 12 additions & 3 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
#AUX_SOURCE_DIRECTORY(./src STORAGE_SRC)
FILE(GLOB BINLOG_PROTO
"${CMAKE_CURRENT_SOURCE_DIR}/src/binlog.proto"
)
EXECUTE_PROCESS(
COMMAND ${PROTOBUF_PROTOC} --cpp_out=${CMAKE_CURRENT_SOURCE_DIR}/src -I${CMAKE_CURRENT_SOURCE_DIR}/src ${BINLOG_PROTO}
)

FILE(GLOB STORAGE_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.h"
Expand All @@ -13,8 +19,11 @@ TARGET_INCLUDE_DIRECTORIES(storage
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${rocksdb_SOURCE_DIR}
PRIVATE ${rocksdb_SOURCE_DIR}/include
)
PRIVATE ${PROTOBUF_INCLUDE_DIR}
)

TARGET_LINK_LIBRARIES(storage pstd glog rocksdb)
TARGET_LINK_LIBRARIES(storage pstd glog rocksdb ${PROTOBUF_LITE_LIBRARY})

SET_TARGET_PROPERTIES(storage PROPERTIES LINKER_LANGUAGE CXX)

ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/tests)
8 changes: 8 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <unistd.h>
#include <list>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <utility>
Expand Down Expand Up @@ -43,6 +44,8 @@ using Status = rocksdb::Status;
using Slice = rocksdb::Slice;

class Redis;
class Task;
class TaskQueue;
enum class OptionType;

template <typename T1, typename T2>
Expand All @@ -58,6 +61,7 @@ struct StorageOptions {
size_t small_compaction_duration_threshold = 10000;
size_t db_instance_num = 3; // default = 3
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
bool is_write_by_binlog = false;
};

struct KeyValue {
Expand Down Expand Up @@ -1073,6 +1077,7 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);
auto GetTaskQueue() const -> TaskQueue* { return task_queue_.get(); }

private:
std::vector<std::unique_ptr<Redis>> insts_;
Expand All @@ -1093,6 +1098,9 @@ class Storage {
// For scan keys in data base
std::atomic<bool> scan_keynum_exit_ = false;
int32_t db_instance_num_;

void DefaultWriteCallback(const Task& task);
std::unique_ptr<TaskQueue> task_queue_;
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum ColumnFamilyIndex {
kZsetsMetaCF = 7,
kZsetsDataCF = 8,
kZsetsScoreCF = 9,
kColumnFamilyNum
};

const static char kNeedTransformCharacter = '\u0000';
Expand Down
98 changes: 98 additions & 0 deletions src/storage/src/batch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <future>
#include <memory>

#include "rocksdb/db.h"

#include "src/binlog.pb.h"
#include "src/redis.h"
#include "src/task_queue.h"
#include "storage/storage.h"
#include "storage/storage_define.h"

namespace storage {

class Batch {
public:
virtual ~Batch() = default;

virtual void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& val) = 0;
virtual void Delete(ColumnFamilyIndex cf_idx, const Slice& key) = 0;
virtual auto Commit() -> Status = 0;

static auto CreateBatch(Redis* redis, bool use_binlog = false) -> std::unique_ptr<Batch>;
};

class RocksBatch : public Batch {
public:
RocksBatch(rocksdb::DB* db, const rocksdb::WriteOptions& options,
const std::vector<rocksdb::ColumnFamilyHandle*>& handles)
: db_(db), options_(options), handles_(handles) {}

void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& val) override {
batch_.Put(handles_[cf_idx], key, val);
}
void Delete(ColumnFamilyIndex cf_idx, const Slice& key) override { batch_.Delete(handles_[cf_idx], key); }
auto Commit() -> Status override { return db_->Write(options_, &batch_); }

private:
rocksdb::WriteBatch batch_;
rocksdb::DB* db_;
const rocksdb::WriteOptions& options_;
const std::vector<rocksdb::ColumnFamilyHandle*>& handles_;
};

class BinlogBatch : public Batch {
public:
BinlogBatch(TaskQueue* queue, int32_t index, int32_t seconds = 10) : task_queue_(queue), seconds_(seconds) {
binlog_.set_slot_idx(index);
}

void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& value) override {
auto entry = binlog_.add_entries();
entry->set_cf_idx(cf_idx);
entry->set_op_type(OperateType::kPut);
entry->set_key(key.ToString());
entry->set_value(value.ToString());
}

void Delete(ColumnFamilyIndex cf_idx, const Slice& key) override {
auto entry = binlog_.add_entries();
entry->set_cf_idx(cf_idx);
entry->set_op_type(OperateType::kDelete);
entry->set_key(key.ToString());
}

Status Commit() override {
std::promise<Status> promise;
auto future = promise.get_future();
Task task(binlog_.SerializeAsString(), std::move(promise));
task_queue_->Produce(std::move(task));
if (seconds_ == -1) { // if do not require timeout
return future.get();
}

auto status = future.wait_for(std::chrono::seconds(seconds_));
if (status == std::future_status::timeout) {
return Status::Incomplete("Wait for write timeout");
}
return future.get();
}

private:
Binlog binlog_;
TaskQueue* task_queue_;
int32_t seconds_;
};

inline auto Batch::CreateBatch(Redis* redis, bool use_binlog) -> std::unique_ptr<Batch> {
if (use_binlog) {
return std::make_unique<BinlogBatch>(redis->GetTaskQueue(), redis->GetIndex());
}
return std::make_unique<RocksBatch>(redis->GetDB(), redis->GetWriteOptions(), redis->GetColumnFamilyHandles());
}

} // namespace storage
23 changes: 23 additions & 0 deletions src/storage/src/binlog.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package storage;

option optimize_for = LITE_RUNTIME;

enum OperateType {
kNoOperate = 0;
kPut = 1;
kDelete = 2;
}

message BinlogEntry {
int32 cf_idx = 1;
OperateType op_type = 2;
bytes key = 3;
optional bytes value = 4;
}

message Binlog {
int32 slot_idx = 1;
repeated BinlogEntry entries = 2;
}
Loading

0 comments on commit b6d54df

Please sign in to comment.