Skip to content

Commit

Permalink
Merge pull request #2114 from dusk-network/fix-2096
Browse files Browse the repository at this point in the history
node: Periodically remove expired transactions from mempool
  • Loading branch information
goshawk-3 authored Aug 21, 2024
2 parents 266d9c9 + 4c6a362 commit 3e7209a
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 28 deletions.
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ console-subscriber = { version = "0.1.8", optional = true }
smallvec = "1.10.0"

serde = "1.0"
humantime-serde = "1"
thiserror = "1"
metrics = "0.22"
metrics-exporter-prometheus = "0.14"
Expand Down
4 changes: 4 additions & 0 deletions node/default.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ max_queue_size = 5000

[mempool]
max_queue_size = 5000
max_mempool_txn_count = 10000
idle_interval = '6h'
mempool_expiry = '3d'
mempool_download_redundancy = 5
10 changes: 8 additions & 2 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use node_data::message::payload::Vote;
use node_data::{Serializable, StepName};
use std::collections::BTreeMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use std::time::{self, Duration, UNIX_EPOCH};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -845,10 +845,16 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Delete any rocksdb record related to this block
t.delete_block(&b)?;

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|n| n.as_secs())
.expect("valid timestamp");

// Attempt to resubmit transactions back to mempool.
// An error here is not considered critical.
// Txs timestamp is reset here
for tx in b.txs().iter() {
if let Err(e) = Mempool::add_tx(t, tx) {
if let Err(e) = Mempool::add_tx(t, tx, now) {
warn!("failed to resubmit transactions: {e}")
};
}
Expand Down
7 changes: 5 additions & 2 deletions node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ pub trait Candidate {
}

pub trait Mempool {
/// Adds a transaction to the mempool.
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()>;
/// Adds a transaction to the mempool with a timestamp.
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()>;

/// Gets a transaction from the mempool.
fn get_tx(&self, tx_id: [u8; 32]) -> Result<Option<ledger::Transaction>>;
Expand All @@ -147,6 +147,9 @@ pub trait Mempool {
/// Get all transactions hashes.
fn get_txs_ids(&self) -> Result<Vec<[u8; 32]>>;

/// Get all expired transactions.
fn get_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>>;

/// Number of persisted transactions
fn txs_count(&self) -> usize;
}
Expand Down
94 changes: 86 additions & 8 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
}

impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()> {
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()> {
// Map Hash to serialized transaction
let mut tx_data = vec![];
tx.write(&mut tx_data)?;
Expand All @@ -694,11 +694,15 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
self.put_cf(self.nullifiers_cf, key, hash)?;
}

// Map Fee_Hash to Null to facilitate sort-by-fee
let timestamp = timestamp.to_be_bytes();

// Map Fee_Hash to Timestamp
// Key pair is used to facilitate sort-by-fee
// Also, the timestamp is used to remove expired transactions
self.put_cf(
self.fees_cf,
serialize_key(tx.gas_price(), hash)?,
vec![0],
timestamp,
)?;

Ok(())
Expand Down Expand Up @@ -771,6 +775,44 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
Ok(Box::new(iter))
}

/// Get all expired transactions hashes.
fn get_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
iter.seek_to_first();
let mut txs_list = vec![];

while iter.valid() {
if let Some(key) = iter.key() {
let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;

let tx_timestamp = u64::from_be_bytes(
iter.value()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"no value",
)
})?
.try_into()
.map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"invalid data",
)
})?,
);

if tx_timestamp <= timestamp {
txs_list.push(tx_id);
}
}

iter.next();
}

Ok(txs_list)
}

