Skip to content

Commit

Permalink
Basic kafka admin stuff
Browse files Browse the repository at this point in the history
New routes and full form, successful creation of ACLs

Cleanup, and testing acl verification method

Simplify some functions, fix form
  • Loading branch information
dakota002 committed Jun 4, 2024
1 parent 3410efd commit 2f2ca44
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 1 deletion.
5 changes: 5 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
topicName *String
group **String
PointInTimeRecovery true

@tables-indexes
email_notification_subscription
topic *String
Expand Down
147 changes: 147 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
Expand Down Expand Up @@ -66,3 +77,139 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = {
topicName: string
permissionType: PermissionType
group: string
prefixed: boolean
}

export type PermissionType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDie('KAFKA_ADMIN_CLIENT_ID')
const admin_client_secret = getEnvOrDie('KAFKA_ADMIN_CLIENT_SECRET')
const adminClient = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain: 'dev.gcn.nasa.gov', // TODO: replace w/ useDomain
}).admin()

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

// Not sure if this is a useful method, but may be helpful if we
// want to verify that our table matches the defined kafka acls
export async function verifyKafkaACL(acl: KafkaACL) {
const operations =
acl.permissionType == 'producer' ? producerOperations : consumerOperations

const promises = operations.map((operation) =>
adminClient.describeAcls({
resourceName: acl.topicName,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation,
resourcePatternType: ResourcePatternTypes.LITERAL,
})
)

const results = await Promise.all(promises)
console.log(results)
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)

// Add to Kafka
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLs(user: User) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan({ client }, { TableName })
const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({ topicName: acl.topicName, group: acl.group })

const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)

await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.group}`,
host: '*',
operation,
permissionType: AclPermissionTypes.ALLOW,
}
})
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -116,6 +117,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(group)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -126,6 +128,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -165,6 +168,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react'
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@ export function Header() {
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@ export function Header() {
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
134 changes: 134 additions & 0 deletions app/routes/admin.kafka._index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*!
* Copyright © 2023 United States Government as represented by the
* Administrator of the National Aeronautics and Space Administration.
* All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
import type { LoaderFunctionArgs } from '@remix-run/node'
import { useFetcher, useLoaderData } from '@remix-run/react'
import type { ModalRef } from '@trussworks/react-uswds'
import {
Button,
Grid,
Icon,
Modal,
ModalFooter,
ModalHeading,
ModalToggleButton,
} from '@trussworks/react-uswds'
import { useRef } from 'react'

import { getUser } from './_auth/user.server'
import HeadingWithAddButton from '~/components/HeadingWithAddButton'
import SegmentedCards from '~/components/SegmentedCards'
import { getGroups } from '~/lib/cognito.server'
import type { KafkaACL } from '~/lib/kafka.server'
import { getKafkaACLs } from '~/lib/kafka.server'

export async function loader({ request }: LoaderFunctionArgs) {
const user = await getUser(request)
if (!user) throw new Response(null, { status: 403 })
const aclData = await getKafkaACLs(user)
const userGroups = (await getGroups())
.filter((group) => group.GroupName?.startsWith('gcn.nasa.gov/'))
.map((group) => group.GroupName)

return { aclData, userGroups }
}

export default function Index() {
const { aclData } = useLoaderData<typeof loader>()

return (
<>
<HeadingWithAddButton headingLevel={1}>Kafka Admin</HeadingWithAddButton>
<h2>Kafka ACLs</h2>
<SegmentedCards>
{aclData.map((x, index) => (
<KafkaAclCard key={index} acl={x} />
))}
</SegmentedCards>
</>
)
}

function KafkaAclCard({ acl }: { acl: KafkaACL }) {
const ref = useRef<ModalRef>(null)
const fetcher = useFetcher()
const disabled = fetcher.state !== 'idle'

return (
<>
<Grid row style={disabled ? { opacity: '50%' } : undefined}>
<div className="tablet:grid-col flex-fill">
<div>
<small>
<strong>Topic:</strong> {acl.topicName}
</small>
</div>
<div>
<small>
<strong>Permission Type:</strong> {acl.permissionType}
</small>
</div>
<div>
<small>
<strong>Group:</strong> {acl.group}
</small>
</div>
</div>
<div className="tablet:grid-col flex-auto margin-y-auto">
<ModalToggleButton
opener
disabled={disabled}
modalRef={ref}
type="button"
className="usa-button--secondary"
>
<Icon.Delete
role="presentation"
className="bottom-aligned margin-right-05"
/>
Delete
</ModalToggleButton>
</div>
</Grid>
<Modal
id="modal-delete"
ref={ref}
aria-labelledby="modal-delete-heading"
aria-describedby="modal-delete-description"
renderToPortal={false}
>
<fetcher.Form method="POST" action="/admin/kafka">
<input type="hidden" name="topicName" value={acl.topicName} />
<input type="hidden" name="group" value={acl.group} />
<input
type="hidden"
name="permissionType"
value={acl.permissionType}
/>
<ModalHeading id="modal-delete-heading">
Delete Kafka ACL
</ModalHeading>
<p id="modal-delete-description">
This will delete the DynamoDB entry and{' '}
{acl.permissionType == 'consumer'
? '"read" and "describe"'
: '"create", "write", and "describe"'}{' '}
Kafka ACLs. Do you wish to continue?
</p>
<ModalFooter>
<ModalToggleButton modalRef={ref} closer outline>
Cancel
</ModalToggleButton>
<Button data-close-modal type="submit" name="intent" value="delete">
Delete
</Button>
</ModalFooter>
</fetcher.Form>
</Modal>
</>
)
}
Loading

0 comments on commit 2f2ca44

Please sign in to comment.