Skip to content

Commit

Permalink
feat: add optional CBOR transaction representation to SUBSCRIBE_ADDRESS
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Oct 29, 2024
1 parent 2754777 commit 60e5e1e
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 77 deletions.
14 changes: 12 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@
"cSpell.words": [
"addresses",
"bech",
"bignumber",
"blockfrost",
"cardano",
"cbor",
"cbors",
"compat",
"emurgo",
"from",
"HEALTHCHECK",
"keyhash",
"libc",
"lovelaces",
"memoizee",
"tailwindcss",
"to",
"timelock",
"txid",
"txids",
"uninstantiated",
"utxos"
"utxo",
"utxos",
"uuidv4"
],
"search.exclude": {
"**/.yarn": true,
Expand Down
1 change: 1 addition & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export default [
'**/.eslintrc.js',
'**/jest.config.js',
'**/babel.config.js',
'**/test',
],
},
...fixupConfigRules(
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"start": "yarn build && yarn node ./dist/server.js",
"start:profiler": "yarn build && NODE_ENV=production yarn node --prof ./dist/server.js",
"test-performance": "yarn build && yarn node ./dist/scripts/performance/index.js",
"test": "BLOCKFROST_PROJECT_ID='none' vitest",
"test": "BLOCKFROST_PROJECT_ID='mainnet_none' vitest",
"coverage": "vitest run --coverage",
"type-check": "tsc --project tsconfig.json"
},
"dependencies": {
"@blockfrost/blockfrost-js": "5.5.0",
"@blockfrost/blockfrost-js": "5.6.0",
"@emurgo/cardano-serialization-lib-nodejs": "^11.5.0",
"@sentry/integrations": "^7.85.0",
"@sentry/node": "^7.85.0",
Expand Down
25 changes: 19 additions & 6 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { prepareMessage } from './utils/message.js';
import { blockfrostAPI } from './utils/blockfrost-api.js';
import { Responses } from '@blockfrost/blockfrost-js';
import { promiseTimeout } from './utils/common.js';
import { getTransactionsWithUtxo } from './utils/transaction.js';
import { getTransactionsWithDetails } from './utils/transaction.js';
import { TxNotification } from './types/response.js';
import { EMIT_MAX_MISSED_BLOCKS } from './constants/config.js';
import { logger } from './utils/logger.js';
Expand All @@ -14,6 +14,11 @@ interface EmitBlockOptions {
maxMissedBlocks?: number;
}

export interface SubscribedAddress {
address: string;
cbor?: boolean;
}

// eslint-disable-next-line unicorn/prefer-event-target
const events = new EventEmitter();

Expand Down Expand Up @@ -94,7 +99,7 @@ export const onBlock = async (
latestBlock: Responses['block_content'],
affectedAddressesInBlock: Responses['block_content_addresses'],
activeSubscriptions: Server.Subscription[] | undefined,
subscribedAddresses: string[] | undefined,
subscribedAddresses: SubscribedAddress[] | undefined,
) => {
// client has no subscription
if (!activeSubscriptions) return;
Expand All @@ -113,7 +118,7 @@ export const onBlock = async (

if (activeAddressSub && subscribedAddresses) {
const affectedAddresses = affectedAddressesInBlock.filter(a =>
subscribedAddresses.includes(a.address),
subscribedAddresses.some(addr => addr.address === a.address),
);

if (affectedAddresses.length === 0) {
Expand All @@ -122,15 +127,22 @@ export const onBlock = async (
}

// get list of unique txids (same tx could affect multiple client's addresses, but we want to fetch it only once)
const txIdsSet = new Set<string>();
const txsCbor: Record<string, boolean | undefined> = {};

for (const address of affectedAddresses) {
const { cbor } = subscribedAddresses.find(
subscription => subscription.address === address.address,
)!;

for (const tx of address.transactions) {
txIdsSet.add(tx.tx_hash);
txsCbor[tx.tx_hash] ||= cbor;
}
}

// fetch txs that include client's address with their utxo data
const txs = await getTransactionsWithUtxo([...txIdsSet]);
const txs = await getTransactionsWithDetails(
Object.entries(txsCbor).map(([txid, cbor]) => ({ txid, cbor })),
);

const notifications: TxNotification[] = [];

Expand All @@ -149,6 +161,7 @@ export const onBlock = async (
txData: enhancedTx.txData,
txUtxos: enhancedTx.txUtxos,
txHash: enhancedTx.txData.hash,
txCbor: enhancedTx.txCbor,
});
}
}
Expand Down
21 changes: 14 additions & 7 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import * as Server from './types/server.js';
import { MESSAGES, WELCOME_MESSAGE, REPOSITORY_URL } from './constants/index.js';
import { getMessage, prepareErrorMessage, prepareMessage } from './utils/message.js';
import { MetricsCollector } from './utils/prometheus.js';
import { events, onBlock, startEmitter } from './events.js';
import { SubscribedAddress, events, onBlock, startEmitter } from './events.js';
import getServerInfo from './methods/get-server-info.js';
import getAccountInfo from './methods/get-account-info.js';
import getAccountUtxo from './methods/get-account-utxo.js';
Expand Down Expand Up @@ -67,7 +67,7 @@ const wss = new WebSocketServer({ server });
server.keepAliveTimeout = 65_000;

const activeSubscriptions: Record<string, Server.Subscription[]> = {};
const addressesSubscribed: Record<string, string[]> = {};
const addressesSubscribed: Record<string, SubscribedAddress[]> = {};

const clients: Array<{
clientId: string;
Expand Down Expand Up @@ -305,11 +305,18 @@ wss.on('connection', async (ws: Server.Ws) => {
}

case MESSAGES.SUBSCRIBE_ADDRESS: {
if (data.params.addresses && data.params.addresses.length > 0) {
for (const addressInput of data.params.addresses) {
if (!addressesSubscribed[clientId].includes(addressInput)) {
addressesSubscribed[clientId].push(addressInput);
}
const { addresses, cbor } = { ...data.params };

if (addresses && addresses.length > 0) {
for (const address of addresses) {
const subscriptionIndex = addressesSubscribed[clientId].findIndex(
addr => addr.address === address,
);

// Subscribe to new address...
if (subscriptionIndex === -1) addressesSubscribed[clientId].push({ address, cbor });
// ... or update the cbor option
else addressesSubscribed[clientId][subscriptionIndex].cbor ||= cbor;
}

const activeAddressSubIndex = activeSubscriptions[clientId].findIndex(
Expand Down
5 changes: 4 additions & 1 deletion src/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ export type Messages =
| {
id: number;
command: 'SUBSCRIBE_ADDRESS';
params: { addresses: string[] };
params: {
addresses: string[];
cbor?: boolean;
};
}
| {
id: number;
Expand Down
1 change: 1 addition & 0 deletions src/types/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface TxNotification {
txHash: string;
txData: TransformedTransaction;
txUtxos: TransformedTransactionUtxo;
txCbor?: string;
}

export interface BalanceHistoryData {
Expand Down
1 change: 1 addition & 0 deletions src/types/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface TxIdsToTransactionsResponse {
txData: TransformedTransaction;
address: string;
txHash: string;
txCbor?: string;
}

export interface TransformedTransactionUtxo {
Expand Down
30 changes: 23 additions & 7 deletions src/utils/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BlockfrostServerError, Responses } from '@blockfrost/blockfrost-js';
import * as Types from '../types/transactions.js';
import { TransformedTransaction, TransformedTransactionUtxo } from '../types/transactions.js';
import { TxIdsToTransactionsResponse } from '../types/transactions.js';
import { blockfrostAPI } from '../utils/blockfrost-api.js';
import { getAssetData, transformAsset } from './asset.js';
import { assetMetadataLimiter, pLimiter } from './limiter.js';
Expand Down Expand Up @@ -68,28 +68,44 @@ export const txIdsToTransactions = async (
return sortedTxs;
};

export const getTransactionsWithUtxo = async (
txids: string[],
): Promise<{ txData: TransformedTransaction; txUtxos: TransformedTransactionUtxo }[]> => {
export interface GetTransactionsDetails {
txid: string;
cbor?: boolean;
}

export const getTransactionsWithDetails = async (
txs: GetTransactionsDetails[],
): Promise<Pick<TxIdsToTransactionsResponse, 'txData' | 'txUtxos' | 'txCbor'>[]> => {
const txsData = await Promise.all(
txids.map(txid =>
txs.map(({ txid }) =>
pLimiter.add(() => blockfrostAPI.txs(txid).then(data => transformTransactionData(data)), {
throwOnTimeout: true,
}),
),
);
const txsUtxo = await Promise.all(
txids.map(txid =>
txs.map(({ txid }) =>
pLimiter.add(
() => blockfrostAPI.txsUtxos(txid).then(data => transformTransactionUtxo(data)),
{ throwOnTimeout: true },
),
),
);
const txsCbors = await Promise.all(
txs.map(({ txid, cbor }) =>
cbor
? pLimiter.add(() => blockfrostAPI.txsCbor(txid).then(data => data.cbor), {
throwOnTimeout: true,
})
: // eslint-disable-next-line unicorn/no-useless-undefined
Promise.resolve<undefined>(undefined),
),
);

return txids.map((_txid, index) => ({
return txs.map((_tx, index) => ({
txData: txsData[index],
txUtxos: txsUtxo[index],
txCbor: txsCbors[index],
}));
};

Expand Down
16 changes: 11 additions & 5 deletions test/unit/fixtures/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// cSpell: disable

export const emitBlock = [
{
description: 'one block',
Expand Down Expand Up @@ -44,7 +46,7 @@ export const emitMissedBlock = [
export const onBlock = [
{
description: "1 of client's addresses affected in a block (real data)",
subscribedAddresses: ['addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h'],
subscribedAddresses: [{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h',cbor:true}],
mocks: {
block: {
time: 1639491936,
Expand Down Expand Up @@ -337,6 +339,9 @@ export const onBlock = [
},
],
},
txCbor: {
cbor: '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'
}
},
],
},
Expand Down Expand Up @@ -592,6 +597,7 @@ export const onBlock = [
],
},
txHash: '4d5beb45fe37b44b46f839811a3d3a1ac4a20911850740867a64f77d09372d0b',
txCbor: {cbor:'0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'}
},
],
},
Expand All @@ -600,9 +606,9 @@ export const onBlock = [
{
description: "2 of the client's addresses affected, 1 address affected in multiple txs",
subscribedAddresses: [
'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h',
'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t',
'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h', // random address, should not be affected
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g77pj9h'},
{address:'addr_test1wrsexavz37208qda7mwwu4k7hcpg26cz0ce86f5e9kul3hqzlh22t'},
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected
],
mocks: {
block: {
Expand Down Expand Up @@ -788,7 +794,7 @@ export const onBlock = [
{
description: 'subscribed address was not affected',
subscribedAddresses: [
'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h', // random address, should not be affected
{address:'addr_test1wpfzvzpa046hkfy65mp4ez6vgjunmytzg0ye0ds7mm26v0g770j9h'}, // random address, should not be affected
],
mocks: {
block: {
Expand Down
33 changes: 18 additions & 15 deletions test/unit/tests/utils/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
TransformedTransaction,
TransformedTransactionUtxo,
} from '../../../../src/types/transactions.js';
import { GetTransactionsDetails } from '../../../../src/utils/transaction.js';

describe('events', () => {
for (const fixture of fixtures.emitBlock) {
Expand Down Expand Up @@ -139,22 +140,24 @@ describe('events', () => {
send: (message: string) => mockedSend(message),
};

vi.spyOn(txUtils, 'getTransactionsWithUtxo').mockImplementation((txids: string[]) => {
return new Promise(resolve => {
// sanity check that the test really wanted to fetch transactions that we expected
for (const mockedTx of fixture.mocks.txsWithUtxo) {
if (!txids.find(txid => mockedTx.txData.hash === txid)) {
throw new Error('Unexpected list of affected addresses');
vi.spyOn(txUtils, 'getTransactionsWithDetails').mockImplementation(
(txs: GetTransactionsDetails[]) => {
return new Promise(resolve => {
// sanity check that the test really wanted to fetch transactions that we expected
for (const mockedTx of fixture.mocks.txsWithUtxo) {
if (!txs.find(({ txid }) => mockedTx.txData.hash === txid)) {
throw new Error('Unexpected list of affected addresses');
}
}
}
resolve(
fixture.mocks.txsWithUtxo as unknown as {
txData: TransformedTransaction;
txUtxos: TransformedTransactionUtxo;
}[],
);
});
});
resolve(
fixture.mocks.txsWithUtxo as unknown as {
txData: TransformedTransaction;
txUtxos: TransformedTransactionUtxo;
}[],
);
});
},
);

// subscribe both to block and addresses
const subscription = [
Expand Down
Loading

0 comments on commit 60e5e1e

Please sign in to comment.