Skip to content

Commit

Permalink
feat(sync): sync pending data (#1175)
Browse files Browse the repository at this point in the history
* feat(sync): sync pending data

* CR fix
  • Loading branch information
DvirYo-starkware authored Sep 28, 2023
1 parent 56f4eff commit 0debad0
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 13 deletions.
20 changes: 17 additions & 3 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, StorageReader, StorageWriter};
use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource};
use papyrus_sync::sources::central::{CentralError, CentralSource};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tracing::metadata::LevelFilter;
use tracing::{error, info};
Expand All @@ -40,15 +42,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData::default()));

// JSON-RPC server.
let (_, server_handle) =
run_server(&config.rpc, shared_highest_block.clone(), storage_reader.clone(), VERSION_FULL)
.await?;
let server_handle_future = tokio::spawn(server_handle.stopped());

// Sync task.
let sync_future =
run_sync(config, shared_highest_block, storage_reader.clone(), storage_writer);
let sync_future = run_sync(
config,
shared_highest_block,
pending_data,
storage_reader.clone(),
storage_writer,
);
let sync_handle = tokio::spawn(sync_future);

tokio::select! {
Expand All @@ -71,19 +80,24 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
async fn run_sync(
config: NodeConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
storage_reader: StorageReader,
storage_writer: StorageWriter,
) -> Result<(), StateSyncError> {
let Some(sync_config) = config.sync else { return Ok(()) };
let central_source =
CentralSource::new(config.central, VERSION_FULL, storage_reader.clone())
CentralSource::new(config.central.clone(), VERSION_FULL, storage_reader.clone())
.map_err(CentralError::ClientCreation)?;
let pending_source = PendingSource::new(config.central, VERSION_FULL)
.map_err(CentralError::ClientCreation)?;
let base_layer_source = EthereumBaseLayerSource::new(config.base_layer)
.map_err(|e| BaseLayerSourceError::BaseLayerSourceCreationError(e.to_string()))?;
let mut sync = StateSync::new(
sync_config,
shared_highest_block,
pending_data,
central_source,
pending_source,
base_layer_source,
storage_reader.clone(),
storage_writer,
Expand Down
84 changes: 75 additions & 9 deletions crates/papyrus_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ use starknet_api::block::{Block, BlockHash, BlockNumber};
use starknet_api::core::{ClassHash, CompiledClassHash};
use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass;
use starknet_api::state::StateDiff;
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::sources::base_layer::{BaseLayerSourceTrait, EthereumBaseLayerSource};
use crate::sources::central::{CentralError, CentralSource, CentralSourceTrait};
use crate::sources::pending::{PendingError, PendingSource, PendingSourceTrait};

// TODO(dvir): add to config.
// Sleep duration between polling for pending data.
const PENDING_SLEEP_DURATION: Duration = Duration::from_millis(500);

// Sleep duration, in seconds, between sync progress checks.
const SLEEP_TIME_SYNC_PROGRESS: Duration = Duration::from_secs(300);
Expand Down Expand Up @@ -110,11 +116,14 @@ impl Default for SyncConfig {
// memory.
pub struct GenericStateSync<
TCentralSource: CentralSourceTrait + Sync + Send,
TPendingSource: PendingSourceTrait + Sync + Send,
TBaseLayerSource: BaseLayerSourceTrait + Sync + Send,
> {
config: SyncConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
central_source: Arc<TCentralSource>,
pending_source: Arc<TPendingSource>,
base_layer_source: Arc<TBaseLayerSource>,
reader: StorageReader,
writer: StorageWriter,
Expand All @@ -130,6 +139,8 @@ pub enum StateSyncError {
StorageError(#[from] StorageError),
#[error(transparent)]
CentralSourceError(#[from] CentralError),
#[error(transparent)]
PendingSourceError(#[from] PendingError),
#[error(
"Parent block hash of block {block_number} is not consistent with the stored block. \
Expected {expected_parent_block_hash}, found {stored_parent_block_hash}."
Expand Down Expand Up @@ -191,8 +202,9 @@ pub enum SyncEvent {

impl<
TCentralSource: CentralSourceTrait + Sync + Send + 'static,
TPendingSource: PendingSourceTrait + Sync + Send + 'static,
TBaseLayerSource: BaseLayerSourceTrait + Sync + Send,
> GenericStateSync<TCentralSource, TBaseLayerSource>
> GenericStateSync<TCentralSource, TPendingSource, TBaseLayerSource>
{
pub async fn run(&mut self) -> StateSyncResult {
info!("State sync started.");
Expand Down Expand Up @@ -259,8 +271,11 @@ impl<
let block_stream = stream_new_blocks(
self.reader.clone(),
self.central_source.clone(),
self.pending_source.clone(),
self.shared_highest_block.clone(),
self.pending_data.clone(),
self.config.block_propagation_sleep_duration,
PENDING_SLEEP_DURATION,
self.config.blocks_max_stream_size,
)
.fuse();
Expand Down Expand Up @@ -518,7 +533,7 @@ impl<
let mut last_block_in_storage = header_marker.prev();
while let Some(block_number) = last_block_in_storage {
if self.should_revert_block(block_number).await? {
self.revert_block(block_number)?;
self.revert_block(block_number).await?;
last_block_in_storage = block_number.prev();
} else {
break;
Expand All @@ -531,10 +546,11 @@ impl<
// Deletes the block data from the storage, moving it to the ommer tables.
#[allow(clippy::expect_fun_call)]
#[instrument(skip(self), level = "debug", err)]
fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult {
async fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult {
debug!("Reverting block.");
let mut txn = self.writer.begin_rw_txn()?;
*self.pending_data.write().await = PendingData::default();

let mut txn = self.writer.begin_rw_txn()?;
txn = txn.try_revert_base_layer_marker(block_number)?;
let res = txn.revert_header(block_number)?;
txn = res.0;
Expand Down Expand Up @@ -618,12 +634,19 @@ impl<
}
}
}

fn stream_new_blocks<TCentralSource: CentralSourceTrait + Sync + Send>(
// TODO(dvir): consider gathering in a single pending argument instead.
#[allow(clippy::too_many_arguments)]
fn stream_new_blocks<
TCentralSource: CentralSourceTrait + Sync + Send,
TPendingSource: PendingSourceTrait + Sync + Send,
>(
reader: StorageReader,
central_source: Arc<TCentralSource>,
pending_source: Arc<TPendingSource>,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
block_propagation_sleep_duration: Duration,
pending_sleep_duration: Duration,
max_stream_size: u32,
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
try_stream! {
Expand All @@ -638,8 +661,16 @@ fn stream_new_blocks<TCentralSource: CentralSourceTrait + Sync + Send>(
papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER, central_block_marker.0 as f64
);
if header_marker == central_block_marker {
debug!("Blocks syncing reached the last known block, waiting for blockchain to advance.");
tokio::time::sleep(block_propagation_sleep_duration).await;
// Only if the node have the last block and state (without casms), sync pending data.
if reader.begin_ro_txn()?.get_state_marker()? == header_marker{
// Here and when reverting a block those are the only places we update the pending data.
debug!("Start polling for pending data.");
sync_pending_data(reader.clone(), pending_source.clone(), pending_data.clone(), pending_sleep_duration).await?;
}
else{
debug!("Blocks syncing reached the last known block, waiting for blockchain to advance.");
tokio::time::sleep(block_propagation_sleep_duration).await;
};
continue;
}
let up_to = min(central_block_marker, BlockNumber(header_marker.0 + max_stream_size as u64));
Expand Down Expand Up @@ -709,21 +740,26 @@ pub fn sort_state_diff(diff: &mut StateDiff) {
}
}

pub type StateSync = GenericStateSync<CentralSource, EthereumBaseLayerSource>;
pub type StateSync = GenericStateSync<CentralSource, PendingSource, EthereumBaseLayerSource>;

impl StateSync {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: SyncConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
central_source: CentralSource,
pending_source: PendingSource,
base_layer_source: EthereumBaseLayerSource,
reader: StorageReader,
writer: StorageWriter,
) -> Self {
Self {
config,
shared_highest_block,
pending_data,
central_source: Arc::new(central_source),
pending_source: Arc::new(pending_source),
base_layer_source: Arc::new(base_layer_source),
reader,
writer,
Expand Down Expand Up @@ -842,3 +878,33 @@ fn check_sync_progress(
}
}
}

// Update the pending data and return when a new block is discovered.
async fn sync_pending_data<TPendingSource: PendingSourceTrait + Sync + Send>(
reader: StorageReader,
pending_source: Arc<TPendingSource>,
pending_data: Arc<RwLock<PendingData>>,
sleep_duration: Duration,
) -> Result<(), StateSyncError> {
let txn = reader.begin_ro_txn()?;
let header_marker = txn.get_header_marker()?;
// TODO: Consider extracting this functionality to different а function.
let latest_block_hash = match header_marker {
// TODO: make sure this is the correct value for the genesis's parent block in all the
// environments.
BlockNumber(0) => BlockHash::default(),
_ => {
txn.get_block_header(header_marker.prev().expect("Header marker isn't zero."))?
.expect("Block before the header marker must have header in the database.")
.block_hash
}
};
loop {
let new_pending_data = pending_source.get_pending_data().await?;
if new_pending_data.block.parent_block_hash != latest_block_hash {
return Ok(());
};
*pending_data.write().await = new_pending_data;
tokio::time::sleep(sleep_duration).await;
}
}
8 changes: 8 additions & 0 deletions crates/papyrus_sync/src/sources/central_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use starknet_api::block::{Block, BlockHash, BlockHeader, BlockNumber};
use starknet_api::hash::StarkFelt;
use starknet_api::stark_felt;
use starknet_api::state::StateDiff;
use starknet_client::reader::PendingData;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error};

use super::pending::MockPendingSourceTrait;
use crate::sources::base_layer::{BaseLayerSourceTrait, MockBaseLayerSourceTrait};
use crate::sources::central::{
BlocksStream,
Expand Down Expand Up @@ -89,6 +91,10 @@ async fn run_sync(
central: impl CentralSourceTrait + Send + Sync + 'static,
base_layer: impl BaseLayerSourceTrait + Send + Sync,
) -> StateSyncResult {
// Mock to the pending source that always returns the default pending data.
let mut pending_source = MockPendingSourceTrait::new();
pending_source.expect_get_pending_data().returning(|| Ok(PendingData::default()));

let mut state_sync = GenericStateSync {
config: SyncConfig {
block_propagation_sleep_duration: SYNC_SLEEP_DURATION,
Expand All @@ -98,7 +104,9 @@ async fn run_sync(
state_updates_max_stream_size: STREAM_SIZE,
},
shared_highest_block: Arc::new(RwLock::new(None)),
pending_data: Arc::new(RwLock::new(PendingData::default())),
central_source: Arc::new(central),
pending_source: Arc::new(pending_source),
base_layer_source: Arc::new(base_layer),
reader,
writer,
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod base_layer;
pub mod central;
#[cfg(test)]
mod central_sync_test;
pub mod pending;
74 changes: 74 additions & 0 deletions crates/papyrus_sync/src/sources/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#[cfg(test)]
#[path = "pending_test.rs"]
mod pending_test;

use std::sync::Arc;

use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;
use starknet_client::reader::{
PendingData,
ReaderClientError,
StarknetFeederGatewayClient,
StarknetReader,
};
use starknet_client::ClientCreationError;
use tracing::{debug, trace};

// TODO(dvir): add pending config.
use super::central::CentralSourceConfig;

pub struct GenericPendingSource<TStarknetClient: StarknetReader + Send + Sync> {
pub starknet_client: Arc<TStarknetClient>,
}

#[derive(thiserror::Error, Debug)]
pub enum PendingError {
#[error(transparent)]
ClientCreation(#[from] ClientCreationError),
#[error(transparent)]
ClientError(#[from] Arc<ReaderClientError>),
#[error("Pending block not found")]
PendingBlockNotFound,
}
#[cfg_attr(test, automock)]
#[async_trait]
pub trait PendingSourceTrait {
async fn get_pending_data(&self) -> Result<PendingData, PendingError>;
}

#[async_trait]
impl<TStarknetClient: StarknetReader + Send + Sync + 'static> PendingSourceTrait
for GenericPendingSource<TStarknetClient>
{
async fn get_pending_data(&self) -> Result<PendingData, PendingError> {
match self.starknet_client.pending_data().await {
Ok(Some(pending_data)) => {
debug!("Received new pending data.");
trace!("Pending data: {pending_data:#?}.");
Ok(pending_data)
}
Ok(None) => Err(PendingError::PendingBlockNotFound),
Err(err) => Err(PendingError::ClientError(Arc::new(err))),
}
}
}

pub type PendingSource = GenericPendingSource<StarknetFeederGatewayClient>;

impl PendingSource {
pub fn new(
config: CentralSourceConfig,
node_version: &'static str,
) -> Result<PendingSource, ClientCreationError> {
let starknet_client = StarknetFeederGatewayClient::new(
&config.url,
config.http_headers,
node_version,
config.retry_config,
)?;

Ok(PendingSource { starknet_client: Arc::new(starknet_client) })
}
}
20 changes: 20 additions & 0 deletions crates/papyrus_sync/src/sources/pending_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::sync::Arc;

use pretty_assertions::assert_eq;
use starknet_client::reader::{MockStarknetReader, PendingData};

use crate::sources::pending::{GenericPendingSource, PendingSourceTrait};

#[tokio::test]
async fn get_pending_data() {
let mut client_mock = MockStarknetReader::new();

// We need to perform all the mocks before moving the mock into pending_source.
// TODO(dvir): use pending_data which isn't the default.
client_mock.expect_pending_data().times(1).returning(|| Ok(Some(PendingData::default())));

let pending_source = GenericPendingSource { starknet_client: Arc::new(client_mock) };

let pending_data = pending_source.get_pending_data().await.unwrap();
assert_eq!(pending_data, PendingData::default());
}
Loading

0 comments on commit 0debad0

Please sign in to comment.