Skip to content

Commit

Permalink
Merge pull request #2592 from dusk-network/failed_finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia authored Oct 4, 2024
2 parents 28ced3e + 197a0b2 commit db53ed3
Show file tree
Hide file tree
Showing 8 changed files with 403 additions and 44 deletions.
4 changes: 2 additions & 2 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
if let Some((prev_final_state, mut new_finals)) = final_results {
let (_, new_final_state) =
new_finals.pop_last().expect("new_finals to be not empty");
let states_to_forget = new_finals
let old_finals_to_merge = new_finals
.into_values()
.chain([prev_final_state])
.collect::<Vec<_>>();
vm.finalize_state(new_final_state, states_to_forget)?;
vm.finalize_state(new_final_state, old_finals_to_merge)?;
}

anyhow::Ok((label, finalized))
Expand Down
42 changes: 20 additions & 22 deletions rusk-recovery/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,6 @@ fn generate_stake_state(
snapshot: &Snapshot,
) -> Result<(), Box<dyn Error>> {
let theme = Theme::default();
session
.call::<_, ()>(
STAKE_CONTRACT,
"insert_stake",
&(
*DUSK_CONSENSUS_KEY,
*DUSK_CONSENSUS_KEY,
StakeData::default(),
),
u64::MAX,
)
.expect("stake to be inserted into the state");
snapshot.stakes().enumerate().for_each(|(idx, staker)| {
info!("{} provisioner #{}", theme.action("Generating"), idx);

Expand Down Expand Up @@ -232,6 +220,19 @@ fn generate_empty_state<P: AsRef<Path>>(
u64::MAX,
)?;

session
.call::<_, ()>(
STAKE_CONTRACT,
"insert_stake",
&(
*DUSK_CONSENSUS_KEY,
*DUSK_CONSENSUS_KEY,
StakeData::default(),
),
u64::MAX,
)
.expect("stake to be inserted into the state");

session
.call::<_, ()>(
TRANSFER_CONTRACT,
Expand Down Expand Up @@ -274,12 +275,8 @@ where
let state_dir = state_dir.as_ref();
let state_id_path = rusk_profile::to_rusk_state_id_path(state_dir);

let mut loaded = false;
let (vm, old_commit_id) = match snapshot.base_state() {
Some(state) => {
loaded = true;
load_state(state_dir, state)
}
Some(state) => load_state(state_dir, state),
None => generate_empty_state(state_dir, snapshot),
}?;

Expand All @@ -300,12 +297,13 @@ where
fs::write(state_id_path, commit_id)?;

if old_commit_id != commit_id {
if !loaded {
vm.finalize_commit(old_commit_id)?;
}
vm.delete_commit(old_commit_id)?;
info!(
"{} {}",
theme.action("Finalizing"),
hex::encode(old_commit_id)
);
vm.finalize_commit(old_commit_id)?;
}
vm.finalize_commit(commit_id)?;

info!("{} {}", theme.action("Init Root"), hex::encode(commit_id));

Expand Down
24 changes: 5 additions & 19 deletions rusk/src/lib/node/rusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use std::{fs, io};
use execution_core::stake::StakeKeys;
use execution_core::transfer::PANIC_NONCE_NOT_READY;
use parking_lot::RwLock;
use tokio::task;
use tracing::{debug, info};
use tracing::info;

use dusk_bytes::{DeserializableSlice, Serializable};
use dusk_consensus::config::{
Expand All @@ -34,7 +33,7 @@ use execution_core::{
use node::vm::bytecode_charge;
use node_data::events::contract::ContractTxEvent;
use node_data::ledger::{Hash, Slash, SpentTransaction, Transaction};
use rusk_abi::{CallReceipt, PiecrustError, Session, VM};
use rusk_abi::{CallReceipt, PiecrustError, Session};
use rusk_profile::to_rusk_state_id_path;
use tokio::sync::broadcast;
#[cfg(feature = "archive")]
Expand Down Expand Up @@ -482,15 +481,10 @@ impl Rusk {
base: [u8; 32],
to_delete: Vec<[u8; 32]>,
) -> Result<()> {
self.vm.finalize_commit(base)?;
self.tip.write().base = base;

// Deleting commits is blocking, meaning it will wait until any process
// using the commit is done. This includes any queries that are
// currently executing.
// Since we do want commits to be deleted, but don't want block
// finalization to wait, we spawn a new task to delete the commits.
task::spawn(delete_commits(self.vm.clone(), to_delete));
for d in to_delete {
self.vm.finalize_commit(d)?;
}
Ok(())
}

Expand All @@ -499,14 +493,6 @@ impl Rusk {
}
}

async fn delete_commits(vm: Arc<VM>, commits: Vec<[u8; 32]>) {
for commit in commits {
if let Err(err) = vm.delete_commit(commit) {
debug!("failed deleting commit {}: {err}", hex::encode(commit));
}
}
}

#[allow(clippy::too_many_arguments)]
fn accept(
session: Session,
Expand Down
124 changes: 124 additions & 0 deletions rusk/tests/assets/deploy.json

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions rusk/tests/assets/mempool.json

Large diffs are not rendered by default.

118 changes: 117 additions & 1 deletion rusk/tests/common/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ pub fn new_state<P: AsRef<Path>>(
dir: P,
snapshot: &Snapshot,
block_gas_limit: u64,
) -> Result<Rusk> {
new_state_with_chainid(dir, snapshot, block_gas_limit, CHAIN_ID)
}

// Creates a Rusk initial state in the given directory
pub fn new_state_with_chainid<P: AsRef<Path>>(
dir: P,
snapshot: &Snapshot,
block_gas_limit: u64,
chain_id: u8,
) -> Result<Rusk> {
let dir = dir.as_ref();

Expand All @@ -48,7 +58,7 @@ pub fn new_state<P: AsRef<Path>>(

let rusk = Rusk::new(
dir,
CHAIN_ID,
chain_id,
None,
DEFAULT_GAS_PER_DEPLOY_BYTE,
DEFAULT_MIN_DEPLOYMENT_GAS_PRICE,
Expand Down Expand Up @@ -183,3 +193,109 @@ pub fn generator_procedure(

Ok(accept_txs)
}

/// Executes the procedure a block generator will go through to generate a block
/// including all specified transactions, checking the outputs are as
/// expected. If not `expected` is specified, all txs must be included in the
/// block
#[allow(dead_code)]
pub fn generator_procedure2(
rusk: &Rusk,
txs: &[Transaction],
block_height: u64,
block_gas_limit: u64,
missed_generators: Vec<BlsPublicKey>,
expected: Option<ExecuteResult>,
) -> anyhow::Result<(Vec<SpentTransaction>, [u8; 32])> {
let expected = expected.unwrap_or(ExecuteResult {
executed: txs.len(),
discarded: 0,
});

let txs: Vec<_> = txs.iter().map(|t| t.clone().into()).collect();
for tx in &txs {
rusk.preverify(tx)?;
}

let generator_pubkey = node_data::bls::PublicKey::new(*DUSK_CONSENSUS_KEY);
let generator_pubkey_bytes = *generator_pubkey.bytes();
let round = block_height;

let mut failed_iterations = IterationsInfo::default();
for to_slash in &missed_generators {
failed_iterations.att_list.push(Some((
Attestation {
result: node_data::message::payload::RatificationResult::Fail(
Vote::NoCandidate,
),
..Default::default()
},
PublicKeyBytes(to_slash.to_bytes()),
)));
}

let faults = vec![];

let to_slash =
Slash::from_iterations_and_faults(&failed_iterations, &faults)?;

let voter = (generator_pubkey.clone(), 1);
let voters_size =
VALIDATION_COMMITTEE_CREDITS + RATIFICATION_COMMITTEE_CREDITS;
let voters = vec![voter; voters_size];

let call_params = CallParams {
round,
generator_pubkey,
to_slash,
voters_pubkey: voters.clone(),
max_txs_bytes: usize::MAX,
};

let (transfer_txs, discarded, execute_output) =
rusk.execute_state_transition(&call_params, txs.into_iter())?;

assert_eq!(transfer_txs.len(), expected.executed, "all txs accepted");
assert_eq!(discarded.len(), expected.discarded, "no discarded tx");

info!(
"execute_state_transition new verification: {}",
execute_output
);

let txs: Vec<_> = transfer_txs.into_iter().map(|tx| tx.inner).collect();

let block = Block::new(
Header {
height: block_height,
gas_limit: block_gas_limit,
generator_bls_pubkey: generator_pubkey_bytes,
state_hash: execute_output.state_root,
event_bloom: execute_output.event_bloom,
failed_iterations,
..Default::default()
},
txs,
vec![],
)
.expect("valid block");

let verify_output = rusk.verify_state_transition(&block, &voters)?;
info!("verify_state_transition new verification: {verify_output}",);

let (accept_txs, accept_output) = rusk.accept(&block, &voters)?;

assert_eq!(accept_txs.len(), expected.executed, "all txs accepted");

info!(
"accept block {} with new verification: {accept_output}",
block_height,
);

assert_eq!(
accept_output, execute_output,
"Verification outputs should be equal"
);

Ok((accept_txs, accept_output.state_root))
}
122 changes: 122 additions & 0 deletions rusk/tests/services/deploy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::collections::BTreeMap;
use std::path::Path;

use rusk::{Result, Rusk};
use serde_json::Value;
use tempfile::tempdir;
use tracing::info;

use crate::common::logger;
use crate::common::state::{generator_procedure2, new_state_with_chainid};

const BLOCK_GAS_LIMIT: u64 = 100_000_000_000;

// Creates the Rusk initial state for the tests below
fn initial_state<P: AsRef<Path>>(dir: P) -> Result<Rusk> {
let snapshot = toml::from_str(include_str!(
"../../../rusk-recovery/config/testnet.toml"
))
.expect("Cannot deserialize config");

new_state_with_chainid(dir, &snapshot, BLOCK_GAS_LIMIT, 0x3)
}

#[tokio::test(flavor = "multi_thread")]
pub async fn deploy_fail() -> Result<()> {
// Setup the logger
logger();

let tmp = tempdir().expect("Should be able to create temporary directory");
let rusk = initial_state(&tmp)?;

let txs = include_str!("../assets/deploy.json");
let txs: Value = serde_json::from_str(txs).unwrap();

let txs = txs
.as_object()
.unwrap()
.get("transactions")
.unwrap()
.as_array()
.unwrap();
let finalize_up_to = 352;
let finalize_at = 353;

let mut state_to_delete = vec![];
let mut new_base = [0u8; 32];
let mut txs_by_height = BTreeMap::new();
for t in txs.iter().rev() {
let block_height = t.get("blockHeight").unwrap().as_u64().unwrap();
let raw = t.get("tx").unwrap().get("raw").unwrap().as_str().unwrap();
let raw = hex::decode(raw).unwrap();
let tx =
execution_core::transfer::Transaction::from_slice(&raw).unwrap();
let txs = txs_by_height.entry(block_height).or_insert(vec![]);
txs.push(tx);
}

for (&block_height, txs) in txs_by_height.iter() {
let (_, state) = generator_procedure2(
&rusk,
txs,
block_height,
50000000,
vec![],
None,
)
.unwrap();
if block_height == finalize_at {
info!("finalizing state up to {finalize_up_to} ");
rusk.finalize_state(new_base, state_to_delete.clone())?;
} else if block_height < finalize_up_to {
state_to_delete.push(state);
} else if block_height == finalize_up_to {
new_base = state
} else {
unreachable!()
}
println!("height: {block_height} - state: {}", hex::encode(state));
}

rusk_recovery_tools::state::restore_state(&tmp)?;

let txs = include_str!("../assets/mempool.json");
let txs: Value = serde_json::from_str(txs).unwrap();

let txs = txs
.as_object()
.unwrap()
.get("mempoolTxs")
.unwrap()
.as_array()
.unwrap();

let mut mempool = vec![];
for t in txs.iter().rev() {
let raw = t.get("raw").unwrap().as_str().unwrap();
let raw = hex::decode(raw).unwrap();
let tx =
execution_core::transfer::Transaction::from_slice(&raw).unwrap();
mempool.push(tx);
}

generator_procedure2(
&rusk,
&mempool,
finalize_at + 1,
50000000,
vec![],
None,
)
.unwrap();

rusk_recovery_tools::state::restore_state(tmp)?;

Ok(())
}
Loading

0 comments on commit db53ed3

Please sign in to comment.