From 33de6de96259a3de1e00981ffc954ed65a49fe92 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Thu, 5 Sep 2024 21:59:00 +0530 Subject: [PATCH] feat: store ipfs data in db --- docs/reindexing.md | 8 +- src/config.ts | 20 ++++ src/database/changeset.ts | 5 + src/database/index.ts | 201 +++++++++++++++++++++++++++++++------- src/database/migrate.ts | 10 ++ src/database/schema.ts | 10 ++ src/http/api/v1/status.ts | 3 +- src/index.ts | 38 ++++++- 8 files changed, 254 insertions(+), 41 deletions(-) diff --git a/docs/reindexing.md b/docs/reindexing.md index 00f6fc67..5f114bcc 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,9 +12,13 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` in Development +### Using `--drop-db` | `--drop-chain-db` | `--drop-ipfs-db` in Development -- During development, you can use the `--drop-db` flag to ensure the indexer always deletes the existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. +- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. + +- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch. + +- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. ### Important Notes diff --git a/src/config.ts b/src/config.ts index 0625a969..f81796df 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,7 @@ type CoingeckoSupportedChainId = | 1088; const CHAIN_DATA_VERSION = "81"; +const IPFS_DATA_VERSION = "1"; export type Token = { code: string; @@ -1829,11 +1830,15 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; + ipfsDataVersion: string; + ipfsDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; enableResourceMonitor: boolean; dropDb: boolean; + dropChainDb: boolean; + dropIpfsDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -1901,6 +1906,12 @@ export function getConfig(): Config { "drop-db": { type: "boolean", }, + "drop-chain-db": { + type: "boolean", + }, + "drop-ipfs-db": { + type: "boolean", + }, "rm-cache": { type: "boolean", }, @@ -1989,7 +2000,12 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; + const ipfsDataVersion = IPFS_DATA_VERSION; + const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); + const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); + const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2038,9 +2054,13 @@ export function getConfig(): Config { databaseUrl, readOnlyDatabaseUrl, dropDb, + dropChainDb, + dropIpfsDb, removeCache, dataVersion, databaseSchemaName, + ipfsDataVersion, + ipfsDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 5eb7f12e..c53d2a14 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,6 +16,7 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, + NewIpfsData, } from "./schema.js"; export type DataChange = @@ -140,4 +141,8 @@ export type DataChange = | { type: "InsertApplicationPayout"; payout: NewApplicationPayout; + } + | { + type: "InsertIpfsData"; + ipfs: NewIpfsData; }; diff --git a/src/database/index.ts b/src/database/index.ts index 2b749ff2..68a5c161 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,8 +14,9 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, + IpfsDataTable, } from "./schema.js"; -import { migrate } from "./migrate.js"; +import { migrate, migrateDataFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -37,6 +38,7 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; + ipfsData: IpfsDataTable; } type KyselyDb = Kysely; @@ -53,13 +55,15 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly databaseSchemaName: string; + readonly chainDataSchemaName: string; + readonly ipfsDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - schemaName: string; + chainDataSchemaName: string; + ipfsDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -72,10 +76,11 @@ export class Database { plugins: [new CamelCasePlugin()], }); - this.#db = this.#db.withSchema(options.schemaName); + // Initialize schema names + this.chainDataSchemaName = options.chainDataSchemaName; + this.ipfsDataSchemaName = options.ipfsDataSchemaName; this.#logger = options.logger; - this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -87,21 +92,40 @@ export class Database { async acquireWriteLock() { const client = await this.#connectionPool.connect(); - // generate lock id based on schema - const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { - return acc + char.charCodeAt(0); - }, 0); + // Helper function to generate lock ID based on schema name + const generateLockId = (schemaName: string): number => { + return schemaName.split("").reduce((acc, char) => { + return acc + char.charCodeAt(0); + }, 0); + }; - try { + // Helper function to acquire a lock for a specific schema + const acquireLockForSchema = async (lockId: number) => { const result = await client.query( `SELECT pg_try_advisory_lock(${lockId}) as lock` ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - if (result.rows[0].lock === true) { + return result.rows[0].lock === true; + }; + + // Helper function to release a lock for a specific schema + const releaseLockForSchema = async (lockId: number) => { + await client.query(`SELECT pg_advisory_unlock(${lockId})`); + }; + + // Acquire locks for both schemas + const chainDataLockId = generateLockId(this.chainDataSchemaName); + const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + + try { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + + if (chainDataLockAcquired && ipfsDataLockAcquired) { return { release: async () => { - await client.query(`SELECT pg_advisory_unlock(${lockId})`); + await releaseLockForSchema(chainDataLockId); + await releaseLockForSchema(ipfsDataLockId); client.release(); }, client, @@ -132,12 +156,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; + const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."rounds" AS r + UPDATE "${this.chainDataSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -160,7 +184,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."applications" AS a + UPDATE "${this.chainDataSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -223,38 +247,71 @@ export class Database { } } - async dropSchemaIfExists() { + async dropChainDataSchemaIfExists() { await this.#db.schema - .dropSchema(this.databaseSchemaName) + .withSchema(this.chainDataSchemaName) + .dropSchema(this.chainDataSchemaName) .ifExists() .cascade() .execute(); } - async createSchemaIfNotExists(logger: Logger) { + async dropIpfsDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.ipfsDataSchemaName) + .dropSchema(this.ipfsDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropAllSchemaIfExists() { + await this.dropChainDataSchemaIfExists(); + await this.dropIpfsDataSchemaIfExists(); + } + + async createSchemaIfNotExists( + schemaName: string, + migrateFn: (tx: any, schemaName: string) => Promise, + logger: Logger + ) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${this.databaseSchemaName} - )`.execute(this.#db); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${schemaName} + )`.execute(this.#db.withSchema(schemaName)); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, + msg: `schema "${schemaName}" exists, skipping creation`, }); - return; } logger.info({ - msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, + msg: `schema "${schemaName}" does not exist, creating schema`, }); - await this.#db.transaction().execute(async (tx) => { - await tx.schema.createSchema(this.databaseSchemaName).execute(); + await this.#db + .withSchema(schemaName) + .transaction() + .execute(async (tx) => { + await tx.schema.createSchema(schemaName).execute(); + await migrateFn(tx, schemaName); + }); + } - await migrate(tx, this.databaseSchemaName); - }); + async createAllSchemas(logger: Logger) { + await this.createSchemaIfNotExists( + this.chainDataSchemaName, + migrate, + logger + ); + await this.createSchemaIfNotExists( + this.ipfsDataSchemaName, + migrateDataFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -267,6 +324,7 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -275,6 +333,7 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -283,6 +342,7 @@ export class Database { case "InsertPendingRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -291,6 +351,7 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -298,12 +359,17 @@ export class Database { } case "InsertProject": { - await this.#db.insertInto("projects").values(change.project).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("projects") + .values(change.project) + .execute(); break; } case "UpdateProject": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -314,6 +380,7 @@ export class Database { case "InsertProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -322,6 +389,7 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -332,6 +400,7 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -342,12 +411,17 @@ export class Database { } case "InsertRound": { - await this.#db.insertInto("rounds").values(change.round).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("rounds") + .values(change.round) + .execute(); break; } case "UpdateRound": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -358,6 +432,7 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -375,6 +450,7 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -385,6 +461,7 @@ export class Database { case "InsertRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -393,6 +470,7 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -411,7 +489,11 @@ export class Database { }; } - await this.#db.insertInto("applications").values(application).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("applications") + .values(application) + .execute(); break; } @@ -425,6 +507,7 @@ export class Database { } await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -441,6 +524,7 @@ export class Database { case "InsertManyDonations": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -449,12 +533,17 @@ export class Database { } case "InsertManyPrices": { - await this.#db.insertInto("prices").values(change.prices).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("prices") + .values(change.prices) + .execute(); break; } case "IncrementRoundDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -472,6 +561,7 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -484,6 +574,7 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -502,6 +593,7 @@ export class Database { case "NewLegacyProject": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -510,12 +602,22 @@ export class Database { case "InsertApplicationPayout": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } + case "InsertIpfsData": { + await this.#db + .withSchema(this.ipfsDataSchemaName) + .insertInto("ipfsData") + .values(change.ipfs) + .execute(); + break; + } + default: throw new Error(`Unknown changeset type`); } @@ -523,6 +625,7 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -534,6 +637,7 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -545,6 +649,7 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -556,6 +661,7 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -567,6 +673,7 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -578,6 +685,7 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -593,6 +701,7 @@ export class Database { roleValue: string ) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -615,6 +724,7 @@ export class Database { } const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -631,6 +741,7 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -641,6 +752,7 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -650,6 +762,7 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -663,6 +776,7 @@ export class Database { applicationId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -679,6 +793,7 @@ export class Database { projectId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -695,6 +810,7 @@ export class Database { anchorAddress: Address ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -707,6 +823,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -723,6 +840,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -741,6 +859,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -750,6 +869,7 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -761,6 +881,7 @@ export class Database { donorAddress: Address ) { const donations = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -784,6 +905,7 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -791,4 +913,15 @@ export class Database { return result ?? null; } + + async getDataByCid(cId: string) { + const metadata = await this.#db + .withSchema(this.ipfsDataSchemaName) + .selectFrom("ipfsData") + .where("cid", "=", cId) + .selectAll() + .executeTakeFirst(); + + return metadata ?? null; + } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c473d710..c59d6bd2 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -392,3 +392,13 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } + +export async function migrateDataFetcher(db: Kysely, schemaName: string) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("ipfs_data") + .addColumn("cid", "text") + .addColumn("data", "jsonb") + .execute(); +} diff --git a/src/database/schema.ts b/src/database/schema.ts index 06b9a906..8e50c133 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -125,6 +125,11 @@ export type ProjectTable = { projectType: ProjectType; }; +export type IpfsDataTable = { + cid: string; + data: unknown; +}; + export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -253,3 +258,8 @@ export type ApplicationPayout = { }; export type NewApplicationPayout = Insertable; + +export type NewIpfsData = { + cid: string; + data: unknown; +}; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index e4ad3c08..be6910b1 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,7 +9,8 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - databaseSchema: config.db.databaseSchemaName, + chainDataSchemaName: config.db.chainDataSchemaName, + ipfsDataSchema: config.db.ipfsDataSchemaName, }); }); diff --git a/src/index.ts b/src/index.ts index 4ad25e91..b17ba61d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -151,7 +151,8 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - schemaName: config.databaseSchemaName, + chainDataSchemaName: config.databaseSchemaName, + ipfsDataSchemaName: config.ipfsDatabaseSchemaName, }); baseLogger.info({ @@ -244,11 +245,17 @@ async function main(): Promise { if (isFirstRun) { if (config.dropDb) { - baseLogger.info("dropping schema"); - await db.dropSchemaIfExists(); + baseLogger.info("dropping all schemas"); + await db.dropAllSchemaIfExists(); + } else if (config.dropChainDb) { + baseLogger.info("resetting chain data schema"); + await db.dropChainDataSchemaIfExists(); + } else if (config.dropIpfsDb) { + baseLogger.info("resetting ipfs data schema"); + await db.dropIpfsDataSchemaIfExists(); } - await db.createSchemaIfNotExists(baseLogger); + await db.createAllSchemas(baseLogger); await subscriptionStore.init(); } @@ -465,6 +472,13 @@ async function catchupAndWatchChain( return undefined; } + // Check if data is already in the IPFS database + const ipfsData = await db.getDataByCid(cid); + if (ipfsData) { + chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + return Promise.resolve(ipfsData.data as string as T); + } + // Fetch from a single IPFS gateway const fetchFromGateway = async (url: string): Promise => { try { @@ -510,6 +524,22 @@ async function catchupAndWatchChain( chainLogger.info( `Fetch successful from gateway: ${gateway} for CID: ${cid}` ); + + // Save to IpfsData table + try { + await db.applyChange({ + type: "InsertIpfsData", + ipfs: { + cid, + data: result, // TODO: check is JSON.parse is needed + }, + }); + } catch (err) { + chainLogger.error( + `Error saving IPFS data to database: ${String(err)}` + ); + } + return result; // Return the result if fetched successfully } else { chainLogger.warn(