Skip to content

Commit

Permalink
Merge pull request #2747 from dusk-network/presync
Browse files Browse the repository at this point in the history
node: fix OutOfSync handler
  • Loading branch information
herr-seppia authored Oct 24, 2024
2 parents f1dbf16 + ab6287f commit 376c9c0
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 128 deletions.
36 changes: 22 additions & 14 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,14 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
match msg.payload {
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_)
| Payload::Quorum(_) => {
debug!(
event = "Consensus message received",
topic = ?msg.topic(),
info = ?msg.header,
metadata = ?msg.metadata,
);
| Payload::Ratification(_) => {
self.reroute_acceptor(msg).await;
}
Payload::Quorum(ref q) => {
fsm.on_quorum(q, msg.metadata.as_ref()).await;
self.reroute_acceptor(msg).await;

// Re-route message to the Consensus
let acc = self.acceptor.as_ref().expect("initialize is called");
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Could not reroute msg to Consensus: {}", e);
}
},
}

Payload::Block(blk) => {
info!(
Expand Down Expand Up @@ -318,4 +311,19 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
.try_revert(acceptor::RevertTarget::LastFinalizedState)
.await
}

async fn reroute_acceptor(&self, msg: Message) {
debug!(
event = "Consensus message received",
topic = ?msg.topic(),
info = ?msg.header,
metadata = ?msg.metadata,
);

// Re-route message to the Consensus
let acc = self.acceptor.as_ref().expect("initialize is called");
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Could not reroute msg to Consensus: {}", e);
}
}
}
196 changes: 144 additions & 52 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,68 @@ const DEFAULT_HOPS_LIMIT: u16 = 16;

type SharedHashSet = Arc<RwLock<HashSet<[u8; 32]>>>;

/// `PresyncInfo` holds information about the presync process, which is used to
/// verify if a peer has valid block successors before switching the system into
/// out-of-sync mode.
/// This struct helps safeguard against syncing with malicious peers by tracking
/// and validating block continuity from a specific peer within a given
/// timeframe.
#[derive(Clone)]
struct PresyncInfo {
// The address of the peer that we're syncing with. This helps to identify
// which peer the presync information is associated with.
peer_addr: SocketAddr,
start_height: u64,
target_blk: Block,

// The current tip height of our local blockchain. This is the height
// of the highest block we know of before starting the presync process.
tip_height: u64,

// The remote height provided by the peer, which indicates the height of
// the last block the peer knows of. This is used to compare and determine
// whether the peer is ahead of us and if we're out of sync.
remote_height: u64,

// A timestamp indicating when the presync process should expire. If the
// peer doesn't provide valid blocks by this time, the presync is
// considered failed.
expiry: Instant,

// A pool of blocks that are collected from the peer during the presync
// process. These blocks will be validated to ensure that the peer has
// valid successors for the current tip.
pool: Vec<Block>,
}

impl PresyncInfo {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
fn new(

fn from_block(
peer_addr: SocketAddr,
remote_block: Block,
tip_height: u64,
) -> Self {
let remote_height = remote_block.header().height;
let mut info = Self::from_height(peer_addr, remote_height, tip_height);
info.pool.push(remote_block);
info
}

fn from_height(
peer_addr: SocketAddr,
target_blk: Block,
start_height: u64,
remote_height: u64,
tip_height: u64,
) -> Self {
Self {
peer_addr,
target_blk,
remote_height,
expiry: Instant::now().checked_add(Self::DEFAULT_TIMEOUT).unwrap(),
start_height,
tip_height,
pool: vec![],
}
}

fn start_height(&self) -> u64 {
self.start_height
self.tip_height
}
}

Expand Down Expand Up @@ -116,6 +153,17 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
self.acc.write().await.restart_consensus().await;
}

pub async fn on_quorum(
&mut self,
quorum: &Quorum,
metadata: Option<&Metadata>,
) {
match &mut self.curr {
State::OutOfSync(oos) => oos.on_quorum(quorum).await,
State::InSync(is) => is.on_quorum(quorum, metadata).await,
}
}

