Skip to content

Commit

Permalink
[pipeline] seq executor run one by one with error returning (#115)
Browse files Browse the repository at this point in the history
* [pipeline] seq executor run one by one with error returning

* Update solana-executor-builder dependency as of marinade-finance/solana-transaction-builder#6 merged
  • Loading branch information
ochaloup authored Sep 26, 2024
1 parent 8b726c9 commit 1970437
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 29 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions settlement-pipelines/src/bin/fund_settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,18 @@ async fn prepare_funding(
}
}

let execution_result = execute_in_sequence(
let execution_result_merging = execute_in_sequence(
rpc_client.clone(),
transaction_executor.clone(),
&mut transaction_builder,
priority_fee_policy,
false,
)
.await;
reporting.add_tx_execution_result(execution_result, "Fund Settlement - Merge Stake Accounts");
reporting.add_tx_execution_result(
execution_result_merging,
"Fund Settlement - Merge Stake Accounts",
);

Ok(())
}
Expand Down Expand Up @@ -648,14 +652,15 @@ async fn fund_settlements(
}
}

let execute_result = execute_in_sequence(
let execute_result_funding = execute_in_sequence(
rpc_client.clone(),
transaction_executor.clone(),
&mut transaction_builder,
priority_fee_policy,
true, // TODO: set false when contract 2.1.0 is deployed (https://github.com/marinade-finance/validator-bonds/pull/77)
)
.await;
reporting.add_tx_execution_result(execute_result, "FundSettlements");
reporting.add_tx_execution_result(execute_result_funding, "FundSettlements");

Ok(())
}
Expand Down
112 changes: 91 additions & 21 deletions settlement-pipelines/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use log::debug;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_transaction_builder::TransactionBuilder;
use solana_transaction_builder_executor::{
builder_to_execution_data, execute_transactions_in_parallel, execute_transactions_in_sequence,
builder_to_execution_data, execute_transaction_data_in_parallel,
execute_transaction_data_in_sequence, TransactionBuilderExecutionData,
TransactionBuilderExecutionErrors,
};
use solana_transaction_executor::{PriorityFeePolicy, TransactionExecutor};
use std::sync::Arc;
Expand All @@ -13,7 +16,7 @@ pub async fn execute_parallel(
executor: Arc<TransactionExecutor>,
builder: &mut TransactionBuilder,
priority_fee_policy: &PriorityFeePolicy,
) -> anyhow::Result<(usize, usize)> {
) -> Result<(usize, usize), TransactionBuilderExecutionErrors> {
execute_parallel_with_rate(
rpc_client,
executor,
Expand All @@ -30,42 +33,109 @@ pub async fn execute_parallel_with_rate(
builder: &mut TransactionBuilder,
priority_fee_policy: &PriorityFeePolicy,
parallel_execution_rate: usize,
) -> anyhow::Result<(usize, usize)> {
let executed_instruction_count = builder.instructions().len();
let execution_data =
builder_to_execution_data(rpc_client.url(), builder, Some(priority_fee_policy.clone()));
let executed_transaction_count = execution_data.len();
execute_transactions_in_parallel(
executor.clone(),
execution_data,
Some(parallel_execution_rate),
)
.await?;
) -> Result<(usize, usize), TransactionBuilderExecutionErrors> {
let execution_data = builder_to_execution_data(
rpc_client.url(),
builder,
Some(priority_fee_policy.clone()),
false,
);
// when all executed successfully then builder should be empty
assert_eq!(
builder.instructions().len(),
0,
"execute_parallel: expected to get all instructions from builder processed"
);
Ok((executed_transaction_count, executed_instruction_count))
let execution_results = execute_transaction_data_in_parallel(
executor.clone(),
&execution_data,
Some(parallel_execution_rate),
)
.await;
handle_execution_results(&execution_data, execution_results)
}

pub async fn execute_in_sequence(
rpc_client: Arc<RpcClient>,
executor: Arc<TransactionExecutor>,
builder: &mut TransactionBuilder,
priority_fee_policy: &PriorityFeePolicy,
) -> anyhow::Result<(usize, usize)> {
let executed_instruction_count = builder.instructions().len();
let execution_data =
builder_to_execution_data(rpc_client.url(), builder, Some(priority_fee_policy.clone()));
let executed_transaction_count = execution_data.len();
execute_transactions_in_sequence(executor.clone(), execution_data).await?;
execute_one_by_one: bool,
) -> Result<(usize, usize), TransactionBuilderExecutionErrors> {
let execution_data = builder_to_execution_data(
rpc_client.url(),
builder,
Some(priority_fee_policy.clone()),
execute_one_by_one,
);
// when all executed successfully then builder should be empty
assert_eq!(
builder.instructions().len(),
0,
"execute_in_sequence: expected to get all instructions from builder processed"
);
Ok((executed_transaction_count, executed_instruction_count))
let execution_results =
execute_transaction_data_in_sequence(executor.clone(), &execution_data, false).await;
handle_execution_results(&execution_data, execution_results)
}

/// Method takes list of data that were about to be executed
/// and the list of errors that came from that execution.
/// It matches the execution data to the list of errors and returns the count of executed transactions and instructions.
fn handle_execution_results(
transaction_builder_execution_data: &[TransactionBuilderExecutionData],
executed_result: Result<(), TransactionBuilderExecutionErrors>,
) -> Result<(usize, usize), TransactionBuilderExecutionErrors> {
let to_execute_transaction_count = transaction_builder_execution_data.len();
let to_execute_instruction_count: usize = transaction_builder_execution_data
.iter()
.map(|data| {
data.prepared_transaction
.transaction
.message
.instructions
.len()
})
.sum();
match executed_result {
Ok(_) => Ok((to_execute_transaction_count, to_execute_instruction_count)),
Err(errors) => {
let failed_transaction_count = errors.len();
let failed_instruction_count: usize = errors
.iter()
.map(|error| {
let failed_tx_uuid = error.tx_uuid.as_str();
transaction_builder_execution_data
.iter()
.find(|data| data.tx_uuid.as_str() == failed_tx_uuid)
.map_or_else(
|| 0,
|data| {
data.prepared_transaction
.transaction
.message
.instructions
.len()
},
)
})
.sum();
assert!(
to_execute_instruction_count >= failed_instruction_count,
"map_executed_data_to_execution_errors: failed_instruction_count should be less or equal to to_execute_instruction_count"
);
assert!(
to_execute_transaction_count >= failed_transaction_count,
"map_executed_data_to_execution_errors: failed_transaction_count should be less or equal to to_execute_transaction_count"
);
debug!(
"Execution errors: executed {}/{} transactions and {}/{} instructions",
to_execute_transaction_count - failed_transaction_count,
to_execute_transaction_count,
to_execute_instruction_count - failed_instruction_count,
to_execute_instruction_count
);
Err(errors)
}
}
}
7 changes: 5 additions & 2 deletions settlement-pipelines/src/reporting.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::cli_result::{CliError, CliResult};
use log::{error, info};
use solana_transaction_builder_executor::TransactionBuilderExecutionErrors;
use std::fmt::Display;
use std::future::Future;
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -75,15 +76,17 @@ impl ErrorHandler {

pub fn add_tx_execution_result<D: Display>(
&mut self,
execution_result: anyhow::Result<(usize, usize)>,
execution_result: Result<(usize, usize), TransactionBuilderExecutionErrors>,
message: D,
) {
match execution_result {
Ok((tx_count, ix_count)) => {
info!("{message}: txes {tx_count}/ixes {ix_count} executed successfully")
}
Err(err) => {
self.add_retry_able_error(err);
for single_error in err.into_iter() {
self.add_retry_able_error(anyhow::Error::from(single_error));
}
}
}
}
Expand Down

0 comments on commit 1970437

Please sign in to comment.