Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka ACL admin to website #2301

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
aclId *String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down Expand Up @@ -143,6 +152,10 @@ synonyms
synonymId *String
name synonymsByUuid

kafka_acls
resourceName *String
name aclsByResourceName

@aws
runtime nodejs20.x
region us-east-1
Expand Down
260 changes: 260 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'

Check warning on line 9 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L8-L9

Added lines #L8 - L9 were not covered by tests
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import crypto from 'crypto'

Check warning on line 11 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L11

Added line #L11 was not covered by tests
import type { AclFilter } from 'gcn-kafka'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {

Check warning on line 15 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L15

Added line #L15 was not covered by tests
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
Expand Down Expand Up @@ -68,3 +81,250 @@
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = AclEntry & {
aclId?: string
}

export type UserClientType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

Check warning on line 91 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L91

Added line #L91 was not covered by tests

export const consumerOperations = [

Check warning on line 93 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L93

Added line #L93 was not covered by tests
AclOperationTypes.READ,
AclOperationTypes.DESCRIBE,
]
export const producerOperations = [

Check warning on line 97 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L97

Added line #L97 was not covered by tests
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({

Check warning on line 105 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L104-L105

Added lines #L104 - L105 were not covered by tests
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {

Check warning on line 111 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L111

Added line #L111 was not covered by tests
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })

Check warning on line 113 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L113

Added line #L113 was not covered by tests
}

export async function createKafkaACL(

Check warning on line 116 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L116

Added line #L116 was not covered by tests
user: User,
userClientType: UserClientType,
resourceName: string,
group: string,
permissionType: number,
resourceType: number,
includePrefixed: boolean
) {
const acls: KafkaACL[] =
userClientType == 'consumer'
? consumerOperations.map((operation) => {
return {

Check warning on line 128 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L127-L128

Added lines #L127 - L128 were not covered by tests
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 3,
resourceType,
}
})
: producerOperations.map((operation) => {
return {

Check warning on line 139 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L138-L139

Added lines #L138 - L139 were not covered by tests
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 3,
resourceType,
}
})

if (includePrefixed) {
const prefixedAcls =
userClientType === 'consumer'
? consumerOperations.map((operation) => {
return {

Check warning on line 154 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L153-L154

Added lines #L153 - L154 were not covered by tests
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
: producerOperations.map((operation) => {
return {

Check warning on line 165 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L164-L165

Added lines #L164 - L165 were not covered by tests
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
acls.push(...prefixedAcls)

Check warning on line 175 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L175

Added line #L175 was not covered by tests
}

await createKafkaACLInternal(user, acls)

Check warning on line 178 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L178

Added line #L178 was not covered by tests
}

async function createKafkaACLInternal(user: User, acls: KafkaACL[]) {
validateUser(user)

Check warning on line 182 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L181-L182

Added lines #L181 - L182 were not covered by tests
// Save to db
const db = await tables()
await Promise.all(
acls.map((acl) => db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() }))

Check warning on line 186 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L184-L186

Added lines #L184 - L186 were not covered by tests
)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()

Check warning on line 191 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L190-L191

Added lines #L190 - L191 were not covered by tests
if (acls.some((acl) => acl.resourceType === AclResourceTypes.TOPIC))
await Promise.all(

Check warning on line 193 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L193

Added line #L193 was not covered by tests
acls
.filter((acl) => acl.resourceType === AclResourceTypes.TOPIC)
.map((acl) =>
adminClient.createTopics({

Check warning on line 197 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L195-L197

Added lines #L195 - L197 were not covered by tests
topics: [
{
topic: acl.resourceName,
},
],
})
)
)

await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()

Check warning on line 208 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L207-L208

Added lines #L207 - L208 were not covered by tests
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan(

Check warning on line 216 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L211-L216

Added lines #L211 - L216 were not covered by tests
{ client },
{
TableName,
FilterExpression: filter
? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
':filter': filter,
}
: undefined,
}
)

const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]

Check warning on line 233 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L231-L233

Added lines #L231 - L233 were not covered by tests
if (newACL) acls.push(...newACL)
}
return acls

Check warning on line 236 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L236

Added line #L236 was not covered by tests
}

export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({

Check warning on line 242 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L239-L242

Added lines #L239 - L242 were not covered by tests
resourceType: AclResourceTypes.ANY,
host: '*',
permissionType: AclPermissionTypes.ANY,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()

Check warning on line 249 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L249

Added line #L249 was not covered by tests

const results: KafkaACL[] = []
for (const item of acls.resources) {
console.log('Item:', item)

Check warning on line 253 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L251-L253

Added lines #L251 - L253 were not covered by tests

results.push(
...item.acls.map((acl) => {
return {

Check warning on line 257 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L255-L257

Added lines #L255 - L257 were not covered by tests
...acl,
resourceName: item.resourceName,
resourceType: item.resourceType,
resourcePatternType: item.resourcePatternType,
}
})
)
}

return results

Check warning on line 267 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L267

Added line #L267 was not covered by tests
}

export async function deleteKafkaACL(user: User, aclIds: string[]) {
validateUser(user)
const db = await tables()
const acls: KafkaACL[] = await Promise.all(
aclIds.map((aclId) => db.kafka_acls.get({ aclId }))

Check warning on line 274 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L270-L274

Added lines #L270 - L274 were not covered by tests
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls as AclFilter[] })
await adminClient.disconnect()

Check warning on line 280 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L277-L280

Added lines #L277 - L280 were not covered by tests

await Promise.all(
acls.map((acl) =>
db.kafka_acls.delete({

Check warning on line 284 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L282-L284

Added lines #L282 - L284 were not covered by tests
aclId: acl.aclId,
})
)
)
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: dbDefinedAcls })
await adminClient.disconnect()

Check warning on line 296 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L291-L296

Added lines #L291 - L296 were not covered by tests
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) =>
db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() })

Check warning on line 304 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L299-L304

Added lines #L299 - L304 were not covered by tests
),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (

Check warning on line 322 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L320-L322

Added lines #L320 - L322 were not covered by tests
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'

Check warning on line 49 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L49

Added line #L49 was not covered by tests
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -119,6 +120,7 @@
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup)
const userIsAdmin = user?.groups.includes(adminGroup)

Check warning on line 123 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L123

Added line #L123 was not covered by tests

return {
origin,
Expand All @@ -129,6 +131,7 @@
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -168,6 +171,11 @@
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin

Check warning on line 176 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L174-L176

Added lines #L174 - L176 were not covered by tests
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

Check warning on line 20 in app/root/header/Header.tsx

View check run for this annotation

Codecov / codecov/patch

app/root/header/Header.tsx#L20

Added line #L20 was not covered by tests

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

Check warning on line 77 in app/root/header/Header.tsx

View check run for this annotation

Codecov / codecov/patch

app/root/header/Header.tsx#L77

Added line #L77 was not covered by tests

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin/kafka">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading
Loading