Skip to content

Commit

Permalink
Refactor lock acquisition and release logic (#661)
Browse files Browse the repository at this point in the history
* update lock logic

* forceRelease of schema
  • Loading branch information
thelostone-mc authored and hussedev committed Sep 11, 2024
1 parent eaaaad4 commit d3747ee
Showing 1 changed file with 43 additions and 20 deletions.
63 changes: 43 additions & 20 deletions src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,59 @@ export class Database {
await client.query(`SELECT pg_advisory_unlock(${lockId})`);
};

// Helper function to force release a lock for a specific schema
const forceReleaseLockForSchema = async (lockId: number) => {
await client.query(`
SELECT pg_terminate_backend(pid)
FROM pg_locks
WHERE locktype = 'advisory'
AND objid = ${lockId}
AND pid != pg_backend_pid()
`);
};

// Acquire locks for all schemas
const chainDataLockId = generateLockId(this.chainDataSchemaName);
const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName);
const priceDataLockId = generateLockId(this.priceDataSchemaName);

// Lock acquisition status
let chainDataLockAcquired = false;
let ipfsDataLockAcquired = false;
let priceDataLockAcquired = false;

try {
const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

if (
chainDataLockAcquired &&
ipfsDataLockAcquired &&
priceDataLockAcquired
) {
return {
release: async () => {
chainDataLockAcquired = await acquireLockForSchema(chainDataLockId);
ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId);
priceDataLockAcquired = await acquireLockForSchema(priceDataLockId);

return {
release: async () => {
if (chainDataLockAcquired) {
await releaseLockForSchema(chainDataLockId);
await releaseLockForSchema(ipfsDataLockId);
await releaseLockForSchema(priceDataLockId);
client.release();
},
client,
};
}
}
if (ipfsDataLockAcquired) {
await forceReleaseLockForSchema(ipfsDataLockId);
// await releaseLockForSchema(ipfsDataLockId);
}
if (priceDataLockAcquired) {
await forceReleaseLockForSchema(priceDataLockId);
// await releaseLockForSchema(priceDataLockId);
}
client.release();
},
client,
};
} catch (error) {
this.#logger.error({ error }, "Failed to acquire write lock");
}
} finally {
// Ensure any acquired locks are released if they were not all acquired
if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId);
if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId);
if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId);

client.release();
client.release();
}

return null;
}
Expand Down

0 comments on commit d3747ee

Please sign in to comment.