From 3ddfa8faec4f9084eab9b843815edfd87d889218 Mon Sep 17 00:00:00 2001 From: Hussein Martinez Date: Wed, 23 Oct 2024 11:40:10 +0700 Subject: [PATCH] Revert "improvments for GG (#662)" This reverts commit 311400f030ee95774c078ff2a91b26c6b864461c. --- .env.example | 3 +- docs/reindexing.md | 10 +- indexer-compose.yml | 4 +- src/config.ts | 47 +------ src/database/changeset.ts | 5 - src/database/index.ts | 277 ++++---------------------------------- src/database/migrate.ts | 55 +++----- src/database/schema.ts | 10 -- src/http/api/v1/status.ts | 4 +- src/http/app.ts | 161 ---------------------- src/index.ts | 125 ++++------------- src/prices/provider.ts | 25 +--- 12 files changed, 88 insertions(+), 638 deletions(-) diff --git a/.env.example b/.env.example index 26db4482..d8c4afb2 100644 --- a/.env.example +++ b/.env.example @@ -47,8 +47,7 @@ DD_ENV=development #GNOSIS_RPC_URL #COINGECKO_API_KEY= -#IPFS_GATEWAYs=[] -#WHITELISTED_ADDRESSES=["0x123..","0x456.."] +#IPFS_GATEWAY= # optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro #GRAPHILE_LICENSE diff --git a/docs/reindexing.md b/docs/reindexing.md index 8ea7689e..00f6fc67 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,15 +12,9 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Dropping Schemas in Development +### Using `--drop-db` in Development -- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. - -- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch. - -- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. - -- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch. +- During development, you can use the `--drop-db` flag to ensure the indexer always deletes the existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. ### Important Notes diff --git a/indexer-compose.yml b/indexer-compose.yml index a2a33f3b..d2cf9758 100644 --- a/indexer-compose.yml +++ b/indexer-compose.yml @@ -20,7 +20,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAYS: ${IPFS_GATEWAYS} + IPFS_GATEWAY: ${IPFS_GATEWAY} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} @@ -62,7 +62,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAYS: ${IPFS_GATEWAYS} + IPFS_GATEWAY: ${IPFS_GATEWAY} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} diff --git a/src/config.ts b/src/config.ts index aff8882d..c2f0964f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,9 +21,7 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "86"; -const IPFS_DATA_VERSION = "1"; -const PRICE_DATA_VERSION = "1"; +const CHAIN_DATA_VERSION = "000999"; export type Token = { code: string; @@ -1883,7 +1881,7 @@ export type Config = { httpServerWaitForSync: boolean; httpServerEnabled: boolean; indexerEnabled: boolean; - ipfsGateways: string[]; + ipfsGateway: string; coingeckoApiKey: string | null; coingeckoApiUrl: string; chains: Chain[]; @@ -1894,18 +1892,11 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; - ipfsDataVersion: string; - ipfsDatabaseSchemaName: string; - priceDataVersion: string; - priceDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; enableResourceMonitor: boolean; dropDb: boolean; - dropChainDb: boolean; - dropIpfsDb: boolean; - dropPriceDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -1919,18 +1910,9 @@ export function getConfig(): Config { "from-block": { type: "string", }, - "drop-chain-db": { - type: "boolean", - }, - "drop-ipfs-db": { - type: "boolean", - }, "drop-db": { type: "boolean", }, - "drop-price-db": { - type: "boolean", - }, "rm-cache": { type: "boolean", }, @@ -2062,11 +2044,10 @@ export function getConfig(): Config { const runOnce = z.boolean().default(false).parse(args["run-once"]); - const ipfsGateways = z + const ipfsGateway = z .string() - .array() - .default(["https://ipfs.io"]) - .parse(JSON.parse(process.env.IPFS_GATEWAYS!)); + .default("https://ipfs.io") + .parse(process.env.IPFS_GATEWAY); const sentryDsn = z .union([z.string(), z.null()]) @@ -2083,16 +2064,7 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; - const ipfsDataVersion = IPFS_DATA_VERSION; - const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; - - const priceDataVersion = PRICE_DATA_VERSION; - const priceDatabaseSchemaName = `price_data_${priceDataVersion}`; - const dropDb = z.boolean().default(false).parse(args["drop-db"]); - const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); - const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); - const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2132,7 +2104,7 @@ export function getConfig(): Config { cacheDir, logLevel, runOnce, - ipfsGateways, + ipfsGateway, passportScorerId, apiHttpPort, pinoPretty, @@ -2141,16 +2113,9 @@ export function getConfig(): Config { databaseUrl, readOnlyDatabaseUrl, dropDb, - dropChainDb, - dropIpfsDb, - dropPriceDb, removeCache, dataVersion, databaseSchemaName, - ipfsDataVersion, - ipfsDatabaseSchemaName, - priceDataVersion, - priceDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 3fb89864..d9979697 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,7 +16,6 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, - NewIpfsData, NewAttestationData, } from "./schema.js"; @@ -143,10 +142,6 @@ export type DataChange = type: "InsertApplicationPayout"; payout: NewApplicationPayout; } - | { - type: "InsertIpfsData"; - ipfs: NewIpfsData; - } | { type: "InsertAttestation"; attestation: NewAttestationData; diff --git a/src/database/index.ts b/src/database/index.ts index c403cd48..4bba8598 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,11 +14,10 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, - IpfsDataTable, AttestationTable, AttestationTxnTable, } from "./schema.js"; -import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js"; +import { migrate } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -40,7 +39,6 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; - ipfsData: IpfsDataTable; attestations: AttestationTable; attestationTxns: AttestationTxnTable; } @@ -59,17 +57,13 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly chainDataSchemaName: string; - readonly ipfsDataSchemaName: string; - readonly priceDataSchemaName: string; + readonly databaseSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - chainDataSchemaName: string; - ipfsDataSchemaName: string; - priceDataSchemaName: string; + schemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -82,12 +76,10 @@ export class Database { plugins: [new CamelCasePlugin()], }); - // Initialize schema names - this.chainDataSchemaName = options.chainDataSchemaName; - this.ipfsDataSchemaName = options.ipfsDataSchemaName; - this.priceDataSchemaName = options.priceDataSchemaName; + this.#db = this.#db.withSchema(options.schemaName); this.#logger = options.logger; + this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -100,7 +92,7 @@ export class Database { const client = await this.#connectionPool.connect(); // generate lock id based on schema - const lockId = this.chainDataSchemaName.split("").reduce((acc, char) => { + const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { return acc + char.charCodeAt(0); }, 0); @@ -144,12 +136,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; + const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.chainDataSchemaName}"."rounds" AS r + UPDATE "${this.databaseSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -172,7 +164,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.chainDataSchemaName}"."applications" AS a + UPDATE "${this.databaseSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -235,86 +227,38 @@ export class Database { } } - async dropChainDataSchemaIfExists() { + async dropSchemaIfExists() { await this.#db.schema - .withSchema(this.chainDataSchemaName) - .dropSchema(this.chainDataSchemaName) + .dropSchema(this.databaseSchemaName) .ifExists() .cascade() .execute(); } - async dropIpfsDataSchemaIfExists() { - await this.#db.schema - .withSchema(this.ipfsDataSchemaName) - .dropSchema(this.ipfsDataSchemaName) - .ifExists() - .cascade() - .execute(); - } - - async dropPriceDataSchemaIfExists() { - await this.#db.schema - .withSchema(this.priceDataSchemaName) - .dropSchema(this.priceDataSchemaName) - .ifExists() - .cascade() - .execute(); - } - - async dropAllSchemaIfExists() { - await this.dropChainDataSchemaIfExists(); - await this.dropIpfsDataSchemaIfExists(); - await this.dropPriceDataSchemaIfExists(); - } - - async createSchemaIfNotExists( - schemaName: string, - migrateFn: (tx: any, schemaName: string) => Promise, - logger: Logger - ) { + async createSchemaIfNotExists(logger: Logger) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${schemaName} - )`.execute(this.#db.withSchema(schemaName)); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${this.databaseSchemaName} + )`.execute(this.#db); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${schemaName}" exists, skipping creation`, + msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, }); + return; } logger.info({ - msg: `schema "${schemaName}" does not exist, creating schema`, + msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, }); - await this.#db - .withSchema(schemaName) - .transaction() - .execute(async (tx) => { - await tx.schema.createSchema(schemaName).execute(); - await migrateFn(tx, schemaName); - }); - } + await this.#db.transaction().execute(async (tx) => { + await tx.schema.createSchema(this.databaseSchemaName).execute(); - async createAllSchemas(logger: Logger) { - await this.createSchemaIfNotExists( - this.chainDataSchemaName, - migrate, - logger - ); - await this.createSchemaIfNotExists( - this.ipfsDataSchemaName, - migrateDataFetcher, - logger - ); - await this.createSchemaIfNotExists( - this.priceDataSchemaName, - migratePriceFetcher, - logger - ); + await migrate(tx, this.databaseSchemaName); + }); } async applyChanges(changes: DataChange[]): Promise { @@ -327,7 +271,6 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -336,7 +279,6 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db - .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -345,7 +287,6 @@ export class Database { case "InsertPendingRoundRole": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -354,7 +295,6 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db - .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -362,17 +302,12 @@ export class Database { } case "InsertProject": { - await this.#db - .withSchema(this.chainDataSchemaName) - .insertInto("projects") - .values(change.project) - .execute(); + await this.#db.insertInto("projects").values(change.project).execute(); break; } case "UpdateProject": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -383,7 +318,6 @@ export class Database { case "InsertProjectRole": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -392,7 +326,6 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db - .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -403,7 +336,6 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db - .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -414,17 +346,12 @@ export class Database { } case "InsertRound": { - await this.#db - .withSchema(this.chainDataSchemaName) - .insertInto("rounds") - .values(change.round) - .execute(); + await this.#db.insertInto("rounds").values(change.round).execute(); break; } case "UpdateRound": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -435,7 +362,6 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -453,7 +379,6 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -464,7 +389,6 @@ export class Database { case "InsertRoundRole": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -473,7 +397,6 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db - .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -492,11 +415,7 @@ export class Database { }; } - await this.#db - .withSchema(this.chainDataSchemaName) - .insertInto("applications") - .values(application) - .execute(); + await this.#db.insertInto("applications").values(application).execute(); break; } @@ -510,7 +429,6 @@ export class Database { } await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -527,7 +445,6 @@ export class Database { case "InsertManyDonations": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -536,17 +453,12 @@ export class Database { } case "InsertManyPrices": { - await this.#db - .withSchema(this.priceDataSchemaName) - .insertInto("prices") - .values(change.prices) - .execute(); + await this.#db.insertInto("prices").values(change.prices).execute(); break; } case "IncrementRoundDonationStats": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -564,7 +476,6 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -577,7 +488,6 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db - .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -596,7 +506,6 @@ export class Database { case "NewLegacyProject": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -605,29 +514,19 @@ export class Database { case "InsertApplicationPayout": { await this.#db - .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } - case "InsertIpfsData": { - await this.#db - .withSchema(this.ipfsDataSchemaName) - .insertInto("ipfsData") - .values(change.ipfs) - .execute(); - break; - } - case "InsertAttestation": { const attestationData = change.attestation.attestationData; const transactionsData = change.attestation.transactionsData; // Insert into attestations await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.databaseSchemaName) .insertInto("attestations") .values(attestationData) .execute(); @@ -644,7 +543,7 @@ export class Database { }); } await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.databaseSchemaName) .insertInto("attestationTxns") .values(attestationTxns) .execute(); @@ -658,7 +557,6 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -670,7 +568,6 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -682,7 +579,6 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -694,7 +590,6 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -706,7 +601,6 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -718,7 +612,6 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -734,7 +627,6 @@ export class Database { roleValue: string ) { const round = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -757,7 +649,6 @@ export class Database { } const round = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -774,7 +665,6 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -785,7 +675,6 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -795,7 +684,6 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -809,7 +697,6 @@ export class Database { applicationId: string ) { const application = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -826,7 +713,6 @@ export class Database { projectId: string ) { const application = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -843,7 +729,6 @@ export class Database { anchorAddress: Address ) { const application = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -856,7 +741,6 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db - .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -873,7 +757,6 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db - .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -892,7 +775,6 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db - .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -902,7 +784,6 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -914,7 +795,6 @@ export class Database { donorAddress: Address ) { const donations = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -938,7 +818,6 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db - .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -946,102 +825,4 @@ export class Database { return result ?? null; } - - async deleteChainData(chainId: ChainId) { - this.#logger.info("Deleting chain data for chainId:", chainId); - - await this.#db.transaction().execute(async (trx) => { - this.#logger.info("Deleting pending round roles"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("pendingRoundRoles") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting round roles"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("roundRoles") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting pending project roles"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("pendingProjectRoles") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting project roles"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("projectRoles") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting applications"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("applications") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting applications donations"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("donations") - .where("chainId", "=", chainId) - .execute(); - - // this.#logger.info("Deleting donation prices"); - // await trx - // .withSchema(this.priceDataSchemaName) - // .deleteFrom("prices") - // .where("chainId", "=", chainId) - // .execute(); - - this.#logger.info("Deleting applications"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("applications") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting rounds"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("rounds") - .where("chainId", "=", chainId) - .execute(); - - this.#logger.info("Deleting projects"); - await trx - .withSchema(this.chainDataSchemaName) - .deleteFrom("projects") - .where("chainId", "=", chainId) - .execute(); - }); - - this.#logger.info("Updating subscriptions indexed_to_block"); - const sqlQuery = ` - UPDATE ${this.chainDataSchemaName}.subscriptions - SET indexed_to_block = 0::bigint - WHERE chain_id = ${chainId} - `; - - await sql.raw(sqlQuery).execute(this.#db); - - this.#logger.info("Deleted chain data for chainId:", chainId); - } - - async getDataByCid(cId: string) { - const metadata = await this.#db - .withSchema(this.ipfsDataSchemaName) - .selectFrom("ipfsData") - .where("cid", "=", cId) - .selectAll() - .executeTakeFirst(); - - return metadata ?? null; - } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index 2c828393..bcd49d80 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -339,6 +339,22 @@ export async function migrate(db: Kysely, schemaName: string) { ) .execute(); + await schema + .createTable("prices") + .addColumn("id", "serial", (cb) => cb.primaryKey()) + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("tokenAddress", ADDRESS_TYPE) + .addColumn("priceInUSD", "real") + .addColumn("timestamp", "timestamptz") + .addColumn("blockNumber", BIGINT_TYPE) + .execute(); + + await db.schema + .createIndex("idx_prices_chain_token_block") + .on("prices") + .expression(sql`chain_id, token_address, block_number DESC`) + .execute(); + await schema .createTable("legacy_projects") .addColumn("id", "serial", (col) => col.primaryKey()) @@ -428,42 +444,3 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } - -export async function migrateDataFetcher(db: Kysely, schemaName: string) { - const schema = db.withSchema(schemaName).schema; - - await schema - .createTable("ipfs_data") - .addColumn("cid", "text") - .addColumn("data", "jsonb") - .addUniqueConstraint("unique_cid", ["cid"]) - .execute(); -} - -export async function migratePriceFetcher( - db: Kysely, - schemaName: string -) { - const schema = db.withSchema(schemaName).schema; - - await schema - .createTable("prices") - .addColumn("id", "serial", (cb) => cb.primaryKey()) - .addColumn("chainId", CHAIN_ID_TYPE) - .addColumn("tokenAddress", ADDRESS_TYPE) - .addColumn("priceInUSD", "real") - .addColumn("timestamp", "timestamptz") - .addColumn("blockNumber", BIGINT_TYPE) - .addUniqueConstraint("unique_chainId_tokenAddress_blockNumber", [ - "chainId", - "tokenAddress", - "blockNumber", - ]) - .execute(); - - await db.schema - .createIndex("idx_prices_chain_token_block") - .on("prices") - .expression(sql`chain_id, token_address, block_number DESC`) - .execute(); -} diff --git a/src/database/schema.ts b/src/database/schema.ts index cd4c377b..29e6762e 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -130,11 +130,6 @@ export type ProjectTable = { projectType: ProjectType; }; -export type IpfsDataTable = { - cid: string; - data: unknown; -}; - export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -264,11 +259,6 @@ export type ApplicationPayout = { export type NewApplicationPayout = Insertable; -export type NewIpfsData = { - cid: string; - data: unknown; -}; - // Attestations export type AttestationTable = { uid: string; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index f29c33a9..e4ad3c08 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,9 +9,7 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - chainDataSchemaName: config.db.chainDataSchemaName, - ipfsDataSchema: config.db.ipfsDataSchemaName, - priceDataSchema: config.db.priceDataSchemaName, + databaseSchema: config.db.databaseSchemaName, }); }); diff --git a/src/http/app.ts b/src/http/app.ts index 22a36bd1..d5ef33ea 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -13,8 +13,6 @@ import { PassportProvider } from "../passport/index.js"; import { DataProvider } from "../calculator/dataProvider/index.js"; import { Chain } from "../config.js"; import { Database } from "../database/index.js"; -import { Indexer } from "chainsauce"; -import { recoverMessageAddress } from "viem"; type AsyncRequestHandler = ( req: express.Request, @@ -40,7 +38,6 @@ export interface HttpApiConfig { | { type: "in-thread" } | { type: "worker-pool"; workerPoolSize: number }; }; - indexedChains?: Indexer[] | null; } interface HttpApi { @@ -103,109 +100,6 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { res.send(config.dataVersion); }); - app.get("/config", (_req, res) => { - res.send(config); - }); - - app.post("/index", (req, res) => { - try { - const { chainId, address, timestamp, signature } = req.body as { - chainId: string; - address: string; - timestamp: number; - signature: `0x${string}`; - }; - - const reindex = async () => { - if (!chainId || !config.indexedChains) { - return res.status(400).send("chainId is required"); - } - - try { - const isAuthenticated = await recoverEthereumAddress({ - address, - timestamp, - signature, - }); - - config.logger.info( - `Reindexing chain ${chainId} requested by ${address} at ${timestamp}` - ); - - if (isAuthenticated) { - await config.db.deleteChainData(Number(chainId)); - - const filteredIndexedChains = config.indexedChains.filter( - (chain) => - (chain as { context: { chainId: number } }).context.chainId === - Number(chainId) - ); - - if (filteredIndexedChains.length === 0) { - config.logger.error(`Chain ${chainId} not found`); - return res.status(400).send("chain not found"); - } - - const filteredChains = config.chains.filter( - (chain) => chain.id === Number(chainId) - ); - - if (filteredChains.length === 0) { - config.logger.error(`Chain ${chainId} not found`); - return res.status(400).send("chain not found"); - } - - const chain = filteredChains[0]; - const indexedChain = filteredIndexedChains[0]; - - chain.subscriptions.forEach((subscription) => { - indexedChain.unsubscribeFromContract({ - address: subscription.address, - }); - - const contractName = subscription.contractName; - const subscriptionFromBlock = - subscription.fromBlock === undefined - ? undefined - : BigInt(subscription.fromBlock); - - indexedChain.subscribeToContract({ - contract: contractName, - address: subscription.address, - fromBlock: subscriptionFromBlock || BigInt(0), - }); - }); - } else { - config.logger.error( - `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication` - ); - return res.status(401).send("Authentication failed"); - } - } catch { - config.logger.error( - `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error` - ); - return res.status(500).send("An error occurred"); - } - }; - - reindex() - .then(() => { - config.logger.info(`Reindexing of chain ${chainId} finished`); - res.send("Reindexing finished"); - }) - .catch(() => { - config.logger.error( - `Reindexing of chain ${chainId} failed with error` - ); - res.status(500).send("An error occurred"); - }); - } catch { - config.logger.error(`Reindexing failed with error`); - res.status(500).send("An error occurred"); - } - }); - app.use("/api/v1", api); if (config.graphqlHandler !== undefined) { @@ -255,58 +149,3 @@ function staticJsonDataHandler(dataProvider: DataProvider) { } }; } - -const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period - -const recoverEthereumAddress = async ({ - address, - timestamp, - signature, -}: { - address: string; - timestamp: number; - signature: `0x${string}`; -}) => { - if (!address || !timestamp || !signature) { - return false; - } - const whitelistedAddresses: string[] = process.env.WHITELISTED_ADDRESSES - ? (JSON.parse(process.env.WHITELISTED_ADDRESSES) as string[]) - : []; - - if (!whitelistedAddresses) { - return false; - } - - // Check timestamp validity - const currentTime = Date.now(); - if (currentTime - timestamp > VALIDITY_PERIOD) { - return false; - } - - // Construct the expected message to be signed - const expectedMessage = `Authenticate with timestamp: ${timestamp}`; - try { - // Recover address from signature and expected message - const recoveredAddress = await recoverMessageAddress({ - message: expectedMessage, - signature, - }); - - const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) => - addr.toLowerCase() - ); - - if ( - recoveredAddress.toLowerCase() === address.toLowerCase() && - whitelistedAddressesLowercase.includes(address.toLowerCase()) - ) { - return true; - } else { - return false; - } - } catch (error) { - console.error("Error verifying signature:", error); - return false; - } -}; diff --git a/src/index.ts b/src/index.ts index e73e99a2..b30bd225 100644 --- a/src/index.ts +++ b/src/index.ts @@ -161,9 +161,7 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - chainDataSchemaName: config.databaseSchemaName, - ipfsDataSchemaName: config.ipfsDatabaseSchemaName, - priceDataSchemaName: config.priceDatabaseSchemaName, + schemaName: config.databaseSchemaName, }); baseLogger.info({ @@ -252,22 +250,15 @@ async function main(): Promise { const lock = await db.acquireWriteLock(); if (lock !== null) { + baseLogger.info("acquired write lock"); + if (isFirstRun) { if (config.dropDb) { - baseLogger.info("dropping all schemas"); - await db.dropAllSchemaIfExists(); - } else if (config.dropChainDb) { - baseLogger.info("resetting chain data schema"); - await db.dropChainDataSchemaIfExists(); - } else if (config.dropIpfsDb) { - baseLogger.info("resetting ipfs data schema"); - await db.dropIpfsDataSchemaIfExists(); - } else if (config.dropPriceDb) { - baseLogger.info("resetting price data schema"); - await db.dropPriceDataSchemaIfExists(); + baseLogger.info("dropping schema"); + await db.dropSchemaIfExists(); } - await db.createAllSchemas(baseLogger); + await db.createSchemaIfNotExists(baseLogger); await subscriptionStore.init(); } @@ -343,11 +334,7 @@ async function main(): Promise { const graphqlHandler = postgraphile( readOnlyDatabaseConnectionPool, - [ - config.databaseSchemaName, - config.ipfsDatabaseSchemaName, - config.priceDatabaseSchemaName, - ], + config.databaseSchemaName, { watchPg: false, graphqlRoute: "/graphql", @@ -424,7 +411,6 @@ async function main(): Promise { workerPoolSize: config.estimatesLinearQfWorkerPoolSize, }, }, - indexedChains: await indexChainsPromise, }); await httpApi.start(); @@ -489,86 +475,29 @@ async function catchupAndWatchChain( return undefined; } - // Check if data is already in the IPFS database - const ipfsData = await db.getDataByCid(cid); - if (ipfsData) { - // chainLogger.info(`Found IPFS data in database for CID: ${cid}`); - return Promise.resolve(ipfsData.data as string as T); - } - - // Fetch from a single IPFS gateway - const fetchFromGateway = async (url: string): Promise => { - try { - const res = await fetch(url, { - timeout: 2000, - onRetry(cause) { - chainLogger.debug({ - msg: "Retrying IPFS request", - url: url, - err: cause, - }); - }, - retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, - // IPFS data is immutable, we can rely entirely on the cache when present - cache: "force-cache", - cachePath: - config.cacheDir !== null - ? path.join(config.cacheDir, "ipfs") - : undefined, - }); + const url = `${config.ipfsGateway}/ipfs/${cid}`; - if (res.ok) { - return (await res.json()) as T; // Return the fetched data - } else { - chainLogger.warn( - `Failed to fetch from ${url}, status: ${res.status} ${res.statusText}` - ); - } - } catch (err) { - chainLogger.error( - `Error fetching from gateway ${url}: ${String(err)}` - ); - } - }; - - // Iterate through each gateway and attempt to fetch data - for (const gateway of config.ipfsGateways) { - const url = `${gateway}/ipfs/${cid}`; - // chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); - - const result = await fetchFromGateway(url); - if (result !== undefined) { - // chainLogger.info( - // `Fetch successful from gateway: ${gateway} for CID: ${cid}` - // ); - - // Save to IpfsData table - try { - await db.applyChange({ - type: "InsertIpfsData", - ipfs: { - cid, - data: result, // TODO: check is JSON.parse is needed - }, - }); - } catch (err) { - chainLogger.error( - `Error saving IPFS data to database: ${String(err)}` - ); - } + // chainLogger.trace(`Fetching ${url}`); - return result; // Return the result if fetched successfully - } else { - chainLogger.warn( - `IPFS fetch failed for gateway ${gateway} for CID ${cid}` - ); - } - } + const res = await fetch(url, { + timeout: 2000, + onRetry(cause) { + chainLogger.debug({ + msg: "Retrying IPFS request", + url: url, + err: cause, + }); + }, + retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, + // IPFS data is immutable, we can rely entirely on the cache when present + cache: "force-cache", + cachePath: + config.cacheDir !== null + ? path.join(config.cacheDir, "ipfs") + : undefined, + }); - chainLogger.error( - `Failed to fetch IPFS data for CID ${cid} from all gateways.` - ); - return undefined; // Return undefined if all gateways fail + return (await res.json()) as T; }; chainLogger.info("DEBUG: catching up with blockchain events"); diff --git a/src/prices/provider.ts b/src/prices/provider.ts index 8279432f..01f75656 100644 --- a/src/prices/provider.ts +++ b/src/prices/provider.ts @@ -211,27 +211,10 @@ export function createPriceProvider( }); } - // Check if the price is already in the database - const existingPrice = await db.getTokenPriceByBlockNumber( - chainId, - newPrice.tokenAddress, - blockNumber - ); - - if (!existingPrice) { - try { - await db.applyChange({ - type: "InsertManyPrices", - prices: [newPrice], - }); - } catch (e) { - logger.error({ - msg: "Failed to insert price", - error: e, - price: newPrice, - }); - } - } + await db.applyChange({ + type: "InsertManyPrices", + prices: [newPrice], + }); return { ...newPrice, tokenDecimals: token.decimals }; }