Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle offline entities with baseVersion and trunkVersion #1154

Merged
merged 13 commits into from
Jun 24, 2024
Merged
35 changes: 33 additions & 2 deletions lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ const odataToColumnMap = new Map([
['__system/conflict', 'entities.conflict']
]);

const _uuidPattern = /^(uuid:)?([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$/i;

////////////////////////////////////////////////////////////////////////////
// ENTITY PARSING

const normalizeUuid = (id) => {
if (!id || id.trim() === '')
throw Problem.user.missingParameter({ field: 'uuid' });

const uuidPattern = /^(uuid:)?([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$/i;
const matches = uuidPattern.exec(id);
const matches = _uuidPattern.exec(id);
if (matches == null) throw Problem.user.invalidDataTypeOfParameter({ field: 'uuid', expected: 'valid version 4 UUID' });
return matches[2].toLowerCase();
};
Expand Down Expand Up @@ -66,6 +67,32 @@ const extractBaseVersionFromSubmission = (entity) => {
}
};

const extractBranchIdFromSubmission = (entity) => {
const { branchId } = entity.system;
if (branchId === '' || branchId == null)
return null;

const matches = _uuidPattern.exec(branchId);
if (matches == null) throw Problem.user.invalidDataTypeOfParameter({ field: 'branchId', expected: 'valid version 4 UUID' });
return matches[2].toLowerCase();
};

const extractTrunkVersionFromSubmission = (entity) => {
const { trunkVersion } = entity.system;
if (trunkVersion) {
// branchId must be present with trunk version
const branchId = extractBranchIdFromSubmission(entity);
if (!branchId)
throw Problem.user.missingParameter({ field: 'branchId' });

if (!/^\d+$/.test(trunkVersion))
throw Problem.user.invalidDataTypeOfParameter({ field: 'trunkVersion', expected: 'integer' });

return parseInt(entity.system.trunkVersion, 10);
}
return null;
};

// This works similarly to processing submissions for export, but also note:
// 1. this is expecting the entityFields to be filled in with propertyName attributes
// 2. the "meta/entity" structural field should be included to get necessary
Expand All @@ -81,6 +108,8 @@ const parseSubmissionXml = (entityFields, xml) => new Promise((resolve, reject)
entity.system.create = field.attrs.create;
entity.system.update = field.attrs.update;
entity.system.baseVersion = field.attrs.baseVersion;
entity.system.trunkVersion = field.attrs.trunkVersion;
entity.system.branchId = field.attrs.branchId;
} else if (field.path.indexOf('/meta/entity') === 0)
entity.system[field.name] = text;
else if (field.propertyName != null)
Expand Down Expand Up @@ -451,6 +480,8 @@ module.exports = {
normalizeUuid,
extractLabelFromSubmission,
extractBaseVersionFromSubmission,
extractTrunkVersionFromSubmission,
extractBranchIdFromSubmission,
extractBulkSource,
streamEntityCsv, streamEntityCsvAttachment,
streamEntityOdata, odataToColumnMap,
Expand Down
16 changes: 13 additions & 3 deletions lib/model/frames/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
/* eslint-disable no-multi-spaces */

const { embedded, fieldTypes, Frame, readable, table } = require('../frame');
const { extractEntity, normalizeUuid, extractLabelFromSubmission, extractBaseVersionFromSubmission } = require('../../data/entity');
const { extractEntity, normalizeUuid,
extractLabelFromSubmission, extractBaseVersionFromSubmission,
extractTrunkVersionFromSubmission, extractBranchIdFromSubmission } = require('../../data/entity');

class Entity extends Frame.define(
table('entities', 'entity'),
Expand Down Expand Up @@ -38,13 +40,17 @@ class Entity extends Frame.define(
const uuid = normalizeUuid(entityData.system.id);
const label = extractLabelFromSubmission(entityData, options);
const baseVersion = extractBaseVersionFromSubmission(entityData);
const branchId = extractBranchIdFromSubmission(entityData);
const trunkVersion = extractTrunkVersionFromSubmission(entityData);
const dataReceived = { ...data, ...(label && { label }) };
return new Entity.Partial({ uuid }, {
def: new Entity.Def({
data,
dataReceived,
...(label && { label }), // add label only if it's there
...(baseVersion && { baseVersion }), // add baseVersion only if it's there
...(label && { label }) // add label only if it's there
...(trunkVersion && { trunkVersion }), // add trunkVersion only if it's there
...(branchId && { branchId }), // add branchId only if it's there
}),
dataset
});
Expand Down Expand Up @@ -88,6 +94,9 @@ Entity.Def = Frame.define(
'version', readable, 'baseVersion', readable,
'dataReceived', readable, 'conflictingProperties', readable,
'createdAt', readable,
'branchId', readable,
'trunkVersion', readable,
'branchBaseVersion', readable,
embedded('creator'),
embedded('source'),
fieldTypes([
Expand All @@ -98,7 +107,8 @@ Entity.Def = Frame.define(
'jsonb', 'bool',
'int4', 'int4',
'jsonb', 'jsonb',
'timestamptz'
'timestamptz',
'uuid', 'int4', 'int4',
])
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`ALTER TABLE entity_defs
ADD COLUMN "branchId" UUID,
ADD COLUMN "trunkVersion" INT4,
ADD COLUMN "branchBaseVersion" INT4`);
};
ktuite marked this conversation as resolved.
Show resolved Hide resolved

const down = (db) => db.raw(`ALTER TABLE entity_defs
DROP COLUMN "branchId",
DROP COLUMN "trunkVersion",
DROP COLUMN "branchBaseVersion"
`);

module.exports = { up, down };
31 changes: 31 additions & 0 deletions lib/model/migrations/20240607-02-add-submission-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`CREATE TABLE entity_submission_backlog (
"submissionId" INT4 NOT NULL,
"submissionDefId" INT4 NOT NULL,
"branchId" UUID NOT NULL,
"branchBaseVersion" INT4 NOT NULL,
"loggedAt" TIMESTAMPTZ(3) NOT NULL,
CONSTRAINT fk_submission_defs
FOREIGN KEY("submissionDefId")
REFERENCES submission_defs(id)
ON DELETE CASCADE,
CONSTRAINT fk_submissions
FOREIGN KEY("submissionId")
REFERENCES submissions(id)
ON DELETE CASCADE,
UNIQUE ("branchId", "branchBaseVersion")
)`);
};

const down = (db) => db.raw('DROP TABLE entity_submission_backlog');

module.exports = { up, down };
117 changes: 101 additions & 16 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ const _defInsert = (id, root, creatorId, userAgent, partial, version, sourceId =
"sourceId", "creatorId", "userAgent",
"label", "data", "dataReceived",
"version", "baseVersion",
"trunkVersion", "branchId", "branchBaseVersion",
"conflictingProperties")
values (${id}, clock_timestamp(),
${root}, true,
${sourceId}, ${creatorId}, ${userAgent},
${partial.def.label}, ${json}, ${dataReceived},
${version}, ${baseVersion},
${partial.def.trunkVersion ?? null}, ${partial.def.branchId ?? null}, ${partial.def.branchBaseVersion ?? null},
${conflictingProperties})
returning *`;
};
Expand Down Expand Up @@ -80,11 +82,12 @@ ins as (insert into entities (id, "datasetId", "uuid", "createdAt", "creatorId")
select def."entityId", ${dataset.id}, ${partial.uuid}, def."createdAt", ${creatorId} from def
returning entities.*)
select ins.*, def.id as "entityDefId" from ins, def;`)
.then(({ entityDefId, ...entityData }) => // TODO/HACK: reassemble just enough to log audit event
.then(({ entityDefId, ...entityData }) => // TODO/HACK: starting to need more reassembling
new Entity(entityData, {
currentVersion: new Entity.Def({
id: entityDefId,
entityId: entityData.id
entityId: entityData.id,
branchId: partial.def.branchId
})
}));
};
Expand Down Expand Up @@ -193,6 +196,39 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);

const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);

// Used by _updateVerison below to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submissionDef) => {
if (!clientEntity.def.trunkVersion || clientEntity.def.baseVersion === clientEntity.def.trunkVersion) {
// trunk and client baseVersion are the same, indicating the start of a batch
return clientEntity.def.baseVersion;
} else {
const condition = { datasetId: dataset.id, uuid: clientEntity.uuid,
branchId: clientEntity.def.branchId,
branchBaseVersion: clientEntity.def.baseVersion - 1 };

// eslint-disable-next-line no-use-before-define
const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition })));
if (!previousInBranch.isDefined()) {
// not ready to process this submission. eventually hold it for later.
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
} else {
return previousInBranch.get().version;
}
}
};

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
Expand All @@ -206,36 +242,64 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id);
const entity = await Entities.createNew(dataset, partial, submissionDef, sourceId);

