Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Periodically remove expired transactions from mempool #2114

Merged
merged 7 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ console-subscriber = { version = "0.1.8", optional = true }
smallvec = "1.10.0"

serde = "1.0"
humantime-serde = "1"
thiserror = "1"
metrics = "0.22"
metrics-exporter-prometheus = "0.14"
Expand Down
4 changes: 4 additions & 0 deletions node/default.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ max_queue_size = 5000

[mempool]
max_queue_size = 5000
max_mempool_txn_count = 10000
idle_interval = '6h'
mempool_expiry = '3d'
mempool_download_redundancy = 5
10 changes: 8 additions & 2 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use node_data::message::payload::Vote;
use node_data::{Serializable, StepName};
use std::collections::BTreeMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use std::time::{self, Duration, UNIX_EPOCH};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -845,10 +845,16 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// Delete any rocksdb record related to this block
t.delete_block(&b)?;

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|n| n.as_secs())
.expect("valid timestamp");

// Attempt to resubmit transactions back to mempool.
// An error here is not considered critical.
// Txs timestamp is reset here
for tx in b.txs().iter() {
if let Err(e) = Mempool::add_tx(t, tx) {
if let Err(e) = Mempool::add_tx(t, tx, now) {
warn!("failed to resubmit transactions: {e}")
};
}
Expand Down
7 changes: 5 additions & 2 deletions node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ pub trait Candidate {
}

pub trait Mempool {
/// Adds a transaction to the mempool.
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()>;
/// Adds a transaction to the mempool with a timestamp.
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()>;

/// Gets a transaction from the mempool.
fn get_tx(&self, tx_id: [u8; 32]) -> Result<Option<ledger::Transaction>>;
Expand All @@ -147,6 +147,9 @@ pub trait Mempool {
/// Get all transactions hashes.
fn get_txs_ids(&self) -> Result<Vec<[u8; 32]>>;

/// Get all expired transactions.
fn get_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>>;

/// Number of persisted transactions
fn txs_count(&self) -> usize;
}
Expand Down
94 changes: 86 additions & 8 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
}

impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
fn add_tx(&self, tx: &ledger::Transaction) -> Result<()> {
fn add_tx(&self, tx: &ledger::Transaction, timestamp: u64) -> Result<()> {
// Map Hash to serialized transaction
let mut tx_data = vec![];
tx.write(&mut tx_data)?;
Expand All @@ -694,11 +694,15 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
self.put_cf(self.nullifiers_cf, key, hash)?;
}

// Map Fee_Hash to Null to facilitate sort-by-fee
let timestamp = timestamp.to_be_bytes();

// Map Fee_Hash to Timestamp
// Key pair is used to facilitate sort-by-fee
// Also, the timestamp is used to remove expired transactions
self.put_cf(
self.fees_cf,
serialize_key(tx.gas_price(), hash)?,
vec![0],
timestamp,
)?;

Ok(())
Expand Down Expand Up @@ -771,6 +775,44 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
Ok(Box::new(iter))
}

/// Get all expired transactions hashes.
fn get_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
iter.seek_to_first();
let mut txs_list = vec![];

while iter.valid() {
if let Some(key) = iter.key() {
let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;

let tx_timestamp = u64::from_be_bytes(
iter.value()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"no value",
)
})?
.try_into()
.map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"invalid data",
)
})?,
);

if tx_timestamp <= timestamp {
txs_list.push(tx_id);
}
}

iter.next();
}

Ok(txs_list)
}

