Skip to content

Commit

Permalink
Basic kafka admin stuff
Browse files Browse the repository at this point in the history
New routes and full form, successful creation of ACLs

Cleanup, and testing acl verification method

Simplify some functions, fix form

Working on syncronizing between brokers and dynamo

Trying a fix for slow tests and needing to wait for the circulars to actually load

Log sync, rough draft of sync functionality, group check
  • Loading branch information
dakota002 committed Jun 25, 2024
1 parent e6080cb commit c6fa53f
Show file tree
Hide file tree
Showing 8 changed files with 649 additions and 2 deletions.
10 changes: 10 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
topicName *String
group **String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down
246 changes: 246 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
Expand Down Expand Up @@ -68,3 +79,238 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = {
topicName: string
permissionType: PermissionType
group: string
prefixed: boolean
}

export type PermissionType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDie('KAFKA_ADMIN_CLIENT_ID')
const admin_client_secret = getEnvOrDie('KAFKA_ADMIN_CLIENT_SECRET')
const adminClient = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain: 'dev.gcn.nasa.gov', // TODO: replace w/ useDomain
}).admin()

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

// Not sure if this is a useful method, but may be helpful if we
// want to verify that our table matches the defined kafka acls
export async function verifyKafkaACL(acl: KafkaACL) {
const operations =
acl.permissionType == 'producer' ? producerOperations : consumerOperations

const promises = operations.map((operation) =>
adminClient.describeAcls({
resourceName: acl.topicName,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation,
resourcePatternType: ResourcePatternTypes.LITERAL,
})
)

const results = await Promise.all(promises)
console.log(results)
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)

// Add to Kafka
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLsFromDynamoDB(user: User) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan({ client }, { TableName })
const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function getAclsFromBrokers() {
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()
const results: KafkaACL[] = []
for (const item of acls.resources) {
const topicName = item.resourceName
const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
const producerRules = producerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const producerGroup =
producerRules &&
[
...new Set(
item.acls
.filter((acl) => producerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
const consumerRules = consumerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const consumerGroup =
consumerRules &&
[
...new Set(
item.acls
.filter((acl) => consumerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
if (producerRules && producerGroup)
results.push({
topicName,
permissionType: 'producer',
group: producerGroup,
prefixed,
})
if (consumerRules && consumerGroup)
results.push({
topicName,
permissionType: 'consumer',
group: consumerGroup,
prefixed,
})
}
return results
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({ topicName: acl.topicName, group: acl.group })

const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)

await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.group}`,
host: '*',
operation,
permissionType: AclPermissionTypes.ALLOW,
}
})
}

// TODO: Write these next
export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const mappedAcls = dbDefinedAcls.flatMap((x) =>
x.permissionType === 'producer'
? createProducerAcls(x)
: createConsumerAcls(x)
)
await adminClient.connect()
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.disconnect()
}
export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
10 changes: 9 additions & 1 deletion app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -116,6 +117,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(group)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -126,6 +128,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -165,6 +168,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down Expand Up @@ -274,7 +282,7 @@ export function Layout({ children }: { children?: ReactNode }) {
function ErrorUnexpected({ children }: { children?: ReactNode }) {
return (
<GridContainer className="usa-section">
<h1>Unexpected error {children}</h1>
<h1 id="unexpectedError">Unexpected error {children}</h1>
<p className="usa-intro">An unexpected error occurred.</p>
<FormGroup>
<ButtonGroup>
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react'
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@ export function Header() {
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@ export function Header() {
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading

0 comments on commit c6fa53f

Please sign in to comment.