From d1d8ff4b712415d977766ae7b80a693b29aaf04d Mon Sep 17 00:00:00 2001 From: 0xKurt Date: Mon, 9 Sep 2024 11:36:28 +0200 Subject: [PATCH 1/4] update schema name --- src/database/index.ts | 87 +++++++++++++++++++++++ src/http/app.ts | 156 ++++++++++++++++++++++++++++++++++++++++++ src/index.ts | 1 + 3 files changed, 244 insertions(+) diff --git a/src/database/index.ts b/src/database/index.ts index 0226366d..2a0b70a2 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -962,6 +962,93 @@ export class Database { return result ?? null; } + async deleteChainData(chainId: ChainId) { + this.#logger.info("Deleting chain data for chainId:", chainId); + + await this.#db.transaction().execute(async (trx) => { + this.#logger.info("Deleting pending round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingRoundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("roundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting pending project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingProjectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("donations") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("prices") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("rounds") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications payouts"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projects") + .where("chainId", "=", chainId) + .execute(); + }); + + this.#logger.info("Updating subscriptions indexed_to_block"); + const sqlQuery = ` + UPDATE ${this.chainDataSchemaName}.subscriptions + SET indexed_to_block = 0::bigint + WHERE chain_id = ${chainId} + `; + + await sql.raw(sqlQuery).execute(this.#db); + + this.#logger.info("Deleted chain data for chainId:", chainId); + } + async getDataByCid(cId: string) { const metadata = await this.#db .withSchema(this.ipfsDataSchemaName) diff --git a/src/http/app.ts b/src/http/app.ts index d5ef33ea..6c44a231 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js"; import { DataProvider } from "../calculator/dataProvider/index.js"; import { Chain } from "../config.js"; import { Database } from "../database/index.js"; +import { Indexer } from "chainsauce"; +import { recoverMessageAddress } from "viem"; type AsyncRequestHandler = ( req: express.Request, @@ -38,6 +40,7 @@ export interface HttpApiConfig { | { type: "in-thread" } | { type: "worker-pool"; workerPoolSize: number }; }; + indexedChains?: Indexer[] | null; } interface HttpApi { @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { res.send(config.dataVersion); }); + app.get("/config", (_req, res) => { + res.send(config); + }); + + app.post("/index", (req, res) => { + try { + const { chainId, address, timestamp, signature } = req.body as { + chainId: string; + address: string; + timestamp: number; + signature: `0x${string}`; + }; + + const reindex = async () => { + if (!chainId || !config.indexedChains) { + return res.status(400).send("chainId is required"); + } + + try { + const isAuthenticated = await recoverEthereumAddress({ + address, + timestamp, + signature, + }); + + config.logger.info( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp}` + ); + + if (isAuthenticated) { + await config.db.deleteChainData(Number(chainId)); + + const filteredIndexedChains = config.indexedChains.filter( + (chain) => + (chain as { context: { chainId: number } }).context.chainId === + Number(chainId) + ); + + if (filteredIndexedChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const filteredChains = config.chains.filter( + (chain) => chain.id === Number(chainId) + ); + + if (filteredChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const chain = filteredChains[0]; + const indexedChain = filteredIndexedChains[0]; + + chain.subscriptions.forEach((subscription) => { + indexedChain.unsubscribeFromContract({ + address: subscription.address, + }); + + const contractName = subscription.contractName; + const subscriptionFromBlock = + subscription.fromBlock === undefined + ? undefined + : BigInt(subscription.fromBlock); + + indexedChain.subscribeToContract({ + contract: contractName, + address: subscription.address, + fromBlock: subscriptionFromBlock || BigInt(0), + }); + }); + } else { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication` + ); + return res.status(401).send("Authentication failed"); + } + } catch { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error` + ); + return res.status(500).send("An error occurred"); + } + }; + + reindex() + .then(() => { + config.logger.info(`Reindexing of chain ${chainId} finished`); + res.send("Reindexing finished"); + }) + .catch(() => { + config.logger.error( + `Reindexing of chain ${chainId} failed with error` + ); + res.status(500).send("An error occurred"); + }); + } catch { + config.logger.error(`Reindexing failed with error`); + res.status(500).send("An error occurred"); + } + }); + app.use("/api/v1", api); if (config.graphqlHandler !== undefined) { @@ -149,3 +255,53 @@ function staticJsonDataHandler(dataProvider: DataProvider) { } }; } + +const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period + +const recoverEthereumAddress = async ({ + address, + timestamp, + signature, +}: { + address: string; + timestamp: number; + signature: `0x${string}`; +}) => { + if (!address || !timestamp || !signature) { + return false; + } + const whitelistedAddresses = + process.env.WHITELISTED_ADDRESSES?.split(",") || []; + + // Check timestamp validity + const currentTime = Date.now(); + if (currentTime - timestamp > VALIDITY_PERIOD) { + return false; + } + + // Construct the expected message to be signed + const expectedMessage = `Authenticate with timestamp: ${timestamp}`; + try { + // Recover address from signature and expected message + const recoveredAddress = await recoverMessageAddress({ + message: expectedMessage, + signature, + }); + + const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) => + addr.toLowerCase() + ); + + if ( + recoveredAddress.toLowerCase() === address.toLowerCase() && + whitelistedAddressesLowercase.includes(address.toLowerCase()) + ) { + return true; + } else { + return false; + } + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +}; diff --git a/src/index.ts b/src/index.ts index fc1df5d4..4e749fe1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -416,6 +416,7 @@ async function main(): Promise { workerPoolSize: config.estimatesLinearQfWorkerPoolSize, }, }, + indexedChains: await indexChainsPromise, }); await httpApi.start(); From 8acbedd25c8bc60b0207d661ca7352e9e8a2f5f1 Mon Sep 17 00:00:00 2001 From: 0xKurt Date: Mon, 9 Sep 2024 12:00:20 +0200 Subject: [PATCH 2/4] updates --- .env.example | 3 ++- src/config.ts | 2 +- src/database/index.ts | 7 ------- src/http/app.ts | 5 +++-- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/.env.example b/.env.example index 397a0336..743de715 100644 --- a/.env.example +++ b/.env.example @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer # METIS_ANDROMEDA_RPC_URL #COINGECKO_API_KEY= -#IPFS_GATEWAY= +#IPFS_GATEWAYs=[] +#WHITELISTED_ADDRESSES=["0x123..","0x456.."] # optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro #GRAPHILE_LICENSE diff --git a/src/config.ts b/src/config.ts index 04348d49..80cae777 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,7 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "81"; +const CHAIN_DATA_VERSION = "82"; const IPFS_DATA_VERSION = "1"; const PRICE_DATA_VERSION = "1"; diff --git a/src/database/index.ts b/src/database/index.ts index 2a0b70a2..2fddeabe 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -1008,13 +1008,6 @@ export class Database { .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("prices") - .where("chainId", "=", chainId) - .execute(); - this.#logger.info("Deleting applications payouts"); await trx .withSchema(this.chainDataSchemaName) diff --git a/src/http/app.ts b/src/http/app.ts index 6c44a231..34ecf97e 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -270,8 +270,9 @@ const recoverEthereumAddress = async ({ if (!address || !timestamp || !signature) { return false; } - const whitelistedAddresses = - process.env.WHITELISTED_ADDRESSES?.split(",") || []; + const whitelistedAddresses: string[] = JSON.parse( + process.env.WHITELISTED_ADDRESSES! + ); // Check timestamp validity const currentTime = Date.now(); From a20f9ca1e0b8e4679948e247a912e18f36af5845 Mon Sep 17 00:00:00 2001 From: 0xKurt Date: Mon, 9 Sep 2024 13:02:13 +0200 Subject: [PATCH 3/4] fix --- src/config.ts | 2 +- src/database/index.ts | 17 ++++++++++++----- src/http/app.ts | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/config.ts b/src/config.ts index 80cae777..1d156e1b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,7 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "82"; +const CHAIN_DATA_VERSION = "83"; const IPFS_DATA_VERSION = "1"; const PRICE_DATA_VERSION = "1"; diff --git a/src/database/index.ts b/src/database/index.ts index 2fddeabe..4d427399 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -994,35 +994,42 @@ export class Database { .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); + this.#logger.info("Deleting applications"); await trx .withSchema(this.chainDataSchemaName) .deleteFrom("applications") .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); + this.#logger.info("Deleting applications donations"); await trx .withSchema(this.chainDataSchemaName) .deleteFrom("donations") .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); + // this.#logger.info("Deleting donation prices"); + // await trx + // .withSchema(this.priceDataSchemaName) + // .deleteFrom("prices") + // .where("chainId", "=", chainId) + // .execute(); + + this.#logger.info("Deleting applications"); await trx .withSchema(this.chainDataSchemaName) .deleteFrom("applications") .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); + this.#logger.info("Deleting rounds"); await trx .withSchema(this.chainDataSchemaName) .deleteFrom("rounds") .where("chainId", "=", chainId) .execute(); - this.#logger.info("Deleting applications payouts"); + this.#logger.info("Deleting projects"); await trx .withSchema(this.chainDataSchemaName) .deleteFrom("projects") diff --git a/src/http/app.ts b/src/http/app.ts index 34ecf97e..602f7fc7 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -173,7 +173,7 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { contract: contractName, address: subscription.address, fromBlock: subscriptionFromBlock || BigInt(0), - }); + }); }); } else { config.logger.error( From c19cdad6df00cd6a114076b6bf640920bdf8b029 Mon Sep 17 00:00:00 2001 From: 0xKurt Date: Mon, 9 Sep 2024 13:02:16 +0200 Subject: [PATCH 4/4] Revert "Refactor lock acquisition and release logic (#661)" This reverts commit 9a2ffe10209a4f93c37c00aef4f258e5a7b046fc. --- src/database/index.ts | 63 ++++++++++++++----------------------------- 1 file changed, 20 insertions(+), 43 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 4d427399..2a37240f 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -116,60 +116,37 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; - // Helper function to force release a lock for a specific schema - const forceReleaseLockForSchema = async (lockId: number) => { - await client.query(` - SELECT pg_terminate_backend(pid) - FROM pg_locks - WHERE locktype = 'advisory' - AND objid = ${lockId} - AND pid != pg_backend_pid() - `); - }; - // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); const priceDataLockId = generateLockId(this.priceDataSchemaName); - // Lock acquisition status - let chainDataLockAcquired = false; - let ipfsDataLockAcquired = false; - let priceDataLockAcquired = false; - try { - chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - - return { - release: async () => { - if (chainDataLockAcquired) { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + + if ( + chainDataLockAcquired && + ipfsDataLockAcquired && + priceDataLockAcquired + ) { + return { + release: async () => { await releaseLockForSchema(chainDataLockId); - } - if (ipfsDataLockAcquired) { - await forceReleaseLockForSchema(ipfsDataLockId); - // await releaseLockForSchema(ipfsDataLockId); - } - if (priceDataLockAcquired) { - await forceReleaseLockForSchema(priceDataLockId); - // await releaseLockForSchema(priceDataLockId); - } - client.release(); - }, - client, - }; + await releaseLockForSchema(ipfsDataLockId); + await releaseLockForSchema(priceDataLockId); + client.release(); + }, + client, + }; + } } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); - } finally { - // Ensure any acquired locks are released if they were not all acquired - if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId); - if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId); - if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId); - - client.release(); } + client.release(); + return null; }