diff --git a/app.arc b/app.arc index 10fe316f5..5cf095b13 100644 --- a/app.arc +++ b/app.arc @@ -99,6 +99,11 @@ kafka_acls group **String PointInTimeRecovery true +kafka_acl_log + partitionKey *Number + syncedOn **Number + PointInTimeRecovery ture + @tables-indexes email_notification_subscription topic *String diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index 0bfbe95e7..3e646a41f 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -83,7 +83,6 @@ export type KafkaACL = { permissionType: PermissionType group: string prefixed: boolean - existsOnBroker: boolean } export type PermissionType = 'producer' | 'consumer' @@ -185,9 +184,7 @@ export async function getAclsFromBrokers() { }) await adminClient.disconnect() const results: KafkaACL[] = [] - for (const item of acls.resources.sort((a, b) => - a.resourceName.localeCompare(b.resourceName) - )) { + for (const item of acls.resources) { const topicName = item.resourceName const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED const producerRules = producerOperations.every((op) => @@ -220,7 +217,6 @@ export async function getAclsFromBrokers() { permissionType: 'producer', group: producerGroup, prefixed, - existsOnBroker: true, }) if (consumerRules && consumerGroup) results.push({ @@ -228,7 +224,6 @@ export async function getAclsFromBrokers() { permissionType: 'consumer', group: consumerGroup, prefixed, - existsOnBroker: true, }) } return results @@ -287,8 +282,33 @@ export async function updateBrokersFromDb(user: User) { await adminClient.createAcls({ acl: mappedAcls }) await adminClient.disconnect() } -export async function updateDbFromBrokers() { +export async function updateDbFromBrokers(user: User) { const kafkaDefinedAcls = await getAclsFromBrokers() const db = await tables() - await Promise.all(kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl))) + await Promise.all([ + ...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)), + db.kafka_acl_log.put({ + partitionKey: 1, + syncedOn: Date.now(), + syncedBy: user.email, + }), + ]) +} + +type KafkaAclSyncLog = { + partitionKey: number + syncedOn: number + syncedBy: string +} + +export async function getLastSyncDate(): Promise { + const db = await tables() + return ( + await db.kafka_acl_log.query({ + KeyConditionExpression: 'partitionKey = :1', + ExpressionAttributeValues: { ':1': 1 }, + ScanIndexForward: false, + Limit: 1, + }) + ).Items.pop() as KafkaAclSyncLog } diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx index d1e560abc..0df4d7600 100644 --- a/app/routes/admin.kafka._index.tsx +++ b/app/routes/admin.kafka._index.tsx @@ -22,41 +22,66 @@ import { useRef } from 'react' import { getUser } from './_auth/user.server' import HeadingWithAddButton from '~/components/HeadingWithAddButton' import SegmentedCards from '~/components/SegmentedCards' -import { getGroups } from '~/lib/cognito.server' +import Spinner from '~/components/Spinner' +import TimeAgo from '~/components/TimeAgo' import type { KafkaACL } from '~/lib/kafka.server' -import { - getAclsFromBrokers, - getKafkaACLsFromDynamoDB, -} from '~/lib/kafka.server' +import { getKafkaACLsFromDynamoDB, getLastSyncDate } from '~/lib/kafka.server' export async function loader({ request }: LoaderFunctionArgs) { const user = await getUser(request) if (!user) throw new Response(null, { status: 403 }) const dynamoDbAclData = await getKafkaACLsFromDynamoDB(user) - const userGroups = (await getGroups()) - .filter((group) => group.GroupName?.startsWith('gcn.nasa.gov/')) - .map((group) => group.GroupName) - const brokerAclData = await getAclsFromBrokers() - return { dynamoDbAclData, userGroups, brokerAclData } + const latestSync = await getLastSyncDate() + return { dynamoDbAclData, latestSync } } export default function Index() { - const { dynamoDbAclData } = useLoaderData() - /** Use a fetcher or something to post? I want to click a button and - * trigger the update, then reload the page? maybe? - */ + const { dynamoDbAclData, latestSync } = useLoaderData() + const aclFetcher = useFetcher() return ( <> Kafka Admin

Kafka ACLs

-

DynamoDB ACLs

+

+ Information about the Kafka ACLs listed here. Click the button to sync + the db to the kafka broker's current state. +

- - {dynamoDbAclData.map((x, index) => ( - - ))} - + + + {aclFetcher.state !== 'idle' && ( + + Saving... + + )} + + {latestSync && ( +

+ Last synced by {latestSync.syncedBy}{' '} + +

+ )} +

DynamoDB ACLs

+ {dynamoDbAclData && ( + <> + ({dynamoDbAclData.length}) ACLs + + {dynamoDbAclData + .sort((a, b) => a.topicName.localeCompare(b.topicName)) + .map((x, index) => ( + + ))} + + + )} ) } diff --git a/app/routes/admin.kafka.tsx b/app/routes/admin.kafka.tsx index 94ce9be02..f42376550 100644 --- a/app/routes/admin.kafka.tsx +++ b/app/routes/admin.kafka.tsx @@ -11,14 +11,24 @@ import { GridContainer, SideNav } from '@trussworks/react-uswds' import { getUser } from './_auth/user.server' import type { PermissionType } from '~/lib/kafka.server' -import { createKafkaACL, deleteKafkaACL } from '~/lib/kafka.server' +import { + adminGroup, + createKafkaACL, + deleteKafkaACL, + updateDbFromBrokers, +} from '~/lib/kafka.server' import { getFormDataString } from '~/lib/utils' export async function action({ request }: ActionFunctionArgs) { const user = await getUser(request) - if (!user) throw new Response(null, { status: 403 }) + if (!user?.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) const data = await request.formData() const intent = getFormDataString(data, 'intent') + if (intent === 'migrateFromBroker') { + await updateDbFromBrokers(user) + return null + } const topicName = getFormDataString(data, 'topicName') const permissionType = getFormDataString( data, @@ -38,7 +48,6 @@ export async function action({ request }: ActionFunctionArgs) { permissionType, group, prefixed: false, - existsOnBroker: false, }) ) break @@ -49,7 +58,6 @@ export async function action({ request }: ActionFunctionArgs) { permissionType, group, prefixed: false, - existsOnBroker: false, }) ) @@ -60,7 +68,6 @@ export async function action({ request }: ActionFunctionArgs) { permissionType, group, prefixed: true, - existsOnBroker: false, }) ) break