Skip to content

Commit

Permalink
Add replace member changes. Perist membership details.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Oct 22, 2024
1 parent 393b6bb commit 68eb07d
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 43 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.4"
version = "2.1.5"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
14 changes: 11 additions & 3 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <boost/uuid/uuid_io.hpp>
#include <sisl/utility/enum.hpp>
#include <sisl/logging/logging.h>

#include "common.hpp"

Expand All @@ -14,9 +15,15 @@ ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADE
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST);

struct PGMember {
// Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated.
static constexpr uint64_t max_name_len = 127;
explicit PGMember(peer_id_t _id) : id(_id) {}
PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {}
PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {}
PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {
RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length");
}
PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {
RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length");
}
peer_id_t id;
std::string name;
int32_t priority{0}; // <0 (Arbiter), ==0 (Follower), >0 (F|Leader)
Expand Down Expand Up @@ -78,7 +85,8 @@ struct PGStats {
class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, u_int32_t commit_quorum) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum = 0) = 0;

/**
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HomeObjectImpl : public HomeObject,

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0;
virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) = 0;
PGMember const& new_member, uint32_t commit_quorum) = 0;
virtual bool _get_stats(pg_id_t id, PGStats& stats) const = 0;
virtual void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const = 0;

Expand Down Expand Up @@ -144,8 +144,8 @@ class HomeObjectImpl : public HomeObject,

/// PgManager
PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum) final;
// see api comments in base class;
bool get_stats(pg_id_t id, PGStats& stats) const final;
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
Expand Down
9 changes: 5 additions & 4 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis

// Write to index table with key {shard id, blob id } and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}", blob_id, lsn,
exist_already, status, pbas.to_string());
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}",
msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
Expand Down Expand Up @@ -254,7 +254,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,

auto r = get_blob_from_index_table(index_table, shard.id, blob_id);
if (!r) {
BLOGW(shard.id, blob_id, "Blob not found in index");
BLOGE(shard.id, blob_id, "Blob not found in index during get blob");
return folly::makeUnexpected(r.error());
}

Expand Down Expand Up @@ -342,7 +342,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard, underlying engine will retry this later");
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand Down
14 changes: 12 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;

PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down Expand Up @@ -378,6 +378,16 @@ class HSHomeObject : public HomeObjectImpl {
void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Function invoked when a member is replaced by a new member
*
* @param group_id The group id of replication device.
* @param member_out Member which is removed from group
* @param member_in Member which is added to group
* */
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down
69 changes: 66 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,72 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
if (ctx) ctx->promise_.setValue(folly::Unit());
}

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id,
PGMember const& new_member, uint32_t commit_quorum) {

group_id_t group_id;
{
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*iter->second);

if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
group_id = repl_dev.group_id();
}

LOGI("PG replace member initated member_out={} member_in={}", boost::uuids::to_string(old_member_id),
boost::uuids::to_string(new_member.id));

homestore::replica_member_info in_replica, out_replica;
out_replica.id = old_member_id;
in_replica.id = new_member.id;
in_replica.priority = new_member.priority;
std::strncpy(in_replica.name, new_member.name.data(), new_member.name.size());
in_replica.name[new_member.name.size()] = '\0';

return hs_repl_service()
.replace_member(group_id, out_replica, in_replica, commit_quorum)
.via(executor_)
.thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
return folly::Unit();
});
}

void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in) {
auto lg = std::shared_lock(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
if (pg_repl_dev(*pg).group_id() == group_id) {
// Remove the old member and add the new member
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());
pg->pg_info_.members.erase(PGMember(member_out.id));
pg->pg_info_.members.emplace(PGMember(member_in.id, member_in.name, member_in.priority));

uint32_t i{0};
for (auto const& m : pg->pg_info_.members) {
hs_pg->pg_sb_->members[i].id = m.id;
std::strncpy(hs_pg->pg_sb_->members[i].name, m.name.c_str(),
std::min(m.name.size(), pg_members::max_name_len));
hs_pg->pg_sb_->members[i].priority = m.priority;
++i;
}

// Update the latest membership info to pg superblk.
hs_pg->pg_sb_.write();
LOGI("PG replace member done member_out={} member_in={}", boost::uuids::to_string(member_out.id),
boost::uuids::to_string(member_in.id));
return;
}
}

