Skip to content

Commit

Permalink
refactor(consensus): the SHC goes from a task to an object which is c…
Browse files Browse the repository at this point in the history
…alled directly (#2079)
  • Loading branch information
matan-starkware authored Jun 13, 2024
1 parent a83fe00 commit 2bdd596
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 168 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ starknet_api.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2" }
papyrus_protobuf = { path = "../../papyrus_protobuf", version = "0.4.0-dev.2" }

[dev-dependencies]
mockall.workspace = true
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
#[allow(dead_code)]
pub mod single_height_consensus;
#[cfg(test)]
pub mod test_utils;
pub(crate) mod test_utils;
#[allow(dead_code)]
pub mod types;
115 changes: 58 additions & 57 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,33 @@
#[path = "single_height_consensus_test.rs"]
mod single_height_consensus_test;

use std::cell::RefCell;
use std::ops::DerefMut;
use std::sync::Arc;

use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockHash, BlockNumber};

use crate::types::{
ConsensusBlock,
ConsensusContext,
ConsensusError,
NodeId,
PeeringConsensusMessage,
NetworkSender,
ProposalInit,
ValidatorId,
};

/// Struct which represents a single height of consensus. Each height is expected to be begun with a
/// call to `start`, which is relevant if we are the proposer for this height's first round. SHC
/// receives messages directly as parameters to function calls. It can send out messages "directly"
/// to the network, and returning a decision to the caller.
pub(crate) struct SingleHeightConsensus<BlockT>
where
BlockT: ConsensusBlock,
{
height: BlockNumber,
context: Arc<dyn ConsensusContext<Block = BlockT>>,
validators: Vec<NodeId>,
id: NodeId,
to_peering_sender: mpsc::Sender<PeeringConsensusMessage<BlockT::ProposalChunk>>,
// This is a RefCell since peering sends to the same receiver permanently, but a new SHC runs
// for each height.
from_peering_receiver: RefCell<mpsc::Receiver<PeeringConsensusMessage<BlockT::ProposalChunk>>>,
validators: Vec<ValidatorId>,
id: ValidatorId,
to_network_sender: Box<dyn NetworkSender<ProposalChunk = BlockT::ProposalChunk>>,
}

impl<BlockT> SingleHeightConsensus<BlockT>
Expand All @@ -40,30 +38,25 @@ where
pub(crate) async fn new(
height: BlockNumber,
context: Arc<dyn ConsensusContext<Block = BlockT>>,
id: NodeId,
to_peering_sender: mpsc::Sender<PeeringConsensusMessage<BlockT::ProposalChunk>>,
from_peering_receiver: RefCell<
mpsc::Receiver<PeeringConsensusMessage<BlockT::ProposalChunk>>,
>,
id: ValidatorId,
to_network_sender: Box<dyn NetworkSender<ProposalChunk = BlockT::ProposalChunk>>,
) -> Self {
let validators = context.validators(height).await;
Self { height, context, validators, id, to_peering_sender, from_peering_receiver }
Self { height, context, validators, id, to_network_sender }
}

