Skip to content

Commit

Permalink
Revert "Refactor lock acquisition and release logic (#661)"
Browse files Browse the repository at this point in the history
This reverts commit 9a2ffe1.
  • Loading branch information
0xKurt committed Sep 9, 2024
1 parent a20f9ca commit c19cdad
Showing 1 changed file with 20 additions and 43 deletions.
63 changes: 20 additions & 43 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,60 +116,37 @@ export class Database {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
};

// Helper function to force release a lock for a specific schema
const forceReleaseLockForSchema = async (lockId: number) => {
await client.query(`
SELECT pg_terminate_backend(pid)
FROM pg_locks
WHERE locktype = 'advisory'
AND objid = ${lockId}
AND pid != pg_backend_pid()
`);
};

// Acquire locks for all schemas
const chainDataLockId = generateLockId(this.chainDataSchemaName);
const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName);
const priceDataLockId = generateLockId(this.priceDataSchemaName);

// Lock acquisition status
let chainDataLockAcquired = false;
let ipfsDataLockAcquired = false;
let priceDataLockAcquired = false;

try {
chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

return {
release: async () => {
if (chainDataLockAcquired) {
const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

if (
chainDataLockAcquired &&
ipfsDataLockAcquired &&
priceDataLockAcquired
) {
return {
release: async () => {
await releaseLockForSchema(chainDataLockId);
}
if (ipfsDataLockAcquired) {
await forceReleaseLockForSchema(ipfsDataLockId);
// await releaseLockForSchema(ipfsDataLockId);
}
if (priceDataLockAcquired) {
await forceReleaseLockForSchema(priceDataLockId);
// await releaseLockForSchema(priceDataLockId);
}
client.release();
},
client,
};
await releaseLockForSchema(ipfsDataLockId);
await releaseLockForSchema(priceDataLockId);
client.release();
},
client,
};
}
} catch (error) {
this.#logger.error({ error }, "Failed to acquire write lock");
} finally {
// Ensure any acquired locks are released if they were not all acquired
if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId);
if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId);
if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId);

client.release();
}

client.release();

return null;
}

Expand Down

0 comments on commit c19cdad

Please sign in to comment.