Skip to content

Commit

Permalink
Fix: New leader must flush blank log
Browse files Browse the repository at this point in the history
This commit addresses a critical issue where if a new leader does not
flush the blank log to disk upon becoming established and then restarts
immediately, there is a possibility that previously committed data
becomes invisible to readers.

Before the blank log is flushed, the leader (identified by vote `v3`)
assumes it will be flushed and commits this log once (|quorum|-1)
replication responses are received. If the blank log is lost and the
server is restarted, data committed by a new leader (vote `v2`) may
not be visible.

This issue is addressed by utilizing `LeaderHandler::leader_append_entries()`
instead of `ReplicationHandler::append_blank_log()`, where the former
does not wait for the blank log to flush.

Changes:

- When assigning log IDs to log entries, the `Leading.last_log_id`,
  which represents the state of the log proposer (equivalent term in
  Paxos is Proposer), should be used instead of `RaftState.last_log_id`,
  which represents the state of the log receiver (equivalent term in
  Paxos is Acceptor).

- Consequently, the method `assign_log_ids()` has been moved from
  `RaftState` to `Leading`.

- Avoid manual implementation of duplicated logic:

  - During `initialize()`, reuse `FollowingHandler::do_append_entries()`
    to submit the very first log to storage.

  - In `establish_leader()`, reuse
    `LeaderHandler::leader_append_entries()` to submit log to storage
    and remove `ReplicationHandler::append_blank_log()`.

  - Remove `Command::AppendEntry`.
  • Loading branch information
drmingdrmer committed May 16, 2024
1 parent a219475 commit 30cdf5f
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 162 deletions.
12 changes: 0 additions & 12 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1609,18 +1609,6 @@ where
Command::QuitLeader => {
self.leader_data = None;
}
Command::AppendEntry { vote, entry } => {
let log_id = *entry.get_log_id();
tracing::debug!("AppendEntry: {}", &entry);

self.append_to_log([entry], vote, log_id).await?;

// The leader may have changed.
// But reporting to a different leader is not a problem.
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().update_local_progress(Some(log_id));
}
}
Command::AppendInputEntries { vote, entries } => {
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);
Expand Down
11 changes: 0 additions & 11 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ where C: RaftTypeConfig
/// No longer a leader. Clean up leader's data.
QuitLeader,

/// Append one entry.
AppendEntry {
/// Same as the `vote` in [`Command::AppendInputEntries`].
vote: Vote<C::NodeId>,

entry: C::Entry,
},