return Audits.log({ id: event.actorId }, 'entity.create', { acteeId: dataset.acteeId },
await Audits.log({ id: event.actorId }, 'entity.create', { acteeId: dataset.acteeId },
{
entityId: entity.id, // Added in v2023.3 and backfilled
entityDefId: entity.aux.currentVersion.id, // Added in v2023.3 and backfilled
entity: { uuid: entity.uuid, dataset: dataset.name },
submissionId,
submissionDefId
});
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne }) => {
if (!(event.action === 'submission.create' || event.action === 'submission.update.version'))
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne, run }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
return null;

// Get client version of entity
const clientEntity = await Entity.fromParseEntityData(entityData, { update: true }); // validation happens here

// Get version of entity on the server
const serverEntity = (await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate))
.orThrow(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name }));
// If the entity doesn't exist, check branchId - maybe this is an update for an entity created offline
let serverEntity = await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate);
if (!serverEntity.isDefined()) {
if (clientEntity.def.branchId == null) {
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
} else {
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
} else {
serverEntity = serverEntity.get();
}

// If the trunk version exists but is higher than current server version,
// that is a weird case that should not be processed OR held, and should log an error.
if (clientEntity.def.trunkVersion && clientEntity.def.trunkVersion > serverEntity.aux.currentVersion.version) {
throw Problem.user.entityVersionNotFound({ baseVersion: `trunkVersion=${clientEntity.def.trunkVersion}`, entityUuid: clientEntity.uuid, datasetName: dataset.name });
}

let { conflict } = serverEntity;
let conflictingProperties; // Maybe we don't need to persist this??? just compute at the read time

if (clientEntity.def.baseVersion !== serverEntity.aux.currentVersion.version) {
// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that
// offline context and we need to translate it to the correct base version within Central.
const baseVersion = await _computeBaseVersion(maybeOne, run, dataset, clientEntity, submissionDef);

// If baseVersion is null, we held a submission and will stop processing now.
if (baseVersion == null)
return null;

const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, version: clientEntity.def.baseVersion };
if (baseVersion !== serverEntity.aux.currentVersion.version) {

const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, version: baseVersion };
// eslint-disable-next-line no-use-before-define
const baseEntityVersion = (await _getDef(maybeOne, new QueryOptions({ condition })))
.orThrow(Problem.user.entityVersionNotFound({ baseVersion: clientEntity.def.baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name }));
.orThrow(Problem.user.entityVersionNotFound({ baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name }));

// we need to find what changed between baseVersion and lastVersion
// it is not the data we received in lastVersion
Expand Down Expand Up @@ -265,23 +329,30 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
data: mergedData,
label: mergedLabel,
dataReceived: clientEntity.def.dataReceived,
branchId: clientEntity.def.branchId,
trunkVersion: clientEntity.def.trunkVersion,
branchBaseVersion: (clientEntity.def.branchId != null) ? clientEntity.def.baseVersion : null,
conflictingProperties
})
});

