Skip to content

Commit

Permalink
index single chain (#657)
Browse files Browse the repository at this point in the history
* update schema name

* updates

* fix

* Revert "Refactor lock acquisition and release logic (#661)"

This reverts commit 9a2ffe1.
  • Loading branch information
0xKurt authored Sep 9, 2024
1 parent 9a2ffe1 commit 4dd2915
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer
# METIS_ANDROMEDA_RPC_URL

#COINGECKO_API_KEY=
#IPFS_GATEWAY=
#IPFS_GATEWAYs=[]
#WHITELISTED_ADDRESSES=["0x123..","0x456.."]

# optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro
#GRAPHILE_LICENSE
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type CoingeckoSupportedChainId =
| 42220
| 1088;

const CHAIN_DATA_VERSION = "81";
const CHAIN_DATA_VERSION = "83";
const IPFS_DATA_VERSION = "1";
const PRICE_DATA_VERSION = "1";

Expand Down
150 changes: 107 additions & 43 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,60 +116,37 @@ export class Database {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
};

// Helper function to force release a lock for a specific schema
const forceReleaseLockForSchema = async (lockId: number) => {
await client.query(`
SELECT pg_terminate_backend(pid)
FROM pg_locks
WHERE locktype = 'advisory'
AND objid = ${lockId}
AND pid != pg_backend_pid()
`);
};

// Acquire locks for all schemas
const chainDataLockId = generateLockId(this.chainDataSchemaName);
const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName);
const priceDataLockId = generateLockId(this.priceDataSchemaName);

// Lock acquisition status
let chainDataLockAcquired = false;
let ipfsDataLockAcquired = false;
let priceDataLockAcquired = false;

try {
chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

return {
release: async () => {
if (chainDataLockAcquired) {
const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

if (
chainDataLockAcquired &&
ipfsDataLockAcquired &&
priceDataLockAcquired
) {
return {
release: async () => {
await releaseLockForSchema(chainDataLockId);
}
if (ipfsDataLockAcquired) {
await forceReleaseLockForSchema(ipfsDataLockId);
// await releaseLockForSchema(ipfsDataLockId);
}
if (priceDataLockAcquired) {
await forceReleaseLockForSchema(priceDataLockId);
// await releaseLockForSchema(priceDataLockId);
}
client.release();
},
client,
};
await releaseLockForSchema(ipfsDataLockId);
await releaseLockForSchema(priceDataLockId);
client.release();
},
client,
};
}
} catch (error) {
this.#logger.error({ error }, "Failed to acquire write lock");
} finally {
// Ensure any acquired locks are released if they were not all acquired
if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId);
if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId);
if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId);

client.release();
}

client.release();

return null;
}

Expand Down Expand Up @@ -962,6 +939,93 @@ export class Database {
return result ?? null;
}

async deleteChainData(chainId: ChainId) {
this.#logger.info("Deleting chain data for chainId:", chainId);

await this.#db.transaction().execute(async (trx) => {
this.#logger.info("Deleting pending round roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("pendingRoundRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting round roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("roundRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting pending project roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("pendingProjectRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting project roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("projectRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting applications");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("applications")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting applications donations");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("donations")
.where("chainId", "=", chainId)
.execute();

// this.#logger.info("Deleting donation prices");
// await trx
// .withSchema(this.priceDataSchemaName)
// .deleteFrom("prices")
// .where("chainId", "=", chainId)
// .execute();

this.#logger.info("Deleting applications");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("applications")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting rounds");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("rounds")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting projects");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("projects")
.where("chainId", "=", chainId)
.execute();
});

this.#logger.info("Updating subscriptions indexed_to_block");
const sqlQuery = `
UPDATE ${this.chainDataSchemaName}.subscriptions
SET indexed_to_block = 0::bigint
WHERE chain_id = ${chainId}
`;

await sql.raw(sqlQuery).execute(this.#db);

this.#logger.info("Deleted chain data for chainId:", chainId);
}

