Skip to content

Commit

Permalink
Merge pull request #2495 from dusk-network/tx-preverify
Browse files Browse the repository at this point in the history
rusk: fix HTTP preverification
  • Loading branch information
herr-seppia authored Sep 28, 2024
2 parents 165080a + aea54f7 commit 50bafe1
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 38 deletions.
5 changes: 5 additions & 0 deletions node/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub trait DB: Send + Sync + 'static {
where
F: for<'a> FnOnce(&Self::P<'a>) -> Result<T>;

fn update_dry_run<F, T>(&self, dry_run: bool, f: F) -> Result<T>
where
F: for<'a> FnOnce(&Self::P<'a>) -> Result<T>;

fn close(&mut self);
}

Expand Down Expand Up @@ -183,6 +187,7 @@ pub trait Persist:

fn clear_database(&self) -> Result<()>;
fn commit(self) -> Result<()>;
fn rollback(self) -> Result<()>;
}

pub fn into_array<const N: usize>(value: &[u8]) -> [u8; N] {
Expand Down
23 changes: 21 additions & 2 deletions node/src/database/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ impl DB for Backend {
}

fn update<F, T>(&self, execute: F) -> Result<T>
where
F: for<'a> FnOnce(&Self::P<'a>) -> Result<T>,
{
self.update_dry_run(false, execute)
}

fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
where
F: for<'a> FnOnce(&Self::P<'a>) -> Result<T>,
{
Expand All @@ -244,8 +251,12 @@ impl DB for Backend {
// storage
let ret = execute(&tx)?;

// Apply changes in atomic way
tx.commit()?;
if dry_run {
tx.rollback()?;
} else {
// Apply changes in atomic way
tx.commit()?;
}

Ok(ret)
}
Expand Down Expand Up @@ -702,6 +713,14 @@ impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {

Ok(())
}

fn rollback(self) -> Result<()> {
if let Err(e) = self.inner.rollback() {
return Err(anyhow::Error::new(e).context("failed to rollback"));
}

Ok(())
}
}

impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
Expand Down
53 changes: 34 additions & 19 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{error, info, warn};
const TOPICS: &[u8] = &[Topics::Tx as u8];