/// Handles an event of a block occurrence.
///
/// A block event could originate from either local consensus execution, a
Expand Down Expand Up @@ -148,7 +196,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
if let Some(blk) = blk.as_ref() {
let fsm_res = match &mut self.curr {
State::InSync(ref mut curr) => {
if let Some((target_block, peer_addr)) =
if let Some(presync) =
curr.on_block_event(blk, metadata).await?
{
// Transition from InSync to OutOfSync state
Expand All @@ -160,13 +208,13 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
self.network.clone(),
)
.await;
next.on_entering(target_block, peer_addr).await;
next.on_entering(presync).await;
self.curr = State::OutOfSync(next);
}
anyhow::Ok(())
}
State::OutOfSync(ref mut curr) => {
if curr.on_block_event(blk, metadata).await? {
if curr.on_block_event(blk).await? {
// Transition from OutOfSync to InSync state
curr.on_exiting().await;

Expand Down Expand Up @@ -498,7 +546,50 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

/// performed when exiting the state
async fn on_exiting(&mut self) {}
async fn on_exiting(&mut self) {
self.presync = None
}

async fn on_quorum(
&mut self,
remote_quorum: &Quorum,
metadata: Option<&Metadata>,
) {
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
// If there's no active presync process, we proceed with validation
if self.presync.is_none() {
let tip_height = self.acc.read().await.get_curr_height().await;
// We use the quorum's previous block, to be sure that network
// already have the full block available
let remote_height = remote_quorum.header.round - 1;
// Don't compare with `= tip + 1` because that's supposed to be
// handled by the InSync
if remote_height > tip_height + 1 {
// Initialize the presync process, storing metadata about
// the peer, the remote height, and our current tip height.
// This serves as a safeguard to avoid switching into
// out-of-sync mode without verifying the peer's
// information.
self.presync = Some(PresyncInfo::from_height(
peer_addr,
remote_height,
tip_height,
));

// Request the block immediately following our tip height
// from the peer to verify if the peer has a valid
// continuation of our chain.
// If the requested block (from the same peer) is accepted
// by the on_block_event before the presync timer expires,
// we will transition into out_of_sync mode.
self.request_block(tip_height + 1, peer_addr).await;
}
}
}
}

/// Return Some if there is the need to switch to OutOfSync mode.
/// This way the sync-up procedure to download all missing blocks from the
Expand All @@ -507,18 +598,19 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
&mut self,
remote_blk: &Block,
metadata: Option<Metadata>,
) -> anyhow::Result<Option<(Block, SocketAddr)>> {
) -> anyhow::Result<Option<PresyncInfo>> {
let mut acc = self.acc.write().await;
let tip_header = acc.tip_header().await;
let tip_height = tip_header.height;
let remote_header = remote_blk.header();
let remote_height = remote_header.height;

// If we already accepted a block with the same height as remote_blk,
// check if remote_blk has higher priority. If so, we revert to its
// prev_block, and accept it as the new tip
if remote_height <= tip_header.height {
if remote_height <= tip_height {
// Ensure the block is different from what we have in our chain
if remote_height == tip_header.height {
if remote_height == tip_height {
if remote_header.hash == tip_header.hash {
return Ok(None);
}
Expand Down Expand Up @@ -559,7 +651,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

// Fetch the chain block at the same height as remote_blk
let local_blk = if remote_height == tip_header.height {
let local_blk = if remote_height == tip_height {
acc.tip.read().await.inner().clone()
} else {
acc.db
Expand Down Expand Up @@ -667,7 +759,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

// If remote_blk is a successor of our tip, we try to accept it
if remote_height == tip_header.height + 1 {
if remote_height == tip_height + 1 {
let finalized = acc.try_accept_block(remote_blk, true).await?;

// On first final block accepted while we're inSync, clear
Expand All @@ -679,15 +771,16 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
// If the accepted block is the one requested to presync peer,
// switch to OutOfSync/Syncing mode
if let Some(metadata) = &metadata {
if let Some(presync) = &mut self.presync {
if metadata.src_addr == presync.peer_addr
&& remote_height == presync.start_height() + 1
{
let res =
(presync.target_blk.clone(), presync.peer_addr);
self.presync = None;
return Ok(Some(res));
}
let same = self
.presync
.as_ref()
.map(|presync| {
metadata.src_addr == presync.peer_addr
&& remote_height == presync.start_height() + 1
})
.unwrap_or_default();
if same {
return Ok(self.presync.take());
}
}

Expand All @@ -697,44 +790,43 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(metadata) = &metadata {
if self.presync.is_none() {
self.presync = Some(PresyncInfo::new(
metadata.src_addr,
remote_blk.clone(),
tip_header.height,
));
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
match self.presync.as_mut() {
// If there's no active presync process, we proceed with
// validation
None => {
self.presync = Some(PresyncInfo::from_block(
peer_addr,
remote_blk.clone(),
tip_height,
));

self.request_block(tip_height + 1, peer_addr).await;
}
// If there's an active presync process, we add the received
// block to the pool so to process it when the sync procedure
// will start
Some(pre) => {
if pre.peer_addr == peer_addr {
pre.pool.push(remote_blk.clone())
}
}
}

Self::request_block_by_height(
&self.network,
tip_header.height + 1,
metadata.src_addr,
)
.await;
}

Ok(None)
}

/// Requests a block by height from a `peer_addr`
async fn request_block_by_height(
network: &Arc<RwLock<N>>,
height: u64,
peer_addr: SocketAddr,
) {
async fn request_block(&self, height: u64, peer_addr: SocketAddr) {
let network = self.network.read().await;
let mut inv = Inv::new(1);
inv.add_block_from_height(height);
let this_peer = *network.read().await.public_addr();
let this_peer = *network.public_addr();
let req = GetResource::new(inv, Some(this_peer), u64::MAX, 1);
debug!(event = "request block by height", ?req, ?peer_addr);

if let Err(err) = network
.read()
.await
.send_to_peer(req.into(), peer_addr)
.await
{
if let Err(err) = network.send_to_peer(req.into(), peer_addr).await {
warn!("could not request block {err}")
}
}
Expand Down
Loading

0 comments on commit 376c9c0

Please sign in to comment.