LOGE("PG replace member failed member_out={} member_in={}", boost::uuids::to_string(member_out.id),
boost::uuids::to_string(member_in.id));
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(shard_info), blkids.chunk_num()));
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(blkids.chunk_num());
Expand All @@ -344,6 +344,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
}
Expand All @@ -368,6 +369,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
} else
LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
LOGI("Commit done for SEAL_SHARD_MSG for shard {}", shard_info.id);
break;
}
default:
Expand Down
5 changes: 5 additions & 0 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
return homestore::blk_alloc_hints();
}

void ReplicationStateMachine::on_replace_member(const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in) {
home_object_->on_pg_replace_member(repl_dev()->group_id(), member_out, member_in);
}

void ReplicationStateMachine::on_destroy() {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
uint32_t data_size) override;

/// @brief Called when replication module is replacing an existing member with a new member
void replace_member(homestore::replica_id_t member_out, homestore::replica_id_t member_in) override {
// TODO
}
void on_replace_member(const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in) override;

/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy() override;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class MemoryHomeObject : public HomeObjectImpl {

// PGManager
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down
10 changes: 8 additions & 2 deletions src/lib/memory_backend/mem_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::s
}

PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
PGMember const& new_member, uint32_t commit_quorum) {
auto lg = std::shared_lock(_pg_lock);
auto it = _pg_map.find(id);
if (_pg_map.end() == it) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNKNOWN_PG));
}
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}

Expand All @@ -25,7 +30,8 @@ bool MemoryHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
stats.open_shards =
std::count_if(pg->shards_.begin(), pg->shards_.end(), [](auto const& s) { return s->is_open(); });
for (auto const& m : pg->pg_info_.members) {
stats.members.emplace_back(std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
stats.members.emplace_back(
std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
}

return true;
Expand Down
12 changes: 4 additions & 8 deletions src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {
}

PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum) {
LOGI("[pg={}] replace member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id));
PGMember const& new_member, uint32_t commit_quorum) {
LOGI("[pg={}] replace member [{}] with [{}] quorum [{}]", id, to_string(old_member), to_string(new_member.id),
commit_quorum);
if (old_member == new_member.id) {
LOGW("rejecting identical replacement SvcId [{}]!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

if (old_member == our_uuid()) {
LOGW("refusing to remove ourself {}!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

return _replace_member(id, old_member, new_member);
return _replace_member(id, old_member, new_member, commit_quorum);
}

bool HomeObjectImpl::get_stats(pg_id_t id, PGStats& stats) const { return _get_stats(id, stats); }
Expand Down
1 change: 1 addition & 0 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ folly::Future< ShardManager::Result< ShardInfo > > HomeObjectImpl::_get_shard(sh
return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > {
auto lg = std::shared_lock(_shard_lock);
if (auto it = _shard_map.find(id); _shard_map.end() != it) return (*it->second)->info;
LOGE("Couldnt find shard id in shard map {}", id);
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
});
}
Expand Down
17 changes: 6 additions & 11 deletions src/lib/tests/PGManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,17 @@ TEST_F(TestFixture, Migrate) {
PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::UNSUPPORTED_OP);
EXPECT_EQ(
homeobj_->pg_manager()
->replace_member(_pg_id, boost::uuids::random_generator()(), PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::UNSUPPORTED_OP);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}, 0).get().error(),
PGError::INVALID_ARG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer2}, 0).get().error(),
PGError::UNKNOWN_PG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}).get().error(),
PGError::INVALID_ARG);
// TODO enable after HO test framework is enabled
#if 0
EXPECT_EQ(homeobj_->pg_manager()
->replace_member(_pg_id, _peer1, PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::INVALID_ARG);
EXPECT_FALSE(
homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}, 0).get());
homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}).get());
#endif
}

0 comments on commit 68eb07d

Please sign in to comment.