Skip to content

Commit

Permalink
Feature: Retrieve Key Log IDs via RaftLogReader::get_key_log_ids()
Browse files Browse the repository at this point in the history
Key log IDs represent the first log IDs proposed by each Leader. These
IDs enable Openraft to efficiently access log IDs at each index with
a succinct storage.

Previously, key log IDs were obtained using a binary-search-like
algorithm through `RaftLogReader`. This commit introduces the
`RaftLogReader::get_key_log_ids()` method, allowing implementations to
directly return a list of key log IDs if the `RaftLogStorage` can
provide them.

For backward compatibility, a default implementation using the original
binary-search method is provided. No application changes are required
when upgrading to this version.

Tests verifying the implementation are included in
`openraft::testing::log::suite::Suite`.

- Fixes: #1261
  • Loading branch information
drmingdrmer committed Nov 8, 2024
1 parent 2c84eb4 commit 9178ef8
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 47 deletions.
54 changes: 54 additions & 0 deletions openraft/src/engine/leader_log_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::fmt;
use std::ops::RangeInclusive;

use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;

/// The first and the last log id belonging to a Leader.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LeaderLogIds<C: RaftTypeConfig> {
log_id_range: Option<RangeInclusive<LogIdOf<C>>>,
}

impl<C> fmt::Display for LeaderLogIds<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.log_id_range {
None => write!(f, "None"),
Some(rng) => write!(f, "({}, {})", rng.start(), rng.end()),
}
}
}

impl<C> LeaderLogIds<C>
where C: RaftTypeConfig
{
pub(crate) fn new(log_id_range: Option<RangeInclusive<LogIdOf<C>>>) -> Self {
Self { log_id_range }
}

/// Used only in tests
#[allow(dead_code)]
pub(crate) fn new_single(log_id: LogIdOf<C>) -> Self {
Self {
log_id_range: Some(log_id.clone()..=log_id),
}
}

/// Used only in tests
#[allow(dead_code)]
pub(crate) fn new_start_end(first: LogIdOf<C>, last: LogIdOf<C>) -> Self {
Self {
log_id_range: Some(first..=last),
}
}

pub(crate) fn first(&self) -> Option<&LogIdOf<C>> {
self.log_id_range.as_ref().map(|x| x.start())
}

pub(crate) fn last(&self) -> Option<&LogIdOf<C>> {
self.log_id_range.as_ref().map(|x| x.end())
}
}
43 changes: 23 additions & 20 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::ops::RangeInclusive;

use crate::engine::leader_log_ids::LeaderLogIds;
use crate::log_id::RaftLogId;
use crate::storage::RaftLogReaderExt;
use crate::type_config::alias::LogIdOf;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftLogReader;
use crate::RaftTypeConfig;
use crate::StorageError;

Expand Down Expand Up @@ -43,24 +48,17 @@ where C: RaftTypeConfig
/// A-------B-------C : find(A,B); find(B,C) // both find `B`, need to de-dup
/// A-------C-------C : find(A,C)
/// ```
pub(crate) async fn load_log_ids<LRX>(
last_purged_log_id: Option<LogId<C::NodeId>>,
last_log_id: Option<LogId<C::NodeId>>,
sto: &mut LRX,
) -> Result<LogIdList<C>, StorageError<C>>
pub(crate) async fn get_key_log_ids<LR>(
range: RangeInclusive<LogId<C::NodeId>>,
sto: &mut LR,
) -> Result<Vec<LogIdOf<C>>, StorageError<C>>
where
LRX: RaftLogReaderExt<C>,
LR: RaftLogReader<C> + ?Sized,
{
let mut res = vec![];
let first = range.start().clone();
let last = range.end().clone();

let last = match last_log_id {
None => return Ok(LogIdList::new(res)),
Some(x) => x,
};
let first = match last_purged_log_id {
None => sto.get_log_id(0).await?,
Some(x) => x,
};
let mut res: Vec<LogIdOf<C>> = vec![];

// Recursion stack
let mut stack = vec![(first, last.clone())];
Expand Down Expand Up @@ -114,13 +112,16 @@ where C: RaftTypeConfig
res.push(last);
}