fn get_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
iter.seek_to_last();
Expand Down Expand Up @@ -1165,7 +1207,7 @@ mod tests {
Backend::create_or_open(path, DatabaseOptions::default());
let t: ledger::Transaction = Faker.fake();

assert!(db.update(|txn| { txn.add_tx(&t) }).is_ok());
assert!(db.update(|txn| { txn.add_tx(&t, 0) }).is_ok());

db.view(|vq| {
assert!(Mempool::get_tx_exists(&vq, t.id()).unwrap());
Expand Down Expand Up @@ -1199,7 +1241,7 @@ mod tests {
db.update(|txn| {
for _i in 0..10u32 {
let t: ledger::Transaction = Faker.fake();
txn.add_tx(&t)?;
txn.add_tx(&t, 0)?;
}
Ok(())
})
Expand Down Expand Up @@ -1238,8 +1280,9 @@ mod tests {

db.update(|db| {
assert_eq!(db.txs_count(), 0);
txs.iter()
.for_each(|t| db.add_tx(&t).expect("tx should be added"));
txs.iter().for_each(|t| {
db.add_tx(&t, 0).expect("tx should be added")
});
Ok(())
})
.unwrap();
Expand Down Expand Up @@ -1276,7 +1319,7 @@ mod tests {
db.update(|txn| {
for i in 0..10u32 {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t)?;
txn.add_tx(&t, 0)?;
}
Ok(())
})
Expand All @@ -1295,6 +1338,41 @@ mod tests {
});
}

#[test]
fn test_get_expired_txs() {
TestWrapper::new("test_get_expired_txs").run(|path| {
let db: Backend =
Backend::create_or_open(path, DatabaseOptions::default());

let mut expiry_list = HashSet::new();
let _ = db.update(|txn| {
(1..101).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
expiry_list.insert(t.id());
});

(1000..1100).for_each(|i| {
let t = ledger::faker::gen_dummy_tx(i as u64);
txn.add_tx(&t, i).expect("tx should be added");
});

Ok(())
});

db.view(|vq| {
let expired: HashSet<[u8; 32]> =
Mempool::get_expired_txs(&vq, 100)
.unwrap()
.into_iter()
.map(|id| id)
.collect();

assert_eq!(expiry_list, expired);
});
});
}

fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
txs.iter()
.map(|t| SpentTransaction {
Expand Down
99 changes: 85 additions & 14 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use crate::database::{Ledger, Mempool};
use crate::mempool::conf::Params;
use crate::{database, vm, LongLivedService, Message, Network};
use async_trait::async_trait;
use conf::{
DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
};
use node_data::events::{Event, TransactionEvent};
use node_data::ledger::Transaction;
use node_data::message::{AsyncQueue, Payload, Topics};
use node_data::message::{payload, AsyncQueue, Payload, Topics};
use std::sync::Arc;
use std::time::{self, UNIX_EPOCH};
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -81,22 +85,63 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
)
.await?;

// Request mempool update from N alive peers
self.request_mempool(&network).await;

let idle_interval =
self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);

let mempool_expiry =
self.conf.mempool_expiry.unwrap_or(DEFAULT_EXPIRY_TIME);

// Mempool service loop
let mut on_idle_event = tokio::time::interval(idle_interval);
loop {
if let Ok(msg) = self.inbound.recv().await {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx::<DB, VM>(&db, &vm, tx);
if let Err(e) = accept.await {
error!("{}", e);
continue;
tokio::select! {
biased;
_ = on_idle_event.tick() => {
info!(event = "mempool_idle", interval = ?idle_interval);

let expiration_time = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp")
.checked_sub(mempool_expiry)
.expect("valid duration");

// Remove expired transactions from the mempool
db.read().await.update(|db| {
let expired_txs = db.get_expired_txs(expiration_time.as_secs())?;
for tx_id in expired_txs {
info!(event = "expired_tx", hash = hex::encode(tx_id));
if db.delete_tx(tx_id)? {
let event = TransactionEvent::Removed(tx_id);
if let Err(e) = self.event_sender.try_send(event.into()) {
warn!("cannot notify mempool removed transaction {e}")
};
}
}
Ok(())
})?;

let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
},
msg = self.inbound.recv() => {
if let Ok(msg) = msg {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx::<DB, VM>(&db, &vm, tx);
if let Err(e) = accept.await {
error!("{}", e);
continue;
}

let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
}
_ => error!("invalid inbound message payload"),
}
}
_ => error!("invalid inbound message payload"),
}
}
}
Expand Down Expand Up @@ -170,7 +215,12 @@ impl MempoolSrv {

events.push(TransactionEvent::Included(tx));
// Persist transaction in mempool storage
db.add_tx(tx)

let now = time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("valid timestamp");

db.add_tx(tx, now.as_secs())
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved
})?;

tracing::info!(
Expand All @@ -187,4 +237,25 @@ impl MempoolSrv {

Ok(())
}

/// Requests full mempool data from N alive peers
///
/// Message flow:
/// GetMempool -> Inv -> GetResource -> Tx
async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
let max_peers = self
.conf
.mempool_download_redundancy
.unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);

let payload = payload::GetMempool {};
if let Err(err) = network
.read()
.await
.send_to_alive_peers(&Message::new_get_mempool(payload), max_peers)
.await
{
error!("could not request mempool from network: {err}");
}
}
}
Loading
Loading