Skip to content

Commit

Permalink
refactor(consensus): state machine returns VecDeque instead of Vec (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware authored Jul 3, 2024
1 parent f4260ce commit c5b95d5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 46 deletions.
73 changes: 40 additions & 33 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
mod state_machine_test;

use std::collections::{HashMap, VecDeque};
use std::vec;

use starknet_api::block::BlockHash;

Expand Down Expand Up @@ -81,9 +80,9 @@ impl StateMachine {

/// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is needed
/// to trigger the first leader to propose. See [`StartRound`](StateMachineEvent::StartRound)
pub fn start(&mut self) -> Vec<StateMachineEvent> {
pub fn start(&mut self) -> VecDeque<StateMachineEvent> {
self.starting_round = true;
vec![StateMachineEvent::StartRound(None, self.round)]
VecDeque::from([StateMachineEvent::StartRound(None, self.round)])
}

/// Process the incoming event.
Expand All @@ -95,7 +94,7 @@ impl StateMachine {
/// events back to the state machine, as it makes sure to handle them before returning.
// This means that the StateMachine handles events the same regardless of whether it was sent by
// self or a peer. This is in line with the Algorithm 1 in the paper and keeps the code simpler.
pub fn handle_event(&mut self, event: StateMachineEvent) -> Vec<StateMachineEvent> {
pub fn handle_event(&mut self, event: StateMachineEvent) -> VecDeque<StateMachineEvent> {
// Mimic LOC 18 in the paper; the state machine doesn't
// handle any events until `getValue` completes.
if self.starting_round {
Expand All @@ -105,7 +104,7 @@ impl StateMachine {
}
_ => {
self.events_queue.push_back(event);
return Vec::new();
return VecDeque::new();
}
}
} else {
Expand All @@ -120,8 +119,8 @@ impl StateMachine {
fn handle_enqueued_events(
&mut self,
mut events_queue: VecDeque<StateMachineEvent>,
) -> Vec<StateMachineEvent> {
let mut output_events = Vec::new();
) -> VecDeque<StateMachineEvent> {
let mut output_events = VecDeque::new();
while let Some(event) = events_queue.pop_front() {
// Handle a specific event and then decide which of the output events should also be
// sent to self.
Expand All @@ -133,18 +132,18 @@ impl StateMachine {
events_queue.push_back(e.clone());
}
StateMachineEvent::Decision(_, _) => {
output_events.push(e);
output_events.push_back(e);
return output_events;
}
_ => {}
}
output_events.push(e);
output_events.push_back(e);
}
}
output_events
}

fn handle_event_internal(&mut self, event: StateMachineEvent) -> Vec<StateMachineEvent> {
fn handle_event_internal(&mut self, event: StateMachineEvent) -> VecDeque<StateMachineEvent> {
match event {
StateMachineEvent::StartRound(block_hash, round) => {
self.handle_start_round(block_hash, round)
Expand All @@ -168,70 +167,78 @@ impl StateMachine {
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> Vec<StateMachineEvent> {
) -> VecDeque<StateMachineEvent> {
// TODO(matan): Will we allow other events (timeoutPropose) to exit this state?
assert!(self.starting_round);
assert_eq!(round, self.round);
self.starting_round = false;

let Some(hash) = block_hash else {
// Validator.
return Vec::new();
return VecDeque::new();
};

// Proposer.
vec![StateMachineEvent::Proposal(hash, round)]
VecDeque::from([StateMachineEvent::Proposal(hash, round)])
}

// A proposal from a peer (or self) node.
fn handle_proposal(&mut self, block_hash: BlockHash, round: u32) -> Vec<StateMachineEvent> {
fn handle_proposal(
&mut self,
block_hash: BlockHash,
round: u32,
) -> VecDeque<StateMachineEvent> {
let old = self.proposals.insert(round, block_hash);
assert!(old.is_none(), "SHC should handle conflicts & replays");
if self.step != Step::Propose {
return Vec::new();
return VecDeque::new();
}

let mut output = vec![StateMachineEvent::Prevote(block_hash, round)];
let mut output = VecDeque::from([StateMachineEvent::Prevote(block_hash, round)]);
output.append(&mut self.advance_to_step(Step::Prevote));
output
}

// A prevote from a peer (or self) node.
fn handle_prevote(&mut self, block_hash: BlockHash, round: u32) -> Vec<StateMachineEvent> {
fn handle_prevote(&mut self, block_hash: BlockHash, round: u32) -> VecDeque<StateMachineEvent> {
assert_eq!(round, 0, "Only round 0 is supported in this milestone.");
let prevote_count = self.prevotes.entry(round).or_default().entry(block_hash).or_insert(0);
// TODO(matan): Use variable weight.
*prevote_count += 1;
if *prevote_count < self.quorum {
return Vec::new();
return VecDeque::new();
}
if self.step != Step::Prevote {
return Vec::new();
return VecDeque::new();
}

self.send_precommit(block_hash, round)
}

// A precommit from a peer (or self) node.
fn handle_precommit(&mut self, block_hash: BlockHash, round: u32) -> Vec<StateMachineEvent> {
fn handle_precommit(
&mut self,
block_hash: BlockHash,
round: u32,
) -> VecDeque<StateMachineEvent> {
assert_eq!(round, 0, "Only round 0 is supported in this milestone.");
let precommit_count =
self.precommits.entry(round).or_default().entry(block_hash).or_insert(0);
// TODO(matan): Use variable weight.
*precommit_count += 1;
if *precommit_count < self.quorum {
return Vec::new();
return VecDeque::new();
}
let Some(proposed_value) = self.proposals.get(&round) else {
return Vec::new();
return VecDeque::new();
};
// TODO(matan): Handle this due to malicious proposer.
assert_eq!(*proposed_value, block_hash, "Proposal should match quorum.");

vec![StateMachineEvent::Decision(block_hash, round)]
VecDeque::from([StateMachineEvent::Decision(block_hash, round)])
}

fn advance_to_step(&mut self, step: Step) -> Vec<StateMachineEvent> {
fn advance_to_step(&mut self, step: Step) -> VecDeque<StateMachineEvent> {
self.step = step;
// Check for an existing quorum in case messages arrived out of order.
match self.step {
Expand All @@ -243,28 +250,28 @@ impl StateMachine {
}
}

fn check_prevote_quorum(&mut self, round: u32) -> Vec<StateMachineEvent> {
fn check_prevote_quorum(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
let Some((block_hash, count)) = leading_vote(&self.prevotes, round) else {
return Vec::new();
return VecDeque::new();
};
if *count < self.quorum {
return Vec::new();
return VecDeque::new();
}
self.send_precommit(*block_hash, round)
}

fn check_precommit_quorum(&mut self, round: u32) -> Vec<StateMachineEvent> {
fn check_precommit_quorum(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
let Some((block_hash, count)) = leading_vote(&self.precommits, round) else {
return Vec::new();
return VecDeque::new();
};
if *count < self.quorum {
return Vec::new();
return VecDeque::new();
}
vec![StateMachineEvent::Decision(*block_hash, round)]
VecDeque::from([StateMachineEvent::Decision(*block_hash, round)])
}

fn send_precommit(&mut self, block_hash: BlockHash, round: u32) -> Vec<StateMachineEvent> {
let mut output = vec![StateMachineEvent::Precommit(block_hash, round)];
fn send_precommit(&mut self, block_hash: BlockHash, round: u32) -> VecDeque<StateMachineEvent> {
let mut output = VecDeque::from([StateMachineEvent::Precommit(block_hash, round)]);
output.append(&mut self.advance_to_step(Step::Precommit));
output
}
Expand Down
26 changes: 13 additions & 13 deletions crates/sequencing/papyrus_consensus/src/state_machine_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,30 @@ fn events_arrive_in_ideal_order(is_proposer: bool) {
let mut state_machine = StateMachine::new(4);

let mut events = state_machine.start();
assert_eq!(events.remove(0), StateMachineEvent::StartRound(None, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, ROUND));
if is_proposer {
events = state_machine.handle_event(StateMachineEvent::StartRound(Some(BLOCK_HASH), ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
} else {
state_machine.handle_event(StateMachineEvent::StartRound(None, ROUND));
assert!(events.is_empty());
events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
}
assert_eq!(events.remove(0), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert!(events.is_empty());

events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert!(events.is_empty());

events = state_machine.handle_event(StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert!(events.is_empty());

events = state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert!(events.is_empty());

events = state_machine.handle_event(StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Decision(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Decision(BLOCK_HASH, ROUND));
assert!(events.is_empty());
}

Expand All @@ -46,9 +46,9 @@ fn validator_receives_votes_first() {
let mut state_machine = StateMachine::new(4);

let mut events = state_machine.start();
assert_eq!(events.remove(0), StateMachineEvent::StartRound(None, 0));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, ROUND));
assert!(events.is_empty());
events.append(&mut state_machine.handle_event(StateMachineEvent::StartRound(None, 0)));
events = state_machine.handle_event(StateMachineEvent::StartRound(None, ROUND));
assert!(events.is_empty());

// Receives votes from all the other nodes first (more than minimum for a quorum).
Expand All @@ -62,17 +62,17 @@ fn validator_receives_votes_first() {

// Finally the proposal arrives.
events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Decision(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Decision(BLOCK_HASH, ROUND));
assert!(events.is_empty());
}

#[test]
fn buffer_events_during_start_round() {
let mut state_machine = StateMachine::new(4);
let mut events = state_machine.start();
assert_eq!(events.remove(0), StateMachineEvent::StartRound(None, 0));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, 0));
assert!(events.is_empty());

// TODO(matan): When we support NIL votes, we should send them. Real votes without the proposal
Expand All @@ -85,7 +85,7 @@ fn buffer_events_during_start_round() {

// Node finishes building the proposal.
events = state_machine.handle_event(StateMachineEvent::StartRound(None, 0));
assert_eq!(events.remove(0), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.remove(0), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert!(events.is_empty());
}

0 comments on commit c5b95d5

Please sign in to comment.