Skip to content

Commit

Permalink
Basic kafka admin stuff
Browse files Browse the repository at this point in the history
New routes and full form, successful creation of ACLs

Working on syncronizing between brokers and dynamo

Trying a fix for slow tests and needing to wait for the circulars to actually load

Log sync, rough draft of sync functionality, group check

Minor changes so tests will work across browser in a single run

Undo some config changes, undo removal of slow

better indicator of the site being ready

Better getter for env variables

I think this will fix the failing tests

another try

getEnvOrDieInProd

remove unrelated changes
  • Loading branch information
dakota002 committed Aug 1, 2024
1 parent a39edad commit fb82cb5
Show file tree
Hide file tree
Showing 9 changed files with 635 additions and 6 deletions.
10 changes: 10 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
topicName *String
group **String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down
236 changes: 233 additions & 3 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'
import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
const kafka = new Kafka({
client_id,
client_secret,
Expand Down Expand Up @@ -68,3 +79,222 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = {
topicName: string
permissionType: PermissionType
group: string
prefixed: boolean
}

export type PermissionType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLsFromDynamoDB(user: User) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan({ client }, { TableName })
const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()
const results: KafkaACL[] = []
for (const item of acls.resources) {
const topicName = item.resourceName
const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
const producerRules = producerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const producerGroup =
producerRules &&
[
...new Set(
item.acls
.filter((acl) => producerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
const consumerRules = consumerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const consumerGroup =
consumerRules &&
[
...new Set(
item.acls
.filter((acl) => consumerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
if (producerRules && producerGroup)
results.push({
topicName,
permissionType: 'producer',
group: producerGroup,
prefixed,
})
if (consumerRules && consumerGroup)
results.push({
topicName,
permissionType: 'consumer',
group: consumerGroup,
prefixed,
})
}
return results
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({ topicName: acl.topicName, group: acl.group })

const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.group}`,
host: '*',
operation,
permissionType: AclPermissionTypes.ALLOW,
}
})
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const mappedAcls = dbDefinedAcls.flatMap((x) =>
x.permissionType === 'producer'
? createProducerAcls(x)
: createConsumerAcls(x)
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.disconnect()
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -116,6 +117,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(group)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -126,6 +128,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -165,6 +168,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react'
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@ export function Header() {
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@ export function Header() {
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin/kafka">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading

0 comments on commit fb82cb5

Please sign in to comment.