Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distinguish between normal node startup and snapshot loading #319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ ExternalProject_Add(
extern_braft
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS brpc
# The pr on braft is not merged, so I am using my own warehouse to run the test for the time being
GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
GIT_TAG master
GIT_TAG v1.1.2-alpha2
GIT_SHALLOW true
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
Expand Down
2 changes: 1 addition & 1 deletion cmake/openssl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ SET_PROPERTY(TARGET crypto PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${OPENSSL_INCL
ADD_DEPENDENCIES(crypto OpenSSL)
SET(OPENSSL_CRYPTO_LIBRARY ${OPENSSL_INSTALL_DIR}/${OPENSSL_LIB}/libcrypto.a)

SET(OPENSSL_INCLUDE_DIR ${LIB_INCLUDE_DIR})
SET(OPENSSL_INCLUDE_DIR ${LIB_INCLUDE_DIR})
2 changes: 1 addition & 1 deletion pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ slowlog-max-len 128
# to the same DB is distributed among several RocksDB instances.

# RocksDB instances number per DB
db-instance-num 3
db-instance-num 1
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
# default is 86400 * 7
small-compaction-threshold 604800
# default is 86400 * 3
Expand Down
6 changes: 3 additions & 3 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The save_load.sh script contains glob patterns (rm -fr *) that could potentially be affected by filenames with dashes. To prevent filenames with dashes from being interpreted as options, consider using ./* instead of *.

  • Line 2: rm -fr * should be updated to rm -fr ./ *
  • Line 7: rm -fr * should be updated to rm -fr ./ *
Analysis chain

Consider using ./ or -- with glob patterns to prevent filenames with dashes from being interpreted as options.

Also applies to: 7-7

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if filenames with dashes are being interpreted as options.

# Test: Check if filenames with dashes exist and could potentially be interpreted as options.
find . -name "*-*" -exec echo "Filename with dash found: {}" \;

Length of output: 3925



Script:

#!/bin/bash
# Description: Check for glob patterns in save_load.sh that could be affected by filenames with dashes.

# Test: Search for glob patterns in save_load.sh
grep -E '\*|\?' save_load.sh

Length of output: 217

killall -9 pikiwidb
mkdir leader follower1

Expand All @@ -10,9 +11,8 @@ redis-cli -p 7777 raft.cluster init

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot

sleep 10

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ TARGET_LINK_LIBRARIES(pikiwidb
"${LIB}"
)

SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX)
SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX)
4 changes: 3 additions & 1 deletion src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
auto self_snapshot_index = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetSmallestFlushedLogIndex();
INFO("DoCmdSnapshot self_snapshot_index:{}", self_snapshot_index);
auto s = PRAFT.DoSnapshot(self_snapshot_index);
Comment on lines +125 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

更改已批准

检索并使用 self_snapshot_index 是必要的,以便处理新的快照功能。

需要帮助完成 self_snapshot_index 的实现吗?

注释的代码表明 self_snapshot_index 计划被使用,但尚未实现。需要我帮助完成这部分代码吗?

if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
Expand Down
14 changes: 11 additions & 3 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ rocksdb::Status DB::Open() {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
storage_options.do_snapshot_function =
std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};
Comment on lines +40 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:在 Lambda 表达式中使用值捕获。

使用引用捕获 raft 可能导致潜在的生命周期问题,尤其是在 raft 对象的生命周期可能比 storage_options 短的场景中。建议改为值捕获或确保 raft 的生命周期总是足够长。

- storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
+ storage_options.do_snapshot_function = [raft = pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
  raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
                   std::forward<decltype(is_sync)>(is_sync));
};
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};
storage_options.do_snapshot_function = [raft = pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};

}

storage_options.db_instance_num = g_config.db_instance_num.load();
Expand Down Expand Up @@ -92,6 +94,13 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
opened_ = false;
// close the old storage, then open the new storage
std::unique_ptr<storage::Storage> old_storage = std::move(storage_);
if (old_storage != nullptr) {
old_storage->Close();
old_storage.reset();
}
storage_ = std::make_unique<storage::Storage>();
Comment on lines +97 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:在 DB::LoadDBFromCheckpoint 中使用 Lambda 表达式。

为了保持一致性,建议将 storage_options.do_snapshot_function 中的 std::bind 替换为 Lambda 表达式。

- storage_options.do_snapshot_function =
-     std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
+ storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
+   raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
+                    std::forward<decltype(is_sync)>(is_sync));
+ };

Committable suggestion was skipped due to low confidence.

auto result = storage_->LoadCheckpoint(checkpoint_sub_path, db_path_);

for (auto& r : result) {
Expand All @@ -114,7 +123,6 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma
storage_options.do_snapshot_function =
std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
Expand Down
1 change: 1 addition & 0 deletions src/praft/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TARGET_INCLUDE_DIRECTORIES(praft
PRIVATE ${BRAFT_INCLUDE_DIR}
PRIVATE ${BRPC_INCLUDE_DIR}
PRIVATE ${PROTO_OUTPUT_DIR}
PRIVATE ${PROJECT_SOURCE_DIR}/src/storage/include
)

IF (CMAKE_SYSTEM_NAME STREQUAL "Linux")
Expand Down
63 changes: 57 additions & 6 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
// the case that it becomes the leader while the service is unreacheable by
// clients.
// Notice the default options of server is used here. Check out details from
// the doc of brpc if you would like change some options;
// the doc of brpc if you would like change some option;
if (server_->Start(port, nullptr) != 0) {
server_.reset();
return ERROR_LOG_AND_STATUS("Failed to start server");
Expand Down Expand Up @@ -150,7 +150,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
node_options_.fsm = this;
node_options_.node_owns_fsm = false;
node_options_.snapshot_interval_s = 0;
std::string prefix = "local://" + g_config.db_path.ToString() + "_praft";
std::string prefix = "local://" + g_config.db_path.ToString() + std::to_string(db_id_) + "/_praft";
node_options_.log_uri = prefix + "/log";
node_options_.raft_meta_uri = prefix + "/raft_meta";
node_options_.snapshot_uri = prefix + "/snapshot";
Expand Down Expand Up @@ -241,6 +241,24 @@ butil::Status PRaft::GetListPeers(std::vector<braft::PeerId>* peers) {
return node_->list_peers(peers);
}

storage::LogIndex PRaft::GetTerm(uint64_t log_index) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
}

return node_->get_term(log_index);
}

storage::LogIndex PRaft::GetLastLogIndex(bool is_flush) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
}

return node_->get_last_log_index(is_flush);
}
Comment on lines +244 to +260
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

检查默认返回值和错误处理

函数 PRaft::GetTermPRaft::GetLastLogIndex 在节点未初始化时返回 0。考虑使用一个明确的错误代码或异常处理机制,而不是返回一个可能会被误解的魔术数字。

- return 0;
+ throw std::runtime_error("Node is not initialized");
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storage::LogIndex PRaft::GetTerm(uint64_t log_index) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
}
return node_->get_term(log_index);
}
storage::LogIndex PRaft::GetLastLogIndex(bool is_flush) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
}
return node_->get_last_log_index(is_flush);
}
storage::LogIndex PRaft::GetTerm(uint64_t log_index) {
if (!node_) {
ERROR("Node is not initialized");
throw std::runtime_error("Node is not initialized");
}
return node_->get_term(log_index);
}
storage::LogIndex PRaft::GetLastLogIndex(bool is_flush) {
if (!node_) {
ERROR("Node is not initialized");
throw std::runtime_error("Node is not initialized");
}
return node_->get_last_log_index(is_flush);
}


void PRaft::SendNodeRequest(PClient* client) {
assert(client);

Expand Down Expand Up @@ -521,10 +539,16 @@ butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) {
if (!node_) {
return ERROR_LOG_AND_STATUS("Node is not initialized");
}
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();

if (is_sync) {
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();
} else {
node_->snapshot(nullptr, self_snapshot_index);
return butil::Status{};
}
}

void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) {
Expand Down Expand Up @@ -629,10 +653,37 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done
int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
CHECK(!IsLeader()) << "Leader is not supposed to load snapshot";
assert(reader);

if (is_node_first_start_up_) {
// get replay point of one db's
/*
1. When a node starts normally, because all memory data is flushed to disks and
snapshots are truncated to the latest, the flush-index and apply-index are the
same when the node starts, so the maximum log index should be obtained.
2. When a node is improperly shut down and restarted, the minimum flush-index should
be obtained as the starting point for fault recovery.
*/
uint64_t replay_point = PSTORE.GetBackend(db_id_)->GetStorage()->GetSmallestFlushedLogIndex();
node_->set_self_playback_point(replay_point);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_->set_self_playback_point(replay_point+1)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 set_self_playback_point 设置的 last_applied_index,所以就是 replay_point

is_node_first_start_up_ = false;
INFO("set replay_point: {}", replay_point);

/*
If a node has just joined the cluster and does not have any data,
it does not load the local snapshot at startup. Therefore,
LoadDBFromCheckPoint is required after loading the snapshot from the leader.
*/
if (GetLastLogIndex() != 0) {
return 0;
}
}
Comment on lines +657 to +679
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

优化快照加载逻辑

函数 PRaft::on_snapshot_load 使用布尔标志 is_node_first_start_up_ 来区分首次启动。考虑使用状态枚举而不是布尔值,以清晰地表示节点的不同状态。

- bool is_node_first_start_up_;
+ enum class NodeStartupState { First, Subsequent };
+ NodeStartupState startup_state_;

在逻辑中相应地替换这个状态的检查和设置。

Committable suggestion was skipped due to low confidence.


// 3. When a snapshot is installed on a node, you do not need to set a playback point.
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto path = g_config.db_path.ToString() + std::to_string(db_id_); // db/db_id
TasksVector tasks(1, {TaskType::kLoadDBFromCheckpoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true});
PSTORE.HandleTaskSpecificDB(tasks);
INFO("load snapshot success!");
return 0;
}