Ok(LogIdList::new(res))
Ok(res)
}
}

impl<C> LogIdList<C>
where C: RaftTypeConfig
{
/// Create a new `LogIdList`.
///
/// It stores the last purged log id, and a series of key log ids.
pub fn new(key_log_ids: impl IntoIterator<Item = LogId<C::NodeId>>) -> Self {
Self {
key_log_ids: key_log_ids.into_iter().collect(),
Expand Down Expand Up @@ -310,18 +311,20 @@ where C: RaftTypeConfig
/// Note that the 0-th log does not belong to any leader(but a membership log to initialize a
/// cluster) but this method does not differentiate between them.
#[allow(dead_code)]
pub(crate) fn by_last_leader(&self) -> &[LogId<C::NodeId>] {
pub(crate) fn by_last_leader(&self) -> LeaderLogIds<C> {
let ks = &self.key_log_ids;
let l = ks.len();
if l < 2 {
return ks;
let last = self.last();
return LeaderLogIds::new(last.map(|x| x.clone()..=x.clone()));
}

// There are at most two(adjacent) key log ids with the same leader_id
if ks[l - 1].leader_id() == ks[l - 2].leader_id() {
&ks[l - 2..]
LeaderLogIds::new_start_end(ks[l - 2].clone(), ks[l - 1].clone())
} else {
&ks[l - 1..]
let last = self.last().cloned().unwrap();
LeaderLogIds::new_single(last)
}
}
}
3 changes: 2 additions & 1 deletion openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ mod command_kind;
mod engine_config;
mod engine_impl;
mod engine_output;
mod log_id_list;
mod replication_progress;

pub(crate) mod command;
pub(crate) mod handler;
pub(crate) mod leader_log_ids;
pub(crate) mod log_id_list;
pub(crate) mod time_state;

#[cfg(test)]
Expand Down
17 changes: 12 additions & 5 deletions openraft/src/engine/tests/log_id_list_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::engine::testing::UTConfig;
use crate::engine::LogIdList;
use crate::testing::log_id;
Expand Down Expand Up @@ -357,23 +358,29 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> {
fn test_log_id_list_by_last_leader() -> anyhow::Result<()> {
// len == 0
let ids = LogIdList::<UTConfig>::default();
assert_eq!(ids.by_last_leader(), &[]);
assert_eq!(ids.by_last_leader(), LeaderLogIds::new(None));

// len == 1
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1)]);
assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader());
assert_eq!(LeaderLogIds::new_single(log_id(1, 1, 1)), ids.by_last_leader());

// len == 2, the last leader has only one log
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(3, 1, 3)]);
assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader());
assert_eq!(LeaderLogIds::new_single(log_id(3, 1, 3)), ids.by_last_leader());

// len == 2, the last leader has two logs
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(1, 1, 3)]);
assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader());
assert_eq!(
LeaderLogIds::new_start_end(log_id(1, 1, 1), log_id(1, 1, 3)),
ids.by_last_leader()
);

// len > 2, the last leader has only more than one logs
let ids = LogIdList::<UTConfig>::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]);
assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader());
assert_eq!(
LeaderLogIds::new_start_end(log_id(7, 1, 8), log_id(7, 1, 10)),
ids.by_last_leader()
);

Ok(())
}
10 changes: 8 additions & 2 deletions openraft/src/proposer/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt;

use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::proposer::Leader;
Expand Down Expand Up @@ -110,8 +111,13 @@ where
vote.into_committed()
};

let last_leader_log_ids = self.last_log_id().cloned().into_iter().collect::<Vec<_>>();
// TODO: tricky: the new LeaderId is different from the last log id
// Thus only the last().index is used.
// Thus the first() is ignored.
// But we should not fake the first() there.
let last = self.last_log_id();
let last_leader_log_ids = LeaderLogIds::new(last.map(|last| last.clone()..=last.clone()));

