From a13cf31e8302f56cd500afbcab916cf478cb3f34 Mon Sep 17 00:00:00 2001 From: Ondra Chaloupka Date: Mon, 25 Mar 2024 14:48:20 +0100 Subject: [PATCH] [web3js] fixing split to have chance to define unsplittable tx stuff --- packages/lib/web3js-common/src/tx.ts | 188 ++++++++++++++++++----- packages/lib/web3js-common/src/txBulk.ts | 162 +++++++++++++------ 2 files changed, 271 insertions(+), 79 deletions(-) diff --git a/packages/lib/web3js-common/src/tx.ts b/packages/lib/web3js-common/src/tx.ts index 8eaa1b8..aaf6c76 100644 --- a/packages/lib/web3js-common/src/tx.ts +++ b/packages/lib/web3js-common/src/tx.ts @@ -26,6 +26,7 @@ import { isLevelEnabled, checkErrorMessage, sleep, + logError, } from '@marinade.finance/ts-common' import { Provider, @@ -515,6 +516,49 @@ export type TransactionData = { signers: (Wallet | Keypair | Signer)[] } +/** + * This is a special marker transaction for splitting instructions in a transaction + * for splitting execution. It is not meant to be executed on chain. + * It is used to mark that any following instructions should be executed in a separate + * transaction and not mixed with the previous instructions. + * + * If you want to use the marker and you need to setup some instructions + * cannot be splitted from each other then place this marker into the set + * of instructions before the instructions that should not be splitted. + * + * Using start marker means to finish the previous split block. + * Having the same meaning as use the end marker. + */ +export class TransactionInstructionSplitMarkerStart extends TransactionInstruction { + constructor() { + super({ + keys: [], + programId: PublicKey.default, + data: Buffer.alloc(0), + }) + } +} +export class TransactionInstructionSplitMarkerEnd extends TransactionInstruction { + constructor() { + super({ + keys: [], + programId: PublicKey.default, + data: Buffer.alloc(0), + }) + } +} +export const SPLIT_MARKER_START_INSTANCE = + new TransactionInstructionSplitMarkerStart() +export const SPLIT_MARKER_END_INSTANCE = + new TransactionInstructionSplitMarkerEnd() + +function isSplitMarkerInstruction(ix: TransactionInstruction): boolean { + return ( + ix instanceof TransactionInstructionSplitMarkerStart || + ix instanceof TransactionInstructionSplitMarkerEnd + ) +} + export type SplitAndExecuteTxData = TransactionData /** @@ -553,11 +597,18 @@ export async function splitAndExecuteTx({ computeUnitPrice, }: ExecuteTxParams): Promise<(ExecuteTxReturn & SplitAndExecuteTxData)[]> { const result: (ExecuteTxReturn & SplitAndExecuteTxData)[] = [] - if (transaction.instructions.length === 0) { + if ( + transaction.instructions.filter(ix => !isSplitMarkerInstruction(ix)) + .length === 0 + ) { return result } if (!simulate && printOnly) { + // remove non executable instructions + transaction.instructions = transaction.instructions.filter( + ix => !isSplitMarkerInstruction(ix) + ) // only to print in base64, returning empty array -> no execution await executeTx({ connection, @@ -614,29 +665,39 @@ export async function splitAndExecuteTx({ lastValidBlockHeight: transaction.lastValidBlockHeight, } } - let checkingTransaction = await getTransaction(feePayerDefined, blockhash) - addComputeBudgetIxes({ - transaction: checkingTransaction, + let lastValidTransaction = await generateNewTransaction({ + feePayer: feePayerDefined, + bh: blockhash, computeUnitLimit, computeUnitPrice, }) - let lastValidTransaction = await getLastValidTransaction( - checkingTransaction, - blockhash, - feePayerDefined - ) - for (const ix of transaction.instructions) { - checkingTransaction.add(ix) + let transactionStartIndex = 0 + let splitMarkerStartIdx = Number.MAX_SAFE_INTEGER + for (let i = 0; i < transaction.instructions.length; i++) { + // TODO: delete me! + logInfo(logger, 'processing index: ' + i) + const ix = transaction.instructions[i] + if (ix instanceof TransactionInstructionSplitMarkerStart) { + splitMarkerStartIdx = i + continue + } + if (ix instanceof TransactionInstructionSplitMarkerEnd) { + splitMarkerStartIdx = Number.MAX_SAFE_INTEGER + continue + } + // TODO: delete me! + logInfo(logger, 'not split marker index: ' + i) + lastValidTransaction.add(ix) const filteredSigners = filterSignersForInstruction( - checkingTransaction.instructions, + lastValidTransaction.instructions, signers, feePayerDefined ) const signaturesSize = filteredSigners.length * 64 let txSize: number | undefined = undefined try { - txSize = checkingTransaction.serialize({ + txSize = lastValidTransaction.serialize({ verifySignatures: false, requireAllSignatures: false, }).byteLength @@ -645,29 +706,76 @@ export async function splitAndExecuteTx({ logDebug(logger, 'Transaction size calculation failed: ' + e) } - // we tried to add the instruction to checkingTransaction + // we tried to add the instruction to lastValidTransaction // when it was already too big, so we need to split it if ( txSize === undefined || txSize + signaturesSize > TRANSACTION_SAFE_SIZE ) { // size was elapsed, need to split - transactions.push(lastValidTransaction) - // nulling data of the checking transaction, but using the latest ix from for cycle - // as it was kicked-off from the lastValidTransaction - checkingTransaction = await getTransaction(feePayerDefined, blockhash) - addComputeBudgetIxes({ - transaction: checkingTransaction, + // need to consider existence of nonPossibleToSplitMarker + const transactionAdd = await generateNewTransaction({ + feePayer: feePayerDefined, + bh: blockhash, + computeUnitLimit, + computeUnitPrice, + }) + let addIdx: number + for ( + addIdx = transactionStartIndex; + addIdx < i && addIdx <= splitMarkerStartIdx; + addIdx++ + ) { + // TODO: delete me! + logInfo( + logger, + `Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}` + ) + if (isSplitMarkerInstruction(transaction.instructions[addIdx])) { + continue + } + transactionAdd.add(transaction.instructions[addIdx]) + } + if (transactionAdd.instructions.length === 0) { + logError( + logger, + `Working with instructions number: ${transaction.instructions}, ` + + `current instruction index: ${i}, last split marker index: ${splitMarkerStartIdx}` + + ` and transaction start index: ${transactionStartIndex}, last valid transaction: ${JSON.stringify( + lastValidTransaction + )}` + ) + throw new Error( + 'splitAndExecuteTx: no instructions to be added to the transaction, ' + + 'most probably the transaction contains split markers ' + + TransactionInstructionSplitMarkerStart.name + + ' at indexes that the instructions cannot be split to executable chunks.' + ) + } + transactions.push(transactionAdd) + // TODO: delete me! + logInfo( + logger, + `transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}` + ) + // we processed until i minus one; + // next outer loop increases i and we need to start from the same instruction + // as the current position is + i = addIdx - 1 + transactionStartIndex = addIdx + // TODO: delete me! + logInfo( + logger, + `after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}` + ) + // nulling data of the next transaction to check + lastValidTransaction = await generateNewTransaction({ + feePayer: feePayerDefined, + bh: blockhash, computeUnitLimit, computeUnitPrice, }) - checkingTransaction.add(ix) } - lastValidTransaction = await getLastValidTransaction( - checkingTransaction, - blockhash, - feePayerDefined - ) } if (lastValidTransaction.instructions.length !== 0) { transactions.push(lastValidTransaction) @@ -719,17 +827,27 @@ export async function splitAndExecuteTx({ return result } -async function getLastValidTransaction( - checkingTransaction: Transaction, - blockhash: Readonly<{ +async function generateNewTransaction({ + feePayer, + bh, + computeUnitLimit, + computeUnitPrice, +}: { + feePayer: PublicKey + bh: Readonly<{ blockhash: string lastValidBlockHeight: number - }>, - feePayer: PublicKey -): Promise { - const lastValidTransaction = await getTransaction(feePayer, blockhash) - checkingTransaction.instructions.forEach(ix => lastValidTransaction.add(ix)) - return lastValidTransaction + }> + computeUnitLimit?: number + computeUnitPrice?: number +}): Promise { + const transaction = await getTransaction(feePayer, bh) + addComputeBudgetIxes({ + transaction, + computeUnitLimit, + computeUnitPrice, + }) + return transaction } /** diff --git a/packages/lib/web3js-common/src/txBulk.ts b/packages/lib/web3js-common/src/txBulk.ts index 5768e28..4f52213 100644 --- a/packages/lib/web3js-common/src/txBulk.ts +++ b/packages/lib/web3js-common/src/txBulk.ts @@ -1,5 +1,6 @@ import { LoggerPlaceholder, + logDebug, logError, logInfo, } from '@marinade.finance/ts-common' @@ -22,6 +23,7 @@ import { splitAndExecuteTx, } from './tx' import { instanceOfProvider } from './provider' +import { ExecutionError } from './error' export type ExecuteTxReturnExecutedUnknown = { signature?: string @@ -69,26 +71,38 @@ export async function splitAndBulkExecuteTx({ computeUnitLimit, computeUnitPrice, numberOfRetries = 0, -}: BulkExecuteTxInput): Promise< +}: Omit): Promise< (BulkExecuteTxSimulatedReturn | BulkExecuteTxExecutedReturn)[] > { connection = instanceOfProvider(connection) ? connection.connection : connection - const resultSimulated = await splitAndExecuteTx({ - connection, - transaction, - errMessage, - signers, - feePayer, - simulate: true, - printOnly, - logger, - sendOpts, - confirmOpts, - computeUnitLimit, - computeUnitPrice, - }) + + let resultSimulated: BulkExecuteTxSimulatedReturn[] = [] + const numberOfSimulation = numberOfRetries + 1 + for (let i = 1; i <= numberOfSimulation; i++) { + try { + resultSimulated = await splitAndExecuteTx({ + connection, + transaction, + errMessage, + signers, + feePayer, + simulate: true, + printOnly, + logger, + sendOpts, + confirmOpts, + computeUnitLimit, + computeUnitPrice, + }) + break + } catch (e) { + if (i >= numberOfSimulation) { + throw e + } + } + } if (printOnly || simulate) { return resultSimulated } @@ -113,22 +127,31 @@ export async function splitAndBulkExecuteTx({ } ) + let failures: ExecutionError[] = [] // let's send to land the transaction on blockchain const numberOfSends = numberOfRetries + 1 for (let i = 1; i <= numberOfSends; i++) { - try { - await bulkSend({ - connection, - logger, - sendOpts, - confirmOpts, - data: resultExecuted, - retryAttempt: i, - }) - } catch (e) { - logError(logger, `Bulk #${i} sending failed with error: ${e}`) + ;({ failures } = await bulkSend({ + connection, + logger, + sendOpts, + confirmOpts, + data: resultExecuted, + retryAttempt: i, + })) + if (failures.length === 0) { + break } } + if (failures.length > 0) { + for (const err of failures) { + logError(logger, err.messageWithCause()) + } + throw new Error( + 'splitAndBulkExecuteTx failed with errors, see logs above' + + `${failures.length} errors of ${resultExecuted.length} transactions` + ) + } return resultExecuted } @@ -149,7 +172,7 @@ async function bulkSend({ } & { data: BulkExecuteTxExecutedReturn[] retryAttempt: number -}): Promise { +}): Promise<{ failures: ExecutionError[] }> { // updating the recent blockhash of all transactions to be on top const workingTransactions: { index: number @@ -165,6 +188,7 @@ async function bulkSend({ } } + // --- SENDING --- logInfo( logger, `Bulk #${retryAttempt} sending ${workingTransactions.length} transactions` @@ -177,23 +201,48 @@ async function bulkSend({ }) txSendPromises.push({ index, promise }) } + + // --- CONFIRMING --- const confirmationPromises: { promise: Promise> index: number }[] = [] + const rpcErrors: ExecutionError[] = [] for (const { index, promise: signaturePromise } of txSendPromises) { - const signature = await signaturePromise - data[index].signature = signature - const promise = connection.confirmTransaction( - { - signature, - blockhash: currentBlockhash.blockhash, - lastValidBlockHeight: currentBlockhash.lastValidBlockHeight, - }, - confirmOpts - ) - confirmationPromises.push({ index, promise }) + try { + const signature = await signaturePromise + data[index].signature = signature + const promise = connection.confirmTransaction( + { + signature, + blockhash: currentBlockhash.blockhash, + lastValidBlockHeight: currentBlockhash.lastValidBlockHeight, + }, + confirmOpts + ) + confirmationPromises.push({ index, promise }) + promise.catch(e => { + // managing 'Promise rejection was handled asynchronously' error + rpcErrors.push( + new ExecutionError({ + msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) + }) + } catch (e) { + rpcErrors.push( + new ExecutionError({ + msg: `Transaction at [${index}] failed to be sent to blockchain`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) + } } + + // --- GETTING LOGS --- const responsePromises: { index: number promise: Promise @@ -215,17 +264,42 @@ async function bulkSend({ responsePromises.push({ index, promise }) } catch (e) { // transaction was not confirmed to be on blockchain - // by chance still can be landed but we do not care about it - // and considering it as not landed on chain + // by chance still can be landed but we do not know why we don't care + // we consider it as not landed on chain data[index].confirmationError = e as Error responsePromises.push({ index, promise: Promise.resolve(null) }) + rpcErrors.push( + new ExecutionError({ + msg: `Transaction '${data[index].signature}' at [${index}] failed to be confirmed`, + cause: e as Error, + transaction: data[index].transaction, + }) + ) } } + + // --- RETRIEVING LOGS PROMISE AND FINISH --- for (const { index, promise: responsePromise } of responsePromises) { - const awaitedResponse = await responsePromise - if (awaitedResponse !== null) { - data[index].response = awaitedResponse - data[index].confirmationError = undefined + try { + const awaitedResponse = await responsePromise + if (awaitedResponse !== null) { + data[index].response = awaitedResponse + data[index].confirmationError = undefined + } + } catch (e) { + rpcErrors.push( + new ExecutionError({ + msg: `Transaction ${data[index].signature} at [${index}] failed to be found on-chain`, + cause: e as Error, + transaction: data[index].transaction, + logs: data[index].response?.meta?.logMessages || undefined, + }) + ) } } + + for (const err of rpcErrors) { + logDebug(logger, err) + } + return { failures: rpcErrors } }