diff --git a/__tests__/table-streams/synonyms.ts b/__tests__/table-streams/synonyms.ts new file mode 100644 index 000000000..9eeedd17f --- /dev/null +++ b/__tests__/table-streams/synonyms.ts @@ -0,0 +1,185 @@ +import { tables } from '@architect/functions' +import { search } from '@nasa-gcn/architect-functions-search' +import type { DynamoDBRecord } from 'aws-lambda' + +import type { Synonym } from '~/routes/synonyms/synonyms.lib' +import { handler } from '~/table-streams/synonyms/index' + +const synonymId = 'abcde-abcde-abcde-abcde-abcde' +const eventId = 'GRB 123' +const existingEventId = 'GRB 99999999' + +const putData = { + index: 'synonym-groups', + id: synonymId, + body: { + synonymId, + eventIds: [] as string[], + }, +} + +jest.mock('@nasa-gcn/architect-functions-search', () => ({ + search: jest.fn(), +})) + +jest.mock('@architect/functions', () => ({ + tables: jest.fn(), +})) + +const mockIndex = jest.fn() +const mockDelete = jest.fn() +const mockQuery = jest.fn() + +const mockStreamEvent = { + Records: [ + { + eventID: '1', + eventName: 'INSERT', + eventVersion: '1.0', + eventSource: 'aws:dynamodb', + awsRegion: 'us-west-2', + dynamodb: { + Keys: { + synonymId: { + S: synonymId, + }, + eventId: { + S: eventId, + }, + }, + NewImage: { + synonymId: { + S: synonymId, + }, + eventId: { + S: eventId, + }, + }, + SequenceNumber: '111', + SizeBytes: 26, + StreamViewType: 'NEW_IMAGE', + }, + eventSourceARN: + 'arn:aws:dynamodb:us-west-2:123456789012:table/synonym-groups/stream/2020-01-01T00:00:00.000', + } as DynamoDBRecord, + ], +} + +afterEach(() => { + jest.clearAllMocks() +}) + +describe('testing put synonymGroup table-stream', () => { + test('insert new synonym group', async () => { + const mockItems = [{ synonymId, eventId }] + const mockClient = { + synonyms: { + query: mockQuery, + }, + } + mockQuery.mockResolvedValue({ Items: mockItems }) + ;(tables as unknown as jest.Mock).mockResolvedValue(mockClient) + putData.body.eventIds = [eventId] + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('insert into existing synonym group', async () => { + const mockItems = [ + { synonymId, eventId: existingEventId }, + { synonymId, eventId }, + ] + const mockClient = { + synonyms: { + query: mockQuery, + }, + } + mockQuery.mockResolvedValue({ Items: mockItems }) + ;(tables as unknown as jest.Mock).mockResolvedValue(mockClient) + putData.body.eventIds = [existingEventId, eventId] + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('insert only once', async () => { + const mockItems = [ + { synonymId, eventId: existingEventId }, + { synonymId, eventId }, + ] + const mockClient = { + synonyms: { + query: mockQuery, + }, + } + mockQuery.mockResolvedValue({ Items: mockItems }) + ;(tables as unknown as jest.Mock).mockResolvedValue(mockClient) + putData.body.eventIds = [existingEventId, eventId] + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) +}) + +describe('testing delete synonymGroup table-stream', () => { + test('remove one eventId while leaving others', async () => { + const mockItems = [{ synonymId, eventId: existingEventId }] + const mockClient = { + synonyms: { + query: mockQuery, + }, + } + mockQuery.mockResolvedValue({ Items: mockItems }) + ;(tables as unknown as jest.Mock).mockResolvedValue(mockClient) + mockStreamEvent.Records[0].eventName = 'REMOVE' + putData.body.eventIds = [existingEventId] + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + }) + + await handler(mockStreamEvent) + + expect(mockIndex).toHaveBeenCalledWith(putData) + }) + + test('remove final synonym and delete synonym group', async () => { + const mockItems = [] as Synonym[] + const mockClient = { + synonyms: { + query: mockQuery, + }, + } + mockQuery.mockResolvedValue({ Items: mockItems }) + ;(tables as unknown as jest.Mock).mockResolvedValue(mockClient) + mockStreamEvent.Records[0].eventName = 'REMOVE' + const deleteData = { + index: 'synonym-groups', + id: synonymId, + } + ;(search as unknown as jest.Mock).mockReturnValue({ + index: mockIndex, + delete: mockDelete, + }) + + await handler(mockStreamEvent) + + expect(mockDelete).toHaveBeenCalledWith(deleteData) + }) +}) diff --git a/app/routes/synonyms/synonyms.lib.ts b/app/routes/synonyms/synonyms.lib.ts index 638fe925b..a0e3a40ac 100644 --- a/app/routes/synonyms/synonyms.lib.ts +++ b/app/routes/synonyms/synonyms.lib.ts @@ -5,7 +5,15 @@ * * SPDX-License-Identifier: Apache-2.0 */ + +/* Data structure in DynamoDB */ export interface Synonym { eventId: string synonymId: string } + +/* Layout of materialized view in OpenSearch */ +export interface SynonymGroup { + synonymId: string + eventIds: string[] +} diff --git a/app/routes/synonyms/synonyms.server.ts b/app/routes/synonyms/synonyms.server.ts index 1fbc57e98..684d30893 100644 --- a/app/routes/synonyms/synonyms.server.ts +++ b/app/routes/synonyms/synonyms.server.ts @@ -6,7 +6,7 @@ * SPDX-License-Identifier: Apache-2.0 */ import { tables } from '@architect/functions' -import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb/dist-types/DynamoDBDocument' +import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { search as getSearchClient } from '@nasa-gcn/architect-functions-search' import crypto from 'crypto' @@ -54,7 +54,7 @@ export async function searchSynonymsByEventId({ if (eventId) { query.bool.should.push({ match: { - eventId: { + eventIds: { query: eventId, fuzziness: 'AUTO', }, @@ -70,7 +70,7 @@ export async function searchSynonymsByEventId({ }, }, } = await client.search({ - index: 'synonyms', + index: 'synonym-groups', from: page && limit && (page - 1) * limit, size: limit, body: { diff --git a/app/table-streams/synonyms/index.ts b/app/table-streams/synonyms/index.ts index e8bb96973..98b38f81e 100644 --- a/app/table-streams/synonyms/index.ts +++ b/app/table-streams/synonyms/index.ts @@ -11,9 +11,10 @@ import type { DynamoDBRecord } from 'aws-lambda' import { unmarshallTrigger } from '../utils' import { createTriggerHandler } from '~/lib/lambdaTrigger.server' -import type { Synonym } from '~/routes/synonyms/synonyms.lib' +import type { Synonym, SynonymGroup } from '~/routes/synonyms/synonyms.lib' +import { getSynonymsByUuid } from '~/routes/synonyms/synonyms.server' -const index = 'synonyms' +const index = 'synonym-groups' async function removeIndex(id: string) { const client = await getSearchClient() @@ -26,26 +27,27 @@ async function removeIndex(id: string) { } } -async function putIndex(synonym: Synonym) { +async function putIndex(synonymGroup: SynonymGroup) { const client = await getSearchClient() await client.index({ index, - body: synonym, + id: synonymGroup.synonymId, + body: synonymGroup, }) } export const handler = createTriggerHandler( async ({ eventName, dynamodb }: DynamoDBRecord) => { - const id = unmarshallTrigger(dynamodb!.Keys).id as string - const promises = [] - - if (eventName === 'REMOVE') { - promises.push(removeIndex(id)) - } /* (eventName === 'INSERT' || eventName === 'MODIFY') */ else { - const synonym = unmarshallTrigger(dynamodb!.NewImage) as Synonym - promises.push(putIndex(synonym)) + if (!eventName || !dynamodb) return + const { synonymId } = unmarshallTrigger(dynamodb!.NewImage) as Synonym + const dynamoSynonyms = await getSynonymsByUuid(synonymId) + if (dynamoSynonyms.length > 0) { + await putIndex({ + synonymId, + eventIds: dynamoSynonyms.map((synonym) => synonym.eventId), + }) + } else { + await removeIndex(synonymId) } - - await Promise.all(promises) } ) diff --git a/sandbox-invoke-mocks.js b/sandbox-invoke-mocks.js index fdf001dda..bfe93f55f 100644 --- a/sandbox-invoke-mocks.js +++ b/sandbox-invoke-mocks.js @@ -217,6 +217,20 @@ export default { body: 'This is a test', }, }, + synonyms: { + INSERT: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + REMOVE: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + MODIFY: { + eventId: 'GRB 230727A', + synonymId: 'fe00636d-ae9a-45fe-9941-9cbc6e84da04', + }, + }, 'circulars-kafka-distribution': { INSERT: { circularId: 40000, diff --git a/sandbox-search.js b/sandbox-search.js index bfae389dc..02a89017a 100644 --- a/sandbox-search.js +++ b/sandbox-search.js @@ -6,18 +6,23 @@ * SPDX-License-Identifier: Apache-2.0 */ import { readFile } from 'fs/promises' +import groupBy from 'lodash/groupBy.js' export default async function () { const text = await readFile('sandbox-seed.json', { encoding: 'utf-8' }) const { circulars, synonyms } = JSON.parse(text) - + const groups = Object.entries(groupBy(synonyms, 'synonymId')).flatMap( + ([synonymId, values]) => [ + { synonymId, eventIds: values.map(({ eventId }) => eventId) }, + ] + ) return [ ...circulars.flatMap((item) => [ { index: { _index: 'circulars', _id: item.circularId.toString() } }, item, ]), - ...synonyms.flatMap((item) => [ - { index: { _index: 'synonyms', _id: item.id } }, + ...groups.flatMap((item) => [ + { index: { _index: 'synonym-groups', _id: item.synonymId.toString() } }, item, ]), ]