#[derive(Debug, Error)]
enum TxAcceptanceError {
pub enum TxAcceptanceError {
#[error("this transaction exists in the mempool")]
AlreadyExistsInMempool,
#[error("this transaction exists in the ledger")]
Expand Down Expand Up @@ -129,9 +129,9 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
if let Ok(msg) = msg {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx::<DB, VM>(&db, &vm, tx);
let accept = self.accept_tx(&db, &vm, tx);
if let Err(e) = accept.await {
error!("{}", e);
error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
continue;
}

Expand Down Expand Up @@ -161,12 +161,40 @@ impl MempoolSrv {
vm: &Arc<RwLock<VM>>,
tx: &Transaction,
) -> Result<(), TxAcceptanceError> {
let max_mempool_txn_count = self.conf.max_mempool_txn_count;

let events =
MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
.await?;

tracing::info!(
event = "transaction accepted",
hash = hex::encode(tx.id())
);

for tx_event in events {
let node_event = tx_event.into();
if let Err(e) = self.event_sender.try_send(node_event) {
warn!("cannot notify mempool accepted transaction {e}")
};
}

Ok(())
}

pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
db: &Arc<RwLock<DB>>,
vm: &Arc<RwLock<VM>>,
tx: &'t Transaction,
dry_run: bool,
max_mempool_txn_count: usize,
) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
let tx_id = tx.id();

// Perform basic checks on the transaction
db.read().await.view(|view| {
let count = view.txs_count();
if count >= self.conf.max_mempool_txn_count {
if count >= max_mempool_txn_count {
return Err(TxAcceptanceError::MaxTxnCountExceeded(count));
}

Expand All @@ -191,7 +219,7 @@ impl MempoolSrv {
let mut events = vec![];

// Try to add the transaction to the mempool
db.read().await.update(|db| {
db.read().await.update_dry_run(dry_run, |db| {
let spend_ids = tx.to_spend_ids();

// ensure spend_ids do not exist in the mempool
Expand All @@ -216,20 +244,7 @@ impl MempoolSrv {

db.add_tx(tx, now)
})?;

tracing::info!(
event = "transaction accepted",
hash = hex::encode(tx_id)
);

for tx_event in events {
let node_event = tx_event.into();
if let Err(e) = self.event_sender.try_send(node_event) {
warn!("cannot notify mempool accepted transaction {e}")
};
}

Ok(())
Ok(events)
}

/// Requests full mempool data from N alive peers
Expand Down
2 changes: 1 addition & 1 deletion rusk-wallet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub enum Error {
#[error("Cache database corrupted")]
CacheDatabaseCorrupted,
/// Prover errors from execution-core
#[error("Prover Error")]
#[error("Prover Error: {0}")]
ProverError(String),
/// Memo provided is too large
#[error("Memo too large {0}")]
Expand Down
26 changes: 25 additions & 1 deletion rusk/src/lib/http/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;
use execution_core::transfer::Transaction as ProtocolTransaction;
use node::database::rocksdb::{Backend, DBTransaction};
use node::database::{Mempool, DB};
use node::mempool::MempoolSrv;
use node::network::Kadcast;
use node::Network;
use node_data::ledger::Transaction;
Expand Down Expand Up @@ -51,13 +52,18 @@ fn variables_from_headers(headers: &Map<String, Value>) -> Variables {
#[async_trait]
impl HandleRequest for RuskNode {
fn can_handle(&self, request: &MessageRequest) -> bool {
matches!(request.event.to_route(), (Target::Host(_), "Chain", _))
let route = request.event.to_route();
if matches!(route, (Target::Host(_), "rusk", "preverify")) {
return true;
}
matches!(route, (Target::Host(_), "Chain", _))
}

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
#[allow(clippy::match_like_matches_macro)]
match request.uri.inner() {
("graphql", _, "query") => true,
("transactions", _, "preverify") => true,
("transactions", _, "propagate") => true,
("network", _, "peers") => true,
("node", _, "info") => true,
Expand All @@ -73,6 +79,9 @@ impl HandleRequest for RuskNode {
("graphql", _, "query") => {
self.handle_gql(&request.data, &request.headers).await
}
("transactions", _, "preverify") => {
self.handle_preverify(request.data.as_bytes()).await
}
("transactions", _, "propagate") => {
self.propagate_tx(request.data.as_bytes()).await
}
Expand Down Expand Up @@ -101,6 +110,9 @@ impl HandleRequest for RuskNode {
(Target::Host(_), "Chain", "gql") => {
self.handle_gql(&request.event.data, &request.headers).await
}
(Target::Host(_), "rusk", "preverify") => {
self.handle_preverify(request.event_data()).await
}
(Target::Host(_), "Chain", "propagate_tx") => {
self.propagate_tx(request.event_data()).await
}
Expand Down Expand Up @@ -153,6 +165,18 @@ impl RuskNode {
Ok(ResponseData::new(data))
}

async fn handle_preverify(
&self,
data: &[u8],
) -> anyhow::Result<ResponseData> {
let tx = execution_core::transfer::Transaction::from_slice(data)
.map_err(|e| anyhow::anyhow!("Invalid Data {e:?}"))?;
let db = self.inner().database();
let vm = self.inner().vm_handler();
MempoolSrv::check_tx(&db, &vm, &tx.into(), true, usize::MAX).await?;
Ok(ResponseData::new(DataType::None))
}

async fn propagate_tx(&self, tx: &[u8]) -> anyhow::Result<ResponseData> {
let tx: Transaction = ProtocolTransaction::from_slice(tx)
.map_err(|e| anyhow::anyhow!("Invalid Data {e:?}"))?
Expand Down
25 changes: 10 additions & 15 deletions rusk/src/lib/http/rusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,24 @@ const RUSK_FEEDER_HEADER: &str = "Rusk-Feeder";
#[async_trait]
impl HandleRequest for Rusk {
fn can_handle(&self, request: &MessageRequest) -> bool {
let route = request.event.to_route();
if matches!(route, (Target::Host(_), "rusk", "preverify")) {
// moved to chain
// here just for backward compatibility
return false;
}
if route.2.starts_with("prove_") {
return false;
}
matches!(
&request.event.to_route(),
route,
(Target::Contract(_), ..) | (Target::Host(_), "rusk", _)
)
}
fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
#[allow(clippy::match_like_matches_macro)]
match request.uri.inner() {
("contracts", Some(_), _) => true,
("transactions", _, "preverify") => true,
("node", _, "provisioners") => true,
("node", _, "crs") => true,
_ => false,
Expand All @@ -50,9 +58,6 @@ impl HandleRequest for Rusk {
let data = request.data.as_bytes();
self.handle_contract_query(contract_id, method, data, feeder)
}
("transactions", _, "preverify") => {
self.handle_preverify(request.data.as_bytes())
}
("node", _, "provisioners") => self.get_provisioners(),
("node", _, "crs") => self.get_crs(),
_ => Err(anyhow::anyhow!("Unsupported")),
Expand All @@ -68,9 +73,6 @@ impl HandleRequest for Rusk {
let feeder = request.header(RUSK_FEEDER_HEADER).is_some();
self.handle_contract_query_legacy(&request.event, feeder)
}
(Target::Host(_), "rusk", "preverify") => {
self.handle_preverify(request.event_data())
}
(Target::Host(_), "rusk", "provisioners") => {
self.get_provisioners()
}
Expand Down Expand Up @@ -124,13 +126,6 @@ impl Rusk {
}
}

fn handle_preverify(&self, data: &[u8]) -> anyhow::Result<ResponseData> {
let tx = execution_core::transfer::Transaction::from_slice(data)
.map_err(|e| anyhow::anyhow!("Invalid Data {e:?}"))?;
self.preverify(&tx.into())?;
Ok(ResponseData::new(DataType::None))
}

fn get_provisioners(&self) -> anyhow::Result<ResponseData> {
let prov: Vec<_> = self
.provisioners(None)
Expand Down

0 comments on commit 50bafe1

Please sign in to comment.