From 68eb07d6a9c6dbacfa1726ab981abbb69c5ee3c1 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 1 Oct 2024 12:25:43 -0700 Subject: [PATCH] Add replace member changes. Perist membership details. --- conanfile.py | 2 +- src/include/homeobject/pg_manager.hpp | 14 +++- src/lib/homeobject_impl.hpp | 6 +- src/lib/homestore_backend/hs_blob_manager.cpp | 9 +-- src/lib/homestore_backend/hs_homeobject.hpp | 14 +++- src/lib/homestore_backend/hs_pg_manager.cpp | 69 ++++++++++++++++++- .../homestore_backend/hs_shard_manager.cpp | 4 +- .../replication_state_machine.cpp | 5 ++ .../replication_state_machine.hpp | 5 +- src/lib/memory_backend/mem_homeobject.hpp | 4 +- src/lib/memory_backend/mem_pg_manager.cpp | 10 ++- src/lib/pg_manager.cpp | 12 ++-- src/lib/shard_manager.cpp | 1 + src/lib/tests/PGManagerTest.cpp | 17 ++--- 14 files changed, 129 insertions(+), 43 deletions(-) diff --git a/conanfile.py b/conanfile.py index b12eb6a9..46a7e08d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.1.4" + version = "2.1.5" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 3013d35c..698ef9eb 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -5,6 +5,7 @@ #include #include +#include #include "common.hpp" @@ -14,9 +15,15 @@ ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADE CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST); struct PGMember { + // Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated. + static constexpr uint64_t max_name_len = 127; explicit PGMember(peer_id_t _id) : id(_id) {} - PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {} - PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {} + PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) { + RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length"); + } + PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) { + RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length"); + } peer_id_t id; std::string name; int32_t priority{0}; // <0 (Arbiter), ==0 (Follower), >0 (F|Leader) @@ -78,7 +85,8 @@ struct PGStats { class PGManager : public Manager< PGError > { public: virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0; - virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, u_int32_t commit_quorum) = 0; + virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, + u_int32_t commit_quorum = 0) = 0; /** * Retrieves the statistics for a specific PG (Placement Group) identified by its ID. diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 5722cc34..cc006a30 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -98,7 +98,7 @@ class HomeObjectImpl : public HomeObject, virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0; virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member) = 0; + PGMember const& new_member, uint32_t commit_quorum) = 0; virtual bool _get_stats(pg_id_t id, PGStats& stats) const = 0; virtual void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const = 0; @@ -144,8 +144,8 @@ class HomeObjectImpl : public HomeObject, /// PgManager PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final; - PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member, u_int32_t commit_quorum) final; + PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, + u_int32_t commit_quorum) final; // see api comments in base class; bool get_stats(pg_id_t id, PGStats& stats) const final; void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final; diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index a745f369..b3c7411c 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -208,8 +208,8 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis // Write to index table with key {shard id, blob id } and value {pba}. auto const [exist_already, status] = add_to_index_table(index_table, blob_info); - LOGTRACEMOD(blobmgr, "blob put commit blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}", blob_id, lsn, - exist_already, status, pbas.to_string()); + LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}", + msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string()); if (!exist_already) { if (status != homestore::btree_status_t::success) { LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status)); @@ -254,7 +254,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, auto r = get_blob_from_index_table(index_table, shard.id, blob_id); if (!r) { - BLOGW(shard.id, blob_id, "Blob not found in index"); + BLOGE(shard.id, blob_id, "Blob not found in index during get blob"); return folly::makeUnexpected(r.error()); } @@ -342,7 +342,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(msg_header->shard_id); if (shard_iter == _shard_map.end()) { - LOGW("Received a blob_put on an unknown shard, underlying engine will retry this later"); + LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later", + msg_header->shard_id); return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 5b73db01..357f0317 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -49,8 +49,8 @@ class HSHomeObject : public HomeObjectImpl { BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override; PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override; - PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member) override; + PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, + uint32_t commit_quorum) override; bool _get_stats(pg_id_t id, PGStats& stats) const override; void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override; @@ -378,6 +378,16 @@ class HSHomeObject : public HomeObjectImpl { void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + /** + * @brief Function invoked when a member is replaced by a new member + * + * @param group_id The group id of replication device. + * @param member_out Member which is removed from group + * @param member_in Member which is added to group + * */ + void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out, + const homestore::replica_member_info& member_in); + /** * @brief Callback function invoked when a message is committed on a shard. * diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index e611f812..5a90bb93 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -145,9 +145,72 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he if (ctx) ctx->promise_.setValue(folly::Unit()); } -PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member) { - return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id, + PGMember const& new_member, uint32_t commit_quorum) { + + group_id_t group_id; + { + auto lg = std::shared_lock(_pg_lock); + auto iter = _pg_map.find(pg_id); + if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG); + auto& repl_dev = pg_repl_dev(*iter->second); + + if (!repl_dev.is_leader() && commit_quorum == 0) { + // Only leader can replace a member + return folly::makeUnexpected(PGError::NOT_LEADER); + } + group_id = repl_dev.group_id(); + } + + LOGI("PG replace member initated member_out={} member_in={}", boost::uuids::to_string(old_member_id), + boost::uuids::to_string(new_member.id)); + + homestore::replica_member_info in_replica, out_replica; + out_replica.id = old_member_id; + in_replica.id = new_member.id; + in_replica.priority = new_member.priority; + std::strncpy(in_replica.name, new_member.name.data(), new_member.name.size()); + in_replica.name[new_member.name.size()] = '\0'; + + return hs_repl_service() + .replace_member(group_id, out_replica, in_replica, commit_quorum) + .via(executor_) + .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { + if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } + return folly::Unit(); + }); +} + +void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const replica_member_info& member_out, + const replica_member_info& member_in) { + auto lg = std::shared_lock(_pg_lock); + for (const auto& iter : _pg_map) { + auto& pg = iter.second; + if (pg_repl_dev(*pg).group_id() == group_id) { + // Remove the old member and add the new member + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); + pg->pg_info_.members.erase(PGMember(member_out.id)); + pg->pg_info_.members.emplace(PGMember(member_in.id, member_in.name, member_in.priority)); + + uint32_t i{0}; + for (auto const& m : pg->pg_info_.members) { + hs_pg->pg_sb_->members[i].id = m.id; + std::strncpy(hs_pg->pg_sb_->members[i].name, m.name.c_str(), + std::min(m.name.size(), pg_members::max_name_len)); + hs_pg->pg_sb_->members[i].priority = m.priority; + ++i; + } + + // Update the latest membership info to pg superblk. + hs_pg->pg_sb_.write(); + LOGI("PG replace member done member_out={} member_in={}", boost::uuids::to_string(member_out.id), + boost::uuids::to_string(member_in.id)); + return; + } + } + + LOGE("PG replace member failed member_out={} member_in={}", boost::uuids::to_string(member_out.id), + boost::uuids::to_string(member_in.id)); } void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 35855a0f..8378087b 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -326,7 +326,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom } if (!shard_exist) { - add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(shard_info), blkids.chunk_num())); + add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num())); // select_specific_chunk() will do something only when we are relaying journal after restart, during the // runtime flow chunk is already been be mark busy when we write the shard info to the repldev. chunk_selector_->select_specific_chunk(blkids.chunk_num()); @@ -344,6 +344,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom hs_pg->durable_entities_update([&blkids](auto& de) { de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed); }); + LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id); break; } @@ -368,6 +369,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom } else LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id); if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } + LOGI("Commit done for SEAL_SHARD_MSG for shard {}", shard_info.id); break; } default: diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index ad7f4306..7f9164f5 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -153,6 +153,11 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t return homestore::blk_alloc_hints(); } +void ReplicationStateMachine::on_replace_member(const homestore::replica_member_info& member_out, + const homestore::replica_member_info& member_in) { + home_object_->on_pg_replace_member(repl_dev()->group_id(), member_out, member_in); +} + void ReplicationStateMachine::on_destroy() { // TODO:: add the logic to handle destroy LOGI("replica destroyed"); diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 6c18eaf8..25556e29 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -169,9 +169,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener { uint32_t data_size) override; /// @brief Called when replication module is replacing an existing member with a new member - void replace_member(homestore::replica_id_t member_out, homestore::replica_id_t member_in) override { - // TODO - } + void on_replace_member(const homestore::replica_member_info& member_out, + const homestore::replica_member_info& member_in) override; /// @brief Called when the replica is being destroyed by nuraft; void on_destroy() override; diff --git a/src/lib/memory_backend/mem_homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp index c029ed3d..84fb3450 100644 --- a/src/lib/memory_backend/mem_homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -46,8 +46,8 @@ class MemoryHomeObject : public HomeObjectImpl { // PGManager PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override; - PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member) override; + PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, + uint32_t commit_quorum) override; bool _get_stats(pg_id_t id, PGStats& stats) const override; void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override; diff --git a/src/lib/memory_backend/mem_pg_manager.cpp b/src/lib/memory_backend/mem_pg_manager.cpp index 5338fe0e..4d6aadfe 100644 --- a/src/lib/memory_backend/mem_pg_manager.cpp +++ b/src/lib/memory_backend/mem_pg_manager.cpp @@ -9,7 +9,12 @@ PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::s } PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member) { + PGMember const& new_member, uint32_t commit_quorum) { + auto lg = std::shared_lock(_pg_lock); + auto it = _pg_map.find(id); + if (_pg_map.end() == it) { + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNKNOWN_PG)); + } return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); } @@ -25,7 +30,8 @@ bool MemoryHomeObject::_get_stats(pg_id_t id, PGStats& stats) const { stats.open_shards = std::count_if(pg->shards_.begin(), pg->shards_.end(), [](auto const& s) { return s->is_open(); }); for (auto const& m : pg->pg_info_.members) { - stats.members.emplace_back(std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */)); + stats.members.emplace_back( + std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */)); } return true; diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index 6d3f94f4..bf0a203f 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -21,19 +21,15 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { } PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member, - PGMember const& new_member, u_int32_t commit_quorum) { - LOGI("[pg={}] replace member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id)); + PGMember const& new_member, uint32_t commit_quorum) { + LOGI("[pg={}] replace member [{}] with [{}] quorum [{}]", id, to_string(old_member), to_string(new_member.id), + commit_quorum); if (old_member == new_member.id) { LOGW("rejecting identical replacement SvcId [{}]!", to_string(old_member)); return folly::makeUnexpected(PGError::INVALID_ARG); } - if (old_member == our_uuid()) { - LOGW("refusing to remove ourself {}!", to_string(old_member)); - return folly::makeUnexpected(PGError::INVALID_ARG); - } - - return _replace_member(id, old_member, new_member); + return _replace_member(id, old_member, new_member, commit_quorum); } bool HomeObjectImpl::get_stats(pg_id_t id, PGStats& stats) const { return _get_stats(id, stats); } diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index 049cdaed..4de4ecd2 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -49,6 +49,7 @@ folly::Future< ShardManager::Result< ShardInfo > > HomeObjectImpl::_get_shard(sh return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > { auto lg = std::shared_lock(_shard_lock); if (auto it = _shard_map.find(id); _shard_map.end() != it) return (*it->second)->info; + LOGE("Couldnt find shard id in shard map {}", id); return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); }); } diff --git a/src/lib/tests/PGManagerTest.cpp b/src/lib/tests/PGManagerTest.cpp index c02eccdd..3eb589c3 100644 --- a/src/lib/tests/PGManagerTest.cpp +++ b/src/lib/tests/PGManagerTest.cpp @@ -36,22 +36,17 @@ TEST_F(TestFixture, Migrate) { PGMember{boost::uuids::random_generator()()}, 0) .get() .error(), - PGError::UNSUPPORTED_OP); - EXPECT_EQ( - homeobj_->pg_manager() - ->replace_member(_pg_id, boost::uuids::random_generator()(), PGMember{boost::uuids::random_generator()()}, 0) - .get() - .error(), - PGError::UNSUPPORTED_OP); - EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}, 0).get().error(), - PGError::INVALID_ARG); - EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer2}, 0).get().error(), + PGError::UNKNOWN_PG); + EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}).get().error(), PGError::INVALID_ARG); + // TODO enable after HO test framework is enabled +#if 0 EXPECT_EQ(homeobj_->pg_manager() ->replace_member(_pg_id, _peer1, PGMember{boost::uuids::random_generator()()}, 0) .get() .error(), PGError::INVALID_ARG); EXPECT_FALSE( - homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}, 0).get()); + homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}).get()); +#endif }