Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index single chain #657

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer
# METIS_ANDROMEDA_RPC_URL

#COINGECKO_API_KEY=
#IPFS_GATEWAY=
#IPFS_GATEWAYs=[]
#WHITELISTED_ADDRESSES=["0x123..","0x456.."]

# optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro
#GRAPHILE_LICENSE
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type CoingeckoSupportedChainId =
| 42220
| 1088;

const CHAIN_DATA_VERSION = "81";
const CHAIN_DATA_VERSION = "83";
const IPFS_DATA_VERSION = "1";
const PRICE_DATA_VERSION = "1";

Expand Down
150 changes: 107 additions & 43 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,60 +116,37 @@ export class Database {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
};

// Helper function to force release a lock for a specific schema
const forceReleaseLockForSchema = async (lockId: number) => {
await client.query(`
SELECT pg_terminate_backend(pid)
FROM pg_locks
WHERE locktype = 'advisory'
AND objid = ${lockId}
AND pid != pg_backend_pid()
`);
};

// Acquire locks for all schemas
const chainDataLockId = generateLockId(this.chainDataSchemaName);
const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName);
const priceDataLockId = generateLockId(this.priceDataSchemaName);

// Lock acquisition status
let chainDataLockAcquired = false;
let ipfsDataLockAcquired = false;
let priceDataLockAcquired = false;

try {
chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

return {
release: async () => {
if (chainDataLockAcquired) {
const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

if (
chainDataLockAcquired &&
ipfsDataLockAcquired &&
priceDataLockAcquired
) {
return {
release: async () => {
await releaseLockForSchema(chainDataLockId);
}
if (ipfsDataLockAcquired) {
await forceReleaseLockForSchema(ipfsDataLockId);
// await releaseLockForSchema(ipfsDataLockId);
}
if (priceDataLockAcquired) {
await forceReleaseLockForSchema(priceDataLockId);
// await releaseLockForSchema(priceDataLockId);
}
client.release();
},
client,
};
await releaseLockForSchema(ipfsDataLockId);
await releaseLockForSchema(priceDataLockId);
client.release();
},
client,
};
}
} catch (error) {
this.#logger.error({ error }, "Failed to acquire write lock");
} finally {
// Ensure any acquired locks are released if they were not all acquired
if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId);
if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId);
if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId);

client.release();
}

client.release();

return null;
}

Expand Down Expand Up @@ -962,6 +939,93 @@ export class Database {
return result ?? null;
}

async deleteChainData(chainId: ChainId) {
this.#logger.info("Deleting chain data for chainId:", chainId);

await this.#db.transaction().execute(async (trx) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • rename to tx ?
  • Also we should likely do this.#db.withSchema("schemaName").trans... cause else this will break once we add the IPFS schema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thelostone-mc will update this after your PR was merged, ao i can rebase and have the new schema names available

this.#logger.info("Deleting pending round roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("pendingRoundRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting round roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("roundRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting pending project roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("pendingProjectRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting project roles");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("projectRoles")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting applications");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("applications")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting applications donations");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("donations")
.where("chainId", "=", chainId)
.execute();

// this.#logger.info("Deleting donation prices");
// await trx
// .withSchema(this.priceDataSchemaName)
// .deleteFrom("prices")
// .where("chainId", "=", chainId)
// .execute();

this.#logger.info("Deleting applications");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("applications")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting rounds");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("rounds")
.where("chainId", "=", chainId)
.execute();

this.#logger.info("Deleting projects");
await trx
.withSchema(this.chainDataSchemaName)
.deleteFrom("projects")
.where("chainId", "=", chainId)
.execute();
});

this.#logger.info("Updating subscriptions indexed_to_block");
const sqlQuery = `
UPDATE ${this.chainDataSchemaName}.subscriptions
SET indexed_to_block = 0::bigint
WHERE chain_id = ${chainId}
`;

await sql.raw(sqlQuery).execute(this.#db);

this.#logger.info("Deleted chain data for chainId:", chainId);
}

