diff --git a/docs/reindexing.md b/docs/reindexing.md index 5f114bcc..8ea7689e 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,7 +12,7 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` | `--drop-chain-db` | `--drop-ipfs-db` in Development +### Dropping Schemas in Development - During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. @@ -20,6 +20,8 @@ When deploying changes to the indexer, it's important to clarify the results you - During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. +- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch. + ### Important Notes - **Reindexing Time**: Deployments involving reindexing will take significantly longer. Plan accordingly to minimize downtime or performance impact. diff --git a/src/config.ts b/src/config.ts index db3451de..9811cf1f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,6 +22,7 @@ type CoingeckoSupportedChainId = const CHAIN_DATA_VERSION = "81"; const IPFS_DATA_VERSION = "1"; +const PRICE_DATA_VERSION = "1"; export type Token = { code: string; @@ -1832,6 +1833,8 @@ export type Config = { databaseSchemaName: string; ipfsDataVersion: string; ipfsDatabaseSchemaName: string; + priceDataVersion: string; + priceDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; @@ -1839,6 +1842,7 @@ export type Config = { dropDb: boolean; dropChainDb: boolean; dropIpfsDb: boolean; + dropPriceDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -2010,9 +2014,13 @@ export function getConfig(): Config { const ipfsDataVersion = IPFS_DATA_VERSION; const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + const priceDataVersion = PRICE_DATA_VERSION; + const priceDatabaseSchemaName = `price_data_${priceDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); + const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2063,11 +2071,14 @@ export function getConfig(): Config { dropDb, dropChainDb, dropIpfsDb, + dropPriceDb, removeCache, dataVersion, databaseSchemaName, ipfsDataVersion, ipfsDatabaseSchemaName, + priceDataVersion, + priceDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/index.ts b/src/database/index.ts index 68a5c161..58e33d97 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -16,7 +16,7 @@ import { ApplicationPayout, IpfsDataTable, } from "./schema.js"; -import { migrate, migrateDataFetcher } from "./migrate.js"; +import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -57,6 +57,7 @@ export class Database { readonly chainDataSchemaName: string; readonly ipfsDataSchemaName: string; + readonly priceDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; @@ -64,6 +65,7 @@ export class Database { connectionPool: Pool; chainDataSchemaName: string; ipfsDataSchemaName: string; + priceDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -79,6 +81,7 @@ export class Database { // Initialize schema names this.chainDataSchemaName = options.chainDataSchemaName; this.ipfsDataSchemaName = options.ipfsDataSchemaName; + this.priceDataSchemaName = options.priceDataSchemaName; this.#logger = options.logger; @@ -113,19 +116,26 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; - // Acquire locks for both schemas + // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + const priceDataLockId = generateLockId(this.priceDataSchemaName); try { const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - if (chainDataLockAcquired && ipfsDataLockAcquired) { + if ( + chainDataLockAcquired && + ipfsDataLockAcquired && + priceDataLockAcquired + ) { return { release: async () => { await releaseLockForSchema(chainDataLockId); await releaseLockForSchema(ipfsDataLockId); + await releaseLockForSchema(priceDataLockId); client.release(); }, client, @@ -265,9 +275,19 @@ export class Database { .execute(); } + async dropPriceDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.priceDataSchemaName) + .dropSchema(this.priceDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + async dropAllSchemaIfExists() { await this.dropChainDataSchemaIfExists(); await this.dropIpfsDataSchemaIfExists(); + await this.dropPriceDataSchemaIfExists(); } async createSchemaIfNotExists( @@ -312,6 +332,11 @@ export class Database { migrateDataFetcher, logger ); + await this.createSchemaIfNotExists( + this.priceDataSchemaName, + migratePriceFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -534,7 +559,7 @@ export class Database { case "InsertManyPrices": { await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .insertInto("prices") .values(change.prices) .execute(); @@ -823,7 +848,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -840,7 +865,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -859,7 +884,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c59d6bd2..bedfb571 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -297,22 +297,6 @@ export async function migrate(db: Kysely, schemaName: string) { .columns(["chainId", "roundId", "applicationId"]) .execute(); - await schema - .createTable("prices") - .addColumn("id", "serial", (cb) => cb.primaryKey()) - .addColumn("chainId", CHAIN_ID_TYPE) - .addColumn("tokenAddress", ADDRESS_TYPE) - .addColumn("priceInUSD", "real") - .addColumn("timestamp", "timestamptz") - .addColumn("blockNumber", BIGINT_TYPE) - .execute(); - - await db.schema - .createIndex("idx_prices_chain_token_block") - .on("prices") - .expression(sql`chain_id, token_address, block_number DESC`) - .execute(); - await schema .createTable("legacy_projects") .addColumn("id", "serial", (col) => col.primaryKey()) @@ -402,3 +386,26 @@ export async function migrateDataFetcher(db: Kysely, schemaName: string) { .addColumn("data", "jsonb") .execute(); } + +export async function migratePriceFetcher( + db: Kysely, + schemaName: string +) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("prices") + .addColumn("id", "serial", (cb) => cb.primaryKey()) + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("tokenAddress", ADDRESS_TYPE) + .addColumn("priceInUSD", "real") + .addColumn("timestamp", "timestamptz") + .addColumn("blockNumber", BIGINT_TYPE) + .execute(); + + await db.schema + .createIndex("idx_prices_chain_token_block") + .on("prices") + .expression(sql`chain_id, token_address, block_number DESC`) + .execute(); +} diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index be6910b1..f29c33a9 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -11,6 +11,7 @@ export const createHandler = (config: HttpApiConfig): express.Router => { buildTag: config.buildTag, chainDataSchemaName: config.db.chainDataSchemaName, ipfsDataSchema: config.db.ipfsDataSchemaName, + priceDataSchema: config.db.priceDataSchemaName, }); }); diff --git a/src/index.ts b/src/index.ts index 205c9923..d57fa9d5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -155,6 +155,7 @@ async function main(): Promise { connectionPool: databaseConnectionPool, chainDataSchemaName: config.databaseSchemaName, ipfsDataSchemaName: config.ipfsDatabaseSchemaName, + priceDataSchemaName: config.priceDatabaseSchemaName, }); baseLogger.info({ @@ -255,6 +256,9 @@ async function main(): Promise { } else if (config.dropIpfsDb) { baseLogger.info("resetting ipfs data schema"); await db.dropIpfsDataSchemaIfExists(); + } else if (config.dropPriceDb) { + baseLogger.info("resetting price data schema"); + await db.dropPriceDataSchemaIfExists(); } await db.createAllSchemas(baseLogger); @@ -333,7 +337,11 @@ async function main(): Promise { const graphqlHandler = postgraphile( readOnlyDatabaseConnectionPool, - config.databaseSchemaName, + [ + config.databaseSchemaName, + config.ipfsDatabaseSchemaName, + config.priceDatabaseSchemaName, + ], { watchPg: false, graphqlRoute: "/graphql",