Skip to content

Commit

Permalink
chore: move prices to it's own schema
Browse files Browse the repository at this point in the history
  • Loading branch information
thelostone-mc authored and hussedev committed Sep 11, 2024
1 parent b389c5d commit eaaaad4
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
4 changes: 3 additions & 1 deletion docs/reindexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ When deploying changes to the indexer, it's important to clarify the results you
- The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch.
- If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh.

### Using `--drop-db` | `--drop-chain-db` | `--drop-ipfs-db` in Development
### Dropping Schemas in Development

- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data.

- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch.

- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch.

- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch.

### Important Notes

- **Reindexing Time**: Deployments involving reindexing will take significantly longer. Plan accordingly to minimize downtime or performance impact.
Expand Down
11 changes: 11 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type CoingeckoSupportedChainId =

const CHAIN_DATA_VERSION = "81";
const IPFS_DATA_VERSION = "1";
const PRICE_DATA_VERSION = "1";

export type Token = {
code: string;
Expand Down Expand Up @@ -1832,13 +1833,16 @@ export type Config = {
databaseSchemaName: string;
ipfsDataVersion: string;
ipfsDatabaseSchemaName: string;
priceDataVersion: string;
priceDatabaseSchemaName: string;
hostname: string;
pinoPretty: boolean;
deploymentEnvironment: "local" | "development" | "staging" | "production";
enableResourceMonitor: boolean;
dropDb: boolean;
dropChainDb: boolean;
dropIpfsDb: boolean;
dropPriceDb: boolean;
removeCache: boolean;
estimatesLinearQfWorkerPoolSize: number | null;
};
Expand Down Expand Up @@ -2010,9 +2014,13 @@ export function getConfig(): Config {
const ipfsDataVersion = IPFS_DATA_VERSION;
const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`;

const priceDataVersion = PRICE_DATA_VERSION;
const priceDatabaseSchemaName = `price_data_${priceDataVersion}`;

const dropDb = z.boolean().default(false).parse(args["drop-db"]);
const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]);
const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]);
const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]);

const removeCache = z.boolean().default(false).parse(args["rm-cache"]);

Expand Down Expand Up @@ -2063,11 +2071,14 @@ export function getConfig(): Config {
dropDb,
dropChainDb,
dropIpfsDb,
dropPriceDb,
removeCache,
dataVersion,
databaseSchemaName,
ipfsDataVersion,
ipfsDatabaseSchemaName,
priceDataVersion,
priceDatabaseSchemaName,
httpServerWaitForSync,
httpServerEnabled,
indexerEnabled,
Expand Down
39 changes: 32 additions & 7 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
ApplicationPayout,
IpfsDataTable,
} from "./schema.js";
import { migrate, migrateDataFetcher } from "./migrate.js";
import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js";
import { encodeJsonWithBigInts } from "../utils/index.js";
import type { DataChange } from "./changeset.js";
import { Logger } from "pino";
Expand Down Expand Up @@ -57,13 +57,15 @@ export class Database {

readonly chainDataSchemaName: string;
readonly ipfsDataSchemaName: string;
readonly priceDataSchemaName: string;

constructor(options: {
statsUpdaterEnabled: boolean;
logger: Logger;
connectionPool: Pool;
chainDataSchemaName: string;
ipfsDataSchemaName: string;
priceDataSchemaName: string;
}) {
const dialect = new PostgresDialect({
pool: options.connectionPool,
Expand All @@ -79,6 +81,7 @@ export class Database {
// Initialize schema names
this.chainDataSchemaName = options.chainDataSchemaName;
this.ipfsDataSchemaName = options.ipfsDataSchemaName;
this.priceDataSchemaName = options.priceDataSchemaName;

this.#logger = options.logger;

Expand Down Expand Up @@ -113,19 +116,26 @@ export class Database {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
};

// Acquire locks for both schemas
// Acquire locks for all schemas
const chainDataLockId = generateLockId(this.chainDataSchemaName);
const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName);
const priceDataLockId = generateLockId(this.priceDataSchemaName);

try {
const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

if (chainDataLockAcquired && ipfsDataLockAcquired) {
if (
chainDataLockAcquired &&
ipfsDataLockAcquired &&
priceDataLockAcquired
) {
return {
release: async () => {
await releaseLockForSchema(chainDataLockId);
await releaseLockForSchema(ipfsDataLockId);
await releaseLockForSchema(priceDataLockId);
client.release();
},
client,
Expand Down Expand Up @@ -265,9 +275,19 @@ export class Database {
.execute();
}

async dropPriceDataSchemaIfExists() {
await this.#db.schema
.withSchema(this.priceDataSchemaName)
.dropSchema(this.priceDataSchemaName)
.ifExists()
.cascade()
.execute();
}

async dropAllSchemaIfExists() {
await this.dropChainDataSchemaIfExists();
await this.dropIpfsDataSchemaIfExists();
await this.dropPriceDataSchemaIfExists();
}

async createSchemaIfNotExists(
Expand Down Expand Up @@ -312,6 +332,11 @@ export class Database {
migrateDataFetcher,
logger
);
await this.createSchemaIfNotExists(
this.priceDataSchemaName,
migratePriceFetcher,
logger
);
}

async applyChanges(changes: DataChange[]): Promise<void> {
Expand Down Expand Up @@ -534,7 +559,7 @@ export class Database {

case "InsertManyPrices": {
await this.#db
.withSchema(this.chainDataSchemaName)
.withSchema(this.priceDataSchemaName)
.insertInto("prices")
.values(change.prices)
.execute();
Expand Down Expand Up @@ -823,7 +848,7 @@ export class Database {

async getLatestPriceTimestampForChain(chainId: ChainId) {
const latestPriceTimestamp = await this.#db
.withSchema(this.chainDataSchemaName)
.withSchema(this.priceDataSchemaName)
.selectFrom("prices")
.where("chainId", "=", chainId)
.orderBy("timestamp", "desc")
Expand All @@ -840,7 +865,7 @@ export class Database {
blockNumber: bigint | "latest"
) {
let priceQuery = this.#db
.withSchema(this.chainDataSchemaName)
.withSchema(this.priceDataSchemaName)
.selectFrom("prices")
.where("chainId", "=", chainId)
.where("tokenAddress", "=", tokenAddress)
Expand All @@ -859,7 +884,7 @@ export class Database {

async getAllChainPrices(chainId: ChainId) {
return await this.#db
.withSchema(this.chainDataSchemaName)
.withSchema(this.priceDataSchemaName)
.selectFrom("prices")
.where("chainId", "=", chainId)
.orderBy("blockNumber", "asc")
Expand Down
39 changes: 23 additions & 16 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,6 @@ export async function migrate<T>(db: Kysely<T>, schemaName: string) {
.columns(["chainId", "roundId", "applicationId"])
.execute();

await schema
.createTable("prices")
.addColumn("id", "serial", (cb) => cb.primaryKey())
.addColumn("chainId", CHAIN_ID_TYPE)
.addColumn("tokenAddress", ADDRESS_TYPE)
.addColumn("priceInUSD", "real")
.addColumn("timestamp", "timestamptz")
.addColumn("blockNumber", BIGINT_TYPE)
.execute();

await db.schema
.createIndex("idx_prices_chain_token_block")
.on("prices")
.expression(sql`chain_id, token_address, block_number DESC`)
.execute();

await schema
.createTable("legacy_projects")
.addColumn("id", "serial", (col) => col.primaryKey())
Expand Down Expand Up @@ -402,3 +386,26 @@ export async function migrateDataFetcher<T>(db: Kysely<T>, schemaName: string) {
.addColumn("data", "jsonb")
.execute();
}

export async function migratePriceFetcher<T>(
db: Kysely<T>,
schemaName: string
) {
const schema = db.withSchema(schemaName).schema;

await schema
.createTable("prices")
.addColumn("id", "serial", (cb) => cb.primaryKey())
.addColumn("chainId", CHAIN_ID_TYPE)
.addColumn("tokenAddress", ADDRESS_TYPE)
.addColumn("priceInUSD", "real")
.addColumn("timestamp", "timestamptz")
.addColumn("blockNumber", BIGINT_TYPE)
.execute();

await db.schema
.createIndex("idx_prices_chain_token_block")
.on("prices")
.expression(sql`chain_id, token_address, block_number DESC`)
.execute();
}
1 change: 1 addition & 0 deletions src/http/api/v1/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const createHandler = (config: HttpApiConfig): express.Router => {
buildTag: config.buildTag,
chainDataSchemaName: config.db.chainDataSchemaName,
ipfsDataSchema: config.db.ipfsDataSchemaName,
priceDataSchema: config.db.priceDataSchemaName,
});
});

Expand Down
10 changes: 9 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ async function main(): Promise<void> {
connectionPool: databaseConnectionPool,
chainDataSchemaName: config.databaseSchemaName,
ipfsDataSchemaName: config.ipfsDatabaseSchemaName,
priceDataSchemaName: config.priceDatabaseSchemaName,
});

baseLogger.info({
Expand Down Expand Up @@ -255,6 +256,9 @@ async function main(): Promise<void> {
} else if (config.dropIpfsDb) {
baseLogger.info("resetting ipfs data schema");
await db.dropIpfsDataSchemaIfExists();
} else if (config.dropPriceDb) {
baseLogger.info("resetting price data schema");
await db.dropPriceDataSchemaIfExists();
}

await db.createAllSchemas(baseLogger);
Expand Down Expand Up @@ -333,7 +337,11 @@ async function main(): Promise<void> {

const graphqlHandler = postgraphile(
readOnlyDatabaseConnectionPool,
config.databaseSchemaName,
[
config.databaseSchemaName,
config.ipfsDatabaseSchemaName,
config.priceDatabaseSchemaName,
],
{
watchPg: false,
graphqlRoute: "/graphql",
Expand Down

0 comments on commit eaaaad4

Please sign in to comment.