diff --git a/__tests__/table-streams/synonyms.ts b/__tests__/table-streams/synonyms.ts new file mode 100644 index 000000000..4a43451c8 --- /dev/null +++ b/__tests__/table-streams/synonyms.ts @@ -0,0 +1,151 @@ +import { search } from '@nasa-gcn/architect-functions-search' +import type { DynamoDBRecord } from 'aws-lambda' + +import { handler } from '~/table-streams/synonyms/index' + +const synonymId = 'abcde-abcde-abcde-abcde-abcde' +const eventId = 'GRB 123' +const existingEventId = 'GRB 99999999' + +const putData = { + index: 'synonym-groups', + id: synonymId, + body: { + synonymId, + eventIds: [] as string[], + }, +} + +jest.mock('@nasa-gcn/architect-functions-search', () => ({ + search: jest.fn(), +})) + +const mockIndex = jest.fn() +const mockDelete = jest.fn() +const mockGet = jest.fn() + +const mockStreamEvent = { + Records: [ + { + eventID: '1', + eventName: 'INSERT', + eventVersion: '1.0', + eventSource: 'aws:dynamodb', + awsRegion: 'us-west-2', + dynamodb: { + Keys: { + synonymId: { + S: synonymId, + }, + eventId: { + S: eventId, + }, + }, + NewImage: { + synonymId: { + S: synonymId, + }, + eventId: { + S: eventId, + }, + }, + SequenceNumber: '111', + SizeBytes: 26, + StreamViewType: 'NEW_IMAGE', + }, + eventSourceARN: + 'arn:aws:dynamodb:us-west-2:123456789012:table/synonym-groups/stream/2020-01-01T00:00:00.000', + } as DynamoDBRecord, + ], +} + +afterEach(() => { + jest.clearAllMocks() +}) + +describe('testing put synonymGroup table-stream', () => { + test('insert new synonym group', async () => { + putData.body.eventIds = [eventId] + mockGet.mockResolvedValue(null) + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + get: mockGet, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('insert into existing synonym group', async () => { + putData.body.eventIds = [existingEventId, eventId] + mockGet.mockResolvedValue({ + body: { _source: { synonymId, eventIds: [existingEventId] } }, + }) + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + get: mockGet, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('insert only once', async () => { + putData.body.eventIds = [existingEventId, eventId] + mockGet.mockResolvedValue({ + body: { _source: { synonymId, eventIds: [existingEventId, eventId] } }, + }) + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + get: mockGet, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) +}) + +describe('testing delete synonymGroup table-stream', () => { + test('remove one eventId while leaving others', async () => { + mockStreamEvent.Records[0].eventName = 'REMOVE' + putData.body.eventIds = [existingEventId] + mockGet.mockResolvedValue({ + body: { _source: { synonymId, eventIds: [existingEventId, eventId] } }, + }) + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + get: mockGet, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('remove final synonym and delete synonym group', async () => { + mockStreamEvent.Records[0].eventName = 'REMOVE' + const deleteData = { + index: 'synonym-groups', + id: synonymId, + } + mockGet.mockResolvedValue({ + body: { _source: { synonymId, eventIds: [eventId] } }, + }) + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + get: mockGet, + }) + + await handler(mockStreamEvent) + + expect(mockDelete).toHaveBeenCalledWith(deleteData) + }) +}) diff --git a/app/routes/synonyms/synonyms.lib.ts b/app/routes/synonyms/synonyms.lib.ts index 638fe925b..a0e3a40ac 100644 --- a/app/routes/synonyms/synonyms.lib.ts +++ b/app/routes/synonyms/synonyms.lib.ts @@ -5,7 +5,15 @@ * * SPDX-License-Identifier: Apache-2.0 */ + +/* Data structure in DynamoDB */ export interface Synonym { eventId: string synonymId: string } + +/* Layout of materialized view in OpenSearch */ +export interface SynonymGroup { + synonymId: string + eventIds: string[] +} diff --git a/app/routes/synonyms/synonyms.server.ts b/app/routes/synonyms/synonyms.server.ts index 1fbc57e98..684d30893 100644 --- a/app/routes/synonyms/synonyms.server.ts +++ b/app/routes/synonyms/synonyms.server.ts @@ -6,7 +6,7 @@ * SPDX-License-Identifier: Apache-2.0 */ import { tables } from '@architect/functions' -import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb/dist-types/DynamoDBDocument' +import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { search as getSearchClient } from '@nasa-gcn/architect-functions-search' import crypto from 'crypto' @@ -54,7 +54,7 @@ export async function searchSynonymsByEventId({ if (eventId) { query.bool.should.push({ match: { - eventId: { + eventIds: { query: eventId, fuzziness: 'AUTO', }, @@ -70,7 +70,7 @@ export async function searchSynonymsByEventId({ }, }, } = await client.search({ - index: 'synonyms', + index: 'synonym-groups', from: page && limit && (page - 1) * limit, size: limit, body: { diff --git a/app/table-streams/synonyms/index.ts b/app/table-streams/synonyms/index.ts index c118bdac1..637f6b5b9 100644 --- a/app/table-streams/synonyms/index.ts +++ b/app/table-streams/synonyms/index.ts @@ -15,9 +15,9 @@ import type { } from 'aws-lambda' import { createTriggerHandler } from '~/lib/lambdaTrigger.server' -import type { Synonym } from '~/routes/synonyms/synonyms.lib' +import type { Synonym, SynonymGroup } from '~/routes/synonyms/synonyms.lib' -const index = 'synonyms' +const index = 'synonym-groups' function unmarshallTrigger(item?: Record) { return unmarshall(item as Record) @@ -34,26 +34,59 @@ async function removeIndex(id: string) { } } -async function putIndex(synonym: Synonym) { +async function putIndex(synonymGroup: SynonymGroup) { const client = await getSearchClient() await client.index({ index, - body: synonym, + id: synonymGroup.synonymId, + body: synonymGroup, }) } +async function getIndex(synonym: Synonym) { + const client = await getSearchClient() + try { + const group = await client.get({ + index: 'synonym-groups', + id: synonym.synonymId, + }) + + return group.body._source as SynonymGroup + } catch (e) { + return null + } +} + export const handler = createTriggerHandler( async ({ eventName, dynamodb }: DynamoDBRecord) => { - const id = unmarshallTrigger(dynamodb!.Keys).id as string + if (!eventName || !dynamodb) return const promises = [] - - if (eventName === 'REMOVE') { + const id = unmarshallTrigger(dynamodb!.Keys).synonymId + const synonym = unmarshallTrigger(dynamodb!.NewImage) as Synonym + const existingGroup = await getIndex(synonym) + const group = + eventName === 'INSERT' && !existingGroup + ? { synonymId: synonym.synonymId, eventIds: [synonym.eventId] } + : existingGroup + if (!group) return + if ( + eventName === 'REMOVE' && + group.eventIds.length === 1 && + group.eventIds.includes(synonym.eventId) + ) { promises.push(removeIndex(id)) + } else if (eventName === 'REMOVE' && group.eventIds.length > 1) { + const modifiedGroup = { + synonymId: synonym.synonymId, + eventIds: group.eventIds.filter((id) => id !== synonym.eventId), + } as SynonymGroup + promises.push(putIndex(modifiedGroup)) } /* (eventName === 'INSERT' || eventName === 'MODIFY') */ else { - const synonym = unmarshallTrigger(dynamodb!.NewImage) as Synonym - promises.push(putIndex(synonym)) + if (!group.eventIds.includes(synonym.eventId)) { + group.eventIds.push(synonym.eventId) + } + promises.push(putIndex(group as SynonymGroup)) } - await Promise.all(promises) } ) diff --git a/sandbox-invoke-mocks.js b/sandbox-invoke-mocks.js index fdf001dda..bfe93f55f 100644 --- a/sandbox-invoke-mocks.js +++ b/sandbox-invoke-mocks.js @@ -217,6 +217,20 @@ export default { body: 'This is a test', }, }, + synonyms: { + INSERT: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + REMOVE: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + MODIFY: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + }, 'circulars-kafka-distribution': { INSERT: { circularId: 40000, diff --git a/sandbox-search.js b/sandbox-search.js index bfae389dc..f07a4be80 100644 --- a/sandbox-search.js +++ b/sandbox-search.js @@ -11,13 +11,28 @@ export default async function () { const text = await readFile('sandbox-seed.json', { encoding: 'utf-8' }) const { circulars, synonyms } = JSON.parse(text) + const groupedSynonyms = {} + synonyms.forEach((synonym) => { + if (groupedSynonyms[synonym.synonymId]) { + groupedSynonyms[synonym.synonymId].push(synonym.eventId) + } else { + groupedSynonyms[synonym.synonymId] = [synonym.eventId] + } + }) + const groups = Object.keys(groupedSynonyms).map((id) => { + return { + synonymId: id, + eventIds: groupedSynonyms[id], + } + }) + return [ ...circulars.flatMap((item) => [ { index: { _index: 'circulars', _id: item.circularId.toString() } }, item, ]), - ...synonyms.flatMap((item) => [ - { index: { _index: 'synonyms', _id: item.id } }, + ...groups.flatMap((item) => [ + { index: { _index: 'synonym-groups', _id: item.synonymId.toString() } }, item, ]), ]