Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): sync pending data #1175

Merged
merged 2 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, StorageReader, StorageWriter};
use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource};
use papyrus_sync::sources::central::{CentralError, CentralSource};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tracing::metadata::LevelFilter;
use tracing::{error, info};
Expand All @@ -40,15 +42,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData::default()));

// JSON-RPC server.
let (_, server_handle) =
run_server(&config.rpc, shared_highest_block.clone(), storage_reader.clone(), VERSION_FULL)
.await?;
let server_handle_future = tokio::spawn(server_handle.stopped());

// Sync task.
let sync_future =
run_sync(config, shared_highest_block, storage_reader.clone(), storage_writer);
let sync_future = run_sync(
config,
shared_highest_block,
pending_data,
storage_reader.clone(),
storage_writer,
);
let sync_handle = tokio::spawn(sync_future);

tokio::select! {
Expand All @@ -71,19 +80,24 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
async fn run_sync(
config: NodeConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
storage_reader: StorageReader,
storage_writer: StorageWriter,
) -> Result<(), StateSyncError> {
let Some(sync_config) = config.sync else { return Ok(()) };
let central_source =
CentralSource::new(config.central, VERSION_FULL, storage_reader.clone())
CentralSource::new(config.central.clone(), VERSION_FULL, storage_reader.clone())
.map_err(CentralError::ClientCreation)?;
let pending_source = PendingSource::new(config.central, VERSION_FULL)
.map_err(CentralError::ClientCreation)?;
let base_layer_source = EthereumBaseLayerSource::new(config.base_layer)
.map_err(|e| BaseLayerSourceError::BaseLayerSourceCreationError(e.to_string()))?;
let mut sync = StateSync::new(
sync_config,
shared_highest_block,
pending_data,
central_source,
pending_source,
base_layer_source,
storage_reader.clone(),
storage_writer,
Expand Down
84 changes: 75 additions & 9 deletions crates/papyrus_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ use starknet_api::block::{Block, BlockHash, BlockNumber};
use starknet_api::core::{ClassHash, CompiledClassHash};
use starknet_api::deprecated_contract_class::ContractClass as DeprecatedContractClass;
use starknet_api::state::StateDiff;
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::sources::base_layer::{BaseLayerSourceTrait, EthereumBaseLayerSource};
use crate::sources::central::{CentralError, CentralSource, CentralSourceTrait};
use crate::sources::pending::{PendingError, PendingSource, PendingSourceTrait};

// TODO(dvir): add to config.
// Sleep duration between polling for pending data.
const PENDING_SLEEP_DURATION: Duration = Duration::from_millis(500);

// Sleep duration, in seconds, between sync progress checks.
const SLEEP_TIME_SYNC_PROGRESS: Duration = Duration::from_secs(300);
Expand Down Expand Up @@ -110,11 +116,14 @@ impl Default for SyncConfig {
// memory.
pub struct GenericStateSync<
TCentralSource: CentralSourceTrait + Sync + Send,
TPendingSource: PendingSourceTrait + Sync + Send,
TBaseLayerSource: BaseLayerSourceTrait + Sync + Send,
> {
config: SyncConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
central_source: Arc<TCentralSource>,
pending_source: Arc<TPendingSource>,
base_layer_source: Arc<TBaseLayerSource>,
reader: StorageReader,
writer: StorageWriter,
Expand All @@ -130,6 +139,8 @@ pub enum StateSyncError {
StorageError(#[from] StorageError),
#[error(transparent)]
CentralSourceError(#[from] CentralError),
#[error(transparent)]
PendingSourceError(#[from] PendingError),
#[error(
"Parent block hash of block {block_number} is not consistent with the stored block. \
Expected {expected_parent_block_hash}, found {stored_parent_block_hash}."
Expand Down Expand Up @@ -191,8 +202,9 @@ pub enum SyncEvent {

impl<
TCentralSource: CentralSourceTrait + Sync + Send + 'static,
TPendingSource: PendingSourceTrait + Sync + Send + 'static,
TBaseLayerSource: BaseLayerSourceTrait + Sync + Send,
> GenericStateSync<TCentralSource, TBaseLayerSource>
> GenericStateSync<TCentralSource, TPendingSource, TBaseLayerSource>
{
pub async fn run(&mut self) -> StateSyncResult {
info!("State sync started.");
Expand Down Expand Up @@ -259,8 +271,11 @@ impl<
let block_stream = stream_new_blocks(
self.reader.clone(),
self.central_source.clone(),
self.pending_source.clone(),
self.shared_highest_block.clone(),
self.pending_data.clone(),
self.config.block_propagation_sleep_duration,
PENDING_SLEEP_DURATION,
self.config.blocks_max_stream_size,
)
.fuse();
Expand Down Expand Up @@ -518,7 +533,7 @@ impl<
let mut last_block_in_storage = header_marker.prev();
while let Some(block_number) = last_block_in_storage {
if self.should_revert_block(block_number).await? {
self.revert_block(block_number)?;
self.revert_block(block_number).await?;
last_block_in_storage = block_number.prev();
} else {
break;
Expand All @@ -531,10 +546,11 @@ impl<
// Deletes the block data from the storage, moving it to the ommer tables.
#[allow(clippy::expect_fun_call)]
#[instrument(skip(self), level = "debug", err)]
fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult {
async fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult {
debug!("Reverting block.");
let mut txn = self.writer.begin_rw_txn()?;
*self.pending_data.write().await = PendingData::default();

let mut txn = self.writer.begin_rw_txn()?;
txn = txn.try_revert_base_layer_marker(block_number)?;
let res = txn.revert_header(block_number)?;
txn = res.0;
Expand Down Expand Up @@ -618,12 +634,19 @@ impl<
}
}
}

fn stream_new_blocks<TCentralSource: CentralSourceTrait + Sync + Send>(
// TODO(dvir): consider gathering in a single pending argument instead.
#[allow(clippy::too_many_arguments)]
fn stream_new_blocks<
TCentralSource: CentralSourceTrait + Sync + Send,
TPendingSource: PendingSourceTrait + Sync + Send,
>(
reader: StorageReader,
central_source: Arc<TCentralSource>,
pending_source: Arc<TPendingSource>,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
block_propagation_sleep_duration: Duration,
pending_sleep_duration: Duration,
max_stream_size: u32,
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
try_stream! {
Expand All @@ -638,8 +661,16 @@ fn stream_new_blocks<TCentralSource: CentralSourceTrait + Sync + Send>(
papyrus_metrics::PAPYRUS_CENTRAL_BLOCK_MARKER, central_block_marker.0 as f64
);
if header_marker == central_block_marker {
debug!("Blocks syncing reached the last known block, waiting for blockchain to advance.");
tokio::time::sleep(block_propagation_sleep_duration).await;
// Only if the node have the last block and state (without casms), sync pending data.
if reader.begin_ro_txn()?.get_state_marker()? == header_marker{
// Here and when reverting a block those are the only places we update the pending data.
debug!("Start polling for pending data.");
sync_pending_data(reader.clone(), pending_source.clone(), pending_data.clone(), pending_sleep_duration).await?;
}
else{
debug!("Blocks syncing reached the last known block, waiting for blockchain to advance.");
tokio::time::sleep(block_propagation_sleep_duration).await;
};
continue;
}
let up_to = min(central_block_marker, BlockNumber(header_marker.0 + max_stream_size as u64));
Expand Down Expand Up @@ -709,21 +740,26 @@ pub fn sort_state_diff(diff: &mut StateDiff) {
}
}

pub type StateSync = GenericStateSync<CentralSource, EthereumBaseLayerSource>;
pub type StateSync = GenericStateSync<CentralSource, PendingSource, EthereumBaseLayerSource>;

impl StateSync {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: SyncConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
central_source: CentralSource,
pending_source: PendingSource,
base_layer_source: EthereumBaseLayerSource,
reader: StorageReader,
writer: StorageWriter,
) -> Self {
Self {
config,
shared_highest_block,
pending_data,
central_source: Arc::new(central_source),
pending_source: Arc::new(pending_source),
base_layer_source: Arc::new(base_layer_source),
reader,
writer,
Expand Down Expand Up @@ -842,3 +878,33 @@ fn check_sync_progress(
}
}
}

// Update the pending data and return when a new block is discovered.
async fn sync_pending_data<TPendingSource: PendingSourceTrait + Sync + Send>(
reader: StorageReader,
pending_source: Arc<TPendingSource>,
pending_data: Arc<RwLock<PendingData>>,
sleep_duration: Duration,
) -> Result<(), StateSyncError> {
let txn = reader.begin_ro_txn()?;
let header_marker = txn.get_header_marker()?;
// TODO: Consider extracting this functionality to different а function.
let latest_block_hash = match header_marker {
// TODO: make sure this is the correct value for the genesis's parent block in all the
// environments.
BlockNumber(0) => BlockHash::default(),
_ => {
txn.get_block_header(header_marker.prev().expect("Header marker isn't zero."))?
.expect("Block before the header marker must have header in the database.")
.block_hash
}
};
loop {
let new_pending_data = pending_source.get_pending_data().await?;
if new_pending_data.block.parent_block_hash != latest_block_hash {
return Ok(());
};
*pending_data.write().await = new_pending_data;
tokio::time::sleep(sleep_duration).await;
}
}
8 changes: 8 additions & 0 deletions crates/papyrus_sync/src/sources/central_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use starknet_api::block::{Block, BlockBody, BlockHash, BlockHeader, BlockNumber}
use starknet_api::hash::StarkFelt;
use starknet_api::stark_felt;
use starknet_api::state::StateDiff;
use starknet_client::reader::PendingData;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error};

use super::pending::MockPendingSourceTrait;
use crate::sources::base_layer::{BaseLayerSourceTrait, MockBaseLayerSourceTrait};
use crate::sources::central::{
BlocksStream,
Expand Down Expand Up @@ -89,6 +91,10 @@ async fn run_sync(
central: impl CentralSourceTrait + Send + Sync + 'static,
base_layer: impl BaseLayerSourceTrait + Send + Sync,
) -> StateSyncResult {
// Mock to the pending source that always returns the default pending data.
let mut pending_source = MockPendingSourceTrait::new();
pending_source.expect_get_pending_data().returning(|| Ok(PendingData::default()));

let mut state_sync = GenericStateSync {
config: SyncConfig {
block_propagation_sleep_duration: SYNC_SLEEP_DURATION,
Expand All @@ -98,7 +104,9 @@ async fn run_sync(
state_updates_max_stream_size: STREAM_SIZE,
},
shared_highest_block: Arc::new(RwLock::new(None)),
pending_data: Arc::new(RwLock::new(PendingData::default())),
central_source: Arc::new(central),
pending_source: Arc::new(pending_source),
base_layer_source: Arc::new(base_layer),
reader,
writer,
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_sync/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod base_layer;
pub mod central;
#[cfg(test)]
mod central_sync_test;
pub mod pending;
74 changes: 74 additions & 0 deletions crates/papyrus_sync/src/sources/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#[cfg(test)]
#[path = "pending_test.rs"]
mod pending_test;

use std::sync::Arc;

use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;
use starknet_client::reader::{
PendingData,
ReaderClientError,
StarknetFeederGatewayClient,
StarknetReader,
};
use starknet_client::ClientCreationError;
use tracing::{debug, trace};

// TODO(dvir): add pending config.
use super::central::CentralSourceConfig;

pub struct GenericPendingSource<TStarknetClient: StarknetReader + Send + Sync> {
pub starknet_client: Arc<TStarknetClient>,
}

#[derive(thiserror::Error, Debug)]
pub enum PendingError {
#[error(transparent)]
ClientCreation(#[from] ClientCreationError),
#[error(transparent)]
ClientError(#[from] Arc<ReaderClientError>),
#[error("Pending block not found")]
PendingBlockNotFound,
}
#[cfg_attr(test, automock)]
#[async_trait]
pub trait PendingSourceTrait {
async fn get_pending_data(&self) -> Result<PendingData, PendingError>;
}

#[async_trait]
impl<TStarknetClient: StarknetReader + Send + Sync + 'static> PendingSourceTrait
for GenericPendingSource<TStarknetClient>
{
async fn get_pending_data(&self) -> Result<PendingData, PendingError> {
match self.starknet_client.pending_data().await {
Ok(Some(pending_data)) => {
debug!("Received new pending data.");
trace!("Pending data: {pending_data:#?}.");
Ok(pending_data)
}
Ok(None) => Err(PendingError::PendingBlockNotFound),
Err(err) => Err(PendingError::ClientError(Arc::new(err))),
}
}
}

pub type PendingSource = GenericPendingSource<StarknetFeederGatewayClient>;

impl PendingSource {
pub fn new(
config: CentralSourceConfig,
node_version: &'static str,
) -> Result<PendingSource, ClientCreationError> {
let starknet_client = StarknetFeederGatewayClient::new(
&config.url,
config.http_headers,
node_version,
config.retry_config,
)?;

Ok(PendingSource { starknet_client: Arc::new(starknet_client) })
}
}
20 changes: 20 additions & 0 deletions crates/papyrus_sync/src/sources/pending_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::sync::Arc;

use pretty_assertions::assert_eq;
use starknet_client::reader::{MockStarknetReader, PendingData};

use crate::sources::pending::{GenericPendingSource, PendingSourceTrait};

#[tokio::test]
async fn get_pending_data() {
let mut client_mock = MockStarknetReader::new();

// We need to perform all the mocks before moving the mock into pending_source.
// TODO(dvir): use pending_data which isn't the default.
client_mock.expect_pending_data().times(1).returning(|| Ok(Some(PendingData::default())));

let pending_source = GenericPendingSource { starknet_client: Arc::new(client_mock) };

let pending_data = pending_source.get_pending_data().await.unwrap();
assert_eq!(pending_data, PendingData::default());
}
Loading
Loading