From b9075afff8339e843380abd1f160da9c1aa6640c Mon Sep 17 00:00:00 2001 From: dakota002 Date: Wed, 7 Aug 2024 15:03:01 -0400 Subject: [PATCH 1/4] Remove extra stuff --- app.arc | 15 ++ app/lib/kafka.server.ts | 275 ++++++++++++++++++++++++++++++ app/root.tsx | 8 + app/root/header/Header.tsx | 8 +- app/routes/admin.kafka._index.tsx | 264 ++++++++++++++++++++++++++++ app/routes/admin.kafka.edit.tsx | 95 +++++++++++ app/routes/admin.tsx | 3 + playwright.config.ts | 2 - sandbox-seed.json | 14 ++ 9 files changed, 681 insertions(+), 3 deletions(-) create mode 100644 app/routes/admin.kafka._index.tsx create mode 100644 app/routes/admin.kafka.edit.tsx diff --git a/app.arc b/app.arc index 28dd2106b..d6338aa1b 100644 --- a/app.arc +++ b/app.arc @@ -94,6 +94,16 @@ legacy_users email *String PointInTimeRecovery true +kafka_acls + topicName *String + cognitoGroup **String + PointInTimeRecovery true + +kafka_acl_log + partitionKey *Number + syncedOn **Number + PointInTimeRecovery ture + @tables-indexes email_notification_subscription topic *String @@ -143,6 +153,11 @@ synonyms synonymId *String name synonymsByUuid +kafka_acls + cognitoGroup *String + permissionType **String + name aclsByGroup + @aws runtime nodejs20.x region us-east-1 diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index eb85f2fa5..3f4ddf9de 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -5,10 +5,21 @@ * * SPDX-License-Identifier: Apache-2.0 */ +import { tables } from '@architect/functions' +import { paginateScan } from '@aws-sdk/lib-dynamodb' +import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { Kafka } from 'gcn-kafka' +import type { AclEntry } from 'kafkajs' +import { + AclOperationTypes, + AclPermissionTypes, + AclResourceTypes, + ResourcePatternTypes, +} from 'kafkajs' import memoizee from 'memoizee' import { domain, getEnvOrDieInProduction } from './env.server' +import type { User } from '~/routes/_auth/user.server' const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? '' const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET') @@ -68,3 +79,267 @@ if (process.env.ARC_SANDBOX) { await producer.send({ topic, messages: [{ value }] }) } } + +export type KafkaACL = { + topicName: string + permissionType: PermissionType + cognitoGroup: string + prefixed: boolean +} + +export type PermissionType = 'producer' | 'consumer' + +export const adminGroup = 'gcn.nasa.gov/gcn-admin' + +const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE] +const producerOperations = [ + AclOperationTypes.CREATE, + AclOperationTypes.WRITE, + AclOperationTypes.DESCRIBE, +] + +const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? '' +const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET') +const adminKafka = new Kafka({ + client_id: admin_client_id, + client_secret: admin_client_secret, + domain, +}) + +function validateUser(user: User) { + if (!user.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) +} + +export async function createKafkaACL(user: User, acl: KafkaACL) { + validateUser(user) + // Save to db + const db = await tables() + await db.kafka_acls.put(acl) + + // Add to Kafka + const adminClient = adminKafka.admin() + await adminClient.connect() + await adminClient.createTopics({ + topics: [ + { + topic: acl.topicName, + }, + ], + }) + const acls = + acl.permissionType == 'producer' + ? createProducerAcls(acl) + : createConsumerAcls(acl) + await adminClient.createAcls({ acl: acls }) + await adminClient.disconnect() +} + +export async function getKafkaACLByTopicName(user: User, topicName: string) { + validateUser(user) + const db = await tables() + return (await db.kafka_acls.get({ topicName })) as KafkaACL +} + +export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) { + validateUser(user) + const db = await tables() + const client = db._doc as unknown as DynamoDBDocument + const TableName = db.name('kafka_acls') + const pages = paginateScan( + { client }, + { + TableName, + FilterExpression: filter + ? 'contains(topicName, :filter) OR contains(cognitoGroup, :filter)' + : undefined, + ExpressionAttributeValues: filter + ? { + ':filter': filter, + } + : undefined, + } + ) + + const acls: KafkaACL[] = [] + for await (const page of pages) { + const newACL = page.Items as KafkaACL[] + if (newACL) acls.push(...newACL) + } + return acls +} + +export async function getKafkaTopicsForUser(user: User) { + validateUser(user) + const userGroups = user.groups.filter((x) => + x.startsWith('gcn.nasa.gov/kafka-') + ) + const db = await tables() + const items = ( + await Promise.all([ + ...userGroups.map((cognitoGroup) => + db.kafka_acls.query({ + IndexName: 'aclsByGroup', + KeyConditionExpression: + 'cognitoGroup = :group AND permissionType = :permission', + ProjectionExpression: 'topicName', + ExpressionAttributeValues: { + ':group': cognitoGroup, + ':permission': 'consumer', + }, + }) + ), + ]) + ) + .filter((x) => x.Count && x.Count > 0) + .flatMap((x) => x.Items) + .map((x) => x.topicName) + + return items +} +export async function getAclsFromBrokers() { + const adminClient = adminKafka.admin() + await adminClient.connect() + const acls = await adminClient.describeAcls({ + resourceType: AclResourceTypes.TOPIC, + host: '*', + permissionType: AclPermissionTypes.ALLOW, + operation: AclOperationTypes.ANY, + resourcePatternType: ResourcePatternTypes.ANY, + }) + await adminClient.disconnect() + const results: KafkaACL[] = [] + for (const item of acls.resources) { + const topicName = item.resourceName + const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED + const producerRules = producerOperations.every((op) => + item.acls.map((x) => x.operation).includes(op) + ) + const producerGroup = + producerRules && + [ + ...new Set( + item.acls + .filter((acl) => producerOperations.includes(acl.operation)) + .map((x) => x.principal) + ), + ][0]?.replace('User:', '') + const consumerRules = consumerOperations.every((op) => + item.acls.map((x) => x.operation).includes(op) + ) + const consumerGroup = + consumerRules && + [ + ...new Set( + item.acls + .filter((acl) => consumerOperations.includes(acl.operation)) + .map((x) => x.principal) + ), + ][0]?.replace('User:', '') + if (producerRules && producerGroup) + results.push({ + topicName, + permissionType: 'producer', + cognitoGroup: producerGroup, + prefixed, + }) + if (consumerRules && consumerGroup) + results.push({ + topicName, + permissionType: 'consumer', + cognitoGroup: consumerGroup, + prefixed, + }) + } + return results +} + +export async function deleteKafkaACL(user: User, acl: KafkaACL) { + validateUser(user) + const db = await tables() + await db.kafka_acls.delete({ + topicName: acl.topicName, + cognitoGroup: acl.cognitoGroup, + }) + + const acls = + acl.permissionType == 'producer' + ? createProducerAcls(acl) + : createConsumerAcls(acl) + + const adminClient = adminKafka.admin() + await adminClient.connect() + await adminClient.deleteAcls({ filters: acls }) + await adminClient.disconnect() +} + +function createProducerAcls(acl: KafkaACL): AclEntry[] { + // Create, Write, and Describe operations + return mapAclAndOperations(acl, producerOperations) +} + +function createConsumerAcls(acl: KafkaACL): AclEntry[] { + // Read and Describe operations + return mapAclAndOperations(acl, consumerOperations) +} + +function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) { + return operations.map((operation) => { + return { + resourceType: AclResourceTypes.TOPIC, + resourceName: acl.topicName, + resourcePatternType: acl.prefixed + ? ResourcePatternTypes.PREFIXED + : ResourcePatternTypes.LITERAL, + principal: `User:${acl.cognitoGroup}`, + host: '*', + operation, + permissionType: AclPermissionTypes.ALLOW, + } + }) +} + +export async function updateBrokersFromDb(user: User) { + const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user) + const mappedAcls = dbDefinedAcls.flatMap((x) => + x.permissionType === 'producer' + ? createProducerAcls(x) + : createConsumerAcls(x) + ) + + const adminClient = adminKafka.admin() + await adminClient.connect() + await adminClient.createAcls({ acl: mappedAcls }) + await adminClient.disconnect() +} + +export async function updateDbFromBrokers(user: User) { + const kafkaDefinedAcls = await getAclsFromBrokers() + const db = await tables() + await Promise.all([ + ...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)), + db.kafka_acl_log.put({ + partitionKey: 1, + syncedOn: Date.now(), + syncedBy: user.email, + }), + ]) +} + +type KafkaAclSyncLog = { + partitionKey: number + syncedOn: number + syncedBy: string +} + +export async function getLastSyncDate(): Promise { + const db = await tables() + return ( + await db.kafka_acl_log.query({ + KeyConditionExpression: 'partitionKey = :1', + ExpressionAttributeValues: { ':1': 1 }, + ScanIndexForward: false, + Limit: 1, + }) + ).Items.pop() as KafkaAclSyncLog +} diff --git a/app/root.tsx b/app/root.tsx index 10ac78d99..2ddbb06d4 100644 --- a/app/root.tsx +++ b/app/root.tsx @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay' import invariant from 'tiny-invariant' import { features, getEnvOrDieInProduction, origin } from './lib/env.server' +import { adminGroup } from './lib/kafka.server' import { DevBanner } from './root/DevBanner' import { Footer } from './root/Footer' import NewsBanner from './root/NewsBanner' @@ -119,6 +120,7 @@ export async function loader({ request }: LoaderFunctionArgs) { const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY') const userIsMod = user?.groups.includes(moderatorGroup) const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup) + const userIsAdmin = user?.groups.includes(adminGroup) return { origin, @@ -129,6 +131,7 @@ export async function loader({ request }: LoaderFunctionArgs) { idp, userIsMod, userIsVerifiedSubmitter, + userIsAdmin, } } @@ -168,6 +171,11 @@ export function useSubmitterStatus() { return userIsVerifiedSubmitter } +export function useAdminStatus() { + const { userIsAdmin } = useLoaderDataRoot() + return userIsAdmin +} + export function useRecaptchaSiteKey() { const { recaptchaSiteKey } = useLoaderDataRoot() return recaptchaSiteKey diff --git a/app/root/header/Header.tsx b/app/root/header/Header.tsx index dde3947ad..61e3e9fcd 100644 --- a/app/root/header/Header.tsx +++ b/app/root/header/Header.tsx @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react' import { useClickAnyWhere, useWindowSize } from 'usehooks-ts' import { Meatball } from '~/components/meatball/Meatball' -import { useEmail, useUserIdp } from '~/root' +import { useAdminStatus, useEmail, useUserIdp } from '~/root' import styles from './header.module.css' @@ -74,6 +74,7 @@ export function Header() { const [expanded, setExpanded] = useState(false) const [userMenuIsOpen, setUserMenuIsOpen] = useState(false) const isMobile = useWindowSize().width < 1024 + const userIsAdmin = useAdminStatus() function toggleMobileNav() { setExpanded((expanded) => !expanded) @@ -162,6 +163,11 @@ export function Header() { Profile , + userIsAdmin && ( + + Admin + + ), Peer Endorsements , diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx new file mode 100644 index 000000000..482799e43 --- /dev/null +++ b/app/routes/admin.kafka._index.tsx @@ -0,0 +1,264 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { ActionFunctionArgs, LoaderFunctionArgs } from '@remix-run/node' +import { useFetcher, useLoaderData } from '@remix-run/react' +import type { ModalRef } from '@trussworks/react-uswds' +import { + Button, + Grid, + Icon, + Label, + Modal, + ModalFooter, + ModalHeading, + ModalToggleButton, + TextInput, +} from '@trussworks/react-uswds' +import { useEffect, useRef, useState } from 'react' + +import { getUser } from './_auth/user.server' +import HeadingWithAddButton from '~/components/HeadingWithAddButton' +import SegmentedCards from '~/components/SegmentedCards' +import Spinner from '~/components/Spinner' +import TimeAgo from '~/components/TimeAgo' +import type { KafkaACL, PermissionType } from '~/lib/kafka.server' +import { + adminGroup, + createKafkaACL, + deleteKafkaACL, + getKafkaACLsFromDynamoDB, + getLastSyncDate, + updateDbFromBrokers, +} from '~/lib/kafka.server' +import { getFormDataString } from '~/lib/utils' + +export async function loader({ request }: LoaderFunctionArgs) { + const user = await getUser(request) + if (!user || !user.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) + const { aclFilter } = Object.fromEntries(new URL(request.url).searchParams) + const dynamoDbAclData = await getKafkaACLsFromDynamoDB(user, aclFilter) + const latestSync = await getLastSyncDate() + return { dynamoDbAclData, latestSync } +} + +export async function action({ request }: ActionFunctionArgs) { + const user = await getUser(request) + if (!user?.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) + const data = await request.formData() + const intent = getFormDataString(data, 'intent') + if (intent === 'migrateFromBroker') { + await updateDbFromBrokers(user) + return null + } + const topicName = getFormDataString(data, 'topicName') + const permissionType = getFormDataString( + data, + 'permissionType' + ) as PermissionType + const group = getFormDataString(data, 'group') + const includePrefixed = getFormDataString(data, 'includePrefixed') + if (!topicName || !permissionType || !group) + throw new Response(null, { status: 400 }) + const promises = [] + + switch (intent) { + case 'delete': + promises.push( + deleteKafkaACL(user, { + topicName, + permissionType, + cognitoGroup: group, + prefixed: false, + }) + ) + break + case 'create': + promises.push( + createKafkaACL(user, { + topicName, + permissionType, + cognitoGroup: group, + prefixed: false, + }) + ) + + if (includePrefixed) + promises.push( + createKafkaACL(user, { + topicName: `${topicName}.`, + permissionType, + cognitoGroup: group, + prefixed: true, + }) + ) + break + default: + break + } + await Promise.all(promises) + + return null +} + +export default function Index() { + const { dynamoDbAclData, latestSync } = useLoaderData() + const [aclData, setAclData] = useState(dynamoDbAclData) + const updateFetcher = useFetcher() + const aclFetcher = useFetcher() + + useEffect(() => { + setAclData(aclFetcher.data?.dynamoDbAclData ?? aclData) + }, [aclFetcher.data, aclData]) + + return ( + <> + Kafka +

Kafka ACLs

+

+ Kafka Access Control Lists (ACLs) are a security mechanism used to + control access to resources within a Kafka cluster. They define which + users or client applications have permissions to perform specific + operations on Kafka resources, such as topics, consumer groups, and + broker resources. ACLs specify who can produce (write) or consume (read) + data from topics, create or delete topics, manage consumer groups, and + perform administrative tasks. +

+ + + + {updateFetcher.state !== 'idle' && ( + + Updating... + + )} + + {latestSync && ( +

+ Last synced by {latestSync.syncedBy}{' '} + +

+ )} + {aclData && ( + <> + + + + + {aclFetcher.state !== 'idle' && ( + + Loading... + + )} + + + {aclData + .sort((a, b) => a.topicName.localeCompare(b.topicName)) + .map((x, index) => ( + + ))} + + + )} + + ) +} + +function KafkaAclCard({ acl }: { acl: KafkaACL }) { + const ref = useRef(null) + const fetcher = useFetcher() + const disabled = fetcher.state !== 'idle' + + return ( + <> + +
+
+ + Topic: {acl.topicName} + +
+
+ + Permission Type: {acl.permissionType} + +
+
+ + Group: {acl.cognitoGroup} + +
+
+
+ + + Delete + +
+
+ + + + + + + Delete Kafka ACL + + + + + Cancel + + + + + + + ) +} diff --git a/app/routes/admin.kafka.edit.tsx b/app/routes/admin.kafka.edit.tsx new file mode 100644 index 000000000..7d9af231a --- /dev/null +++ b/app/routes/admin.kafka.edit.tsx @@ -0,0 +1,95 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { LoaderFunctionArgs } from '@remix-run/node' +import { Form, useLoaderData } from '@remix-run/react' +import { + Button, + Checkbox, + Label, + Select, + TextInput, +} from '@trussworks/react-uswds' + +import { getUser } from './_auth/user.server' +import { getGroups } from '~/lib/cognito.server' + +export async function loader({ request }: LoaderFunctionArgs) { + const user = await getUser(request) + if (!user) throw new Response(null, { status: 403 }) + const userGroups = (await getGroups()) + .filter((group) => group.GroupName?.startsWith('gcn.nasa.gov/')) + .map((group) => group.GroupName) + + return { userGroups } +} + +export default function Kafka() { + const { userGroups } = useLoaderData() + return +} + +function KafkaAclForm({ groups }: { groups: string[] }) { + return ( + <> +

Create Kafka ACLs

+
+ + console.log(e.target.value)} + /> + + + + Producer will generate ACLs for the Create, Write, and Describe + operations. Consumer will generate ACLs for the Read and Describe + operations + + + + +
+ + If yes, submission will also trigger th generation of ACLs for the + provided topic name as a PREFIXED topic with a period included at + the end. For example, if checked, a topic of `gcn.notices.icecube` + will result in ACLs for both `gcn.notices.icecube` (literal) and + `gcn.notices.icecube.` (prefixed). + +
+ + + + ) +} diff --git a/app/routes/admin.tsx b/app/routes/admin.tsx index a38586307..0531f8c35 100644 --- a/app/routes/admin.tsx +++ b/app/routes/admin.tsx @@ -27,6 +27,9 @@ export default function () {
+ Kafka + , Users , diff --git a/playwright.config.ts b/playwright.config.ts index 5aa5401b4..76bf15d53 100644 --- a/playwright.config.ts +++ b/playwright.config.ts @@ -4,7 +4,6 @@ import { defineConfig, devices } from '@playwright/test' * Read environment variables from file. * https://github.com/motdotla/dotenv */ -// require('dotenv').config(); const deviceList = ['Desktop Firefox', 'Desktop Chrome', 'Desktop Safari'] @@ -69,7 +68,6 @@ export default defineConfig({ command: 'npm run dev', url: 'http://localhost:3333', reuseExistingServer: !process.env.CI, - stdout: 'pipe', timeout: 120 * 1000, // 120 Seconds timeout on webServer }, }) diff --git a/sandbox-seed.json b/sandbox-seed.json index 35534487e..448b71353 100644 --- a/sandbox-seed.json +++ b/sandbox-seed.json @@ -5155,5 +5155,19 @@ "requestorEmail": "example@example.com", "subject": "Optical Observations for GRB 971227" } + ], + "kafka_acls": [ + { + "topicName": "test_topic_created_from_website", + "permissionType": "consumer", + "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-consumer", + "prefixed": false + }, + { + "topicName": "test_topic_created_from_website", + "permissionType": "producer", + "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-producer", + "prefixed": false + } ] } From 80cb32ae75cee4b753c7d6e32ecade37f05e12b4 Mon Sep 17 00:00:00 2001 From: dakota002 Date: Wed, 14 Aug 2024 15:04:54 -0400 Subject: [PATCH 2/4] Fixes pulling data, name change on data type, Deny permissions --- app.arc | 2 +- app/lib/kafka.server.ts | 68 +++++++++---------------- app/routes/admin.kafka._index.tsx | 83 +++++++++++++++++++++++-------- app/routes/admin.kafka.edit.tsx | 10 ++-- sandbox-seed.json | 10 ++-- 5 files changed, 101 insertions(+), 72 deletions(-) diff --git a/app.arc b/app.arc index d6338aa1b..f4de4c93e 100644 --- a/app.arc +++ b/app.arc @@ -155,7 +155,7 @@ synonyms kafka_acls cognitoGroup *String - permissionType **String + userClientType **String name aclsByGroup @aws diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index 3f4ddf9de..5720ec7a6 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -82,12 +82,13 @@ if (process.env.ARC_SANDBOX) { export type KafkaACL = { topicName: string - permissionType: PermissionType + userClientType: UserClientType cognitoGroup: string prefixed: boolean + permissionType: number } -export type PermissionType = 'producer' | 'consumer' +export type UserClientType = 'producer' | 'consumer' export const adminGroup = 'gcn.nasa.gov/gcn-admin' @@ -128,7 +129,7 @@ export async function createKafkaACL(user: User, acl: KafkaACL) { ], }) const acls = - acl.permissionType == 'producer' + acl.userClientType == 'producer' ? createProducerAcls(acl) : createConsumerAcls(acl) await adminClient.createAcls({ acl: acls }) @@ -203,54 +204,33 @@ export async function getAclsFromBrokers() { const acls = await adminClient.describeAcls({ resourceType: AclResourceTypes.TOPIC, host: '*', - permissionType: AclPermissionTypes.ALLOW, + permissionType: AclPermissionTypes.ANY, operation: AclOperationTypes.ANY, resourcePatternType: ResourcePatternTypes.ANY, }) await adminClient.disconnect() const results: KafkaACL[] = [] for (const item of acls.resources) { + console.log('Item:', item) const topicName = item.resourceName + const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED - const producerRules = producerOperations.every((op) => - item.acls.map((x) => x.operation).includes(op) - ) - const producerGroup = - producerRules && - [ - ...new Set( - item.acls - .filter((acl) => producerOperations.includes(acl.operation)) - .map((x) => x.principal) - ), - ][0]?.replace('User:', '') - const consumerRules = consumerOperations.every((op) => - item.acls.map((x) => x.operation).includes(op) + results.push( + ...item.acls + .filter((acl) => acl.operation !== AclOperationTypes.DESCRIBE) + .map((acl) => { + const principal = acl.principal.split('-') + return { + topicName, + prefixed, + permissionType: acl.permissionType, + cognitoGroup: acl.principal.replace('User:', ''), + userClientType: principal[principal.length - 1] as UserClientType, + } + }) ) - const consumerGroup = - consumerRules && - [ - ...new Set( - item.acls - .filter((acl) => consumerOperations.includes(acl.operation)) - .map((x) => x.principal) - ), - ][0]?.replace('User:', '') - if (producerRules && producerGroup) - results.push({ - topicName, - permissionType: 'producer', - cognitoGroup: producerGroup, - prefixed, - }) - if (consumerRules && consumerGroup) - results.push({ - topicName, - permissionType: 'consumer', - cognitoGroup: consumerGroup, - prefixed, - }) } + return results } @@ -263,7 +243,7 @@ export async function deleteKafkaACL(user: User, acl: KafkaACL) { }) const acls = - acl.permissionType == 'producer' + acl.userClientType == 'producer' ? createProducerAcls(acl) : createConsumerAcls(acl) @@ -294,7 +274,7 @@ function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) { principal: `User:${acl.cognitoGroup}`, host: '*', operation, - permissionType: AclPermissionTypes.ALLOW, + permissionType: acl.permissionType, } }) } @@ -302,7 +282,7 @@ function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) { export async function updateBrokersFromDb(user: User) { const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user) const mappedAcls = dbDefinedAcls.flatMap((x) => - x.permissionType === 'producer' + x.userClientType === 'producer' ? createProducerAcls(x) : createConsumerAcls(x) ) diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx index 482799e43..66639e00c 100644 --- a/app/routes/admin.kafka._index.tsx +++ b/app/routes/admin.kafka._index.tsx @@ -26,13 +26,14 @@ import HeadingWithAddButton from '~/components/HeadingWithAddButton' import SegmentedCards from '~/components/SegmentedCards' import Spinner from '~/components/Spinner' import TimeAgo from '~/components/TimeAgo' -import type { KafkaACL, PermissionType } from '~/lib/kafka.server' +import type { KafkaACL, UserClientType } from '~/lib/kafka.server' import { adminGroup, createKafkaACL, deleteKafkaACL, getKafkaACLsFromDynamoDB, getLastSyncDate, + updateBrokersFromDb, updateDbFromBrokers, } from '~/lib/kafka.server' import { getFormDataString } from '~/lib/utils' @@ -57,15 +58,23 @@ export async function action({ request }: ActionFunctionArgs) { await updateDbFromBrokers(user) return null } + + if (intent === 'migrateFromDB') { + await updateBrokersFromDb(user) + return null + } + const topicName = getFormDataString(data, 'topicName') - const permissionType = getFormDataString( + const userClientType = getFormDataString( data, - 'permissionType' - ) as PermissionType + 'userClientType' + ) as UserClientType + const permissionTypeString = getFormDataString(data, 'permissionType') const group = getFormDataString(data, 'group') const includePrefixed = getFormDataString(data, 'includePrefixed') - if (!topicName || !permissionType || !group) + if (!topicName || !userClientType || !group || !permissionTypeString) throw new Response(null, { status: 400 }) + const permissionType = parseInt(permissionTypeString) const promises = [] switch (intent) { @@ -73,9 +82,10 @@ export async function action({ request }: ActionFunctionArgs) { promises.push( deleteKafkaACL(user, { topicName, - permissionType, + userClientType, cognitoGroup: group, - prefixed: false, + prefixed: topicName.endsWith('.'), + permissionType, }) ) break @@ -83,9 +93,10 @@ export async function action({ request }: ActionFunctionArgs) { promises.push( createKafkaACL(user, { topicName, - permissionType, + userClientType, cognitoGroup: group, prefixed: false, + permissionType, }) ) @@ -93,9 +104,10 @@ export async function action({ request }: ActionFunctionArgs) { promises.push( createKafkaACL(user, { topicName: `${topicName}.`, - permissionType, + userClientType, cognitoGroup: group, prefixed: true, + permissionType, }) ) break @@ -112,6 +124,7 @@ export default function Index() { const [aclData, setAclData] = useState(dynamoDbAclData) const updateFetcher = useFetcher() const aclFetcher = useFetcher() + const brokerFromDbFetcher = useFetcher() useEffect(() => { setAclData(aclFetcher.data?.dynamoDbAclData ?? aclData) @@ -136,7 +149,10 @@ export default function Index() { type="submit" name="intent" value="migrateFromBroker" - disabled={updateFetcher.state !== 'idle'} + disabled={ + updateFetcher.state !== 'idle' || + brokerFromDbFetcher.state !== 'idle' + } > Pull ACLs from Broker @@ -152,6 +168,26 @@ export default function Index() {

)} + + + + {brokerFromDbFetcher.state !== 'idle' && ( + + Updating... + + )} + + {aclData && ( <> @@ -188,24 +224,26 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { const fetcher = useFetcher() const disabled = fetcher.state !== 'idle' + const permissionMap: { [key: number]: string } = { + 2: 'Deny', + 3: 'Allow', + } + return ( <>
- - Topic: {acl.topicName} - + Group: {acl.cognitoGroup} +
+
+ Client Type: {acl.userClientType}
- - Permission Type: {acl.permissionType} - + Permission: {permissionMap[acl.permissionType]}
- - Group: {acl.cognitoGroup} - + Topic: {acl.topicName}
@@ -239,12 +277,17 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { name="permissionType" value={acl.permissionType} /> + Delete Kafka ACL
- Group: {acl.cognitoGroup} + Group: {acl.principal}
-
+ {/*
Client Type: {acl.userClientType} -
+
*/}
Permission: {permissionMap[acl.permissionType]}
- Topic: {acl.topicName} + Resource: {acl.resourceName}
@@ -270,27 +267,13 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { renderToPortal={false} > - - - - + Delete Kafka ACL diff --git a/sandbox-seed.json b/sandbox-seed.json index 4daab0ee7..9c9a94b3e 100644 --- a/sandbox-seed.json +++ b/sandbox-seed.json @@ -5158,15 +5158,15 @@ ], "kafka_acls": [ { - "topicName": "test_topic_created_from_website", - "userClientType": "consumer", + "aclId": "12345678-abcd-1234-abcd-1234abcd1234", + "resourceName": "test_topic_created_from_website", "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-consumer", "prefixed": false, "permissionType": "3" }, { - "topicName": "test_topic_created_from_website", - "userClientType": "producer", + "aclId": "62fe8590-42e4-4917-afea-db6a0a84079a", + "resourceName": "test_topic_created_from_website", "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-producer", "prefixed": false, "permissionType": "3" From bc8e81f751d9cf9e80389f675bf1b26bd778bfc3 Mon Sep 17 00:00:00 2001 From: dakota002 Date: Mon, 19 Aug 2024 14:58:08 -0400 Subject: [PATCH 4/4] Adds some checks to confirm actions, and include Update Broker from DB action --- app/lib/kafka.server.ts | 26 ++---- app/routes/admin.kafka._index.tsx | 147 ++++++++++++++++++++++-------- app/routes/admin.kafka.edit.tsx | 11 ++- sandbox-seed.json | 16 +++- 4 files changed, 138 insertions(+), 62 deletions(-) diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index 831aa0701..dbaa4c5b5 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -9,6 +9,7 @@ import { tables } from '@architect/functions' import { paginateScan } from '@aws-sdk/lib-dynamodb' import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import crypto from 'crypto' +import type { AclFilter } from 'gcn-kafka' import { Kafka } from 'gcn-kafka' import type { AclEntry } from 'kafkajs' import { @@ -81,17 +82,6 @@ if (process.env.ARC_SANDBOX) { } } -/** - * AclEntry already contains definitions for the following: - * - * principal: string --> 'User:{cognito_group_name}' - * host: string --> '*' - * operation: AclOperationTypes --> Read,Write, etc from enum - * permissionType: AclPermissionTypes --> Allow, Deny, etc from enum - * resourceType: AclResourceTypes --> TOPIC, etc - * resourceName: string --> name of topic: 'gcn.notices.burstcube' - * resourcePatternType: ResourcePatternTypes --> PREFIXED or LITERAL - */ export type KafkaACL = AclEntry & { aclId?: string } @@ -139,9 +129,9 @@ export async function createKafkaACL( resourceName, principal: `User:${group}`, host: '*', - operation, // Read, write, etc - permissionType, // Allow, deny etc - resourcePatternType: 3, // LITERAL | PREFIXED + operation, + permissionType, + resourcePatternType: 3, resourceType, } }) @@ -150,9 +140,9 @@ export async function createKafkaACL( resourceName, principal: `User:${group}`, host: '*', - operation, // Read, write, etc + operation, permissionType, - resourcePatternType: 3, // LITERAL | PREFIX + resourcePatternType: 3, resourceType, } }) @@ -280,13 +270,13 @@ export async function getAclsFromBrokers() { export async function deleteKafkaACL(user: User, aclIds: string[]) { validateUser(user) const db = await tables() - const acls = await Promise.all( + const acls: KafkaACL[] = await Promise.all( aclIds.map((aclId) => db.kafka_acls.get({ aclId })) ) const adminClient = adminKafka.admin() await adminClient.connect() - await adminClient.deleteAcls({ filters: acls }) + await adminClient.deleteAcls({ filters: acls as AclFilter[] }) await adminClient.disconnect() await Promise.all( diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx index b2b877679..0685fcd23 100644 --- a/app/routes/admin.kafka._index.tsx +++ b/app/routes/admin.kafka._index.tsx @@ -19,6 +19,7 @@ import { ModalToggleButton, TextInput, } from '@trussworks/react-uswds' +import { groupBy, sortBy } from 'lodash' import { useEffect, useRef, useState } from 'react' import { getUser } from './_auth/user.server' @@ -43,7 +44,13 @@ export async function loader({ request }: LoaderFunctionArgs) { if (!user || !user.groups.includes(adminGroup)) throw new Response(null, { status: 403 }) const { aclFilter } = Object.fromEntries(new URL(request.url).searchParams) - const dynamoDbAclData = await getKafkaACLsFromDynamoDB(user, aclFilter) + const dynamoDbAclData = groupBy( + sortBy(await getKafkaACLsFromDynamoDB(user, aclFilter), [ + 'resourceName', + 'principal', + ]), + 'resourceName' + ) const latestSync = await getLastSyncDate() return { dynamoDbAclData, latestSync } } @@ -54,6 +61,7 @@ export async function action({ request }: ActionFunctionArgs) { throw new Response(null, { status: 403 }) const data = await request.formData() const intent = getFormDataString(data, 'intent') + if (intent === 'migrateFromBroker') { await updateDbFromBrokers(user) return null @@ -64,11 +72,11 @@ export async function action({ request }: ActionFunctionArgs) { return null } - const aclId = getFormDataString(data, 'aclId') const promises = [] switch (intent) { case 'delete': + const aclId = getFormDataString(data, 'aclId') if (!aclId) throw new Response(null, { status: 400 }) promises.push(deleteKafkaACL(user, [aclId])) break @@ -78,8 +86,8 @@ export async function action({ request }: ActionFunctionArgs) { data, 'userClientType' ) as UserClientType - const permissionTypeString = getFormDataString(data, 'permissionType') const group = getFormDataString(data, 'group') + const permissionTypeString = getFormDataString(data, 'permissionType') const includePrefixed = getFormDataString(data, 'includePrefixed') const resourceTypeString = getFormDataString(data, 'resourceType') @@ -122,6 +130,7 @@ export default function Index() { const updateFetcher = useFetcher() const aclFetcher = useFetcher() const brokerFromDbFetcher = useFetcher() + const ref = useRef(null) useEffect(() => { setAclData(aclFetcher.data?.dynamoDbAclData ?? aclData) @@ -140,7 +149,6 @@ export default function Index() { data from topics, create or delete topics, manage consumer groups, and perform administrative tasks.

- - {brokerFromDbFetcher.state !== 'idle' && ( - - Updating... - - )} - - {aclData && ( <> - + +
+ + ) } @@ -221,26 +264,56 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { const fetcher = useFetcher() const disabled = fetcher.state !== 'idle' + // TODO: These maps can probably be refactored, since they are + // just inverting the enum from kafka, but importing them + // directly here causes some errors. Same for mapping them to + // dropdowns const permissionMap: { [key: number]: string } = { 2: 'Deny', 3: 'Allow', } + const operationMap: { [key: number]: string } = { + 0: 'Unknown', + 1: 'Any', + 2: 'All', + 3: 'Read', + 4: 'Write', + 5: 'Create', + 6: 'Delete', + 7: 'Alter', + 8: 'Describe', + 9: 'Cluster Action', + 10: 'Describe Configs', + 11: 'Alter Configs', + 12: 'Idempotent Write', + } + + const resourceTypeMap: { [key: number]: string } = { + 0: 'Unknown', + 1: 'Any', + 2: 'Topic', + 3: 'Group', + 4: 'Cluster', + 5: 'Transactional Id', + 6: 'Delegation Token', + } + return ( <> -
+
+
+ Type: {resourceTypeMap[acl.resourceType]} +
Group: {acl.principal}
- {/*
- Client Type: {acl.userClientType} -
*/}
Permission: {permissionMap[acl.permissionType]}
- Resource: {acl.resourceName} + Operation: {operationMap[acl.operation]}
@@ -272,8 +345,8 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { Delete Kafka ACL diff --git a/app/routes/admin.kafka.edit.tsx b/app/routes/admin.kafka.edit.tsx index af8aad19a..f3f3c6887 100644 --- a/app/routes/admin.kafka.edit.tsx +++ b/app/routes/admin.kafka.edit.tsx @@ -39,20 +39,25 @@ function KafkaAclForm({ groups }: { groups: string[] }) {

Create Kafka ACLs

+ +