diff --git a/crates/papyrus_network/src/db_executor/mod.rs b/crates/papyrus_network/src/db_executor/mod.rs index 9f041f46b2..109c3e4b1e 100644 --- a/crates/papyrus_network/src/db_executor/mod.rs +++ b/crates/papyrus_network/src/db_executor/mod.rs @@ -130,7 +130,7 @@ pub trait DBExecutorTrait { &mut self, query: Query, data_type: impl FetchBlockDataFromDb + Send + 'static, - sender: Sender>, + sender: Sender, ); /// Polls incoming queries. @@ -154,7 +154,7 @@ impl DBExecutorTrait for DBExecutor { &mut self, query: Query, data_type: impl FetchBlockDataFromDb + Send + 'static, - mut sender: Sender>, + mut sender: Sender, ) { let storage_reader_clone = self.storage_reader.clone(); tokio::task::spawn(async move { @@ -179,8 +179,10 @@ impl DBExecutorTrait for DBExecutor { let data_vec = data_type.fetch_block_data_from_db(block_number, &txn)?; // Using poll_fn because Sender::poll_ready is not a future poll_fn(|cx| sender.poll_ready(cx)).await?; - // TODO: consider implement retry mechanism. - sender.start_send(data_vec)?; + for data in data_vec { + // TODO: consider implement retry mechanism. + sender.start_send(data)?; + } } Ok(()) }; diff --git a/crates/papyrus_network/src/db_executor/test.rs b/crates/papyrus_network/src/db_executor/test.rs index d4b6b94393..75a7338baa 100644 --- a/crates/papyrus_network/src/db_executor/test.rs +++ b/crates/papyrus_network/src/db_executor/test.rs @@ -32,7 +32,7 @@ async fn header_db_executor_can_register_and_run_a_query() { limit: NUM_OF_BLOCKS, step: 1, }; - type ReceiversType = Vec<(Receiver>, DataType)>; + type ReceiversType = Vec<(Receiver, DataType)>; let mut receivers: ReceiversType = enum_iterator::all::() .map(|data_type| { let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE); @@ -60,20 +60,20 @@ async fn header_db_executor_can_register_and_run_a_query() { _ = async { while let Some(res) = receivers_stream.next().await { let (data, requested_data_type) = res.await; - assert_eq!(data.len(), NUM_OF_BLOCKS as usize); + if matches!(requested_data_type, DataType::SignedBlockHeader) { + assert_eq!(data.len(), NUM_OF_BLOCKS as usize); + } for (i, data) in data.into_iter().enumerate() { - for data in data.iter() { - match data { - Data::BlockHeaderAndSignature(signed_header) => { - assert_eq!(signed_header.block_header.block_number.0, i as u64); - assert_eq!(*requested_data_type, DataType::SignedBlockHeader); - } - Data::StateDiffChunk (_state_diff) => { - // TODO: check the state diff. - assert_eq!(*requested_data_type, DataType::StateDiff); - } - _ => panic!("Unexpected data type"), + match data { + Data::BlockHeaderAndSignature(signed_header) => { + assert_eq!(signed_header.block_header.block_number.0, i as u64); + assert_eq!(*requested_data_type, DataType::SignedBlockHeader); + } + Data::StateDiffChunk (_state_diff) => { + // TODO: check the state diff. + assert_eq!(*requested_data_type, DataType::StateDiff); } + _ => panic!("Unexpected data type"), } } } @@ -117,12 +117,10 @@ async fn header_db_executor_start_block_given_by_hash() { res = receiver.collect::>() => { assert_eq!(res.len(), NUM_OF_BLOCKS as usize); for (i, data) in res.into_iter().enumerate() { - for data in data.iter() { - let Data::BlockHeaderAndSignature(signed_header) = data else { - panic!("Unexpected data type"); - }; - assert_eq!(signed_header.block_header.block_number.0, i as u64); - } + let Data::BlockHeaderAndSignature(signed_header) = data else { + panic!("Unexpected data type"); + }; + assert_eq!(signed_header.block_header.block_number.0, i as u64); } } } diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 8176406990..47df3cadf2 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -309,7 +309,6 @@ impl GenericNetworkManager>, + mut sender: Sender, ) { let headers = self.query_to_headers.get(&query).unwrap().clone(); self.query_execution_set.push(tokio::task::spawn(async move { @@ -236,9 +236,10 @@ impl DBExecutorTrait for MockDBExecutor { for header in headers.iter().cloned() { // Using poll_fn because Sender::poll_ready is not a future if let Ok(()) = poll_fn(|cx| sender.poll_ready(cx)).await { - sender.start_send(vec![Data::BlockHeaderAndSignature( - SignedBlockHeader { block_header: header, signatures: vec![] }, - )])?; + sender.start_send(Data::BlockHeaderAndSignature(SignedBlockHeader { + block_header: header, + signatures: vec![], + }))?; } } Ok(())