fn get_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
iter.seek_to_last();
Expand Down Expand Up @@ -1165,7 +1207,7 @@ mod tests {
Backend::create_or_open(path, DatabaseOptions::default());
let t: ledger::Transaction = Faker.fake();

assert!(db.update(|txn| { txn.add_tx(&t) }).is_ok());
assert!(db.update(|txn| { txn.add_tx(&t, 0) }).is_ok());

db.view(|vq| {
assert!(Mempool::get_tx_exists(&vq, t.id()).unwrap());
Expand Down Expand Up @@ -1199,7 +1241,7 @@ mod tests {
db.update(|txn| {
for _i in 0..10u32 {
let t: ledger::Transaction = Faker.fake();
txn.add_tx(&t)?;
txn.add_tx(&t, 0)?;
}
Ok(())
})
Expand Down Expand Up @@ -1238,8 +1280,9 @@ mod tests {

db.update(|db| {
assert_eq!(db.txs_count(), 0);
txs.iter()
.for_each(|t| db.add_tx(&t).expect("tx should be added"));
txs.iter().for_each(|t| {
db.add_tx(&t, 0).expect("tx should be added")
});
Ok(())
})
.unwrap();
Expand Down Expand Up @@ -1276,7 +1319,7 @@ mod tests {
db.update(|txn| {
for i in 0..10u32 {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t)?;
txn.add_tx(&t, 0)?;
}
Ok(())
})
Expand All @@ -1295,6 +1338,41 @@ mod tests {
});
}

#[test]
fn test_get_expired_txs() {
TestWrapper::new("test_get_expired_txs").run(|path| {
let db: Backend =
Backend::create_or_open(path, DatabaseOptions::default());

let mut expiry_list = HashSet::new();
let _ = db.update(|txn| {
(1..101).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
expiry_list.insert(t.id());
});

(1000..1100).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
});

Ok(())
});

db.view(|vq| {
let expired: HashSet<[u8; 32]> =
Mempool::get_expired_txs(&vq, 100)
.unwrap()
.into_iter()
.map(|id| id)
.collect();

assert_eq!(expiry_list, expired);
});
});
}

fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
txs.iter()
.map(|t| SpentTransaction {
Expand Down
99 changes: 85 additions & 14 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use crate::database::{Ledger, Mempool};
use crate::mempool::conf::Params;
use crate::{database, vm, LongLivedService, Message, Network};
use async_trait::async_trait;
use conf::{
DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
};
use node_data::events::{Event, TransactionEvent};
use node_data::ledger::Transaction;
use node_data::message::{AsyncQueue, Payload, Topics};
use node_data::message::{payload, AsyncQueue, Payload, Topics};
use std::sync::Arc;
use std::time::{self, UNIX_EPOCH};
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -81,22 +85,63 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
)
.await?;

// Request mempool update from N alive peers
self.request_mempool(&network).await;

let idle_interval =
self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);

let mempool_expiry =
self.conf.mempool_expiry.unwrap_or(DEFAULT_EXPIRY_TIME);

// Mempool service loop
let mut on_idle_event = tokio::time::interval(idle_interval);
loop {
if let Ok(msg) = self.inbound.recv().await {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx::<DB, VM>(&db, &vm, tx);
if let Err(e) = accept.await {
error!("{}", e);
continue;
tokio::select! {
biased;
_ = on_idle_event.tick() => {
info!(event = "mempool_idle", interval = ?idle_interval);

let expiration_time = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp")
.checked_sub(mempool_expiry)
.expect("valid duration");

// Remove expired transactions from the mempool
db.read().await.update(|db| {
let expired_txs = db.get_expired_txs(expiration_time.as_secs())?;
for tx_id in expired_txs {
info!(event = "expired_tx", hash = hex::encode(tx_id));
if db.delete_tx(tx_id)? {
let event = TransactionEvent::Removed(tx_id);
if let Err(e) = self.event_sender.try_send(event.into()) {
warn!("cannot notify mempool removed transaction {e}")
};
}
}
Ok(())
})?;

let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
},
msg = self.inbound.recv() => {
if let Ok(msg) = msg {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx::<DB, VM>(&db, &vm, tx);
if let Err(e) = accept.await {
error!("{}", e);
continue;
}

let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
}
_ => error!("invalid inbound message payload"),
}
}
_ => error!("invalid inbound message payload"),
}
}
}
Expand Down Expand Up @@ -170,7 +215,12 @@ impl MempoolSrv {

events.push(TransactionEvent::Included(tx));
// Persist transaction in mempool storage
db.add_tx(tx)

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp");

db.add_tx(tx, now.as_secs())
})?;

tracing::info!(
Expand All @@ -187,4 +237,25 @@ impl MempoolSrv {

Ok(())
}

/// Requests full mempool data from N alive peers
///
/// Message flow:
/// GetMempool -> Inv -> GetResource -> Tx
async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
let max_peers = self
.conf
.mempool_download_redundancy
.unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);

let payload = payload::GetMempool {};
if let Err(err) = network
.read()
.await
.send_to_alive_peers(&Message::new_get_mempool(payload), max_peers)
.await
{
error!("could not request mempool from network: {err}");
}
}
}
Loading

0 comments on commit 3e7209a

Please sign in to comment.