diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 570ea66f6b..6f4bbad8e0 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -14,7 +14,9 @@ use papyrus_rpc::run_server; use papyrus_storage::{open_storage, StorageReader, StorageWriter}; use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource}; use papyrus_sync::sources::central::{CentralError, CentralSource}; +use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, StateSyncError}; +use starknet_client::reader::PendingData; use tokio::sync::RwLock; use tracing::metadata::LevelFilter; use tracing::{error, info}; @@ -40,6 +42,8 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // The sync is the only writer of the syncing state. let shared_highest_block = Arc::new(RwLock::new(None)); + let pending_data = Arc::new(RwLock::new(PendingData::default())); + // JSON-RPC server. let (_, server_handle) = run_server(&config.rpc, shared_highest_block.clone(), storage_reader.clone(), VERSION_FULL) @@ -47,8 +51,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let server_handle_future = tokio::spawn(server_handle.stopped()); // Sync task. - let sync_future = - run_sync(config, shared_highest_block, storage_reader.clone(), storage_writer); + let sync_future = run_sync( + config, + shared_highest_block, + pending_data, + storage_reader.clone(), + storage_writer, + ); let sync_handle = tokio::spawn(sync_future); tokio::select! { @@ -71,19 +80,24 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { async fn run_sync( config: NodeConfig, shared_highest_block: Arc>>, + pending_data: Arc>, storage_reader: StorageReader, storage_writer: StorageWriter, ) -> Result<(), StateSyncError> { let Some(sync_config) = config.sync else { return Ok(()) }; let central_source = - CentralSource::new(config.central, VERSION_FULL, storage_reader.clone()) + CentralSource::new(config.central.clone(), VERSION_FULL, storage_reader.clone()) .map_err(CentralError::ClientCreation)?; + let pending_source = PendingSource::new(config.central, VERSION_FULL) + .map_err(CentralError::ClientCreation)?; let base_layer_source = EthereumBaseLayerSource::new(config.base_layer) .map_err(|e| BaseLayerSourceError::BaseLayerSourceCreationError(e.to_string()))?; let mut sync = StateSync::new( sync_config, shared_highest_block, + pending_data, central_source, + pending_source, base_layer_source, storage_reader.clone(), storage_writer, diff --git a/crates/papyrus_sync/src/lib.rs b/crates/papyrus_sync/src/lib.rs index 9a77376001..d4990b257c 100644 --- a/crates/papyrus_sync/src/lib.rs +++ b/crates/papyrus_sync/src/lib.rs @@ -35,11 +35,17 @@ use starknet_api::block::{Block, BlockHash, BlockNumber}; use starknet_api::core::{ClassHash, CompiledClassHash}; use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass; use starknet_api::state::StateDiff; +use starknet_client::reader::PendingData; use tokio::sync::RwLock; use tracing::{debug, error, info, instrument, trace, warn}; use crate::sources::base_layer::{BaseLayerSourceTrait, EthereumBaseLayerSource}; use crate::sources::central::{CentralError, CentralSource, CentralSourceTrait}; +use crate::sources::pending::{PendingError, PendingSource, PendingSourceTrait}; + +// TODO(dvir): add to config. +// Sleep duration between polling for pending data. +const PENDING_SLEEP_DURATION: Duration = Duration::from_millis(500); // Sleep duration, in seconds, between sync progress checks. const SLEEP_TIME_SYNC_PROGRESS: Duration = Duration::from_secs(300); @@ -110,11 +116,14 @@ impl Default for SyncConfig { // memory. pub struct GenericStateSync< TCentralSource: CentralSourceTrait + Sync + Send, + TPendingSource: PendingSourceTrait + Sync + Send, TBaseLayerSource: BaseLayerSourceTrait + Sync + Send, > { config: SyncConfig, shared_highest_block: Arc>>, + pending_data: Arc>, central_source: Arc, + pending_source: Arc, base_layer_source: Arc, reader: StorageReader, writer: StorageWriter, @@ -130,6 +139,8 @@ pub enum StateSyncError { StorageError(#[from] StorageError), #[error(transparent)] CentralSourceError(#[from] CentralError), + #[error(transparent)] + PendingSourceError(#[from] PendingError), #[error( "Parent block hash of block {block_number} is not consistent with the stored block. \ Expected {expected_parent_block_hash}, found {stored_parent_block_hash}." @@ -191,8 +202,9 @@ pub enum SyncEvent { impl< TCentralSource: CentralSourceTrait + Sync + Send + 'static, + TPendingSource: PendingSourceTrait + Sync + Send + 'static, TBaseLayerSource: BaseLayerSourceTrait + Sync + Send, -> GenericStateSync +> GenericStateSync { pub async fn run(&mut self) -> StateSyncResult { info!("State sync started."); @@ -259,8 +271,11 @@ impl< let block_stream = stream_new_blocks( self.reader.clone(), self.central_source.clone(), + self.pending_source.clone(), self.shared_highest_block.clone(), + self.pending_data.clone(), self.config.block_propagation_sleep_duration, + PENDING_SLEEP_DURATION, self.config.blocks_max_stream_size, ) .fuse(); @@ -518,7 +533,7 @@ impl< let mut last_block_in_storage = header_marker.prev(); while let Some(block_number) = last_block_in_storage { if self.should_revert_block(block_number).await? { - self.revert_block(block_number)?; + self.revert_block(block_number).await?; last_block_in_storage = block_number.prev(); } else { break; @@ -531,10 +546,11 @@ impl< // Deletes the block data from the storage, moving it to the ommer tables. #[allow(clippy::expect_fun_call)] #[instrument(skip(self), level = "debug", err)] - fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult { + async fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult { debug!("Reverting block."); - let mut txn = self.writer.begin_rw_txn()?; + *self.pending_data.write().await = PendingData::default(); + let mut txn = self.writer.begin_rw_txn()?; txn = txn.try_revert_base_layer_marker(block_number)?; let res = txn.revert_header(block_number)?; txn = res.0; @@ -618,12 +634,19 @@ impl< } } } - -fn stream_new_blocks( +// TODO(dvir): consider gathering in a single pending argument instead. +#[allow(clippy::too_many_arguments)] +fn stream_new_blocks< + TCentralSource: CentralSourceTrait + Sync + Send, + TPendingSource: PendingSourceTrait + Sync + Send, +>( reader: StorageReader, central_source: Arc, + pending_source: Arc, shared_highest_block: Arc>>, + pending_data: Arc>, block_propagation_sleep_duration: Duration, + pending_sleep_duration: Duration, max_stream_size: u32, ) -> impl Stream> { try_stream! { @@ -638,8 +661,16 @@ fn stream_new_blocks( papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER, central_block_marker.0 as f64 ); if header_marker == central_block_marker { - debug!("Blocks syncing reached the last known block, waiting for blockchain to advance."); - tokio::time::sleep(block_propagation_sleep_duration).await; + // Only if the node have the last block and state (without casms), sync pending data. + if reader.begin_ro_txn()?.get_state_marker()? == header_marker{ + // Here and when reverting a block those are the only places we update the pending data. + debug!("Start polling for pending data."); + sync_pending_data(reader.clone(), pending_source.clone(), pending_data.clone(), pending_sleep_duration).await?; + } + else{ + debug!("Blocks syncing reached the last known block, waiting for blockchain to advance."); + tokio::time::sleep(block_propagation_sleep_duration).await; + }; continue; } let up_to = min(central_block_marker, BlockNumber(header_marker.0 + max_stream_size as u64)); @@ -709,13 +740,16 @@ pub fn sort_state_diff(diff: &mut StateDiff) { } } -pub type StateSync = GenericStateSync; +pub type StateSync = GenericStateSync; impl StateSync { + #[allow(clippy::too_many_arguments)] pub fn new( config: SyncConfig, shared_highest_block: Arc>>, + pending_data: Arc>, central_source: CentralSource, + pending_source: PendingSource, base_layer_source: EthereumBaseLayerSource, reader: StorageReader, writer: StorageWriter, @@ -723,7 +757,9 @@ impl StateSync { Self { config, shared_highest_block, + pending_data, central_source: Arc::new(central_source), + pending_source: Arc::new(pending_source), base_layer_source: Arc::new(base_layer_source), reader, writer, @@ -842,3 +878,33 @@ fn check_sync_progress( } } } + +// Update the pending data and return when a new block is discovered. +async fn sync_pending_data( + reader: StorageReader, + pending_source: Arc, + pending_data: Arc>, + sleep_duration: Duration, +) -> Result<(), StateSyncError> { + let txn = reader.begin_ro_txn()?; + let header_marker = txn.get_header_marker()?; + // TODO: Consider extracting this functionality to different а function. + let latest_block_hash = match header_marker { + // TODO: make sure this is the correct value for the genesis's parent block in all the + // environments. + BlockNumber(0) => BlockHash::default(), + _ => { + txn.get_block_header(header_marker.prev().expect("Header marker isn't zero."))? + .expect("Block before the header marker must have header in the database.") + .block_hash + } + }; + loop { + let new_pending_data = pending_source.get_pending_data().await?; + if new_pending_data.block.parent_block_hash != latest_block_hash { + return Ok(()); + }; + *pending_data.write().await = new_pending_data; + tokio::time::sleep(sleep_duration).await; + } +} diff --git a/crates/papyrus_sync/src/sources/central_sync_test.rs b/crates/papyrus_sync/src/sources/central_sync_test.rs index ef6e9599b8..4c7b7f30b2 100644 --- a/crates/papyrus_sync/src/sources/central_sync_test.rs +++ b/crates/papyrus_sync/src/sources/central_sync_test.rs @@ -16,9 +16,11 @@ use starknet_api::block::{Block, BlockBody, BlockHash, BlockHeader, BlockNumber} use starknet_api::hash::StarkFelt; use starknet_api::stark_felt; use starknet_api::state::StateDiff; +use starknet_client::reader::PendingData; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error}; +use super::pending::MockPendingSourceTrait; use crate::sources::base_layer::{BaseLayerSourceTrait, MockBaseLayerSourceTrait}; use crate::sources::central::{ BlocksStream, @@ -89,6 +91,10 @@ async fn run_sync( central: impl CentralSourceTrait + Send + Sync + 'static, base_layer: impl BaseLayerSourceTrait + Send + Sync, ) -> StateSyncResult { + // Mock to the pending source that always returns the default pending data. + let mut pending_source = MockPendingSourceTrait::new(); + pending_source.expect_get_pending_data().returning(|| Ok(PendingData::default())); + let mut state_sync = GenericStateSync { config: SyncConfig { block_propagation_sleep_duration: SYNC_SLEEP_DURATION, @@ -98,7 +104,9 @@ async fn run_sync( state_updates_max_stream_size: STREAM_SIZE, }, shared_highest_block: Arc::new(RwLock::new(None)), + pending_data: Arc::new(RwLock::new(PendingData::default())), central_source: Arc::new(central), + pending_source: Arc::new(pending_source), base_layer_source: Arc::new(base_layer), reader, writer, diff --git a/crates/papyrus_sync/src/sources/mod.rs b/crates/papyrus_sync/src/sources/mod.rs index 75140233f7..fe37526dcf 100644 --- a/crates/papyrus_sync/src/sources/mod.rs +++ b/crates/papyrus_sync/src/sources/mod.rs @@ -2,3 +2,4 @@ pub mod base_layer; pub mod central; #[cfg(test)] mod central_sync_test; +pub mod pending; diff --git a/crates/papyrus_sync/src/sources/pending.rs b/crates/papyrus_sync/src/sources/pending.rs new file mode 100644 index 0000000000..e6ca353eb9 --- /dev/null +++ b/crates/papyrus_sync/src/sources/pending.rs @@ -0,0 +1,74 @@ +#[cfg(test)] +#[path = "pending_test.rs"] +mod pending_test; + +use std::sync::Arc; + +use async_trait::async_trait; +#[cfg(test)] +use mockall::automock; +use starknet_client::reader::{ + PendingData, + ReaderClientError, + StarknetFeederGatewayClient, + StarknetReader, +}; +use starknet_client::ClientCreationError; +use tracing::{debug, trace}; + +// TODO(dvir): add pending config. +use super::central::CentralSourceConfig; + +pub struct GenericPendingSource { + pub starknet_client: Arc, +} + +#[derive(thiserror::Error, Debug)] +pub enum PendingError { + #[error(transparent)] + ClientCreation(#[from] ClientCreationError), + #[error(transparent)] + ClientError(#[from] Arc), + #[error("Pending block not found")] + PendingBlockNotFound, +} +#[cfg_attr(test, automock)] +#[async_trait] +pub trait PendingSourceTrait { + async fn get_pending_data(&self) -> Result; +} + +#[async_trait] +impl PendingSourceTrait + for GenericPendingSource +{ + async fn get_pending_data(&self) -> Result { + match self.starknet_client.pending_data().await { + Ok(Some(pending_data)) => { + debug!("Received new pending data."); + trace!("Pending data: {pending_data:#?}."); + Ok(pending_data) + } + Ok(None) => Err(PendingError::PendingBlockNotFound), + Err(err) => Err(PendingError::ClientError(Arc::new(err))), + } + } +} + +pub type PendingSource = GenericPendingSource; + +impl PendingSource { + pub fn new( + config: CentralSourceConfig, + node_version: &'static str, + ) -> Result { + let starknet_client = StarknetFeederGatewayClient::new( + &config.url, + config.http_headers, + node_version, + config.retry_config, + )?; + + Ok(PendingSource { starknet_client: Arc::new(starknet_client) }) + } +} diff --git a/crates/papyrus_sync/src/sources/pending_test.rs b/crates/papyrus_sync/src/sources/pending_test.rs new file mode 100644 index 0000000000..68e0140397 --- /dev/null +++ b/crates/papyrus_sync/src/sources/pending_test.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use pretty_assertions::assert_eq; +use starknet_client::reader::{MockStarknetReader, PendingData}; + +use crate::sources::pending::{GenericPendingSource, PendingSourceTrait}; + +#[tokio::test] +async fn get_pending_data() { + let mut client_mock = MockStarknetReader::new(); + + // We need to perform all the mocks before moving the mock into pending_source. + // TODO(dvir): use pending_data which isn't the default. + client_mock.expect_pending_data().times(1).returning(|| Ok(Some(PendingData::default()))); + + let pending_source = GenericPendingSource { starknet_client: Arc::new(client_mock) }; + + let pending_data = pending_source.get_pending_data().await.unwrap(); + assert_eq!(pending_data, PendingData::default()); +} diff --git a/crates/papyrus_sync/src/sync_test.rs b/crates/papyrus_sync/src/sync_test.rs index 647e8a1a31..09fdcef0f3 100644 --- a/crates/papyrus_sync/src/sync_test.rs +++ b/crates/papyrus_sync/src/sync_test.rs @@ -9,19 +9,22 @@ use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; use papyrus_storage::StorageWriter; use pretty_assertions::assert_eq; -use starknet_api::block::{BlockHash, BlockHeader, BlockNumber}; +use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, GasPrice}; use starknet_api::core::{ClassHash, CompiledClassHash, ContractAddress, Nonce, PatriciaKey}; use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass; use starknet_api::hash::{StarkFelt, StarkHash}; use starknet_api::state::{ContractClass, StateDiff, StorageKey}; use starknet_api::{patricia_key, stark_felt}; +use starknet_client::reader::PendingData; use tokio::sync::RwLock; use crate::sources::base_layer::MockBaseLayerSourceTrait; use crate::sources::central::MockCentralSourceTrait; +use crate::sources::pending::MockPendingSourceTrait; use crate::{ sort_state_diff, stream_new_base_layer_block, + sync_pending_data, GenericStateSync, StateSyncError, SyncConfig, @@ -177,7 +180,9 @@ fn store_base_layer_block_test() { let mut gen_state_sync = GenericStateSync { config: SyncConfig::default(), shared_highest_block: Arc::new(RwLock::new(None)), + pending_data: Arc::new(RwLock::new(PendingData::default())), central_source: Arc::new(MockCentralSourceTrait::new()), + pending_source: Arc::new(MockPendingSourceTrait::new()), base_layer_source: Arc::new(MockBaseLayerSourceTrait::new()), reader, writer, @@ -217,3 +222,53 @@ fn add_headers(headers_num: u64, writer: &mut StorageWriter) { .unwrap(); } } + +#[tokio::test] +async fn pending_sync() { + // Storage with one default block header. + let (reader, mut writer) = get_test_storage().0; + writer + .begin_rw_txn() + .unwrap() + .append_header(BlockNumber(0), &BlockHeader::default()) + .unwrap() + .commit() + .unwrap(); + + let mut mock_pending_source = MockPendingSourceTrait::new(); + + const PENDING_QUERIES: usize = 2; + for call_count in 0..=PENDING_QUERIES { + mock_pending_source.expect_get_pending_data().times(1).returning(move || { + let mut block = PendingData::default(); + block.block.parent_block_hash = BlockHash::default(); + block.block.gas_price = GasPrice(call_count as u128); + Ok(block) + }); + } + + // A different parent block hash than the last block in the database tells that a new block was + // created, and pending sync should wait until the new block is written to the storage. so + // this pending data should not be written. + mock_pending_source.expect_get_pending_data().times(1).returning(|| { + let mut block = PendingData::default(); + block.block.parent_block_hash = BlockHash(stark_felt!("0x1")); + Ok(block) + }); + + let pending_data = Arc::new(RwLock::new(PendingData::default())); + + sync_pending_data( + reader, + Arc::new(mock_pending_source), + pending_data.clone(), + Duration::from_millis(1), + ) + .await + .unwrap(); + + // The Last query for pending data (with parent block hash 0x1) should not be written so the gas + // price should PENDING_QUERIES. + assert_eq!(pending_data.read().await.block.parent_block_hash, BlockHash::default()); + assert_eq!(pending_data.read().await.block.gas_price, GasPrice(PENDING_QUERIES as u128)); +}