Skip to content

Commit

Permalink
node: improve readability of the PreSync struct
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Oct 24, 2024
1 parent 6065431 commit ab6287f
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 57 deletions.
144 changes: 88 additions & 56 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,68 @@ const DEFAULT_HOPS_LIMIT: u16 = 16;

type SharedHashSet = Arc<RwLock<HashSet<[u8; 32]>>>;

/// `PresyncInfo` holds information about the presync process, which is used to
/// verify if a peer has valid block successors before switching the system into
/// out-of-sync mode.
/// This struct helps safeguard against syncing with malicious peers by tracking
/// and validating block continuity from a specific peer within a given
/// timeframe.
#[derive(Clone)]
struct PresyncInfo {
// The address of the peer that we're syncing with. This helps to identify
// which peer the presync information is associated with.
peer_addr: SocketAddr,
start_height: u64,
target_height: u64,

// The current tip height of our local blockchain. This is the height
// of the highest block we know of before starting the presync process.
tip_height: u64,

// The remote height provided by the peer, which indicates the height of
// the last block the peer knows of. This is used to compare and determine
// whether the peer is ahead of us and if we're out of sync.
remote_height: u64,

// A timestamp indicating when the presync process should expire. If the
// peer doesn't provide valid blocks by this time, the presync is
// considered failed.
expiry: Instant,

// A pool of blocks that are collected from the peer during the presync
// process. These blocks will be validated to ensure that the peer has
// valid successors for the current tip.
pool: Vec<Block>,
}

impl PresyncInfo {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
fn new(

fn from_block(
peer_addr: SocketAddr,
target_height: u64,
target_blk: Option<Block>,
start_height: u64,
remote_block: Block,
tip_height: u64,
) -> Self {
let remote_height = remote_block.header().height;
let mut info = Self::from_height(peer_addr, remote_height, tip_height);
info.pool.push(remote_block);
info
}

fn from_height(
peer_addr: SocketAddr,
remote_height: u64,
tip_height: u64,
) -> Self {
let mut pool = vec![];
if let Some(blk) = target_blk {
pool.push(blk);
}
Self {
peer_addr,
target_height,
remote_height,
expiry: Instant::now().checked_add(Self::DEFAULT_TIMEOUT).unwrap(),
start_height,
pool,
tip_height,
pool: vec![],
}
}

fn start_height(&self) -> u64 {
self.start_height
self.tip_height
}
}

Expand Down Expand Up @@ -528,24 +558,34 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(metadata) = metadata {
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
// If there's no active presync process, we proceed with validation
if self.presync.is_none() {
let tip_height = self.acc.read().await.get_curr_height().await;
let prev_height_remote = remote_quorum.header.round - 1;
if prev_height_remote > tip_height + 1 {
self.presync = Some(PresyncInfo::new(
metadata.src_addr,
prev_height_remote,
None,
// We use the quorum's previous block, to be sure that network
// already have the full block available
let remote_height = remote_quorum.header.round - 1;
// Don't compare with `= tip + 1` because that's supposed to be
// handled by the InSync
if remote_height > tip_height + 1 {
// Initialize the presync process, storing metadata about
// the peer, the remote height, and our current tip height.
// This serves as a safeguard to avoid switching into
// out-of-sync mode without verifying the peer's
// information.
self.presync = Some(PresyncInfo::from_height(
peer_addr,
remote_height,
tip_height,
));

Self::request_block_by_height(
&self.network,
tip_height + 1,
metadata.src_addr,
)
.await;
// Request the block immediately following our tip height
// from the peer to verify if the peer has a valid
// continuation of our chain.
// If the requested block (from the same peer) is accepted
// by the on_block_event before the presync timer expires,
// we will transition into out_of_sync mode.
self.request_block(tip_height + 1, peer_addr).await;
}
}
}
Expand All @@ -561,15 +601,16 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
) -> anyhow::Result<Option<PresyncInfo>> {
let mut acc = self.acc.write().await;
let tip_header = acc.tip_header().await;
let tip_height = tip_header.height;
let remote_header = remote_blk.header();
let remote_height = remote_header.height;

// If we already accepted a block with the same height as remote_blk,
// check if remote_blk has higher priority. If so, we revert to its
// prev_block, and accept it as the new tip
if remote_height <= tip_header.height {
if remote_height <= tip_height {
// Ensure the block is different from what we have in our chain
if remote_height == tip_header.height {
if remote_height == tip_height {
if remote_header.hash == tip_header.hash {
return Ok(None);
}
Expand Down Expand Up @@ -610,7 +651,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

// Fetch the chain block at the same height as remote_blk
let local_blk = if remote_height == tip_header.height {
let local_blk = if remote_height == tip_height {
acc.tip.read().await.inner().clone()
} else {
acc.db
Expand Down Expand Up @@ -718,7 +759,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

// If remote_blk is a successor of our tip, we try to accept it
if remote_height == tip_header.height + 1 {
if remote_height == tip_height + 1 {
let finalized = acc.try_accept_block(remote_blk, true).await?;

// On first final block accepted while we're inSync, clear
Expand Down Expand Up @@ -749,25 +790,24 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
// If remote_blk.height > tip.height+1, we might be out of sync.
// Before switching to outOfSync mode and download missing blocks,
// we ensure that the peer has a valid successor of tip
if let Some(metadata) = &metadata {
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
match self.presync.as_mut() {
// If there's no active presync process, we proceed with
// validation
None => {
self.presync = Some(PresyncInfo::new(
metadata.src_addr,
remote_blk.header().height,
Some(remote_blk.clone()),
tip_header.height,
self.presync = Some(PresyncInfo::from_block(
peer_addr,
remote_blk.clone(),
tip_height,
));

Self::request_block_by_height(
&self.network,
tip_header.height + 1,
metadata.src_addr,
)
.await;
self.request_block(tip_height + 1, peer_addr).await;
}
// If there's an active presync process, we add the received
// block to the pool so to process it when the sync procedure
// will start
Some(pre) => {
if pre.peer_addr == metadata.src_addr {
if pre.peer_addr == peer_addr {
pre.pool.push(remote_blk.clone())
}
}
Expand All @@ -778,23 +818,15 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

/// Requests a block by height from a `peer_addr`
async fn request_block_by_height(
network: &Arc<RwLock<N>>,
height: u64,
peer_addr: SocketAddr,
) {
async fn request_block(&self, height: u64, peer_addr: SocketAddr) {
let network = self.network.read().await;
let mut inv = Inv::new(1);
inv.add_block_from_height(height);
let this_peer = *network.read().await.public_addr();
let this_peer = *network.public_addr();
let req = GetResource::new(inv, Some(this_peer), u64::MAX, 1);
debug!(event = "request block by height", ?req, ?peer_addr);

if let Err(err) = network
.read()
.await
.send_to_peer(req.into(), peer_addr)
.await
{
if let Err(err) = network.send_to_peer(req.into(), peer_addr).await {
warn!("could not request block {err}")
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/chain/fsm/outofsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
let pool = presync.pool;
let curr_height = self.acc.read().await.get_curr_height().await;

self.range = (curr_height + 1, presync.target_height);
self.range = (curr_height + 1, presync.remote_height);

// add target_block to the pool
self.drain_pool().await;
Expand Down

0 comments on commit ab6287f

Please sign in to comment.