diff --git a/config/src/queue.rs b/config/src/queue.rs index 37a0954a0e1..6b644c3ab56 100644 --- a/config/src/queue.rs +++ b/config/src/queue.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; const DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK: u32 = 2_u32.pow(9); const DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16); +const DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER: u32 = 2_u32.pow(16); // 24 hours const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000; const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; @@ -18,6 +19,8 @@ pub struct Configuration { pub maximum_transactions_in_block: u32, /// The upper limit of the number of transactions waiting in the queue. pub maximum_transactions_in_queue: u32, + /// The upper limit of the number of transactions waiting for more signatures. + pub maximum_transactions_in_waiting_buffer: u32, /// The transaction will be dropped after this time if it is still in the queue. pub transaction_time_to_live_ms: u64, /// The threshold to determine if a transaction has been tampered to have a future timestamp. @@ -29,6 +32,9 @@ impl Default for ConfigurationProxy { Self { maximum_transactions_in_block: Some(DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK), maximum_transactions_in_queue: Some(DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE), + maximum_transactions_in_waiting_buffer: Some( + DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER, + ), transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS), future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS), } @@ -46,11 +52,12 @@ pub mod tests { ( maximum_transactions_in_block in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK)), maximum_transactions_in_queue in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE)), + maximum_transactions_in_waiting_buffer in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER)), transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)), future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)), ) -> ConfigurationProxy { - ConfigurationProxy { maximum_transactions_in_block, maximum_transactions_in_queue, transaction_time_to_live_ms, future_threshold_ms } + ConfigurationProxy { maximum_transactions_in_block, maximum_transactions_in_queue, maximum_transactions_in_waiting_buffer, transaction_time_to_live_ms, future_threshold_ms } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index 617118bc2fd..5f72b93dd82 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -38,6 +38,7 @@ "QUEUE": { "MAXIMUM_TRANSACTIONS_IN_BLOCK": 512, "MAXIMUM_TRANSACTIONS_IN_QUEUE": 65536, + "MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/core/src/queue.rs b/core/src/queue.rs index 8d203afc051..3fdd94ca929 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -29,6 +29,8 @@ use crate::prelude::*; pub struct Queue { /// The queue proper queue: ArrayQueue>, + /// The queue for transactions that are still waiting for signatures + waiting_buffer: ArrayQueue>, /// [`VersionedAcceptedTransaction`]s addressed by `Hash`. txs: DashMap, VersionedAcceptedTransaction>, /// The maximum number of transactions in the block @@ -72,8 +74,10 @@ impl Queue { pub fn from_configuration(cfg: &Configuration) -> Self { Self { queue: ArrayQueue::new(cfg.maximum_transactions_in_queue as usize), + waiting_buffer: ArrayQueue::new(cfg.maximum_transactions_in_waiting_buffer as usize), txs: DashMap::new(), - max_txs: cfg.maximum_transactions_in_queue as usize, + max_txs: (cfg.maximum_transactions_in_queue + + cfg.maximum_transactions_in_waiting_buffer) as usize, txs_in_block: cfg.maximum_transactions_in_block as usize, tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms), future_threshold: Duration::from_millis(cfg.future_threshold_ms), @@ -114,7 +118,9 @@ impl Queue { tx: &VersionedAcceptedTransaction, wsv: &WorldStateView, ) -> Result, Error> { - if tx.is_expired(self.tx_time_to_live) { + if tx.is_in_future(self.future_threshold) { + Err(Error::InFuture) + } else if tx.is_expired(self.tx_time_to_live) { Err(Error::Expired) } else if tx.is_in_blockchain(wsv) { Err(Error::InBlockchain) @@ -141,65 +147,64 @@ impl Queue { tx: VersionedAcceptedTransaction, wsv: &WorldStateView, ) -> Result<(), (VersionedAcceptedTransaction, Error)> { - if tx.is_in_future(self.future_threshold) { - Err((tx, Error::InFuture)) - } else if let Err(e) = self.check_tx(&tx, wsv) { - Err((tx, e)) - } else if self.txs.len() >= self.max_txs { - Err((tx, Error::Full)) - } else { - let hash = tx.hash(); - let entry = match self.txs.entry(hash) { - Entry::Occupied(mut old_tx) => { - // MST case - old_tx - .get_mut() - .as_mut_v1() - .signatures - .extend(tx.as_v1().signatures.clone()); - return Ok(()); + match self.check_tx(&tx, wsv) { + Err(e) => Err((tx, e)), + Ok(MustUse(signature_check)) => { + // Get `txs_len` before entry to avoid deadlock + let txs_len = self.txs.len(); + let hash = tx.hash(); + let entry = match self.txs.entry(hash) { + Entry::Occupied(mut old_tx) => { + // MST case + old_tx + .get_mut() + .as_mut_v1() + .signatures + .extend(tx.as_v1().signatures.clone()); + return Ok(()); + } + Entry::Vacant(entry) => entry, + }; + if txs_len >= self.max_txs { + return Err((tx, Error::Full)); } - Entry::Vacant(entry) => entry, - }; - // Reason for such insertion order is to avoid situation - // when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs` - entry.insert(tx); - self.queue.push(hash).map_err(|err_hash| { - let (_, err_tx) = self - .txs - .remove(&err_hash) - .expect("Inserted just before match"); - (err_tx, Error::Full) - }) + // Reason for such insertion order is to avoid situation + // when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs` + entry.insert(tx); + let queue_to_push = if signature_check { + &self.queue + } else { + &self.waiting_buffer + }; + queue_to_push.push(hash).map_err(|err_hash| { + let (_, err_tx) = self + .txs + .remove(&err_hash) + .expect("Inserted just before match"); + (err_tx, Error::Full) + }) + } } } /// Pop single transaction. /// /// Records unsigned transaction in `seen`. - #[allow( - clippy::expect_used, - clippy::unwrap_in_result, - clippy::cognitive_complexity - )] fn pop( &self, seen: &mut Vec>, + queue_to_pop: &ArrayQueue>, wsv: &WorldStateView, ) -> Option { loop { - let hash = self.queue.pop()?; + let hash = queue_to_pop.pop()?; let entry = match self.txs.entry(hash) { Entry::Occupied(entry) => entry, // As practice shows this code is not `unreachable!()`. // When transactions are submitted quickly it can be reached. Entry::Vacant(_) => continue, }; - if self.check_tx(entry.get(), wsv).is_err() { - entry.remove_entry(); - continue; - } match self.check_tx(entry.get(), wsv) { Err(_) => { @@ -247,21 +252,115 @@ impl Queue { return; } - let mut seen = Vec::new(); + let mut seen_queue = Vec::new(); + let mut seen_waiting_buffer = Vec::new(); + + let pop_from_queue = core::iter::from_fn(|| self.pop(&mut seen_queue, &self.queue, wsv)); + let pop_from_waiting_buffer = + core::iter::from_fn(|| self.pop(&mut seen_waiting_buffer, &self.waiting_buffer, wsv)); let transactions_hashes: HashSet> = transactions .iter() .map(VersionedAcceptedTransaction::hash) .collect(); - let out = std::iter::from_fn(|| self.pop(&mut seen, wsv)) + let out = pop_from_queue + .round_robin(pop_from_waiting_buffer) .filter(|tx| !transactions_hashes.contains(&tx.hash())) .take(self.txs_in_block - transactions.len()); transactions.extend(out); - #[allow(clippy::expect_used)] - seen.into_iter() - .try_for_each(|hash| self.queue.push(hash)) - .expect("As we never exceed the number of transactions pending"); + for (seen, queue) in [ + (seen_queue, &self.queue), + (seen_waiting_buffer, &self.waiting_buffer), + ] { + #[allow(clippy::expect_used)] + seen.into_iter() + .try_for_each(|hash| queue.push(hash)) + .expect("As we never exceed the number of transactions pending"); + } + } +} + +/// Iterator which combine two iterators into the single one in round-robin fashion. +/// +/// ```ignore +/// [(a0,a1,a2,..),(b0,b1,b2,..)] -> (a0,b0,a1,b1,a2,b2,..) +/// ``` +struct RoundRobinIterator { + left_iter: A, + right_iter: B, + state: RoundRobinState, +} + +enum RoundRobinState { + CurrentLeft, + CurrentRight, + LeftExhausted, + RightExhausted, + BothExhausted, +} + +trait RoundRobinIter: Iterator + Sized { + fn round_robin>( + self, + iter: I, + ) -> RoundRobinIterator::IntoIter> { + RoundRobinIterator { + left_iter: self, + right_iter: iter.into_iter(), + state: RoundRobinState::CurrentLeft, + } + } +} + +impl RoundRobinIter for T {} + +impl Iterator for RoundRobinIterator +where + A: Iterator, + B: Iterator, +{ + type Item = T; + + fn next(&mut self) -> Option { + use RoundRobinState::*; + loop { + match self.state { + BothExhausted => break None, + LeftExhausted => { + let item = self.right_iter.next(); + if item.is_none() { + self.state = BothExhausted; + } + break item; + } + RightExhausted => { + let item = self.left_iter.next(); + if item.is_none() { + self.state = BothExhausted; + } + break item; + } + CurrentLeft => { + let item = self.left_iter.next(); + if item.is_none() { + self.state = LeftExhausted; + continue; + } + self.state = CurrentRight; + break item; + } + CurrentRight => { + let item = self.right_iter.next(); + if item.is_none() { + self.state = RightExhausted; + continue; + } + self.state = CurrentLeft; + break item; + } + } + } } } @@ -771,4 +870,54 @@ mod tests { assert!(matches!(queue.push(tx, &wsv), Err((_, Error::InFuture)))); assert_eq!(queue.txs.len(), 1); } + + #[test] + fn round_robin_iter_a_eq_b_size() { + let a = vec![0, 2, 4, 6, 8]; + let b = vec![1, 3, 5, 7, 9]; + assert_eq!( + vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + a.into_iter().round_robin(b).collect::>() + ); + } + + #[test] + fn round_robin_iter_a_gt_b_size() { + let a = vec![0, 2, 4, 6, 8]; + let b = vec![1, 3, 5]; + assert_eq!( + vec![0, 1, 2, 3, 4, 5, 6, 8], + a.into_iter().round_robin(b).collect::>() + ); + } + + #[test] + fn round_robin_iter_a_lt_b_size() { + let a = vec![0, 2, 4]; + let b = vec![1, 3, 5, 7, 9]; + assert_eq!( + vec![0, 1, 2, 3, 4, 5, 7, 9], + a.into_iter().round_robin(b).collect::>() + ); + } + + #[test] + fn round_robin_iter_a_empty() { + let a = vec![0, 2, 4, 6, 8]; + let b = Vec::new(); + assert_eq!( + vec![0, 2, 4, 6, 8], + a.into_iter().round_robin(b).collect::>() + ); + } + + #[test] + fn round_robin_iter_b_empty() { + let a = Vec::new(); + let b = vec![1, 3, 5, 7, 9]; + assert_eq!( + vec![1, 3, 5, 7, 9], + a.into_iter().round_robin(b).collect::>() + ); + } } diff --git a/docs/source/references/config.md b/docs/source/references/config.md index e354cdee0f6..4cc3851958e 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -69,6 +69,7 @@ The following is the default configuration used by Iroha. "QUEUE": { "MAXIMUM_TRANSACTIONS_IN_BLOCK": 512, "MAXIMUM_TRANSACTIONS_IN_QUEUE": 65536, + "MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, @@ -430,6 +431,7 @@ Has type `Option`[^1]. Can be configured via environm "FUTURE_THRESHOLD_MS": 1000, "MAXIMUM_TRANSACTIONS_IN_BLOCK": 512, "MAXIMUM_TRANSACTIONS_IN_QUEUE": 65536, + "MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000 } ``` @@ -464,6 +466,16 @@ Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MA 65536 ``` +### `queue.maximum_transactions_in_waiting_buffer` + +The upper limit of the number of transactions waiting for more signatures. + +Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER` + +```json +65536 +``` + ### `queue.transaction_time_to_live_ms` The transaction will be dropped after this time if it is still in the queue.