Skip to content

Commit

Permalink
feat: Egress Traffic Tracking + Stripe Billing Meters (#430)
Browse files Browse the repository at this point in the history
### Egress Traffic Tracking and Stripe Billing Meters API Integration

This PR introduces an asynchronous solution for tracking egress traffic
in the Freeway Gateway and reporting events to Stripe’s Billing Meters
API.

### RFC Reference

Storacha Network is implementing a scalable, automated mechanism for
tracking egress traffic and updating Stripe’s API with relevant data to
ensure accurate customer billing. This
[RFC](https://github.com/storacha/RFC/blob/rfc/egress-tracking/rfc/egress-traffic.md)
outlines the proposed approaches and their trade-offs.

### Implementation

The selected approach for this implementation is **Alternative 3**:  
- [Freeway Worker invoking `usage/record`, SQS, Lambda with Stripe
Integration](https://github.com/storacha/RFC/blob/rfc/egress-tracking/rfc/egress-traffic.md#alternative-3-freeway-worker-invoking-usagerecord-sqs-lambda-with-stripe-integration)

### Flow

1. To avoid blocking, the Freeway Gateway asynchronously invokes the
`usage/record` capability using `ctx.waitUntil`.
2. A new handler added to `w3infra/upload-api/stores/usage.js` receives
the events from the Kinesis stream and places the egress data into an
Egress Traffic SQS queue.
3. A new Lambda function consumes the events from the SQS queue and
publishes the egress data to the Stripe Billing Meters API.

```mermaid
graph TD
    CF[Freeway Gateway] --> w3infra[w3infra/upload-api]
    w3infra --> Kinesis[Kinesis Stream]
    Kinesis --> UsageRecord[Usage Record Handler]
    UsageRecord --> SQS[Egress Traffic Queue]
    SQS --> Lambda[Lambda Function]
    Lambda --> DynamoDB[Egress Traffic DynamoDB Table]
    Lambda --> Stripe[Stripe Billing Meters API]
    UsageRecord --> Logs[S3 Bucket /Receipts]
```

### Key Advantages

- **Asynchronous execution:** `ctx.waitUntil` ensures the Freeway Worker
won’t block, allowing seamless egress traffic handling.
- **Decoupled architecture:** By using SQS and Lambda, the architecture
is decoupled and includes a buffer before publishing events to Stripe.
- **Signed receipts:** Each egress event generates a signed receipt,
providing auditability and traceability. The receipts are stored in the
general bucket in S3 where we already store other receipts.

### Summary of Changes

- Introduces the `usage/record` capability handler in
`w3infra/upload-api/stores/usage.js` to handle egress data from the
Kinesis stream and place them into the SQS queue.
- Implements an event-driven architecture using SQS and Lambda to
process egress events asynchronously and publish to Stripe.
- An integration test that publishes fake egress traffic events on
Stripe’s Test API.

### Next Steps

- Deployment (Staging)
- Validate the solution’s performance and scalability.
- Monitor for potential rate limit issues when publishing to Stripe.
  • Loading branch information
fforbeck authored Oct 30, 2024
1 parent fb2e99d commit 85cf83a
Show file tree
Hide file tree
Showing 28 changed files with 764 additions and 134 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ jobs:
AWS_ACCESS_KEY_ID: 'NOSUCH'
AWS_SECRET_ACCESS_KEY: 'NOSUCH'
STRIPE_TEST_SECRET_KEY: ${{ secrets.STRIPE_TEST_SECRET_KEY }}
STRIPE_BILLING_METER_ID: ${{ vars.STRIPE_BILLING_METER_ID }}
STRIPE_BILLING_METER_EVENT_NAME: ${{ vars.STRIPE_BILLING_METER_EVENT_NAME }}
110 changes: 110 additions & 0 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Link } from '@ucanto/server'
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

/**
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficData> } EgressTrafficStoreRecord
* @typedef { import('../types').InferStoreRecord<import('../lib/api').EgressTrafficEventListKey> } EgressTrafficKeyStoreRecord
*/

export const egressSchema = Schema.struct({
space: Schema.did({ method: 'key' }),
customer: Schema.did({ method: 'mailto' }),
resource: Schema.link(),
bytes: Schema.number(),
servedAt: Schema.date(),
cause: Schema.link(),
})

/** @type {import('../lib/api').Validator<import('../lib/api').EgressTrafficData>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, EgressTrafficStoreRecord>} */
export const encode = input => {
try {
return {
ok: {
space: input.space.toString(),
customer: input.customer.toString(),
resource: input.resource.toString(),
bytes: Number(input.bytes),
servedAt: input.servedAt.toISOString(),
cause: input.cause.toString(),
}
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, string>} */
export const encodeStr = input => {
try {
const data = encode(input)
if (data.error) throw data.error
return { ok: JSON.stringify(data.ok) }
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<import('../types.js').StoreRecord, import('../lib/api').EgressTrafficData>} */
export const decode = input => {
try {
return {
ok: {
space: Schema.did({ method: 'key' }).from(input.space),
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Link.parse(/** @type {string} */(input.resource)),
bytes: Number(input.bytes),
servedAt: new Date(input.servedAt),
cause: Link.parse(/** @type {string} */(input.cause)),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<string, import('../lib/api').EgressTrafficData>} */
export const decodeStr = input => {
try {
return decode(JSON.parse(input))
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err })
}
}
}

export const lister = {
/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficEventListKey, EgressTrafficKeyStoreRecord>} */
encodeKey: input => ({
ok: {
space: input.space.toString(),
customer: input.customer.toString(),
from: input.from.toISOString()
}
}),
/** @type {import('../lib/api').Decoder<EgressTrafficKeyStoreRecord, import('../lib/api').EgressTrafficEventListKey>} */
decodeKey: input => {
try {
return {
ok: {
space: Schema.did({ method: 'key' }).from(input.space),
customer: Schema.did({ method: 'mailto' }).from(input.customer),
from: new Date(input.from)
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress traffic event list key: ${err.message}`, { cause: err })
}
}
}
}
95 changes: 95 additions & 0 deletions billing/functions/egress-traffic-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import * as Sentry from '@sentry/serverless'
import { expect } from './lib.js'
import { decodeStr } from '../data/egress.js'
import { mustGetEnv } from '../../lib/env.js'
import { createCustomerStore } from '../tables/customer.js'
import Stripe from 'stripe'
import { Config } from 'sst/node/config'
import { recordBillingMeterEvent } from '../utils/stripe.js'
import { createEgressTrafficEventStore } from '../tables/egress-traffic.js'


Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
})

/**
* @typedef {{
* region?: 'us-west-2'|'us-east-2'
* egressTrafficQueueUrl?: string
* customerTable?: string
* billingMeterName?: string
* stripeSecretKey?: string
* customerStore?: import('../lib/api.js').CustomerStore
* egressTrafficTable?: string
* egressTrafficEventStore?: import('../lib/api.js').EgressTrafficEventStore
* }} CustomHandlerContext
*/

/**
* AWS Lambda handler to process egress events from the egress traffic queue.
* Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`.
* The message is then deleted from the queue when successful.
*/
export const handler = Sentry.AWSLambda.wrapHandler(
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/
async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable })
const egressTrafficTable = customContext?.egressTrafficTable ?? mustGetEnv('EGRESS_TRAFFIC_TABLE_NAME')
const egressTrafficEventStore = customContext?.egressTrafficEventStore ?? createEgressTrafficEventStore({ region }, { tableName: egressTrafficTable })

const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')

const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME')
if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME')

const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })
const batchItemFailures = []
for (const record of event.Records) {
try {
const decoded = decodeStr(record.body)
const egressData = expect(decoded, 'Failed to decode egress event')

const putResult = await egressTrafficEventStore.put(egressData)
if (putResult.error) throw putResult.error

const response = await customerStore.get({ customer: egressData.customer })
if (response.error) {
return {
error: {
name: 'CustomerNotFound',
message: `Error getting customer ${egressData.customer}`,
cause: response.error
}
}
}
const customerAccount = response.ok.account

expect(
await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount),
`Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}`
)
} catch (error) {
console.error('Error processing egress event:', error)
batchItemFailures.push({ itemIdentifier: record.messageId })
}
}

return {
statusCode: 200,
body: 'Egress events processed successfully',
// Return the failed records so they can be retried
batchItemFailures
}
},
)
35 changes: 34 additions & 1 deletion billing/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface'
import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface'

// Billing stores /////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -133,6 +133,11 @@ export interface UsageListKey { customer: CustomerDID, from: Date }

export type UsageStore = StorePutter<Usage>

/**
* Store for egress traffic data.
*/
export type EgressTrafficEventStore = StorePutter<EgressTrafficData> & StoreLister<EgressTrafficEventListKey, EgressTrafficData>

// Billing queues /////////////////////////////////////////////////////////////

/**
Expand All @@ -158,6 +163,34 @@ export interface CustomerBillingInstruction {

export type CustomerBillingQueue = QueueAdder<CustomerBillingInstruction>

/**
* Captures details about egress traffic that should be billed for a given period
*/
export interface EgressTrafficData {
/** Space DID (did:key:...). */
space: ConsumerDID
/** Customer DID (did:mailto:...). */
customer: CustomerDID
/** Resource that was served. */
resource: UnknownLink
/** Number of bytes that were served. */
bytes: number
/** Time the egress traffic was served at. */
servedAt: Date
/** UCAN invocation IDthat caused the egress traffic. */
cause: UnknownLink
}

/**
* Queue for egress traffic data.
*/
export type EgressTrafficQueue = QueueAdder<EgressTrafficData>

/**
* List key for egress traffic data.
*/
export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date }

/**
* Captures details about a space that should be billed for a given customer in
* the given period of usage.
Expand Down
3 changes: 2 additions & 1 deletion billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"type": "module",
"scripts": {
"test": "entail '**/*.spec.js'",
"test-only": "entail",
"coverage": "c8 -r text -r html npm test"
},
"dependencies": {
Expand All @@ -13,7 +14,7 @@
"@sentry/serverless": "^7.74.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/capabilities": "^17.1.1",
"@web3-storage/capabilities": "^17.4.0",
"big.js": "^6.2.1",
"lru-cache": "^11.0.0",
"multiformats": "^13.1.0",
Expand Down
9 changes: 9 additions & 0 deletions billing/queues/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createQueueAdderClient } from './client.js'
import { encodeStr, validate } from '../data/egress.js'

/**
* @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf
* @param {{ url: URL }} context
*/
export const createEgressTrafficQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode: encodeStr, validate })
42 changes: 42 additions & 0 deletions billing/tables/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createStorePutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/egress.js'

/**
* Source of truth for egress traffic data.
*
* @type {import('sst/constructs').TableProps}
*/
export const egressTrafficTableProps = {
fields: {
/** Space DID (did:key:...). */
space: 'string',
/** Customer DID (did:mailto:...). */
customer: 'string',
/** Resource CID. */
resource: 'string',
/** ISO timestamp of the event. */
servedAt: 'string',
/** Bytes served. */
bytes: 'number',
/** UCAN invocation ID that caused the egress traffic. */
cause: 'string',
},
primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' },
globalIndexes: {
customer: {
partitionKey: 'customer',
sortKey: 'servedAt',
projection: ['space', 'resource', 'bytes', 'cause', 'servedAt']
}
}
}

/**
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {{ tableName: string }} context
* @returns {import('../lib/api.js').EgressTrafficEventStore}
*/
export const createEgressTrafficEventStore = (conf, { tableName }) => ({
...createStorePutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
})
Loading

0 comments on commit 85cf83a

Please sign in to comment.