From 587279997218568f26d9e2d8e4778f5d60262319 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Tue, 8 Oct 2024 11:08:27 -0300 Subject: [PATCH] egress table and queue --- billing/data/egress.js | 77 ++++++++++++++++++++++++++++++++++++++++ billing/lib/api.ts | 31 +++++++++++----- billing/queues/egress.js | 9 +++++ billing/tables/egress.js | 33 +++++++++++++++++ 4 files changed, 142 insertions(+), 8 deletions(-) create mode 100644 billing/data/egress.js create mode 100644 billing/queues/egress.js create mode 100644 billing/tables/egress.js diff --git a/billing/data/egress.js b/billing/data/egress.js new file mode 100644 index 00000000..87f5dde1 --- /dev/null +++ b/billing/data/egress.js @@ -0,0 +1,77 @@ +import { DecodeFailure, EncodeFailure, Schema } from './lib.js' + +/** + * @typedef {import('../lib/api').EgressEvent} EgressEvent + * @typedef {import('../types').InferStoreRecord & { pk: string, sk: string }} EgressEventStoreRecord + * @typedef {import('../types').StoreRecord} StoreRecord + * @typedef {import('../lib/api').EgressEventListKey} EgressEventListKey + * @typedef {{ pk: string, sk: string }} EgressEventListStoreRecord + */ + +export const egressSchema = Schema.struct({ + customerId: Schema.did({ method: 'mailto' }), + resourceId: Schema.text(), + timestamp: Schema.date(), +}) + +/** @type {import('../lib/api').Validator} */ +export const validate = input => egressSchema.read(input) + +/** @type {import('../lib/api').Encoder} */ +export const encode = input => { + try { + return { + ok: { + pk: `${input.timestamp.toISOString()}#${input.customerId}`, + sk: `${input.timestamp.toISOString()}#${input.customerId}#${input.resourceId}`, + customerId: input.customerId, + resourceId: input.resourceId, + timestamp: input.timestamp.toISOString(), + } + } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Encoder} */ +export const encodeStr = input => { + try { + const data = encode(input) + if (data.error) throw data.error + return { ok: JSON.stringify(data.ok) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decode = input => { + try { + return { + ok: { + customerId: Schema.did({ method: 'mailto' }).from(input.customerId), + resourceId: /** @type {string} */ (input.resourceId), + timestamp: new Date(input.timestamp), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) + } + } +} + +export const lister = { + /** @type {import('../lib/api').Encoder} */ + encodeKey: input => ({ + ok: { + pk: `${input.from.toISOString()}#${input.customerId}`, + sk: `${input.from.toISOString()}#${input.customerId}#${input.resourceId}` + } + }) +} \ No newline at end of file diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 22df39de..dfdb6167 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -90,7 +90,7 @@ export interface CustomerKey { customer: CustomerDID } -export interface CustomerListOptions extends Pageable {} +export interface CustomerListOptions extends Pageable { } export type CustomerStore = & StoreGetter @@ -133,6 +133,21 @@ export interface UsageListKey { customer: CustomerDID, from: Date } export type UsageStore = StorePutter + +/** + * The event that is emitted when egress traffic is detected. + */ +export interface EgressEvent { + customerId: string + resourceId: string + timestamp: Date +} + + +export interface EgressEventListKey { customerId: string, resourceId: string, from: Date } + +export type EgressEventStore = StorePutter & StoreLister + // Billing queues ///////////////////////////////////////////////////////////// /** @@ -188,7 +203,7 @@ export interface ConsumerListKey { consumer: ConsumerDID } export type ConsumerStore = & StoreGetter - & StoreLister> + & StoreLister> export interface Subscription { customer: CustomerDID @@ -205,7 +220,7 @@ export interface SubscriptionListKey { customer: CustomerDID } export type SubscriptionStore = & StoreGetter - & StoreLister> + & StoreLister> // UCAN invocation //////////////////////////////////////////////////////////// @@ -302,7 +317,7 @@ export interface InsufficientRecords extends Failure { /** StorePutter allows a single item to be put in the store by it's key. */ export interface StorePutter { /** Puts a single item into the store by it's key */ - put: (rec: T) => Promise> + put: (rec: T) => Promise> } /** @@ -316,23 +331,23 @@ export interface StoreBatchPutter { * not transactional. A failure may mean 1 or more records succeeded to * be written. */ - batchPut: (rec: Iterable) => Promise> + batchPut: (rec: Iterable) => Promise> } /** StoreGetter allows a single item to be retrieved by it's key. */ export interface StoreGetter { /** Gets a single item by it's key. */ - get: (key: K) => Promise|DecodeFailure|StoreOperationFailure>> + get: (key: K) => Promise | DecodeFailure | StoreOperationFailure>> } /** StoreLister allows items in the store to be listed page by page. */ export interface StoreLister { /** Lists items in the store. */ - list: (key: K, options?: Pageable) => Promise, EncodeFailure|DecodeFailure|StoreOperationFailure>> + list: (key: K, options?: Pageable) => Promise, EncodeFailure | DecodeFailure | StoreOperationFailure>> } /** QueueAdder allows messages to be added to the end of the queue. */ export interface QueueAdder { /** Adds a message to the end of the queue. */ - add: (message: T) => Promise> + add: (message: T) => Promise> } diff --git a/billing/queues/egress.js b/billing/queues/egress.js new file mode 100644 index 00000000..c2cad3a9 --- /dev/null +++ b/billing/queues/egress.js @@ -0,0 +1,9 @@ +import { createQueueAdderClient } from './client.js' +import { encodeStr, validate } from '../data/egress.js' + +/** + * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf + * @param {{ url: URL }} context + */ +export const createEgressEventQueue = (conf, { url }) => + createQueueAdderClient(conf, { url, encode:encodeStr, validate }) diff --git a/billing/tables/egress.js b/billing/tables/egress.js new file mode 100644 index 00000000..8e2882ee --- /dev/null +++ b/billing/tables/egress.js @@ -0,0 +1,33 @@ +import { createStorePutterClient, createStoreListerClient } from './client.js' +import { validate, encode, lister, decode } from '../data/egress.js' + +/** + * Stores egress events for tracking requests served to customers. + * + * @type {import('sst/constructs').TableProps} + */ +export const egressTableProps = { + fields: { + /** Composite key with format: "customerId" */ + pk: 'string', + /** Composite key with format: "timestamp#customerId#resourceId" */ + sk: 'string', + /** Customer DID (did:mailto:...). */ + customerId: 'string', + /** Resource CID. */ + resourceId: 'string', + /** ISO timestamp of the event. */ + timestamp: 'string', + }, + primaryIndex: { partitionKey: 'pk', sortKey: 'sk' } +} + +/** + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {{ tableName: string }} context + * @returns {import('../lib/api.js').EgressEventStore} + */ +export const createEgressEventStore = (conf, { tableName }) => ({ + ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) +})