pub(crate) async fn run(mut self) -> Result<BlockT, ConsensusError> {
// TODO(matan): In the future this logic will be encapsulated in the state machine, and SHC
// will await a signal from SHC to propose.
pub(crate) async fn start(&mut self) -> Result<Option<BlockT>, ConsensusError> {
let proposer_id = self.context.proposer(&self.validators, self.height);
if proposer_id == self.id { self.propose().await } else { self.validate(proposer_id).await }
}
if proposer_id != self.id {
return Ok(None);
}

async fn propose(&mut self) -> Result<BlockT, ConsensusError> {
let (content_receiver, block_receiver) = self.context.build_proposal(self.height).await;
let (fin_sender, fin_receiver) = oneshot::channel();
let init = ProposalInit { height: self.height, proposer: self.id };
// Peering is a permanent component, so if sending to it fails we cannot continue.
self.to_peering_sender
.send(PeeringConsensusMessage::Proposal((init, content_receiver, fin_receiver)))
self.to_network_sender
.propose(init, content_receiver, fin_receiver)
.await
.expect("Failed sending Proposal to Peering");
let block = block_receiver.await.expect("Block building failed.");
Expand All @@ -73,45 +66,53 @@ where
//
// TODO(matan): Switch this to the Proposal signature.
fin_sender.send(block.id()).expect("Failed to send ProposalFin to Peering.");
Ok(block)
Ok(Some(block))
}

async fn validate(&mut self, proposer_id: NodeId) -> Result<BlockT, ConsensusError> {
let mut receiver = self
.from_peering_receiver
.try_borrow_mut()
.expect("Couldn't get exclusive access to Peering receiver.");
// Peering is a permanent component, so if receiving from it fails we cannot continue.
let msg = receiver.deref_mut().next().await.expect("Cannot receive from Peering");
let (init, content_receiver, fin_receiver) = match msg {
PeeringConsensusMessage::Proposal((init, content_receiver, block_hash_receiver)) => {
(init, content_receiver, block_hash_receiver)
}
};
if init.height != self.height || init.proposer != proposer_id {
// Ignore the proposal.
// TODO(matan): Do we want to handle this gracefully and "retry" in milestone 1?
return Err(ConsensusError::InvalidProposal(proposer_id, self.height));
/// Receive a proposal from a peer node. Returns only once the proposal has been fully received
/// and processed.
pub(crate) async fn handle_proposal(
&mut self,
init: ProposalInit,
content_receiver: mpsc::Receiver<<BlockT as ConsensusBlock>::ProposalChunk>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<Option<BlockT>, ConsensusError> {
let proposer_id = self.context.proposer(&self.validators, self.height);
if init.height != self.height {
let msg = format!("invalid height: expected {:?}, got {:?}", self.height, init.height);
return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg));
}
if init.proposer != proposer_id {
let msg =
format!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg));
}

let block_receiver = self.context.validate_proposal(self.height, content_receiver).await;
// Receive the block which was build by Context. If the channel is closed without a block
// being sent, this is an invalid proposal.
// TODO(matan): Actual Tendermint should handle invalid proposals.
let block = block_receiver
.await
.map_err(|_| ConsensusError::InvalidProposal(proposer_id, self.height))?;
// Receive the fin message from the proposer. If this is not received, this is an invalid
// proposal.
let block = block_receiver.await.map_err(|_| {
ConsensusError::InvalidProposal(
proposer_id,
self.height,
"block validation failed".into(),
)
})?;
// TODO(matan): Actual Tendermint should handle invalid proposals.
let fin = fin_receiver
.await
.map_err(|_| ConsensusError::InvalidProposal(proposer_id, self.height))?;
let fin = fin_receiver.await.map_err(|_| {
ConsensusError::InvalidProposal(
proposer_id,
self.height,
"proposal fin never received".into(),
)
})?;
// TODO(matan): Switch to signature validation and handle invalid proposals.
// An invalid signature means the proposal is invalid. The signature validation on the
// proposal's init prevents a malicious node from sending proposals during another's round.
if block.id() != fin {
return Err(ConsensusError::InvalidProposal(proposer_id, self.height));
return Err(ConsensusError::InvalidProposal(
proposer_id,
self.height,
"block signature doesn't match expected block hash".into(),
));
}
Ok(block)
Ok(Some(block))
}
}
Original file line number Diff line number Diff line change
@@ -1,133 +1,102 @@
use std::cell::RefCell;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
use tokio;

use super::SingleHeightConsensus;
use crate::test_utils::{MockTestContext, TestBlock};
use crate::types::{ConsensusBlock, NodeId, PeeringConsensusMessage, ProposalInit};

type Shc = SingleHeightConsensus<TestBlock>;
type ProposalChunk = <TestBlock as ConsensusBlock>::ProposalChunk;
type PeeringMessage = PeeringConsensusMessage<ProposalChunk>;

struct TestSetup {
pub context: MockTestContext,
pub shc_to_peering_sender: mpsc::Sender<PeeringConsensusMessage<u32>>,
pub shc_to_peering_receiver: mpsc::Receiver<PeeringConsensusMessage<u32>>,
pub peering_to_shc_sender: mpsc::Sender<PeeringConsensusMessage<u32>>,
pub peering_to_shc_receiver: mpsc::Receiver<PeeringConsensusMessage<u32>>,
}

impl TestSetup {
fn new() -> TestSetup {
let (shc_to_peering_sender, shc_to_peering_receiver) = mpsc::channel(1);
let (peering_to_shc_sender, peering_to_shc_receiver) = mpsc::channel(1);
let context = MockTestContext::new();
TestSetup {
context,
shc_to_peering_sender,
shc_to_peering_receiver,
peering_to_shc_sender,
peering_to_shc_receiver,
}
}

// This should be called after all of the mock's expectations have been set. This is because SHC
// holds `Arc<dyn ConsentContext>`. Setting mock expectations, though, require exclusive access
// (`&mut self`).
async fn new_shc(
self,
height: BlockNumber,
id: NodeId,
) -> (
Shc,
mpsc::Receiver<PeeringConsensusMessage<u32>>,
mpsc::Sender<PeeringConsensusMessage<u32>>,
) {
let shc = Shc::new(
height,
Arc::new(self.context),
id,
self.shc_to_peering_sender,
RefCell::new(self.peering_to_shc_receiver),
)
.await;
(shc, self.shc_to_peering_receiver, self.peering_to_shc_sender)
}
}
use crate::types::{ConsensusBlock, MockNetworkSender, ProposalInit, ValidatorId};

