diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 7087fa88b..77870bfc8 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1609,18 +1609,6 @@ where Command::QuitLeader => { self.leader_data = None; } - Command::AppendEntry { vote, entry } => { - let log_id = *entry.get_log_id(); - tracing::debug!("AppendEntry: {}", &entry); - - self.append_to_log([entry], vote, log_id).await?; - - // The leader may have changed. - // But reporting to a different leader is not a problem. - if let Ok(mut lh) = self.engine.leader_handler() { - lh.replication_handler().update_local_progress(Some(log_id)); - } - } Command::AppendInputEntries { vote, entries } => { let last_log_id = *entries.last().unwrap().get_log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 15bb05516..887ebd60a 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -33,14 +33,6 @@ where C: RaftTypeConfig /// No longer a leader. Clean up leader's data. QuitLeader, - /// Append one entry. - AppendEntry { - /// Same as the `vote` in [`Command::AppendInputEntries`]. - vote: Vote, - - entry: C::Entry, - }, - /// Append a `range` of entries. AppendInputEntries { /// The vote of the leader that submits the entries to write. @@ -139,7 +131,6 @@ where match (self, other) { (Command::BecomeLeader, Command::BecomeLeader) => true, (Command::QuitLeader, Command::QuitLeader) => true, - (Command::AppendEntry { vote, entry }, Command::AppendEntry { vote: vb, entry: b }, ) => vote == vb && entry == b, (Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b, (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, (Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto, @@ -168,7 +159,6 @@ where C: RaftTypeConfig Command::RebuildReplicationStreams { .. } => CommandKind::Main, Command::Respond { .. } => CommandKind::Main, - Command::AppendEntry { .. } => CommandKind::Log, Command::AppendInputEntries { .. } => CommandKind::Log, Command::SaveVote { .. } => CommandKind::Log, Command::PurgeLog { .. } => CommandKind::Log, @@ -192,7 +182,6 @@ where C: RaftTypeConfig match self { Command::BecomeLeader => None, Command::QuitLeader => None, - Command::AppendEntry { .. } => None, Command::AppendInputEntries { .. } => None, Command::ReplicateCommitted { .. } => None, // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 3c062443c..c63978e4b 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -20,6 +20,7 @@ use crate::engine::handler::vote_handler::VoteHandler; use crate::engine::Command; use crate::engine::EngineOutput; use crate::engine::Respond; +use crate::entry::RaftEntry; use crate::entry::RaftPayload; use crate::error::ForwardToLeader; use crate::error::Infallible; @@ -28,7 +29,6 @@ use crate::error::NotAllowed; use crate::error::NotInMembers; use crate::error::RejectAppendEntries; use crate::internal_server_state::InternalServerState; -use crate::membership::EffectiveMembership; use crate::raft::responder::Responder; use crate::raft::AppendEntriesResponse; use crate::raft::SnapshotResponse; @@ -155,25 +155,13 @@ where C: RaftTypeConfig pub(crate) fn initialize(&mut self, mut entry: C::Entry) -> Result<(), InitializeError> { self.check_initialize()?; - self.state.assign_log_ids([&mut entry]); - let log_id = *entry.get_log_id(); - self.state.extend_log_ids_from_same_leader(&[log_id]); + // The very first log id + entry.set_log_id(&LogId::default()); let m = entry.get_membership().expect("the only log entry for initializing has to be membership log"); self.check_members_contain_me(m)?; - tracing::debug!("update effective membership: log_id:{} {}", log_id, m.summary()); - - let em = EffectiveMembership::new_arc(Some(log_id), m.clone()); - self.state.membership_state.append(em); - - self.output.push_command(Command::AppendEntry { - // When initialize, it behaves as a learner. - vote: *self.state.vote_ref(), - entry, - }); - - self.server_state_handler().update_server_state_if_changed(); + self.following_handler().do_append_entries(vec![entry], 0); // With the new config, start to elect to become leader self.elect(); @@ -663,8 +651,11 @@ where C: RaftTypeConfig // Thus append_blank_log() can be moved before rebuild_replication_streams() rh.rebuild_replication_streams(); - rh.append_blank_log(); - rh.initiate_replication(SendNone::False); + + // Safe unwrap(): Leader is just established + self.leader_handler() + .unwrap() + .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); } /// Check if a raft node is in a state that allows to initialize. diff --git a/openraft/src/engine/engine_output.rs b/openraft/src/engine/engine_output.rs index 1125c5094..878e17cb0 100644 --- a/openraft/src/engine/engine_output.rs +++ b/openraft/src/engine/engine_output.rs @@ -47,7 +47,6 @@ where C: RaftTypeConfig } Command::BecomeLeader => {} Command::QuitLeader => {} - Command::AppendEntry { .. } => {} Command::AppendInputEntries { .. } => {} Command::ReplicateCommitted { .. } => {} Command::Commit { .. } => {} diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 604a4fe32..77a900f62 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -118,7 +118,7 @@ where C: RaftTypeConfig /// /// Membership config changes are also detected and applied here. #[tracing::instrument(level = "debug", skip(self, entries))] - fn do_append_entries(&mut self, mut entries: Vec, since: usize) { + pub(crate) fn do_append_entries(&mut self, mut entries: Vec, since: usize) { let l = entries.len(); if since == l { diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 83b7be5a9..cfb30f247 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -50,7 +50,9 @@ where C: RaftTypeConfig return; } - self.state.assign_log_ids(&mut entries); + // Engine::leader_handler() ensures it is a valid leader. + self.leader.assign_log_ids(&mut entries).unwrap(); + self.state.extend_log_ids_from_same_leader(&entries); let mut membership_entry = None; diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index b5cd4326a..4a84e7237 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -6,7 +6,6 @@ use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; -use crate::entry::RaftEntry; use crate::internal_server_state::LeaderQuorumSet; use crate::leader::Leading; use crate::progress::entry::ProgressEntry; @@ -60,30 +59,6 @@ pub(crate) enum SendNone { impl<'x, C> ReplicationHandler<'x, C> where C: RaftTypeConfig { - /// Append a blank log. - /// - /// It is used by the leader when leadership is established. - #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn append_blank_log(&mut self) { - let log_id = LogId::new( - self.state.vote_ref().committed_leader_id().unwrap(), - self.state.last_log_id().next_index(), - ); - self.state.log_ids.append(log_id); - let entry = C::Entry::new_blank(log_id); - - // TODO: with asynchronous IO in future, - // do not write log until vote being committed, - // or consistency is broken. - self.output.push_command(Command::AppendEntry { - // A leader should always use the leader's vote. - vote: self.leader.vote, - entry, - }); - - self.update_local_progress(Some(log_id)); - } - /// Append a new membership and update related state such as replication streams. /// /// It is called by the leader when a new membership log is appended to log store. diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index b952d2d7c..1fef9fd2c 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -185,7 +185,12 @@ impl LogIdList { // l >= 1 - debug_assert!(new_log_id > self.key_log_ids[l - 1]); + debug_assert!( + new_log_id > self.key_log_ids[l - 1], + "new_log_id: {}, last: {}", + new_log_id, + self.key_log_ids[l - 1] + ); if l == 1 { self.key_log_ids.push(new_log_id); diff --git a/openraft/src/engine/tests/elect_test.rs b/openraft/src/engine/tests/elect_test.rs index 7f50e4b74..3fa27040d 100644 --- a/openraft/src/engine/tests/elect_test.rs +++ b/openraft/src/engine/tests/elect_test.rs @@ -68,23 +68,9 @@ fn test_elect() -> anyhow::Result<()> { }, Command::BecomeLeader, Command::RebuildReplicationStreams { targets: vec![] }, - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::new_committed(1, 1), - entry: Entry::::new_blank(log_id(1, 1, 1)) - }, - Command::ReplicateCommitted { - committed: Some(LogId { - leader_id: CommittedLeaderId::new(1, 1), - index: 1, - },), - }, - Command::Commit { - seq: 1, - already_committed: None, - upto: LogId { - leader_id: CommittedLeaderId::new(1, 1), - index: 1, - }, + entries: vec![Entry::::new_blank(log_id(1, 1, 1))] }, ], eng.output.take_commands() @@ -127,23 +113,9 @@ fn test_elect() -> anyhow::Result<()> { }, Command::BecomeLeader, Command::RebuildReplicationStreams { targets: vec![] }, - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::new_committed(2, 1), - entry: Entry::::new_blank(log_id(2, 1, 1)) - }, - Command::ReplicateCommitted { - committed: Some(LogId { - leader_id: CommittedLeaderId::new(2, 1), - index: 1, - },), - }, - Command::Commit { - seq: 1, - already_committed: None, - upto: LogId { - leader_id: CommittedLeaderId::new(2, 1), - index: 1, - }, + entries: vec![Entry::::new_blank(log_id(2, 1, 1))] }, ], eng.output.take_commands() diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 9efaacd21..910665c33 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -258,9 +258,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> { Command::RebuildReplicationStreams { targets: vec![(2, ProgressEntry::empty(1))] }, - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::new_committed(2, 1), - entry: Entry::::new_blank(log_id(2, 1, 1)), + entries: vec![Entry::::new_blank(log_id(2, 1, 1))], }, Command::Replicate { target: 2, diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index b104e9745..963fabbae 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -56,9 +56,9 @@ fn test_initialize_single_node() -> anyhow::Result<()> { assert_eq!( vec![ - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::default(), - entry: Entry::::new_membership(LogId::default(), m1()) + entries: vec![Entry::::new_membership(LogId::default(), m1())], }, // When update the effective membership, the engine set it to Follower. // But when initializing, it will switch to Candidate at once, in the last output @@ -71,23 +71,9 @@ fn test_initialize_single_node() -> anyhow::Result<()> { }, Command::BecomeLeader, Command::RebuildReplicationStreams { targets: vec![] }, - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::new_committed(1, 1), - entry: Entry::::new_blank(log_id(1, 1, 1)) - }, - Command::ReplicateCommitted { - committed: Some(LogId { - leader_id: CommittedLeaderId::new(1, 1), - index: 1, - },), - }, - Command::Commit { - seq: 1, - already_committed: None, - upto: LogId { - leader_id: CommittedLeaderId::new(1, 1), - index: 1, - }, + entries: vec![Entry::::new_blank(log_id(1, 1, 1))] }, ], eng.output.take_commands() @@ -131,9 +117,9 @@ fn test_initialize() -> anyhow::Result<()> { assert_eq!( vec![ - Command::AppendEntry { + Command::AppendInputEntries { vote: Vote::default(), - entry: Entry::new_membership(LogId::default(), m12()) + entries: vec![Entry::new_membership(LogId::default(), m12())], }, // When update the effective membership, the engine set it to Follower. // But when initializing, it will switch to Candidate at once, in the last output diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index 996bffabb..f9b23ed88 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -9,6 +9,7 @@ use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::NodeId; +use crate::RaftLogId; use crate::Vote; /// Leading state data. @@ -31,6 +32,8 @@ pub(crate) struct Leading, I: Instant> { /// The vote this leader works in. pub(crate) vote: Vote, + last_log_id: Option>, + /// The log id of the first log entry proposed by this leader, /// i.e., the `noop` log(AKA blank log) after leader established. /// @@ -62,13 +65,14 @@ where pub(crate) fn new( vote: Vote, quorum_set: QS, - learner_ids: impl Iterator, + learner_ids: impl IntoIterator, last_log_id: Option>, ) -> Self { - let learner_ids = learner_ids.collect::>(); + let learner_ids = learner_ids.into_iter().collect::>(); Self { vote, + last_log_id, noop_log_id: None, quorum_set: quorum_set.clone(), voting: None, @@ -91,6 +95,46 @@ where self.voting.as_mut() } + /// Return the last log id this leader knows of. + /// + /// The leader's last log id may be different from the local RaftState.last_log_id. + /// The later is used by the `Acceptor` part of a Raft node. + pub(crate) fn last_log_id(&self) -> Option<&LogId> { + self.last_log_id.as_ref() + } + + /// Assign log ids to the entries. + /// + /// Return `()` if successful. + /// Otherwise, return `Err(current_vote)` if this Leader is not yet established(by being + /// accepted by a quorum). + /// + /// This method update the `self.last_log_id`. + pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( + &mut self, + entries: impl IntoIterator, + ) -> Result<(), Vote> { + let Some(committed_leader_id) = self.vote.committed_leader_id() else { + return Err(self.vote); + }; + + let first = LogId::new(committed_leader_id, self.last_log_id().next_index()); + let mut last = first; + + for entry in entries { + entry.set_log_id(&last); + tracing::debug!("assign log id: {}", last); + last.index += 1; + } + + if last.index > first.index { + last.index -= 1; + self.last_log_id = Some(last); + } + + Ok(()) + } + pub(crate) fn initialize_voting(&mut self, last_log_id: Option>, now: I) -> &mut Voting { self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone())); self.voting.as_mut().unwrap() @@ -142,19 +186,78 @@ where #[cfg(test)] mod tests { use crate::engine::testing::UTConfig; + use crate::entry::RaftEntry; use crate::leader::Leading; use crate::progress::Progress; + use crate::testing::blank_ent; + use crate::testing::log_id; use crate::type_config::alias::InstantOf; + use crate::Entry; + use crate::RaftLogId; use crate::Vote; #[test] - fn test_leading_last_quorum_acked_time_leader_is_voter() { - let mut leading = Leading::, InstantOf>::new( - Vote::new_committed(2, 1), - vec![1, 2, 3], - vec![4].into_iter(), - None, + fn test_leader_not_established() { + let vote = Vote::new(2, 2); + let mut leading = Leading::, InstantOf>::new(vote, vec![1, 2, 3], vec![], None); + + let mut entries = vec![Entry::::new_blank(log_id(5, 5, 2))]; + let res = leading.assign_log_ids(&mut entries); + + assert_eq!( + entries[0].get_log_id(), + &log_id(5, 5, 2), + "entry log id does not change" ); + assert_eq!(Err(Vote::new(2, 2)), res); + assert_eq!(None, leading.last_log_id); + } + + #[test] + fn test_1_entry_none_last_log_id() { + let vote = Vote::new(0, 0); + let mut leading = Leading::, InstantOf>::new(vote, vec![1, 2, 3], vec![], None); + + let mut entries: Vec> = vec![blank_ent(1, 1, 1)]; + let result = leading.assign_log_ids(&mut entries); + + assert!(result.is_ok()); + assert_eq!(entries[0].get_log_id(), &log_id(0, 0, 0),); + assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id); + } + + #[test] + fn test_no_entries_provided() { + let vote = Vote::new_committed(2, 2); + let mut leading = + Leading::, InstantOf>::new(vote, vec![1, 2, 3], vec![], Some(log_id(1, 1, 8))); + + let mut entries: Vec> = vec![]; + let result = leading.assign_log_ids(&mut entries); + assert!(result.is_ok()); + assert_eq!(Some(log_id(1, 1, 8)), leading.last_log_id); + } + + #[test] + fn test_multiple_entries() { + let vote = Vote::new_committed(2, 2); + let mut leading = + Leading::, InstantOf>::new(vote, vec![1, 2, 3], [], Some(log_id(1, 1, 8))); + + let mut entries: Vec> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)]; + + let result = leading.assign_log_ids(&mut entries); + assert!(result.is_ok()); + assert_eq!(entries[0].get_log_id(), &log_id(2, 2, 9)); + assert_eq!(entries[1].get_log_id(), &log_id(2, 2, 10)); + assert_eq!(entries[2].get_log_id(), &log_id(2, 2, 11)); + assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id); + } + + #[test] + fn test_leading_last_quorum_acked_time_leader_is_voter() { + let mut leading = + Leading::, InstantOf>::new(Vote::new_committed(2, 1), vec![1, 2, 3], [4], None); let now1 = InstantOf::::now(); @@ -165,12 +268,8 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_learner() { - let mut leading = Leading::, InstantOf>::new( - Vote::new_committed(2, 4), - vec![1, 2, 3], - vec![4].into_iter(), - None, - ); + let mut leading = + Leading::, InstantOf>::new(Vote::new_committed(2, 4), vec![1, 2, 3], [4], None); let t2 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); @@ -185,12 +284,8 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_not_member() { - let mut leading = Leading::, InstantOf>::new( - Vote::new_committed(2, 5), - vec![1, 2, 3], - vec![4].into_iter(), - None, - ); + let mut leading = + Leading::, InstantOf>::new(Vote::new_committed(2, 5), vec![1, 2, 3], [4], None); let t2 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 04b2ce46e..6da89988d 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -4,7 +4,6 @@ use std::ops::Deref; use validit::Validate; use crate::engine::LogIdList; -use crate::entry::RaftEntry; use crate::error::ForwardToLeader; use crate::log_id::RaftLogId; use crate::node::Node; @@ -392,21 +391,6 @@ where self.is_leading(id) && self.vote.is_committed() } - pub(crate) fn assign_log_ids<'a, Ent: RaftEntry + 'a>( - &mut self, - entries: impl IntoIterator, - ) { - let mut log_id = LogId::new( - self.vote_ref().committed_leader_id().unwrap(), - self.last_log_id().next_index(), - ); - for entry in entries { - entry.set_log_id(&log_id); - tracing::debug!("assign log id: {}", log_id); - log_id.index += 1; - } - } - /// Build a ForwardToLeader error that contains the leader id and node it knows. pub(crate) fn forward_to_leader(&self) -> ForwardToLeader { let vote = self.vote_ref(); diff --git a/tests/tests/append_entries/t10_see_higher_vote.rs b/tests/tests/append_entries/t10_see_higher_vote.rs index 726c992a6..4f709a9c4 100644 --- a/tests/tests/append_entries/t10_see_higher_vote.rs +++ b/tests/tests/append_entries/t10_see_higher_vote.rs @@ -34,9 +34,9 @@ async fn append_sees_higher_vote() -> Result<()> { let mut router = RaftRouter::new(config.clone()); - let _log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?; + let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?; - tracing::info!("--- upgrade vote on node-1"); + tracing::info!(log_index, "--- upgrade vote on node-1"); { // Let leader lease expire sleep(Duration::from_millis(800)).await;