Skip to content

Commit

Permalink
Merge pull request #2625 from dusk-network/consecutive_nonce
Browse files Browse the repository at this point in the history
node: mempool - reject moonlight txs with non consecutive nonce
  • Loading branch information
herr-seppia authored Oct 8, 2024
2 parents a3778df + 7c5fcaa commit 410012e
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 56 deletions.
9 changes: 9 additions & 0 deletions node-data/src/ledger/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ impl Transaction {
}
}
}

pub fn next_spending_id(&self) -> Option<SpendingId> {
match &self.inner {
ProtocolTransaction::Phoenix(_) => None,
ProtocolTransaction::Moonlight(m) => {
Some(SpendingId::AccountNonce(*m.sender(), m.nonce() + 1))
}
}
}
}

impl PartialEq<Self> for Transaction {
Expand Down
23 changes: 12 additions & 11 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,23 +631,24 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// block
for tx in tip.inner().txs().iter() {
let tx_id = tx.id();
let deleted = Mempool::delete_tx(t, tx_id)
for deleted in Mempool::delete_tx(t, tx_id, false)
.map_err(|e| warn!("Error while deleting tx: {e}"))
.unwrap_or_default();
if deleted {
events.push(TransactionEvent::Removed(tx_id).into());
.unwrap_or_default()
{
events.push(TransactionEvent::Removed(deleted).into());
}

let spend_ids = tx.to_spend_ids();
for orphan_tx in t.get_txs_by_spendable_ids(&spend_ids) {
let deleted = Mempool::delete_tx(t, orphan_tx)
.map_err(|e| {
warn!("Error while deleting orphan_tx: {e}")
})
.unwrap_or_default();
if deleted {
for deleted_tx in
Mempool::delete_tx(t, orphan_tx, false)
.map_err(|e| {
warn!("Error while deleting orphan_tx: {e}")
})
.unwrap_or_default()
{
events.push(
TransactionEvent::Removed(orphan_tx).into(),
TransactionEvent::Removed(deleted_tx).into(),
);
}
}
Expand Down
6 changes: 5 additions & 1 deletion node/src/chain/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,11 @@ impl<DB: database::DB, VM: vm::VMExecution> Operations for Executor<DB, VM> {
.map_err(OperationError::InvalidEST)?;
let _ = db.update(|m| {
for t in &discarded_txs {
let _ = m.delete_tx(t.id());
if let Ok(_removed) = m.delete_tx(t.id(), true) {
// TODO: `_removed` entries should be sent to rues to inform
// the subscribers that a transaction has been pruned from
// the mempool
}
}
Ok(())
});
Expand Down
10 changes: 9 additions & 1 deletion node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@ pub trait Mempool {
fn get_tx_exists(&self, tx_id: [u8; 32]) -> Result<bool>;

/// Deletes a transaction from the mempool.
fn delete_tx(&self, tx_id: [u8; 32]) -> Result<bool>;
///
/// If `cascade` is true, all dependant transactions are deleted
///
/// Return a vector with all the deleted tx_id
fn delete_tx(
&self,
tx_id: [u8; 32],
cascade: bool,
) -> Result<Vec<[u8; 32]>>;

/// Get transactions hash from the mempool, searching by spendable ids
fn get_txs_by_spendable_ids(&self, n: &[SpendingId]) -> HashSet<[u8; 32]>;
Expand Down
59 changes: 38 additions & 21 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
const CF_CANDIDATES: &str = "cf_candidates";
const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
const CF_MEMPOOL: &str = "cf_mempool";
const CF_MEMPOOL_NULLIFIERS: &str = "cf_mempool_nullifiers";
const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
const CF_METADATA: &str = "cf_metadata";

Expand Down Expand Up @@ -99,10 +99,10 @@ impl Backend {
.cf_handle(CF_MEMPOOL)
.expect("mempool column family must exist");

let nullifiers_cf = self
let spending_id_cf = self
.rocksdb
.cf_handle(CF_MEMPOOL_NULLIFIERS)
.expect("CF_MEMPOOL_NULLIFIERS column family must exist");
.cf_handle(CF_MEMPOOL_SPENDING_ID)
.expect("CF_MEMPOOL_SPENDING_ID column family must exist");

let fees_cf = self
.rocksdb
Expand All @@ -129,7 +129,7 @@ impl Backend {
ledger_txs_cf,
ledger_faults_cf,
mempool_cf,
nullifiers_cf,
spending_id_cf,
fees_cf,
ledger_height_cf,
metadata_cf,
Expand Down Expand Up @@ -206,7 +206,10 @@ impl DB for Backend {
),
ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
ColumnFamilyDescriptor::new(CF_MEMPOOL_NULLIFIERS, mp_opts.clone()),
ColumnFamilyDescriptor::new(
CF_MEMPOOL_SPENDING_ID,
mp_opts.clone(),
),
ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
];

Expand Down Expand Up @@ -282,7 +285,7 @@ pub struct DBTransaction<'db, DB: DBAccess> {

// Mempool column families
mempool_cf: &'db ColumnFamily,
nullifiers_cf: &'db ColumnFamily,
spending_id_cf: &'db ColumnFamily,
fees_cf: &'db ColumnFamily,

metadata_cf: &'db ColumnFamily,
Expand Down Expand Up @@ -733,10 +736,10 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
self.put_cf(self.mempool_cf, hash, tx_data)?;

// Add Secondary indexes //
// Nullifiers
for n in tx.inner.nullifiers() {
// Spending Ids
for n in tx.to_spend_ids() {
let key = n.to_bytes();
self.put_cf(self.nullifiers_cf, key, hash)?;
self.put_cf(self.spending_id_cf, key, hash)?;
}

let timestamp = timestamp.to_be_bytes();
Expand Down Expand Up @@ -769,18 +772,19 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
Ok(self.snapshot.get_cf(self.mempool_cf, h)?.is_some())
}

fn delete_tx(&self, h: [u8; 32]) -> Result<bool> {
fn delete_tx(&self, h: [u8; 32], cascade: bool) -> Result<Vec<[u8; 32]>> {
let mut deleted = vec![];
let tx = self.get_tx(h)?;
if let Some(tx) = tx {
let hash = tx.id();

self.inner.delete_cf(self.mempool_cf, hash)?;

// Delete Secondary indexes
// Delete Nullifiers
for n in tx.inner.nullifiers() {
// Delete spendingids (nullifiers or nonce)
for n in tx.to_spend_ids() {
let key = n.to_bytes();
self.inner.delete_cf(self.nullifiers_cf, key)?;
self.inner.delete_cf(self.spending_id_cf, key)?;
}

// Delete Fee_Hash
Expand All @@ -789,16 +793,27 @@ impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
serialize_key(tx.gas_price(), hash)?,
)?;

return Ok(true);
deleted.push(h);

if cascade {
// Get the next spending id (aka next nonce tx)
// retrieve tx_id and delete it
if let Some(spending_id) = tx.next_spending_id() {
for tx_id in self.get_txs_by_spendable_ids(&[spending_id]) {
let cascade_deleted = self.delete_tx(tx_id, cascade)?;
deleted.extend(cascade_deleted);
}
}
}
}

Ok(false)
Ok(deleted)
}

fn get_txs_by_spendable_ids(&self, n: &[SpendingId]) -> HashSet<[u8; 32]> {
n.iter()
.filter_map(|n| {
match self.snapshot.get_cf(self.nullifiers_cf, n.to_bytes()) {
match self.snapshot.get_cf(self.spending_id_cf, n.to_bytes()) {
Ok(Some(tx_id)) => tx_id.try_into().ok(),
_ => None,
}
Expand Down Expand Up @@ -1290,7 +1305,8 @@ mod tests {

// Delete a contract call
db.update(|txn| {
assert!(txn.delete_tx(t.id()).expect("valid tx"));
let deleted = txn.delete_tx(t.id(), false).expect("valid tx");
assert!(deleted.len() == 1);
Ok(())
})
.unwrap();
Expand Down Expand Up @@ -1358,9 +1374,10 @@ mod tests {
assert_eq!(db.txs_count(), N);

txs.iter().take(D).for_each(|tx| {
assert!(db
.delete_tx(tx.id())
.expect("transaction should be deleted"));
let deleted = db
.delete_tx(tx.id(), false)
.expect("transaction should be deleted");
assert!(deleted.len() == 1);
});

Ok(())
Expand Down
64 changes: 47 additions & 17 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ pub mod conf;

use crate::database::{Ledger, Mempool};
use crate::mempool::conf::Params;
use crate::vm::PreverificationResult;
use crate::{database, vm, LongLivedService, Message, Network};
use async_trait::async_trait;
use conf::{
DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
};
use node_data::events::{Event, TransactionEvent};
use node_data::get_current_timestamp;
use node_data::ledger::Transaction;
use node_data::ledger::{SpendingId, Transaction};
use node_data::message::{payload, AsyncQueue, Payload, Topics};
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -118,8 +119,8 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
let expired_txs = db.get_expired_txs(expiration_time)?;
for tx_id in expired_txs {
info!(event = "expired_tx", hash = hex::encode(tx_id));
if db.delete_tx(tx_id)? {
let event = TransactionEvent::Removed(tx_id);
for deleted_tx_id in db.delete_tx(tx_id, true)? {
let event = TransactionEvent::Removed(deleted_tx_id);
if let Err(e) = self.event_sender.try_send(event.into()) {
warn!("cannot notify mempool removed transaction {e}")
};
Expand Down Expand Up @@ -235,21 +236,50 @@ impl MempoolSrv {
}

let txs_count = view.txs_count();
if txs_count > max_mempool_txn_count {
let tx_to_delete = view
.get_txs_ids_sorted_by_low_fee()?
.map(|(_, tx_id)| tx_id)
.next();
if txs_count >= max_mempool_txn_count {
// Get the lowest fee transaction to delete
Ok(tx_to_delete)
let (lowest_price, to_delete) = view
.get_txs_ids_sorted_by_low_fee()?
.next()
.ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;

if tx.gas_price() < lowest_price {
// Or error if the gas price proposed is the lowest of all
// the transactions in the mempool
Err(TxAcceptanceError::MaxTxnCountExceeded(
max_mempool_txn_count,
))
} else {
Ok(Some(to_delete))
}
} else {
Ok(None)
}
})?;

// VM Preverify call
if let Err(e) = vm.read().await.preverify(tx) {
Err(TxAcceptanceError::VerificationFailed(format!("{e:?}")))?;
let preverification_data =
vm.read().await.preverify(tx).map_err(|e| {
TxAcceptanceError::VerificationFailed(format!("{e:?}"))
})?;

if let PreverificationResult::FutureNonce {
account,
state,
nonce_used,
} = preverification_data
{
db.read().await.view(|db| {
for nonce in state.nonce + 1..nonce_used {
let spending_id = SpendingId::AccountNonce(account, nonce);
if db.get_txs_by_spendable_ids(&[spending_id]).is_empty() {
return Err(TxAcceptanceError::VerificationFailed(
format!("Missing intermediate nonce {nonce}"),
));
}
}
Ok(())
})?;
}

let mut events = vec![];
Expand All @@ -263,10 +293,10 @@ impl MempoolSrv {
for m_tx_id in db.get_txs_by_spendable_ids(&spend_ids) {
if let Some(m_tx) = db.get_tx(m_tx_id)? {
if m_tx.inner.gas_price() < tx.inner.gas_price() {
if db.delete_tx(m_tx_id)? {
events.push(TransactionEvent::Removed(m_tx_id));
for deleted_tx in db.delete_tx(m_tx_id, false)? {
events.push(TransactionEvent::Removed(deleted_tx));
replaced = true;
};
}
} else {
return Err(
TxAcceptanceError::SpendIdExistsInMempool.into()
Expand All @@ -279,9 +309,9 @@ impl MempoolSrv {

if !replaced {
if let Some(to_delete) = tx_to_delete {
if db.delete_tx(to_delete)? {
events.push(TransactionEvent::Removed(to_delete));
};
for deleted in db.delete_tx(to_delete, true)? {
events.push(TransactionEvent::Removed(deleted));
}
}
}
// Persist transaction in mempool storage
Expand Down
17 changes: 16 additions & 1 deletion node/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use dusk_consensus::{
};
use execution_core::signatures::bls::PublicKey as BlsPublicKey;
use execution_core::transfer::data::ContractBytecode;
use execution_core::transfer::moonlight::AccountData;
use node_data::ledger::{Block, SpentTransaction, Transaction};

#[derive(Default)]
Expand Down Expand Up @@ -45,7 +46,10 @@ pub trait VMExecution: Send + Sync + 'static {
to_delete: Vec<[u8; 32]>,
) -> anyhow::Result<()>;

fn preverify(&self, tx: &Transaction) -> anyhow::Result<()>;
fn preverify(
&self,
tx: &Transaction,
) -> anyhow::Result<PreverificationResult>;

fn get_provisioners(
&self,
Expand Down Expand Up @@ -80,6 +84,17 @@ pub trait VMExecution: Send + Sync + 'static {
fn min_gas_limit(&self) -> u64;
}

#[allow(clippy::large_enum_variant)]
pub enum PreverificationResult {
Valid,
// Current account state, nonce used by tx
FutureNonce {
account: BlsPublicKey,
state: AccountData,
nonce_used: u64,
},
}

// Returns gas charge for bytecode deployment.
pub fn bytecode_charge(
bytecode: &ContractBytecode,
Expand Down
Loading

0 comments on commit 410012e

Please sign in to comment.