Expand Down
5 changes: 5 additions & 0 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "braft/raft.h"
#include "brpc/server.h"
#include "rocksdb/status.h"
#include "storage/storage.h"

#include "client.h"

Expand Down Expand Up @@ -137,6 +138,8 @@ class PRaft : public braft::StateMachine {
std::string GetGroupID() const;
braft::NodeStatus GetNodeStatus() const;
butil::Status GetListPeers(std::vector<braft::PeerId>* peers);
storage::LogIndex GetTerm(uint64_t log_index);
storage::LogIndex GetLastLogIndex(bool is_flush = false);

bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; }

Expand Down Expand Up @@ -164,6 +167,8 @@ class PRaft : public braft::StateMachine {
ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command
std::string group_id_; // group id
int db_id_ = 0; // db_id

bool is_node_first_start_up_ = true;
};

} // namespace pikiwidb
27 changes: 24 additions & 3 deletions src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,44 @@
#include "psnapshot.h"

#include "braft/local_file_meta.pb.h"
#include "braft/snapshot.h"
#include "butil/files/file_path.h"

#include "pstd/log.h"
#include "pstd/pstd_string.h"

#include "config.h"
#include "praft.h"
Comment on lines +14 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additions and changes to snapshot path parsing are necessary for the new functionality. Consider adding unit tests to cover potential edge cases in the parsing logic.

