Skip to content

Commit

Permalink
Log sync, rough draft of sync functionality, group check
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 committed Jun 11, 2024
1 parent ecf923d commit 5ccfd06
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 33 deletions.
5 changes: 5 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ kafka_acls
group **String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down
36 changes: 28 additions & 8 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export type KafkaACL = {
permissionType: PermissionType
group: string
prefixed: boolean
existsOnBroker: boolean
}

export type PermissionType = 'producer' | 'consumer'
Expand Down Expand Up @@ -185,9 +184,7 @@ export async function getAclsFromBrokers() {
})
await adminClient.disconnect()
const results: KafkaACL[] = []
for (const item of acls.resources.sort((a, b) =>
a.resourceName.localeCompare(b.resourceName)
)) {
for (const item of acls.resources) {
const topicName = item.resourceName
const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
const producerRules = producerOperations.every((op) =>
Expand Down Expand Up @@ -220,15 +217,13 @@ export async function getAclsFromBrokers() {
permissionType: 'producer',
group: producerGroup,
prefixed,
existsOnBroker: true,
})
if (consumerRules && consumerGroup)
results.push({
topicName,
permissionType: 'consumer',
group: consumerGroup,
prefixed,
existsOnBroker: true,
})
}
return results
Expand Down Expand Up @@ -287,8 +282,33 @@ export async function updateBrokersFromDb(user: User) {
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.disconnect()
}
export async function updateDbFromBrokers() {
export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all(kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)))
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
65 changes: 45 additions & 20 deletions app/routes/admin.kafka._index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,66 @@ import { useRef } from 'react'
import { getUser } from './_auth/user.server'
import HeadingWithAddButton from '~/components/HeadingWithAddButton'
import SegmentedCards from '~/components/SegmentedCards'
import { getGroups } from '~/lib/cognito.server'
import Spinner from '~/components/Spinner'
import TimeAgo from '~/components/TimeAgo'
import type { KafkaACL } from '~/lib/kafka.server'
import {
getAclsFromBrokers,
getKafkaACLsFromDynamoDB,
} from '~/lib/kafka.server'
import { getKafkaACLsFromDynamoDB, getLastSyncDate } from '~/lib/kafka.server'

export async function loader({ request }: LoaderFunctionArgs) {
const user = await getUser(request)
if (!user) throw new Response(null, { status: 403 })
const dynamoDbAclData = await getKafkaACLsFromDynamoDB(user)
const userGroups = (await getGroups())
.filter((group) => group.GroupName?.startsWith('gcn.nasa.gov/'))
.map((group) => group.GroupName)
const brokerAclData = await getAclsFromBrokers()
return { dynamoDbAclData, userGroups, brokerAclData }
const latestSync = await getLastSyncDate()
return { dynamoDbAclData, latestSync }
}

export default function Index() {
const { dynamoDbAclData } = useLoaderData<typeof loader>()
/** Use a fetcher or something to post? I want to click a button and
* trigger the update, then reload the page? maybe?
*/
const { dynamoDbAclData, latestSync } = useLoaderData<typeof loader>()
const aclFetcher = useFetcher()

return (
<>
<HeadingWithAddButton headingLevel={1}>Kafka Admin</HeadingWithAddButton>
<h2>Kafka ACLs</h2>
<h3>DynamoDB ACLs</h3>
<p>
Information about the Kafka ACLs listed here. Click the button to sync
the db to the kafka broker's current state.
</p>

<SegmentedCards>
{dynamoDbAclData.map((x, index) => (
<KafkaAclCard key={index} acl={x} />
))}
</SegmentedCards>
<aclFetcher.Form method="POST" action="/admin/kafka">
<Button
type="submit"
name="intent"
value="migrateFromBroker"
disabled={aclFetcher.state !== 'idle'}
>
Pull ACLs from Broker
</Button>
{aclFetcher.state !== 'idle' && (
<span className="text-middle">
<Spinner /> Saving...
</span>
)}
</aclFetcher.Form>
{latestSync && (
<p>
Last synced by {latestSync.syncedBy}{' '}
<TimeAgo time={latestSync.syncedOn} />
</p>
)}
<h3>DynamoDB ACLs</h3>
{dynamoDbAclData && (
<>
({dynamoDbAclData.length}) ACLs
<SegmentedCards>
{dynamoDbAclData
.sort((a, b) => a.topicName.localeCompare(b.topicName))
.map((x, index) => (
<KafkaAclCard key={index} acl={x} />
))}
</SegmentedCards>
</>
)}
</>
)
}
Expand Down
17 changes: 12 additions & 5 deletions app/routes/admin.kafka.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@ import { GridContainer, SideNav } from '@trussworks/react-uswds'

import { getUser } from './_auth/user.server'
import type { PermissionType } from '~/lib/kafka.server'
import { createKafkaACL, deleteKafkaACL } from '~/lib/kafka.server'
import {
adminGroup,
createKafkaACL,
deleteKafkaACL,
updateDbFromBrokers,
} from '~/lib/kafka.server'
import { getFormDataString } from '~/lib/utils'

export async function action({ request }: ActionFunctionArgs) {
const user = await getUser(request)
if (!user) throw new Response(null, { status: 403 })
if (!user?.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
const data = await request.formData()
const intent = getFormDataString(data, 'intent')
if (intent === 'migrateFromBroker') {
await updateDbFromBrokers(user)
return null
}
const topicName = getFormDataString(data, 'topicName')
const permissionType = getFormDataString(
data,
Expand All @@ -38,7 +48,6 @@ export async function action({ request }: ActionFunctionArgs) {
permissionType,
group,
prefixed: false,
existsOnBroker: false,
})
)
break
Expand All @@ -49,7 +58,6 @@ export async function action({ request }: ActionFunctionArgs) {
permissionType,
group,
prefixed: false,
existsOnBroker: false,
})
)

Expand All @@ -60,7 +68,6 @@ export async function action({ request }: ActionFunctionArgs) {
permissionType,
group,
prefixed: true,
existsOnBroker: false,
})
)
break
Expand Down

0 comments on commit 5ccfd06

Please sign in to comment.