diff --git a/pocket-gateway/ecs-task-us-east-2.json b/pocket-gateway/ecs-task-us-east-2.json index 93ef9a1b..894c7a87 100644 --- a/pocket-gateway/ecs-task-us-east-2.json +++ b/pocket-gateway/ecs-task-us-east-2.json @@ -25,7 +25,7 @@ "environment": [], "command": [], "linuxParameters": null, - "cpu": 2048, + "cpu": 4096, "resourceRequirements": null, "ulimits": [ { diff --git a/src/services/pocket-relayer.ts b/src/services/pocket-relayer.ts index 4312375f..9dba48ba 100644 --- a/src/services/pocket-relayer.ts +++ b/src/services/pocket-relayer.ts @@ -145,7 +145,7 @@ export class PocketRelayer { } // Send this relay attempt - const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck); + const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck, String(this.altruists[blockchain])); if (!(relayResponse instanceof Error)) { // Record success metric @@ -303,6 +303,7 @@ export class PocketRelayer { blockchain: string, blockchainEnforceResult: string, blockchainSyncCheck: string, + blockchainSyncBackup: string, ): Promise { logger.log('info', 'RELAYING ' + blockchain + ' req: ' + data, {requestID: requestID, relayType: 'APP', typeID: application.id, serviceNode: ''}); @@ -366,8 +367,11 @@ export class PocketRelayer { if (pocketSession instanceof Session) { let nodes: Node[] = pocketSession.sessionNodes; if (blockchainSyncCheck) { - nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration); - } + nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, blockchainSyncBackup, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration); + if (nodes.length === 0) { + return new Error('Sync check failure; using fallbacks'); + } + } node = await this.cherryPicker.cherryPickNode(application, nodes, blockchain, requestID); } diff --git a/src/services/sync-checker.ts b/src/services/sync-checker.ts index 63007b66..099ed972 100644 --- a/src/services/sync-checker.ts +++ b/src/services/sync-checker.ts @@ -4,6 +4,7 @@ import {Redis} from 'ioredis'; var crypto = require('crypto'); const logger = require('../services/logger'); +import axios from 'axios'; export class SyncChecker { redis: Redis; @@ -14,7 +15,7 @@ export class SyncChecker { this.metricsRecorder = metricsRecorder; } - async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise { + async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, blockchainSyncBackup: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise { let syncedNodes: Node[] = []; let syncedNodesList: String[] = []; @@ -48,34 +49,53 @@ export class SyncChecker { // Fires all 5 sync checks synchronously then assembles the results const nodeSyncLogs = await this.getNodeSyncLogs(nodes, requestID, syncCheck, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration); - + let errorState = false; + // This should never happen if (nodeSyncLogs.length <= 2) { logger.log('error', 'SYNC CHECK ERROR: fewer than 3 nodes returned sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''}); - return nodes; + errorState = true; } + let currentBlockHeight = 0; + // Sort NodeSyncLogs by blockHeight nodeSyncLogs.sort((a, b) => b.blockHeight - a.blockHeight); - + // If top node is still 0, or not a number, return all nodes due to check failure if ( + nodeSyncLogs.length === 0 || nodeSyncLogs[0].blockHeight === 0 || typeof nodeSyncLogs[0].blockHeight !== 'number' || (nodeSyncLogs[0].blockHeight %1 ) !== 0 ) { - logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + nodeSyncLogs[0].blockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''}); - return nodes; + logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + JSON.stringify(nodeSyncLogs), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''}); + errorState = true; + } else { + currentBlockHeight = nodeSyncLogs[0].blockHeight; } // Make sure at least 2 nodes agree on current highest block to prevent one node from being wildly off - if (nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance)) { + if ( + !errorState && + nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance) + ) { logger.log('error', 'SYNC CHECK ERROR: two highest nodes could not agree on sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''}); - return nodes; + errorState = true; } - const currentBlockHeight = nodeSyncLogs[0].blockHeight; + if (errorState) { + // Consult Altruist for sync source of truth + currentBlockHeight = await this.getSyncFromAltruist(syncCheck, blockchainSyncBackup); + if (currentBlockHeight === 0) { + // Failure to find sync from consensus and altruist + logger.log('info', 'SYNC CHECK ALTRUIST FAILURE: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''}); + return nodes; + } else { + logger.log('info', 'SYNC CHECK ALTRUIST CHECK: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''}); + } + } // Go through nodes and add all nodes that are current or within 1 block -- this allows for block processing times for (const nodeSyncLog of nodeSyncLogs) { @@ -113,7 +133,7 @@ export class SyncChecker { syncedNodesKey, JSON.stringify(syncedNodesList), 'EX', - 300, + (syncedNodes.length > 0) ? 300 : 30, // will retry sync check every 30 seconds if no nodes are in sync ); // If one or more nodes of this session are not in sync, fire a consensus relay with the same check. @@ -133,10 +153,38 @@ export class SyncChecker { ); logger.log('info', 'SYNC CHECK CHALLENGE: ' + JSON.stringify(consensusResponse), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''}); } - return syncedNodes; } + async getSyncFromAltruist(syncCheck: string, blockchainSyncBackup: string): Promise { + // Remove user/pass from the altruist URL + const redactedAltruistURL = blockchainSyncBackup.replace(/[\w]*:\/\/[^\/]*@/g, ''); + + try { + const syncResponse = await axios({ + method: 'POST', + url: blockchainSyncBackup, + data: syncCheck, + headers: {'Content-Type': 'application/json'} + }); + + if (!(syncResponse instanceof Error)) { + // Return decimal version of hex result as blockHeight + return parseInt(syncResponse.data.result, 16); + } + return 0; + } + catch (e) { + logger.log('error', e.message, { + requestID: '', + relayType: 'FALLBACK', + typeID: '', + serviceNode: 'fallback:' + redactedAltruistURL, + }); + } + return 0; + } + async getNodeSyncLogs(nodes: Node[], requestID: string, syncCheck: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise { const nodeSyncLogs: NodeSyncLog[] = []; const promiseStack: Promise[] = []; diff --git a/stacks/local.init.sql b/stacks/local.init.sql new file mode 100644 index 00000000..faa962f7 --- /dev/null +++ b/stacks/local.init.sql @@ -0,0 +1,19 @@ +-- Local setup of timescaledb relations + +CREATE TABLE relay ( + timestamp TIMESTAMPTZ NOT NULL, + app_pub_key TEXT NOT NULL, + blockchain TEXT NOT NULL, + service_node TEXT, + elapsed_time DOUBLE PRECISION NOT NULL, + result NUMERIC, + bytes NUMERIC NOT NULL, + method TEXT +); + +CREATE INDEX relay_app_pub_key_method_timestamp_idx ON relay(app_pub_key, method, timestamp); +CREATE INDEX relay_app_pub_key_result_timestamp_idx ON relay(app_pub_key, result, timestamp); +CREATE INDEX relay_app_pub_key_timestamp_idx ON relay(app_pub_key, timestamp DESC); +CREATE INDEX relay_service_node_timestamp_idx ON relay(service_node, timestamp DESC); +CREATE INDEX relay_timestamp_app_pub_key_idx ON relay(timestamp DESC, app_pub_key); +CREATE INDEX relay_timestamp_idx ON relay(timestamp DESC); diff --git a/stacks/local.yml b/stacks/local.yml index af4fbb38..d625b087 100644 --- a/stacks/local.yml +++ b/stacks/local.yml @@ -56,7 +56,9 @@ services: - 5432:5432 networks: - pocket - + volumes: + - ./local.init.sql:/docker-entrypoint-initdb.d/init.sql + networks: pocket: driver: bridge