Would you like me to help in creating unit tests for the snapshot path parsing logic?

Also applies to: 31-51

#include "store.h"

namespace pikiwidb {

extern PConfig g_config;

braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int oflag,
const ::google::protobuf::Message* file_meta, butil::File::Error* e) {
if ((oflag & IS_RDONLY) == 0) { // This is a read operation
bool snapshots_exists = false;
std::string snapshot_path;
int db_id = -1;

// parse snapshot path
butil::FilePath parse_snapshot_path(path);
std::vector<std::string> components;
bool is_find_db = false;
parse_snapshot_path.GetComponents(&components);
for (auto component : components) {
for (const auto& component : components) {
snapshot_path += component + "/";

if (is_find_db && pstd::String2int(component, &db_id)) {
is_find_db = false;
}

if (component.find("snapshot_") != std::string::npos) {
break;
} else if (component == "db") {
is_find_db = true;
}
}

Comment on lines +31 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:为快照路径解析逻辑添加单元测试

快照路径解析逻辑对于新功能是必要的。建议添加单元测试以覆盖潜在的边界情况。

需要我帮助创建快照路径解析逻辑的单元测试吗?

// check whether snapshots have been created
std::lock_guard<braft::raft_mutex_t> guard(mutex_);
if (!snapshot_path.empty()) {
Expand All @@ -55,6 +66,8 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o

// Snapshot generation
if (!snapshots_exists) {
assert(db_id >= 0);

braft::LocalSnapshotMetaTable snapshot_meta_memtable;
std::string meta_path = snapshot_path + "/" PRAFT_SNAPSHOT_META_FILE;
INFO("start to generate snapshot in path {}", snapshot_path);
Expand All @@ -66,6 +79,14 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o
PSTORE.HandleTaskSpecificDB(tasks);
AddAllFiles(snapshot_path, &snapshot_meta_memtable, snapshot_path);

// update snapshot last log index and last_log_term
auto& new_meta = const_cast<braft::SnapshotMeta&>(snapshot_meta_memtable.meta());
auto last_log_index = PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经维护了一个 last_flush_index_ ,这里使用 GetLastFlushIndex 可以少很多计算。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯,已经用了

new_meta.set_last_included_index(last_log_index);
auto last_log_term = PRAFT.GetTerm(last_log_index);
new_meta.set_last_included_term(last_log_term);
INFO("Succeed to fix db_{} snapshot meta: {}, {}", db_id, last_log_index, last_log_term);

auto rc = snapshot_meta_memtable.save_to_file(fs, meta_path);
if (rc == 0) {
INFO("Succeed to save snapshot in path {}", snapshot_path);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,8 @@ class Storage {
void GetRocksDBInfo(std::string& info);
Status OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx);

LogIndex GetSmallestFlushedLogIndex() const;

private:
std::vector<std::unique_ptr<Redis>> insts_;
std::unique_ptr<SlotIndexer> slot_indexer_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "log_index.h"

#include "pstd/log.h"

#include <algorithm>
#include <cinttypes>
#include <set>
Expand Down Expand Up @@ -167,6 +169,7 @@ void LogIndexAndSequenceCollectorPurger::OnFlushCompleted(rocksdb::DB *db,
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
INFO("do snapshot after flush: {}", smallest_flushed_log_index);
callback_(smallest_flushed_log_index, false);
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/log_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ class LogIndexOfColumnFamilies {
last_flush_index_.SetLogIndexSeqnoPair(lastest_flush_log_index, lastest_flush_sequence_number);
}

// for gtest
LogIndexSeqnoPair &GetLastFlushIndex() { return last_flush_index_; }
// get the the latest global minimum flushed_index
const LogIndexSeqnoPair &GetLastFlushIndex() const { return last_flush_index_; }

LogIndexPair &GetCFStatus(size_t cf) { return cf_[cf]; }

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class Redis {
void UpdateAppliedLogIndexOfColumnFamily(size_t cf_idx, LogIndex logidx, SequenceNumber seqno) {
log_index_of_all_cfs_.Update(cf_idx, logidx, seqno);
}
LogIndex GetSmallestFlushedLogIndex() const { return log_index_of_all_cfs_.GetLastFlushIndex().GetLogIndex(); }
bool IsRestarting() const { return is_starting_; }
void StartingPhaseEnd() { is_starting_ = false; }

Expand Down
10 changes: 9 additions & 1 deletion src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ Status Storage::LoadCheckpointInternal(const std::string& checkpoint_sub_path, c
int index) {
auto rocksdb_path = AppendSubDirectory(db_sub_path, index); // ./db/db_id/index
auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/index.tmp
insts_[index].reset();

auto source_dir = AppendSubDirectory(checkpoint_sub_path, index);
// 1) Rename the original db to db.tmp, and only perform the maximum possible recovery of data
Expand Down Expand Up @@ -2517,4 +2516,13 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {
return s;
}

LogIndex Storage::GetSmallestFlushedLogIndex() const {
LogIndex smallest_flushed_log_index = INT64_MAX;
for (auto& inst : insts_) {
smallest_flushed_log_index = std::min(smallest_flushed_log_index, inst->GetSmallestFlushedLogIndex());
}

return smallest_flushed_log_index;
}

} // namespace storage
Loading