/// Append a `range` of entries.
AppendInputEntries {
/// The vote of the leader that submits the entries to write.
Expand Down Expand Up @@ -139,7 +131,6 @@ where
match (self, other) {
(Command::BecomeLeader, Command::BecomeLeader) => true,
(Command::QuitLeader, Command::QuitLeader) => true,
(Command::AppendEntry { vote, entry }, Command::AppendEntry { vote: vb, entry: b }, ) => vote == vb && entry == b,
(Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
Expand Down Expand Up @@ -168,7 +159,6 @@ where C: RaftTypeConfig
Command::RebuildReplicationStreams { .. } => CommandKind::Main,
Command::Respond { .. } => CommandKind::Main,

Command::AppendEntry { .. } => CommandKind::Log,
Command::AppendInputEntries { .. } => CommandKind::Log,
Command::SaveVote { .. } => CommandKind::Log,
Command::PurgeLog { .. } => CommandKind::Log,
Expand All @@ -192,7 +182,6 @@ where C: RaftTypeConfig
match self {
Command::BecomeLeader => None,
Command::QuitLeader => None,
Command::AppendEntry { .. } => None,
Command::AppendInputEntries { .. } => None,
Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Expand Down
27 changes: 9 additions & 18 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::engine::handler::vote_handler::VoteHandler;
use crate::engine::Command;
use crate::engine::EngineOutput;
use crate::engine::Respond;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::error::ForwardToLeader;
use crate::error::Infallible;
Expand All @@ -28,7 +29,6 @@ use crate::error::NotAllowed;
use crate::error::NotInMembers;
use crate::error::RejectAppendEntries;
use crate::internal_server_state::InternalServerState;
use crate::membership::EffectiveMembership;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesResponse;
use crate::raft::SnapshotResponse;
Expand Down Expand Up @@ -155,25 +155,13 @@ where C: RaftTypeConfig
pub(crate) fn initialize(&mut self, mut entry: C::Entry) -> Result<(), InitializeError<C::NodeId, C::Node>> {
self.check_initialize()?;

self.state.assign_log_ids([&mut entry]);
let log_id = *entry.get_log_id();
self.state.extend_log_ids_from_same_leader(&[log_id]);
// The very first log id
entry.set_log_id(&LogId::default());

let m = entry.get_membership().expect("the only log entry for initializing has to be membership log");
self.check_members_contain_me(m)?;

tracing::debug!("update effective membership: log_id:{} {}", log_id, m.summary());

let em = EffectiveMembership::new_arc(Some(log_id), m.clone());
self.state.membership_state.append(em);

self.output.push_command(Command::AppendEntry {
// When initialize, it behaves as a learner.
vote: *self.state.vote_ref(),
entry,
});

self.server_state_handler().update_server_state_if_changed();
self.following_handler().do_append_entries(vec![entry], 0);

// With the new config, start to elect to become leader
self.elect();
Expand Down Expand Up @@ -663,8 +651,11 @@ where C: RaftTypeConfig
// Thus append_blank_log() can be moved before rebuild_replication_streams()

rh.rebuild_replication_streams();
rh.append_blank_log();
rh.initiate_replication(SendNone::False);

// Safe unwrap(): Leader is just established
self.leader_handler()
.unwrap()
.leader_append_entries(vec![C::Entry::new_blank(LogId::<C::NodeId>::default())]);
}

/// Check if a raft node is in a state that allows to initialize.
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/engine_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ where C: RaftTypeConfig
}
Command::BecomeLeader => {}
Command::QuitLeader => {}
Command::AppendEntry { .. } => {}
Command::AppendInputEntries { .. } => {}
Command::ReplicateCommitted { .. } => {}
Command::Commit { .. } => {}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where C: RaftTypeConfig
///
/// Membership config changes are also detected and applied here.
#[tracing::instrument(level = "debug", skip(self, entries))]
fn do_append_entries(&mut self, mut entries: Vec<C::Entry>, since: usize) {
pub(crate) fn do_append_entries(&mut self, mut entries: Vec<C::Entry>, since: usize) {
let l = entries.len();

if since == l {
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ where C: RaftTypeConfig
return;
}

self.state.assign_log_ids(&mut entries);
// Engine::leader_handler() ensures it is a valid leader.
self.leader.assign_log_ids(&mut entries).unwrap();

self.state.extend_log_ids_from_same_leader(&entries);

let mut membership_entry = None;
Expand Down
25 changes: 0 additions & 25 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntry;
use crate::internal_server_state::LeaderQuorumSet;
use crate::leader::Leading;
use crate::progress::entry::ProgressEntry;
Expand Down Expand Up @@ -60,30 +59,6 @@ pub(crate) enum SendNone {
impl<'x, C> ReplicationHandler<'x, C>
where C: RaftTypeConfig
{
/// Append a blank log.
///
/// It is used by the leader when leadership is established.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn append_blank_log(&mut self) {
let log_id = LogId::new(
self.state.vote_ref().committed_leader_id().unwrap(),
self.state.last_log_id().next_index(),
);
self.state.log_ids.append(log_id);
let entry = C::Entry::new_blank(log_id);

// TODO: with asynchronous IO in future,
// do not write log until vote being committed,
// or consistency is broken.
self.output.push_command(Command::AppendEntry {
// A leader should always use the leader's vote.
vote: self.leader.vote,
entry,
});

self.update_local_progress(Some(log_id));
}

/// Append a new membership and update related state such as replication streams.
///
/// It is called by the leader when a new membership log is appended to log store.
Expand Down
7 changes: 6 additions & 1 deletion openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ impl<NID: NodeId> LogIdList<NID> {

// l >= 1

debug_assert!(new_log_id > self.key_log_ids[l - 1]);
debug_assert!(
new_log_id > self.key_log_ids[l - 1],
"new_log_id: {}, last: {}",
new_log_id,
self.key_log_ids[l - 1]
);

if l == 1 {
self.key_log_ids.push(new_log_id);
Expand Down
36 changes: 4 additions & 32 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,9 @@ fn test_elect() -> anyhow::Result<()> {
},
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::new_committed(1, 1),
entry: Entry::<UTConfig>::new_blank(log_id(1, 1, 1))
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},),
},
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},
entries: vec![Entry::<UTConfig>::new_blank(log_id(1, 1, 1))]
},
],
eng.output.take_commands()
Expand Down Expand Up @@ -127,23 +113,9 @@ fn test_elect() -> anyhow::Result<()> {
},
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::new_committed(2, 1),
entry: Entry::<UTConfig>::new_blank(log_id(2, 1, 1))
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: CommittedLeaderId::new(2, 1),
index: 1,
},),
},
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
leader_id: CommittedLeaderId::new(2, 1),
index: 1,
},
entries: vec![Entry::<UTConfig>::new_blank(log_id(2, 1, 1))]
},
],
eng.output.take_commands()
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
Command::RebuildReplicationStreams {
targets: vec![(2, ProgressEntry::empty(1))]
},
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::new_committed(2, 1),
entry: Entry::<UTConfig>::new_blank(log_id(2, 1, 1)),
entries: vec![Entry::<UTConfig>::new_blank(log_id(2, 1, 1))],
},
Command::Replicate {
target: 2,
Expand Down
26 changes: 6 additions & 20 deletions openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ fn test_initialize_single_node() -> anyhow::Result<()> {

assert_eq!(
vec![
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::default(),
entry: Entry::<UTConfig>::new_membership(LogId::default(), m1())
entries: vec![Entry::<UTConfig>::new_membership(LogId::default(), m1())],
},
// When update the effective membership, the engine set it to Follower.
// But when initializing, it will switch to Candidate at once, in the last output
Expand All @@ -71,23 +71,9 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
},
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::new_committed(1, 1),
entry: Entry::<UTConfig>::new_blank(log_id(1, 1, 1))
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},),
},
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},
entries: vec![Entry::<UTConfig>::new_blank(log_id(1, 1, 1))]
},
],
eng.output.take_commands()
Expand Down Expand Up @@ -131,9 +117,9 @@ fn test_initialize() -> anyhow::Result<()> {

assert_eq!(
vec![
Command::AppendEntry {
Command::AppendInputEntries {
vote: Vote::default(),
entry: Entry::new_membership(LogId::default(), m12())
entries: vec![Entry::new_membership(LogId::default(), m12())],
},
// When update the effective membership, the engine set it to Follower.
// But when initializing, it will switch to Candidate at once, in the last output
Expand Down
Loading

0 comments on commit 30cdf5f

Please sign in to comment.