Skip to content

Commit

Permalink
try catch on async
Browse files Browse the repository at this point in the history
  • Loading branch information
ochaloup committed Mar 26, 2024
1 parent 3be5c6d commit cd86b5d
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 33 deletions.
61 changes: 42 additions & 19 deletions packages/lib/web3js-common/src/tx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,18 @@ export async function executeTxWithExceededBlockhashRetry(
txParams: ExecuteTxParams
): Promise<ExecuteTxReturn> {
try {
return await executeTx(txParams)
logInfo(txParams.logger, 'Executing transaction')
const promise = executeTx(txParams)
promise.catch(e => {
logInfo(txParams.logger, 'Fuck you! Failed transaction execution', e)
})
return await promise
} catch (e) {
const txSig =
e instanceof ExecutionError && e.txSignature !== undefined
? `${e.txSignature} `
: ''
if (checkErrorMessage(e, 'block height exceeded')) {
const txSig =
e instanceof ExecutionError && e.txSignature !== undefined
? `${e.txSignature} `
: ''
logDebug(
txParams.logger,
`Failed to execute transaction ${txSig}` +
Expand All @@ -409,7 +414,25 @@ export async function executeTxWithExceededBlockhashRetry(
)
txParams.transaction.recentBlockhash = undefined
return await executeTx(txParams)
}
if (checkErrorMessage(e, 'Too many requests')) {
logInfo(txParams.logger, 'too many requests execution')
logDebug(
txParams.logger,
`Failed to execute transaction ${txSig}` +
'due too many requests on RPC, retrying, ' +
'original error: ' +
e
)
txParams.transaction.recentBlockhash = undefined
await sleep(3_000)
return await executeTx(txParams)
} else {
logInfo(
txParams.logger,
'Failed transaction execution',
(e as Error).message
)
throw e
}
}
Expand Down Expand Up @@ -676,7 +699,7 @@ export async function splitAndExecuteTx({
let splitMarkerStartIdx = Number.MAX_SAFE_INTEGER
for (let i = 0; i < transaction.instructions.length; i++) {
// TODO: delete me!
logInfo(logger, 'processing index: ' + i)
// logInfo(logger, 'processing index: ' + i)
const ix = transaction.instructions[i]
if (ix instanceof TransactionInstructionSplitMarkerStart) {
splitMarkerStartIdx = i
Expand All @@ -687,7 +710,7 @@ export async function splitAndExecuteTx({
continue
}
// TODO: delete me!
logInfo(logger, 'not split marker index: ' + i)
// logInfo(logger, 'not split marker index: ' + i)
lastValidTransaction.add(ix)
const filteredSigners = filterSignersForInstruction(
lastValidTransaction.instructions,
Expand Down Expand Up @@ -727,10 +750,10 @@ export async function splitAndExecuteTx({
addIdx++
) {
// TODO: delete me!
logInfo(
logger,
`Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}`
)
// logInfo(
// logger,
// `Adding tx of index: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}, marker: ${splitMarkerStartIdx}`
// )
if (isSplitMarkerInstruction(transaction.instructions[addIdx])) {
continue
}
Expand All @@ -754,20 +777,20 @@ export async function splitAndExecuteTx({
}
transactions.push(transactionAdd)
// TODO: delete me!
logInfo(
logger,
`transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}`
)
// logInfo(
// logger,
// `transactions size: ${transactions.length}, additional tx ixes: ${transactionAdd.instructions.length}`
// )
// we processed until i minus one;
// next outer loop increases i and we need to start from the same instruction
// as the current position is
i = addIdx - 1
transactionStartIndex = addIdx
// TODO: delete me!
logInfo(
logger,
`after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}`
)
// logInfo(
// logger,
// `after: addIdx: ${addIdx}, i: ${i}, tx start index: ${transactionStartIndex}`
// )
// nulling data of the next transaction to check
lastValidTransaction = await generateNewTransaction({
feePayer: feePayerDefined,
Expand Down
98 changes: 84 additions & 14 deletions packages/lib/web3js-common/src/txBulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
logDebug,
logError,
logInfo,
logWarn,
} from '@marinade.finance/ts-common'
import {
Connection,
Expand Down Expand Up @@ -83,6 +84,7 @@ export async function splitAndBulkExecuteTx({
const numberOfSimulations = numberOfRetries < 5 ? 5 : numberOfRetries
for (let i = 1; i <= numberOfSimulations; i++) {
try {
logWarn(logger, 'Simulating transactions: ' + i)
resultSimulated = await splitAndExecuteTx({
connection,
transaction,
Expand All @@ -97,12 +99,22 @@ export async function splitAndBulkExecuteTx({
computeUnitLimit,
computeUnitPrice,
})
logWarn(logger, 'Simulation was successful, proceeding to send')
break
} catch (e) {
if (
i >= numberOfSimulations ||
!checkErrorMessage(e, 'Too many requests for a specific RPC call')
!checkErrorMessage(e, 'Too many requests')
) {
logError(
logger,
'Too many retries for simulation, aborting... ' +
{
i,
numberOfSimulations,
tooMany: checkErrorMessage(e, 'Too many requests'),
}
)
throw e
} else {
logDebug(logger, `Error to split and execute transactions: ${e}`)
Expand Down Expand Up @@ -199,16 +211,38 @@ async function bulkSend({
logger,
`Bulk #${retryAttempt} sending ${workingTransactions.length} transactions`
)
let processed = 0
const txSendPromises: { promise: Promise<string>; index: number }[] = []
for (const { index, transaction } of workingTransactions) {
const promise = connection.sendTransaction(transaction, {
skipPreflight: true,
...sendOpts,
})
txSendPromises.push({ index, promise })
promise
.then(() => {
txSendPromises.push({ index, promise })
})
.catch(e => {
rpcErrors.push(
new ExecutionError({
msg: `Transaction at [${index}] failed to be sent to blockchain`,
cause: e as Error,
transaction: data[index].transaction,
})
)
})
.finally(() => {
processed++
})
}

// --- WAITING FOR ALL TO BE SENT ---
while (processed < workingTransactions.length) {
await new Promise(resolve => setTimeout(resolve, 100))
}

// --- CONFIRMING ---
processed = 0
const confirmationPromises: {
promise: Promise<RpcResponseAndContext<SignatureResult>>
index: number
Expand All @@ -226,17 +260,24 @@ async function bulkSend({
},
confirmOpts
)
confirmationPromises.push({ index, promise })
promise.catch(e => {
// managing 'Promise rejection was handled asynchronously' error
rpcErrors.push(
new ExecutionError({
msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`,
cause: e as Error,
transaction: data[index].transaction,
})
)
})

promise
.then(() => {
confirmationPromises.push({ index, promise })
})
.catch(e => {
// managing 'Promise rejection was handled asynchronously' error
rpcErrors.push(
new ExecutionError({
msg: `Transaction '${signature}' at [${index}] timed-out to be confirmed`,
cause: e as Error,
transaction: data[index].transaction,
})
)
})
.finally(() => {
processed++
})
} catch (e) {
rpcErrors.push(
new ExecutionError({
Expand All @@ -245,10 +286,18 @@ async function bulkSend({
transaction: data[index].transaction,
})
)

processed++
}
}

// --- WAITING FOR ALL TO BE CONFIRMED ---
while (processed < txSendPromises.length) {
await new Promise(resolve => setTimeout(resolve, 100))
}

// --- GETTING LOGS ---
processed = 0
const responsePromises: {
index: number
promise: Promise<VersionedTransactionResponse | null>
Expand All @@ -267,7 +316,22 @@ async function bulkSend({
commitment: confirmOpts,
maxSupportedTransactionVersion: 0,
})
responsePromises.push({ index, promise })
promise
.then(() => {
responsePromises.push({ index, promise })
})
.catch(e => {
rpcErrors.push(
new ExecutionError({
msg: `Transaction at [${index}] failed to be sent to blockchain`,
cause: e as Error,
transaction: data[index].transaction,
})
)
})
.finally(() => {
processed++
})
} catch (e) {
// transaction was not confirmed to be on blockchain
// by chance still can be landed but we do not know why we don't care
Expand All @@ -281,9 +345,15 @@ async function bulkSend({
transaction: data[index].transaction,
})
)
processed++
}
}

// --- WAITING FOR ALL LOGS BEING FETCHED ---
while (processed < confirmationPromises.length) {
await new Promise(resolve => setTimeout(resolve, 100))
}

// --- RETRIEVING LOGS PROMISE AND FINISH ---
for (const { index, promise: responsePromise } of responsePromises) {
try {
Expand Down

0 comments on commit cd86b5d

Please sign in to comment.