Skip to content

Commit

Permalink
egress table and queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 8, 2024
1 parent 744ca61 commit 5872799
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 8 deletions.
77 changes: 77 additions & 0 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

/**
* @typedef {import('../lib/api').EgressEvent} EgressEvent
* @typedef {import('../types').InferStoreRecord<EgressEvent> & { pk: string, sk: string }} EgressEventStoreRecord
* @typedef {import('../types').StoreRecord} StoreRecord
* @typedef {import('../lib/api').EgressEventListKey} EgressEventListKey
* @typedef {{ pk: string, sk: string }} EgressEventListStoreRecord
*/

export const egressSchema = Schema.struct({
customerId: Schema.did({ method: 'mailto' }),
resourceId: Schema.text(),
timestamp: Schema.date(),
})

/** @type {import('../lib/api').Validator<EgressEvent>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<EgressEvent, EgressEventStoreRecord>} */
export const encode = input => {
try {
return {
ok: {
pk: `${input.timestamp.toISOString()}#${input.customerId}`,
sk: `${input.timestamp.toISOString()}#${input.customerId}#${input.resourceId}`,
customerId: input.customerId,
resourceId: input.resourceId,
timestamp: input.timestamp.toISOString(),
}
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Encoder<EgressEvent, string>} */
export const encodeStr = input => {
try {
const data = encode(input)
if (data.error) throw data.error
return { ok: JSON.stringify(data.ok) }
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<StoreRecord, EgressEvent>} */
export const decode = input => {
try {
return {
ok: {
customerId: Schema.did({ method: 'mailto' }).from(input.customerId),
resourceId: /** @type {string} */ (input.resourceId),
timestamp: new Date(input.timestamp),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}

export const lister = {
/** @type {import('../lib/api').Encoder<EgressEventListKey, EgressEventListStoreRecord>} */
encodeKey: input => ({
ok: {
pk: `${input.from.toISOString()}#${input.customerId}`,
sk: `${input.from.toISOString()}#${input.customerId}#${input.resourceId}`
}
})
}
31 changes: 23 additions & 8 deletions billing/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export interface CustomerKey {
customer: CustomerDID
}

export interface CustomerListOptions extends Pageable {}
export interface CustomerListOptions extends Pageable { }

export type CustomerStore =
& StoreGetter<CustomerKey, Customer>
Expand Down Expand Up @@ -133,6 +133,21 @@ export interface UsageListKey { customer: CustomerDID, from: Date }

export type UsageStore = StorePutter<Usage>


/**
* The event that is emitted when egress traffic is detected.
*/
export interface EgressEvent {
customerId: string
resourceId: string
timestamp: Date
}


export interface EgressEventListKey { customerId: string, resourceId: string, from: Date }

export type EgressEventStore = StorePutter<EgressEvent> & StoreLister<EgressEventListKey, EgressEvent>

// Billing queues /////////////////////////////////////////////////////////////

/**
Expand Down Expand Up @@ -188,7 +203,7 @@ export interface ConsumerListKey { consumer: ConsumerDID }

export type ConsumerStore =
& StoreGetter<ConsumerKey, Consumer>
& StoreLister<ConsumerListKey, Pick<Consumer, 'consumer'|'provider'|'subscription'>>
& StoreLister<ConsumerListKey, Pick<Consumer, 'consumer' | 'provider' | 'subscription'>>

export interface Subscription {
customer: CustomerDID
Expand All @@ -205,7 +220,7 @@ export interface SubscriptionListKey { customer: CustomerDID }

export type SubscriptionStore =
& StoreGetter<SubscriptionKey, Subscription>
& StoreLister<SubscriptionListKey, Pick<Subscription, 'customer'|'provider'|'subscription'|'cause'>>
& StoreLister<SubscriptionListKey, Pick<Subscription, 'customer' | 'provider' | 'subscription' | 'cause'>>

// UCAN invocation ////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -302,7 +317,7 @@ export interface InsufficientRecords extends Failure {
/** StorePutter allows a single item to be put in the store by it's key. */
export interface StorePutter<T> {
/** Puts a single item into the store by it's key */
put: (rec: T) => Promise<Result<Unit, EncodeFailure|StoreOperationFailure|Failure>>
put: (rec: T) => Promise<Result<Unit, EncodeFailure | StoreOperationFailure | Failure>>
}

/**
Expand All @@ -316,23 +331,23 @@ export interface StoreBatchPutter<T> {
* not transactional. A failure may mean 1 or more records succeeded to
* be written.
*/
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords|EncodeFailure|StoreOperationFailure|Failure>>
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords | EncodeFailure | StoreOperationFailure | Failure>>
}

/** StoreGetter allows a single item to be retrieved by it's key. */
export interface StoreGetter<K extends {}, V> {
/** Gets a single item by it's key. */
get: (key: K) => Promise<Result<V, EncodeFailure|RecordNotFound<K>|DecodeFailure|StoreOperationFailure>>
get: (key: K) => Promise<Result<V, EncodeFailure | RecordNotFound<K> | DecodeFailure | StoreOperationFailure>>
}

/** StoreLister allows items in the store to be listed page by page. */
export interface StoreLister<K extends {}, V> {
/** Lists items in the store. */
list: (key: K, options?: Pageable) => Promise<Result<ListSuccess<V>, EncodeFailure|DecodeFailure|StoreOperationFailure>>
list: (key: K, options?: Pageable) => Promise<Result<ListSuccess<V>, EncodeFailure | DecodeFailure | StoreOperationFailure>>
}

/** QueueAdder allows messages to be added to the end of the queue. */
export interface QueueAdder<T> {
/** Adds a message to the end of the queue. */
add: (message: T) => Promise<Result<Unit, EncodeFailure|QueueOperationFailure|Failure>>
add: (message: T) => Promise<Result<Unit, EncodeFailure | QueueOperationFailure | Failure>>
}
9 changes: 9 additions & 0 deletions billing/queues/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createQueueAdderClient } from './client.js'
import { encodeStr, validate } from '../data/egress.js'

/**
* @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf
* @param {{ url: URL }} context
*/
export const createEgressEventQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode:encodeStr, validate })
33 changes: 33 additions & 0 deletions billing/tables/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { createStorePutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/egress.js'

/**
* Stores egress events for tracking requests served to customers.
*
* @type {import('sst/constructs').TableProps}
*/
export const egressTableProps = {
fields: {
/** Composite key with format: "customerId" */
pk: 'string',
/** Composite key with format: "timestamp#customerId#resourceId" */
sk: 'string',
/** Customer DID (did:mailto:...). */
customerId: 'string',
/** Resource CID. */
resourceId: 'string',
/** ISO timestamp of the event. */
timestamp: 'string',
},
primaryIndex: { partitionKey: 'pk', sortKey: 'sk' }
}

/**
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {{ tableName: string }} context
* @returns {import('../lib/api.js').EgressEventStore}
*/
export const createEgressEventStore = (conf, { tableName }) => ({
...createStorePutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
})

0 comments on commit 5872799

Please sign in to comment.