const entity = await Entities.createVersion(dataset, partial, submissionDef, serverEntity.aux.currentVersion.version + 1, sourceId, clientEntity.def.baseVersion);
return Audits.log({ id: event.actorId }, 'entity.update.version', { acteeId: dataset.acteeId },
// Assign new version (increment latest server version)
const version = serverEntity.aux.currentVersion.version + 1;

const entity = await Entities.createVersion(dataset, partial, submissionDef, version, sourceId, baseVersion);
await Audits.log({ id: event.actorId }, 'entity.update.version', { acteeId: dataset.acteeId },
{
entityId: entity.id,
entityDefId: entity.aux.currentVersion.id,
entity: { uuid: entity.uuid, dataset: dataset.name },
submissionId,
submissionDefId
});
return entity;
};

// Entrypoint to where submissions (a specific version) become entities
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, maybeOne, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;

const form = await Forms.getByActeeId(event.acteeId);
Expand Down Expand Up @@ -334,19 +405,33 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entit
throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions });
}

let maybeEntity = null;
ktuite marked this conversation as resolved.
Show resolved Hide resolved
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
} catch (err) {
if ((err.problemCode === 404.8) && (entityData.system.create === '1' || entityData.system.create === 'true')) {
await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
} else {
throw (err);
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
return Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);

// Check for held submissions that follow this one in the same branch
if (maybeEntity != null && maybeEntity.aux.currentVersion.branchId != null) {
const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion;
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
ktuite marked this conversation as resolved.
Show resolved Hide resolved
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
}
}

return null;
};
Expand Down
1 change: 1 addition & 0 deletions lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],
ktuite marked this conversation as resolved.
Show resolved Hide resolved

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
Loading