diff --git a/src/database/index.ts b/src/database/index.ts index 58e33d97..0226366d 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -116,36 +116,59 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; + // Helper function to force release a lock for a specific schema + const forceReleaseLockForSchema = async (lockId: number) => { + await client.query(` + SELECT pg_terminate_backend(pid) + FROM pg_locks + WHERE locktype = 'advisory' + AND objid = ${lockId} + AND pid != pg_backend_pid() + `); + }; + // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); const priceDataLockId = generateLockId(this.priceDataSchemaName); + // Lock acquisition status + let chainDataLockAcquired = false; + let ipfsDataLockAcquired = false; + let priceDataLockAcquired = false; + try { - const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - - if ( - chainDataLockAcquired && - ipfsDataLockAcquired && - priceDataLockAcquired - ) { - return { - release: async () => { + chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + + return { + release: async () => { + if (chainDataLockAcquired) { await releaseLockForSchema(chainDataLockId); - await releaseLockForSchema(ipfsDataLockId); - await releaseLockForSchema(priceDataLockId); - client.release(); - }, - client, - }; - } + } + if (ipfsDataLockAcquired) { + await forceReleaseLockForSchema(ipfsDataLockId); + // await releaseLockForSchema(ipfsDataLockId); + } + if (priceDataLockAcquired) { + await forceReleaseLockForSchema(priceDataLockId); + // await releaseLockForSchema(priceDataLockId); + } + client.release(); + }, + client, + }; } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); - } + } finally { + // Ensure any acquired locks are released if they were not all acquired + if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId); + if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId); + if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId); - client.release(); + client.release(); + } return null; }