diff --git a/node/src/chain.rs b/node/src/chain.rs index 08ef9b1ee..3cc328f15 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -146,21 +146,14 @@ impl match msg.payload { Payload::Candidate(_) | Payload::Validation(_) - | Payload::Ratification(_) - | Payload::Quorum(_) => { - debug!( - event = "Consensus message received", - topic = ?msg.topic(), - info = ?msg.header, - metadata = ?msg.metadata, - ); + | Payload::Ratification(_) => { + self.reroute_acceptor(msg).await; + } + Payload::Quorum(ref q) => { + fsm.on_quorum(q, msg.metadata.as_ref()).await; + self.reroute_acceptor(msg).await; - // Re-route message to the Consensus - let acc = self.acceptor.as_ref().expect("initialize is called"); - if let Err(e) = acc.read().await.reroute_msg(msg).await { - warn!("Could not reroute msg to Consensus: {}", e); - } - }, + } Payload::Block(blk) => { info!( @@ -318,4 +311,19 @@ impl ChainSrv { .try_revert(acceptor::RevertTarget::LastFinalizedState) .await } + + async fn reroute_acceptor(&self, msg: Message) { + debug!( + event = "Consensus message received", + topic = ?msg.topic(), + info = ?msg.header, + metadata = ?msg.metadata, + ); + + // Re-route message to the Consensus + let acc = self.acceptor.as_ref().expect("initialize is called"); + if let Err(e) = acc.read().await.reroute_msg(msg).await { + warn!("Could not reroute msg to Consensus: {}", e); + } + } } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 7f2295809..97522c5c9 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -41,31 +41,68 @@ const DEFAULT_HOPS_LIMIT: u16 = 16; type SharedHashSet = Arc>>; +/// `PresyncInfo` holds information about the presync process, which is used to +/// verify if a peer has valid block successors before switching the system into +/// out-of-sync mode. +/// This struct helps safeguard against syncing with malicious peers by tracking +/// and validating block continuity from a specific peer within a given +/// timeframe. #[derive(Clone)] struct PresyncInfo { + // The address of the peer that we're syncing with. This helps to identify + // which peer the presync information is associated with. peer_addr: SocketAddr, - start_height: u64, - target_blk: Block, + + // The current tip height of our local blockchain. This is the height + // of the highest block we know of before starting the presync process. + tip_height: u64, + + // The remote height provided by the peer, which indicates the height of + // the last block the peer knows of. This is used to compare and determine + // whether the peer is ahead of us and if we're out of sync. + remote_height: u64, + + // A timestamp indicating when the presync process should expire. If the + // peer doesn't provide valid blocks by this time, the presync is + // considered failed. expiry: Instant, + + // A pool of blocks that are collected from the peer during the presync + // process. These blocks will be validated to ensure that the peer has + // valid successors for the current tip. + pool: Vec, } impl PresyncInfo { const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); - fn new( + + fn from_block( + peer_addr: SocketAddr, + remote_block: Block, + tip_height: u64, + ) -> Self { + let remote_height = remote_block.header().height; + let mut info = Self::from_height(peer_addr, remote_height, tip_height); + info.pool.push(remote_block); + info + } + + fn from_height( peer_addr: SocketAddr, - target_blk: Block, - start_height: u64, + remote_height: u64, + tip_height: u64, ) -> Self { Self { peer_addr, - target_blk, + remote_height, expiry: Instant::now().checked_add(Self::DEFAULT_TIMEOUT).unwrap(), - start_height, + tip_height, + pool: vec![], } } fn start_height(&self) -> u64 { - self.start_height + self.tip_height } } @@ -116,6 +153,17 @@ impl SimpleFSM { self.acc.write().await.restart_consensus().await; } + pub async fn on_quorum( + &mut self, + quorum: &Quorum, + metadata: Option<&Metadata>, + ) { + match &mut self.curr { + State::OutOfSync(oos) => oos.on_quorum(quorum).await, + State::InSync(is) => is.on_quorum(quorum, metadata).await, + } + } + /// Handles an event of a block occurrence. /// /// A block event could originate from either local consensus execution, a @@ -148,7 +196,7 @@ impl SimpleFSM { if let Some(blk) = blk.as_ref() { let fsm_res = match &mut self.curr { State::InSync(ref mut curr) => { - if let Some((target_block, peer_addr)) = + if let Some(presync) = curr.on_block_event(blk, metadata).await? { // Transition from InSync to OutOfSync state @@ -160,13 +208,13 @@ impl SimpleFSM { self.network.clone(), ) .await; - next.on_entering(target_block, peer_addr).await; + next.on_entering(presync).await; self.curr = State::OutOfSync(next); } anyhow::Ok(()) } State::OutOfSync(ref mut curr) => { - if curr.on_block_event(blk, metadata).await? { + if curr.on_block_event(blk).await? { // Transition from OutOfSync to InSync state curr.on_exiting().await; @@ -498,7 +546,50 @@ impl InSyncImpl { } /// performed when exiting the state - async fn on_exiting(&mut self) {} + async fn on_exiting(&mut self) { + self.presync = None + } + + async fn on_quorum( + &mut self, + remote_quorum: &Quorum, + metadata: Option<&Metadata>, + ) { + // If remote_blk.height > tip.height+1, we might be out of sync. + // Before switching to outOfSync mode and download missing blocks, + // we ensure that the peer has a valid successor of tip + if let Some(peer_addr) = metadata.map(|m| m.src_addr) { + // If there's no active presync process, we proceed with validation + if self.presync.is_none() { + let tip_height = self.acc.read().await.get_curr_height().await; + // We use the quorum's previous block, to be sure that network + // already have the full block available + let remote_height = remote_quorum.header.round - 1; + // Don't compare with `= tip + 1` because that's supposed to be + // handled by the InSync + if remote_height > tip_height + 1 { + // Initialize the presync process, storing metadata about + // the peer, the remote height, and our current tip height. + // This serves as a safeguard to avoid switching into + // out-of-sync mode without verifying the peer's + // information. + self.presync = Some(PresyncInfo::from_height( + peer_addr, + remote_height, + tip_height, + )); + + // Request the block immediately following our tip height + // from the peer to verify if the peer has a valid + // continuation of our chain. + // If the requested block (from the same peer) is accepted + // by the on_block_event before the presync timer expires, + // we will transition into out_of_sync mode. + self.request_block(tip_height + 1, peer_addr).await; + } + } + } + } /// Return Some if there is the need to switch to OutOfSync mode. /// This way the sync-up procedure to download all missing blocks from the @@ -507,18 +598,19 @@ impl InSyncImpl { &mut self, remote_blk: &Block, metadata: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let mut acc = self.acc.write().await; let tip_header = acc.tip_header().await; + let tip_height = tip_header.height; let remote_header = remote_blk.header(); let remote_height = remote_header.height; // If we already accepted a block with the same height as remote_blk, // check if remote_blk has higher priority. If so, we revert to its // prev_block, and accept it as the new tip - if remote_height <= tip_header.height { + if remote_height <= tip_height { // Ensure the block is different from what we have in our chain - if remote_height == tip_header.height { + if remote_height == tip_height { if remote_header.hash == tip_header.hash { return Ok(None); } @@ -559,7 +651,7 @@ impl InSyncImpl { } // Fetch the chain block at the same height as remote_blk - let local_blk = if remote_height == tip_header.height { + let local_blk = if remote_height == tip_height { acc.tip.read().await.inner().clone() } else { acc.db @@ -667,7 +759,7 @@ impl InSyncImpl { } // If remote_blk is a successor of our tip, we try to accept it - if remote_height == tip_header.height + 1 { + if remote_height == tip_height + 1 { let finalized = acc.try_accept_block(remote_blk, true).await?; // On first final block accepted while we're inSync, clear @@ -679,15 +771,16 @@ impl InSyncImpl { // If the accepted block is the one requested to presync peer, // switch to OutOfSync/Syncing mode if let Some(metadata) = &metadata { - if let Some(presync) = &mut self.presync { - if metadata.src_addr == presync.peer_addr - && remote_height == presync.start_height() + 1 - { - let res = - (presync.target_blk.clone(), presync.peer_addr); - self.presync = None; - return Ok(Some(res)); - } + let same = self + .presync + .as_ref() + .map(|presync| { + metadata.src_addr == presync.peer_addr + && remote_height == presync.start_height() + 1 + }) + .unwrap_or_default(); + if same { + return Ok(self.presync.take()); } } @@ -697,44 +790,43 @@ impl InSyncImpl { // If remote_blk.height > tip.height+1, we might be out of sync. // Before switching to outOfSync mode and download missing blocks, // we ensure that the peer has a valid successor of tip - if let Some(metadata) = &metadata { - if self.presync.is_none() { - self.presync = Some(PresyncInfo::new( - metadata.src_addr, - remote_blk.clone(), - tip_header.height, - )); + if let Some(peer_addr) = metadata.map(|m| m.src_addr) { + match self.presync.as_mut() { + // If there's no active presync process, we proceed with + // validation + None => { + self.presync = Some(PresyncInfo::from_block( + peer_addr, + remote_blk.clone(), + tip_height, + )); + + self.request_block(tip_height + 1, peer_addr).await; + } + // If there's an active presync process, we add the received + // block to the pool so to process it when the sync procedure + // will start + Some(pre) => { + if pre.peer_addr == peer_addr { + pre.pool.push(remote_blk.clone()) + } + } } - - Self::request_block_by_height( - &self.network, - tip_header.height + 1, - metadata.src_addr, - ) - .await; } Ok(None) } /// Requests a block by height from a `peer_addr` - async fn request_block_by_height( - network: &Arc>, - height: u64, - peer_addr: SocketAddr, - ) { + async fn request_block(&self, height: u64, peer_addr: SocketAddr) { + let network = self.network.read().await; let mut inv = Inv::new(1); inv.add_block_from_height(height); - let this_peer = *network.read().await.public_addr(); + let this_peer = *network.public_addr(); let req = GetResource::new(inv, Some(this_peer), u64::MAX, 1); debug!(event = "request block by height", ?req, ?peer_addr); - if let Err(err) = network - .read() - .await - .send_to_peer(req.into(), peer_addr) - .await - { + if let Err(err) = network.send_to_peer(req.into(), peer_addr).await { warn!("could not request block {err}") } } diff --git a/node/src/chain/fsm/outofsync.rs b/node/src/chain/fsm/outofsync.rs index 9d358d643..07ef17038 100644 --- a/node/src/chain/fsm/outofsync.rs +++ b/node/src/chain/fsm/outofsync.rs @@ -13,12 +13,13 @@ use tokio::sync::RwLock; use tracing::{debug, info, warn}; use node_data::ledger::Block; -use node_data::message::payload::{GetResource, Inv}; -use node_data::message::Metadata; +use node_data::message::payload::{GetResource, Inv, Quorum}; use crate::chain::acceptor::Acceptor; use crate::{database, vm, Network}; +use super::PresyncInfo; + const MAX_POOL_BLOCKS_SIZE: usize = 1000; const MAX_BLOCKS_TO_REQUEST: u64 = 100; const SYNC_TIMEOUT: Duration = Duration::from_secs(5); @@ -176,24 +177,24 @@ impl attempts: 3, } } + /// Performed when entering the OutOfSync state /// /// Handles the logic for entering the out-of-sync state. Sets the target /// block range, adds the `target_block` to the pool, updates the /// `remote_peer` address, and starts to request missing blocks - pub async fn on_entering( - &mut self, - target_block: Block, - peer_addr: SocketAddr, - ) { + pub async fn on_entering(&mut self, presync: PresyncInfo) { + let peer_addr = presync.peer_addr; + let pool = presync.pool; let curr_height = self.acc.read().await.get_curr_height().await; - self.range = (curr_height + 1, target_block.header().height); + self.range = (curr_height + 1, presync.remote_height); // add target_block to the pool - let key = target_block.header().height; self.drain_pool().await; - self.pool.insert(key, target_block); + for b in &pool { + self.pool.insert(b.header().height, b.clone()); + } self.remote_peer = peer_addr; if let Some(last_request) = self.request_pool_missing_blocks().await { @@ -201,7 +202,10 @@ impl } let (from, to) = &self.range; - info!(event = "entering out-of-sync", from, to, ?peer_addr); + info!(event = "entering", from, to, ?peer_addr); + for (_, b) in self.pool.clone() { + let _ = self.on_block_event(&b).await; + } } /// performed when exiting the state @@ -216,6 +220,19 @@ impl self.pool.retain(|h, _| h >= &curr_height); } + pub async fn on_quorum(&mut self, quorum: &Quorum) { + let prev_quorum_height = quorum.header.round - 1; + if self.range.1 < prev_quorum_height { + debug!( + event = "update sync target due to quorum", + prev = self.range.1, + new = prev_quorum_height, + ); + self.range.1 = prev_quorum_height; + self.request_pool_missing_blocks().await; + } + } + /// Processes incoming blocks during the out-of-sync state. Determines /// whether a block should be accepted, added to the pool, or skipped. /// Handles consecutive block acceptance, pool draining, and state @@ -225,7 +242,6 @@ impl pub async fn on_block_event( &mut self, blk: &Block, - metadata: Option, ) -> anyhow::Result { let mut acc = self.acc.write().await; let block_height = blk.header().height; @@ -247,7 +263,6 @@ impl event = "update sync target", prev = self.range.1, new = block_height, - mode = "out_of_sync" ); self.range.1 = block_height } @@ -255,33 +270,27 @@ impl // Try accepting consecutive block if block_height == current_height + 1 { acc.try_accept_block(blk, false).await?; + // reset expiry_time only if we receive a valid block + self.start_time = SystemTime::now(); debug!( event = "accepted block", - block_height = block_height, + block_height, last_request = self.last_request, - mode = "out_of_sync" ); self.range.0 = block_height + 1; - if let Some(metadata) = &metadata { - if metadata.src_addr == self.remote_peer { - // reset expiry_time only if we receive a valid block from - // the syncing peer. - self.start_time = SystemTime::now(); - } - } - // Try to accept other consecutive blocks from the pool, if // available for height in self.range.0..=self.range.1 { if let Some(blk) = self.pool.get(&height) { acc.try_accept_block(blk, false).await?; + // reset expiry_time only if we receive a valid block + self.start_time = SystemTime::now(); self.range.0 += 1; debug!( - event = "accepting next block", + event = "accepted next block", block_height = height, last_request = self.last_request, - mode = "out_of_sync" ); } else { // This means we accepted a block and the next block @@ -308,17 +317,12 @@ impl event = "pool drain", pool_len = self.pool.len(), last_request = self.last_request, - mode = "out_of_sync" ); let tip = acc.get_curr_height().await; // Check target height is reached if tip >= self.range.1 { - debug!( - event = "sync target reached", - height = tip, - mode = "out_of_sync" - ); + debug!(event = "sync target reached", height = tip); self.pool.clear(); // Block sync-up procedure manages to download all requested @@ -337,9 +341,7 @@ impl if self.pool.contains_key(&block_height) { debug!( event = "block skipped (already present)", - block_height, - pool_len, - mode = "out_of_sync" + block_height, pool_len, ); return Ok(false); } @@ -359,19 +361,11 @@ impl if stored_height > block_height { debug!( event = "block removed", - block_height, - stored_height, - pool_len, - mode = "out_of_sync" + block_height, stored_height, pool_len, ); entry.remove(); } else { - debug!( - event = "block skipped", - block_height, - pool_len, - mode = "out_of_sync" - ); + debug!(event = "block skipped", block_height, pool_len); return Ok(false); } } @@ -384,7 +378,6 @@ impl event = "block saved", block_height, pool_len = self.pool.len(), - mode = "out_of_sync" ); Ok(false) @@ -397,11 +390,7 @@ impl pub async fn on_heartbeat(&mut self) -> anyhow::Result { if self.is_timeout_expired() { if self.attempts == 0 { - debug!( - event = "out_of_sync timer expired", - attempts = self.attempts, - mode = "out_of_sync" - ); + debug!(event = "timer expired", attempts = self.attempts); // sync-up has timed out, recover consensus task self.acc.write().await.restart_consensus().await; @@ -429,7 +418,7 @@ impl let get_resource = GetResource::new(inv, Some(self.local_peer), u64::MAX, 1); - debug!(event = "request block", height, mode = "out_of_sync",); + debug!(event = "request block", height); if let Err(e) = self .network .read() @@ -437,11 +426,7 @@ impl .send_to_alive_peers(get_resource.into(), 2) .await { - warn!( - event = "Unable to request missing block", - ?e, - mode = "out_of_sync", - ); + warn!(event = "Unable to request missing block", ?e); } } @@ -471,7 +456,6 @@ impl debug!( event = "request blocks", target = last_request.unwrap_or_default(), - mode = "out_of_sync", ); let get_resource = @@ -484,11 +468,7 @@ impl .send_to_peer(get_resource.into(), self.remote_peer) .await { - debug!( - event = "Unable to request missing blocks", - ?e, - mode = "out_of_sync", - ); + debug!(event = "Unable to request missing blocks", ?e); warn!("Unable to request missing blocks {e}"); return None; }