Skip to content

Commit

Permalink
Lock Entity while it's being updated
Browse files Browse the repository at this point in the history
  • Loading branch information
sadiqkhoja committed Oct 26, 2023
1 parent 9d1bbd7 commit 2e3e09c
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 5 deletions.
16 changes: 13 additions & 3 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
const clientEntity = await Entity.fromParseEntityData(entityData); // validation happens here

// Get version of entity on the server
const serverEntity = (await Entities.getById(dataset.id, clientEntity.uuid))
const serverEntity = (await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate))
.orThrow(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name }));

let { conflict } = serverEntity;
Expand Down Expand Up @@ -347,14 +347,24 @@ const _get = (includeSource) => {
`);
};

// This is Postgresql Advisory Lock
// We can't use `FOR UPDATE` clause because of "Read Committed Isolation Level",
// i.e. blocked transaction gets the row version that was at the start of the command,
// (after lock is released by the first transaction), even if transaction with lock has updated that row.
const _lockEntity = (exec, uuid) => exec(sql`SELECT pg_advisory_xact_lock(id) FROM entities WHERE uuid = ${uuid};`);

const assignCurrentVersionCreator = (entity) => {
const currentVersion = new Entity.Def(entity.aux.currentVersion, { creator: entity.aux.currentVersionCreator });
return new Entity(entity, { currentVersion, creator: entity.aux.creator });
};

const getById = (datasetId, uuid, options = QueryOptions.none) => ({ maybeOne }) =>
_get(true)(maybeOne, options.withCondition({ datasetId, uuid }), isTrue(options.argData?.deleted))
const getById = (datasetId, uuid, options = QueryOptions.none) => async ({ maybeOne }) => {
if (options.forUpdate) {
await _lockEntity(maybeOne, uuid);
}
return _get(true)(maybeOne, options.withCondition({ datasetId, uuid }), isTrue(options.argData?.deleted))
.then(map(assignCurrentVersionCreator));
};

const getAll = (datasetId, options = QueryOptions.none) => ({ all }) =>
_get(false)(all, options.withCondition({ datasetId }), isTrue(options.argData.deleted))
Expand Down
3 changes: 2 additions & 1 deletion lib/resources/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { isTrue, success, withEtag } = require('../util/http');
const { Entity } = require('../model/frames');
const Problem = require('../util/problem');
const { diffEntityData, getWithConflictDetails } = require('../data/entity');
const { QueryOptions } = require('../util/db');

module.exports = (service, endpoint) => {

Expand Down Expand Up @@ -102,7 +103,7 @@ module.exports = (service, endpoint) => {

await auth.canOrReject('entity.update', dataset);

const entity = await Entities.getById(dataset.id, params.uuid).then(getOrNotFound);
const entity = await Entities.getById(dataset.id, params.uuid, QueryOptions.forUpdate).then(getOrNotFound);

if (isTrue(query.resolve) && !entity.conflict) return reject(Problem.user.noConflictEntity());

Expand Down
1 change: 1 addition & 0 deletions lib/util/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class QueryOptions {
}
QueryOptions.none = new QueryOptions();
QueryOptions.extended = new QueryOptions({ extended: true });
QueryOptions.forUpdate = new QueryOptions({ forUpdate: true });


////////////////////////////////////////////////////////////////////////////////
Expand Down
124 changes: 123 additions & 1 deletion test/integration/api/entities.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
const appRoot = require('app-root-path');
const { testService } = require('../setup');
const { testService, testServiceFullTrx } = require('../setup');
const testData = require('../../data/xml');
const { sql } = require('slonik');
const should = require('should');
const { QueryOptions, queryFuncs } = require('../../../lib/util/db');
const { getById, createVersion } = require('../../../lib/model/query/entities');
const Option = require('../../../lib/util/option');
const { Entity } = require('../../../lib/model/frames');
const { getOrNotFound } = require('../../../lib/util/promise');
const { get } = require('../../../lib/model/query/datasets');

const { exhaust } = require(appRoot + '/lib/worker/worker');

Expand Down Expand Up @@ -1457,6 +1463,122 @@ describe('Entities API', () => {

});

/* eslint-disable no-console */
// This is explanatory test where two transaction tries to update the same Entity.
// `getById` creates an advisory lock which blocks other transactions to do the same.
// Once first transaction updates the Entity, only then second transaction is able
// to get the Entity.
it('should not allow parallel updates to the same Entity', testServiceFullTrx(async (service, container) => {

const asAlice = await service.login('alice');

await asAlice.post('/v1/projects/1/forms?publish=true')
.send(testData.forms.simpleEntity)
.expect(200);

await asAlice.post('/v1/projects/1/forms/simpleEntity/submissions')
.send(testData.instances.simpleEntity.one)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

const dataset = await get(1, 'people', true)(container).then(getOrNotFound);
const actorId = await container.oneFirst(sql`SELECT id FROM actors WHERE "displayName" = 'Alice'`);

let secondTxWaiting = false;
let entityLocked = false;

const transaction1 = container.db.connect(connection => connection.transaction(async tx1 => {
const containerTx1 = { context: { auth: { actor: Option.of({ id: actorId }) } } };
queryFuncs(tx1, containerTx1);

const entity = await getById(dataset.id, '12345678-1234-4123-8234-123456789abc', QueryOptions.forUpdate)(containerTx1).then(getOrNotFound);

entityLocked = true;
console.log('Tx1: entity fetched');

console.log('Tx1: waiting for 2nd tx to get started');
await new Promise(resolve => {
const intervalId = setInterval(async () => {
if (secondTxWaiting) {
clearInterval(intervalId);
resolve();
}
}, 1);
});

// Assert that other transaction is blocked
await tx1.any(sql`SELECT 1 FROM pg_stat_activity WHERE state = 'active' AND wait_event_type ='Lock'`)
.then(r => {
r.should.not.be.null();
});

const updatedEntity = new Entity.Partial(entity, {
def: entity.aux.currentVersion.with({ label: 'Jane', data: { first_name: 'Jane' }, dataReceived: { first_name: 'Jane' } })
});

await createVersion({ id: dataset.id }, updatedEntity, null, entity.aux.currentVersion.version + 1, null, 1)(containerTx1)
.then(() => {
console.log('Tx1: entity updated');
});
}));

const transaction2 = container.db.connect(connection => connection.transaction(async tx2 => {
const containerTx2 = { context: { auth: { actor: Option.of({ id: actorId }) } } };
queryFuncs(tx2, containerTx2);

console.log('Tx2: waiting for 1st Tx to lock the row');

await new Promise(resolve => {
const intervalId = setInterval(() => {
if (entityLocked) {
clearInterval(intervalId);
resolve();
}
}, 1);
});

console.log('Tx2: looks like 1st tx has locked the row');

const promise = getById(dataset.id, '12345678-1234-4123-8234-123456789abc', QueryOptions.forUpdate)(containerTx2).then(getOrNotFound)
.then(async (entity) => {
console.log('Tx2: entity fetched');

entity.aux.currentVersion.version.should.be.eql(2);
const updatedEntity = new Entity.Partial(entity, {
def: entity.aux.currentVersion.with({ label: 'Robert', data: { first_name: 'Robert' }, dataReceived: { first_name: 'Robert' } })
});

await createVersion({ id: dataset.id }, updatedEntity, null, entity.aux.currentVersion.version + 1, null, 1)(containerTx2)
.then(() => {
console.log('Tx2: entity updated');
});
});

secondTxWaiting = true;

return promise;
}));

await Promise.all([transaction1, transaction2]);

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions[0].data.first_name.should.eql('Alice');
versions[0].version.should.eql(1);

// Created by Tx1
versions[1].data.first_name.should.eql('Jane');
versions[1].version.should.eql(2);

// Created by Tx2
versions[2].data.first_name.should.eql('Robert');
versions[2].version.should.eql(3);
});

}));
/* eslint-enable no-console */
});

describe('DELETE /datasets/:name/entities/:uuid', () => {
Expand Down

0 comments on commit 2e3e09c

Please sign in to comment.