#[tokio::test]
async fn propose() {
let mut test_fields = TestSetup::new();
let node_id: NodeId = 1_u32.into();
let block = TestBlock { content: vec![1, 2, 3], id: 1 };
async fn proposer() {
let mut context = MockTestContext::new();
let mut network_sender = MockNetworkSender::new();

let node_id: ValidatorId = 1_u32.into();
let block = TestBlock { content: vec![1, 2, 3], id: BlockHash(Felt::ONE) };
// Set expectations for how the test should run:
test_fields
.context
context
.expect_validators()
.returning(move |_| vec![node_id, 2_u32.into(), 3_u32.into(), 4_u32.into()]);
test_fields.context.expect_proposer().returning(move |_, _| node_id);
context.expect_proposer().returning(move |_, _| node_id);
let block_clone = block.clone();
test_fields.context.expect_build_proposal().returning(move |_| {
// SHC doesn't actually handle the content, so ignore for unit tests.
context.expect_build_proposal().returning(move |_| {
let (_, content_receiver) = mpsc::channel(1);
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(block_clone.clone()).unwrap();
(content_receiver, block_receiver)
});

// Creation calls to `context.validators`.
let (shc, mut shc_to_peering_receiver, _) = test_fields.new_shc(BlockNumber(0), node_id).await;
let fin_receiver = Arc::new(OnceLock::new());
let fin_receiver_clone = Arc::clone(&fin_receiver);
network_sender.expect_propose().return_once(move |init, _, fin_receiver| {
// Ignore content receiver, since this is the context's responsibility.
assert_eq!(init.height, BlockNumber(0));
assert_eq!(init.proposer, node_id);
fin_receiver_clone.set(fin_receiver).unwrap();
Ok(())
});

// This calls to `context.proposer` and `context.build_proposal`.
assert_eq!(shc.run().await.unwrap(), block);
let mut shc = SingleHeightConsensus::new(
BlockNumber(0),
Arc::new(context),
node_id,
Box::new(network_sender),
)
.await;

// Check what was sent to peering. We don't check the content stream as that is filled by
// ConsensusContext, not SHC.
let PeeringMessage::Proposal((init, _, block_id_receiver)) =
shc_to_peering_receiver.next().await.unwrap();
assert_eq!(init, ProposalInit { height: BlockNumber(0), proposer: node_id });
assert_eq!(block_id_receiver.await.unwrap(), block.id());
let decision = shc.start().await.unwrap().unwrap();
assert_eq!(decision, block);

// Check the fin sent to the network.
let fin = Arc::into_inner(fin_receiver).unwrap().take().unwrap().await.unwrap();
assert_eq!(fin, block.id());
}

#[tokio::test]
async fn validate() {
let mut test_fields = TestSetup::new();
let node_id: NodeId = 1_u32.into();
let proposer: NodeId = 2_u32.into();
let block = TestBlock { content: vec![1, 2, 3], id: 1 };
async fn validator() {
let mut context = MockTestContext::new();

let node_id: ValidatorId = 1_u32.into();
let proposer: ValidatorId = 2_u32.into();
let block = TestBlock { content: vec![1, 2, 3], id: BlockHash(Felt::ONE) };

// Set expectations for how the test should run:
test_fields
.context
context
.expect_validators()
.returning(move |_| vec![node_id, proposer, 3_u32.into(), 4_u32.into()]);
test_fields.context.expect_proposer().returning(move |_, _| proposer);
context.expect_proposer().returning(move |_, _| proposer);
let block_clone = block.clone();
test_fields.context.expect_validate_proposal().returning(move |_, _| {
context.expect_validate_proposal().returning(move |_, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(block_clone.clone()).unwrap();
block_receiver
});

// Creation calls to `context.validators`.
let (shc, _, mut peering_to_shc_sender) = test_fields.new_shc(BlockNumber(0), node_id).await;
let mut shc = SingleHeightConsensus::new(
BlockNumber(0),
Arc::new(context),
node_id,
Box::new(MockNetworkSender::new()),
)
.await;

// Send the proposal from the peer.
let (fin_sender, fin_receiver) = oneshot::channel();
peering_to_shc_sender
.send(PeeringMessage::Proposal((
fin_sender.send(block.id()).unwrap();
let decision = shc
.handle_proposal(
ProposalInit { height: BlockNumber(0), proposer },
mpsc::channel(1).1, // content - ignored by SHC.
fin_receiver,
)))
)
.await
.unwrap()
.unwrap();
fin_sender.send(block.id()).unwrap();

// This calls to `context.proposer` and `context.build_proposal`.
assert_eq!(shc.run().await.unwrap(), block);
assert_eq!(decision, block);
}
Loading

0 comments on commit 2bdd596

Please sign in to comment.