async getDataByCid(cId: string) {
const metadata = await this.#db
.withSchema(this.ipfsDataSchemaName)
Expand Down
157 changes: 157 additions & 0 deletions src/http/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js";
import { DataProvider } from "../calculator/dataProvider/index.js";
import { Chain } from "../config.js";
import { Database } from "../database/index.js";
import { Indexer } from "chainsauce";
import { recoverMessageAddress } from "viem";

type AsyncRequestHandler = (
req: express.Request,
Expand All @@ -38,6 +40,7 @@ export interface HttpApiConfig {
| { type: "in-thread" }
| { type: "worker-pool"; workerPoolSize: number };
};
indexedChains?: Indexer<any, any>[] | null;
}

interface HttpApi {
Expand Down Expand Up @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => {
res.send(config.dataVersion);
});

app.get("/config", (_req, res) => {
res.send(config);
});

app.post("/index", (req, res) => {
try {
const { chainId, address, timestamp, signature } = req.body as {
chainId: string;
address: string;
timestamp: number;
signature: `0x${string}`;
};

const reindex = async () => {
if (!chainId || !config.indexedChains) {
return res.status(400).send("chainId is required");
}

try {
const isAuthenticated = await recoverEthereumAddress({
address,
timestamp,
signature,
});

config.logger.info(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp}`
);

if (isAuthenticated) {
await config.db.deleteChainData(Number(chainId));

const filteredIndexedChains = config.indexedChains.filter(
(chain) =>
(chain as { context: { chainId: number } }).context.chainId ===
Number(chainId)
);

if (filteredIndexedChains.length === 0) {
config.logger.error(`Chain ${chainId} not found`);
return res.status(400).send("chain not found");
}

const filteredChains = config.chains.filter(
(chain) => chain.id === Number(chainId)
);

if (filteredChains.length === 0) {
config.logger.error(`Chain ${chainId} not found`);
return res.status(400).send("chain not found");
}

const chain = filteredChains[0];
const indexedChain = filteredIndexedChains[0];

chain.subscriptions.forEach((subscription) => {
indexedChain.unsubscribeFromContract({
address: subscription.address,
});

const contractName = subscription.contractName;
const subscriptionFromBlock =
subscription.fromBlock === undefined
? undefined
: BigInt(subscription.fromBlock);

indexedChain.subscribeToContract({
contract: contractName,
address: subscription.address,
fromBlock: subscriptionFromBlock || BigInt(0),
});
});
} else {
config.logger.error(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication`
);
return res.status(401).send("Authentication failed");
}
} catch {
config.logger.error(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error`
);
return res.status(500).send("An error occurred");
}
};

reindex()
.then(() => {
config.logger.info(`Reindexing of chain ${chainId} finished`);
res.send("Reindexing finished");
})
.catch(() => {
config.logger.error(
`Reindexing of chain ${chainId} failed with error`
);
res.status(500).send("An error occurred");
});
} catch {
config.logger.error(`Reindexing failed with error`);
res.status(500).send("An error occurred");
}
});

app.use("/api/v1", api);

if (config.graphqlHandler !== undefined) {
Expand Down Expand Up @@ -149,3 +255,54 @@ function staticJsonDataHandler(dataProvider: DataProvider) {
}
};
}

const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period

const recoverEthereumAddress = async ({
address,
timestamp,
signature,
}: {
address: string;
timestamp: number;
signature: `0x${string}`;
}) => {
if (!address || !timestamp || !signature) {
return false;
}
const whitelistedAddresses: string[] = JSON.parse(
process.env.WHITELISTED_ADDRESSES!
);

// Check timestamp validity
const currentTime = Date.now();
if (currentTime - timestamp > VALIDITY_PERIOD) {
return false;
}

// Construct the expected message to be signed
const expectedMessage = `Authenticate with timestamp: ${timestamp}`;
try {
// Recover address from signature and expected message
const recoveredAddress = await recoverMessageAddress({
message: expectedMessage,
signature,
});

const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) =>
addr.toLowerCase()
);

if (
recoveredAddress.toLowerCase() === address.toLowerCase() &&
whitelistedAddressesLowercase.includes(address.toLowerCase())
) {
return true;
} else {
return false;
}
} catch (error) {
console.error("Error verifying signature:", error);
return false;
}
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ async function main(): Promise<void> {
workerPoolSize: config.estimatesLinearQfWorkerPoolSize,
},
},
indexedChains: await indexChainsPromise,
});

await httpApi.start();
Expand Down

0 comments on commit 4dd2915

Please sign in to comment.