Leader::new(vote, self.quorum_set.clone(), self.learner_ids, &last_leader_log_ids)
Leader::new(vote, self.quorum_set.clone(), self.learner_ids, last_leader_log_ids)
}
}
60 changes: 45 additions & 15 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplaySliceExt;
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
Expand Down Expand Up @@ -82,19 +82,19 @@ where
vote: CommittedVote<C>,
quorum_set: QS,
learner_ids: impl IntoIterator<Item = C::NodeId>,
last_leader_log_id: &[LogIdOf<C>],
last_leader_log_id: LeaderLogIds<C>,
) -> Self {
debug_assert!(
Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.last() {}",
vote,
last_leader_log_id.display()
last_leader_log_id
);
debug_assert!(
Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()),
"vote {} must GE last_leader_log_id.first() {}",
vote,
last_leader_log_id.display()
last_leader_log_id
);

let learner_ids = learner_ids.into_iter().collect::<Vec<_>>();
Expand Down Expand Up @@ -222,6 +222,7 @@ where

#[cfg(test)]
mod tests {
use crate::engine::leader_log_ids::LeaderLogIds;
use crate::engine::testing::UTConfig;
use crate::entry::RaftEntry;
use crate::progress::Progress;
Expand All @@ -238,7 +239,12 @@ mod tests {
tracing::info!("--- vote greater than last log id, create new noop_log_id");
{
let vote = Vote::new(2, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
LeaderLogIds::new_start_end(log_id(1, 2, 1), log_id(1, 2, 3)),
);

assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -247,7 +253,12 @@ mod tests {
tracing::info!("--- vote equals last log id, reuse noop_log_id");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]);
let leader = Leader::<UTConfig, _>::new(
vote,
vec![1, 2, 3],
vec![],
LeaderLogIds::new_start_end(log_id(1, 2, 1), log_id(1, 2, 3)),
);

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -256,7 +267,8 @@ mod tests {
tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
let leader =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 2, 3)));

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3)));
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3)));
Expand All @@ -265,7 +277,7 @@ mod tests {
tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0");
{
let vote = Vote::new(1, 2).into_committed();
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
let leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None));

assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0)));
assert_eq!(leader.last_log_id(), None);
Expand All @@ -275,7 +287,8 @@ mod tests {
#[test]
fn test_leader_established() {
let vote = Vote::new(2, 2).into_committed();
let mut leader = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]);
let mut leader =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 2, 3)));

let mut entries = vec![Entry::<UTConfig>::new_blank(log_id(5, 5, 2))];
leader.assign_log_ids(&mut entries);
Expand All @@ -291,7 +304,7 @@ mod tests {
#[test]
fn test_1_entry_none_last_log_id() {
let vote = Vote::new(0, 0).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[]);
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None));

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1)];
leading.assign_log_ids(&mut entries);
Expand All @@ -303,7 +316,8 @@ mod tests {
#[test]
fn test_no_entries_provided() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]);
let mut leading =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new_single(log_id(1, 1, 8)));

let mut entries: Vec<Entry<UTConfig>> = vec![];
leading.assign_log_ids(&mut entries);
Expand All @@ -313,7 +327,8 @@ mod tests {
#[test]
fn test_multiple_entries() {
let vote = Vote::new(2, 2).into_committed();
let mut leading = Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]);
let mut leading =
Leader::<UTConfig, _>::new(vote, vec![1, 2, 3], [], LeaderLogIds::new_single(log_id(1, 1, 8)));

let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)];

Expand All @@ -326,7 +341,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 1).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 1).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let now1 = UTConfig::<()>::now();

Expand All @@ -337,7 +357,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 4).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 4).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand All @@ -352,7 +377,12 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leader::<UTConfig, Vec<u64>>::new(Vote::new(2, 5).into_committed(), vec![1, 2, 3], [4], &[]);
let mut leading = Leader::<UTConfig, Vec<u64>>::new(
Vote::new(2, 5).into_committed(),
vec![1, 2, 3],
[4],
LeaderLogIds::new(None),
);

let t2 = UTConfig::<()>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand Down
Loading

0 comments on commit 9178ef8

Please sign in to comment.