async getDataByCid(cId: string) {
const metadata = await this.#db
.withSchema(this.ipfsDataSchemaName)
Expand Down
157 changes: 157 additions & 0 deletions src/http/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js";
import { DataProvider } from "../calculator/dataProvider/index.js";
import { Chain } from "../config.js";
import { Database } from "../database/index.js";
import { Indexer } from "chainsauce";
import { recoverMessageAddress } from "viem";

type AsyncRequestHandler = (
req: express.Request,
Expand All @@ -38,6 +40,7 @@ export interface HttpApiConfig {
| { type: "in-thread" }
| { type: "worker-pool"; workerPoolSize: number };
};
indexedChains?: Indexer<any, any>[] | null;
}

interface HttpApi {
Expand Down Expand Up @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => {
res.send(config.dataVersion);
});

app.get("/config", (_req, res) => {
res.send(config);
});

app.post("/index", (req, res) => {
try {
const { chainId, address, timestamp, signature } = req.body as {
chainId: string;
address: string;
timestamp: number;
signature: `0x${string}`;
};

const reindex = async () => {
if (!chainId || !config.indexedChains) {
return res.status(400).send("chainId is required");
}

try {
const isAuthenticated = await recoverEthereumAddress({
address,
timestamp,
signature,
});

config.logger.info(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp}`
);

if (isAuthenticated) {
await config.db.deleteChainData(Number(chainId));

const filteredIndexedChains = config.indexedChains.filter(
(chain) =>
(chain as { context: { chainId: number } }).context.chainId ===
Number(chainId)
);

if (filteredIndexedChains.length === 0) {
config.logger.error(`Chain ${chainId} not found`);
return res.status(400).send("chain not found");
}

const filteredChains = config.chains.filter(
(chain) => chain.id === Number(chainId)
);

if (filteredChains.length === 0) {
config.logger.error(`Chain ${chainId} not found`);
return res.status(400).send("chain not found");
}

const chain = filteredChains[0];
const indexedChain = filteredIndexedChains[0];

chain.subscriptions.forEach((subscription) => {
indexedChain.unsubscribeFromContract({
address: subscription.address,
});

const contractName = subscription.contractName;
const subscriptionFromBlock =
subscription.fromBlock === undefined
? undefined
: BigInt(subscription.fromBlock);

indexedChain.subscribeToContract({
contract: contractName,
address: subscription.address,
fromBlock: subscriptionFromBlock || BigInt(0),
});
});
} else {
config.logger.error(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication`
);
return res.status(401).send("Authentication failed");
}
} catch {
config.logger.error(
`Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error`
);
return res.status(500).send("An error occurred");
}
};

reindex()
.then(() => {
config.logger.info(`Reindexing of chain ${chainId} finished`);
res.send("Reindexing finished");
})
.catch(() => {
config.logger.error(
`Reindexing of chain ${chainId} failed with error`
);
res.status(500).send("An error occurred");
});
} catch {
config.logger.error(`Reindexing failed with error`);
res.status(500).send("An error occurred");
}
});

app.use("/api/v1", api);

if (config.graphqlHandler !== undefined) {
Expand Down Expand Up @@ -149,3 +255,54 @@ function staticJsonDataHandler(dataProvider: DataProvider) {
}
};
}

const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period

const recoverEthereumAddress = async ({
address,
timestamp,
signature,
}: {
address: string;
timestamp: number;
signature: `0x${string}`;
}) => {
if (!address || !timestamp || !signature) {
return false;
}
const whitelistedAddresses: string[] = JSON.parse(
process.env.WHITELISTED_ADDRESSES!
);

// Check timestamp validity
const currentTime = Date.now();
if (currentTime - timestamp > VALIDITY_PERIOD) {
return false;
}

// Construct the expected message to be signed
const expectedMessage = `Authenticate with timestamp: ${timestamp}`;
try {
// Recover address from signature and expected message
const recoveredAddress = await recoverMessageAddress({
message: expectedMessage,
signature,
});

const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) =>
addr.toLowerCase()
);

if (
recoveredAddress.toLowerCase() === address.toLowerCase() &&
whitelistedAddressesLowercase.includes(address.toLowerCase())
) {
return true;
} else {
return false;
}
} catch (error) {
console.error("Error verifying signature:", error);
return false;
}
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ async function main(): Promise<void> {
workerPoolSize: config.estimatesLinearQfWorkerPoolSize,
},
},
indexedChains: await indexChainsPromise,
});

await httpApi.start();
Expand Down
Loading