From 85cf83a94593771a0da143655ba5efe6434e9f42 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Wed, 30 Oct 2024 15:36:46 -0300 Subject: [PATCH] feat: Egress Traffic Tracking + Stripe Billing Meters (#430) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Egress Traffic Tracking and Stripe Billing Meters API Integration This PR introduces an asynchronous solution for tracking egress traffic in the Freeway Gateway and reporting events to Stripe’s Billing Meters API. ### RFC Reference Storacha Network is implementing a scalable, automated mechanism for tracking egress traffic and updating Stripe’s API with relevant data to ensure accurate customer billing. This [RFC](https://github.com/storacha/RFC/blob/rfc/egress-tracking/rfc/egress-traffic.md) outlines the proposed approaches and their trade-offs. ### Implementation The selected approach for this implementation is **Alternative 3**: - [Freeway Worker invoking `usage/record`, SQS, Lambda with Stripe Integration](https://github.com/storacha/RFC/blob/rfc/egress-tracking/rfc/egress-traffic.md#alternative-3-freeway-worker-invoking-usagerecord-sqs-lambda-with-stripe-integration) ### Flow 1. To avoid blocking, the Freeway Gateway asynchronously invokes the `usage/record` capability using `ctx.waitUntil`. 2. A new handler added to `w3infra/upload-api/stores/usage.js` receives the events from the Kinesis stream and places the egress data into an Egress Traffic SQS queue. 3. A new Lambda function consumes the events from the SQS queue and publishes the egress data to the Stripe Billing Meters API. ```mermaid graph TD CF[Freeway Gateway] --> w3infra[w3infra/upload-api] w3infra --> Kinesis[Kinesis Stream] Kinesis --> UsageRecord[Usage Record Handler] UsageRecord --> SQS[Egress Traffic Queue] SQS --> Lambda[Lambda Function] Lambda --> DynamoDB[Egress Traffic DynamoDB Table] Lambda --> Stripe[Stripe Billing Meters API] UsageRecord --> Logs[S3 Bucket /Receipts] ``` ### Key Advantages - **Asynchronous execution:** `ctx.waitUntil` ensures the Freeway Worker won’t block, allowing seamless egress traffic handling. - **Decoupled architecture:** By using SQS and Lambda, the architecture is decoupled and includes a buffer before publishing events to Stripe. - **Signed receipts:** Each egress event generates a signed receipt, providing auditability and traceability. The receipts are stored in the general bucket in S3 where we already store other receipts. ### Summary of Changes - Introduces the `usage/record` capability handler in `w3infra/upload-api/stores/usage.js` to handle egress data from the Kinesis stream and place them into the SQS queue. - Implements an event-driven architecture using SQS and Lambda to process egress events asynchronously and publish to Stripe. - An integration test that publishes fake egress traffic events on Stripe’s Test API. ### Next Steps - Deployment (Staging) - Validate the solution’s performance and scalability. - Monitor for potential rate limit issues when publishing to Stripe. --- .github/workflows/test.yaml | 2 + billing/data/egress.js | 110 +++++++++++++ billing/functions/egress-traffic-queue.js | 95 +++++++++++ billing/lib/api.ts | 35 +++- billing/package.json | 3 +- billing/queues/egress-traffic.js | 9 + billing/tables/egress-traffic.js | 42 +++++ billing/test/helpers/context.js | 84 ++++++++++ billing/test/helpers/did.js | 2 +- billing/test/helpers/egress.js | 16 ++ billing/test/lib.egress-traffic.spec.js | 4 + billing/test/lib/api.ts | 25 ++- billing/test/lib/egress-traffic.js | 109 +++++++++++++ billing/test/utils/stripe.js | 17 +- billing/utils/stripe.js | 61 +++++++ filecoin/test/filecoin-events.test.js | 13 +- filecoin/test/filecoin-service.test.js | 13 +- package-lock.json | 154 ++++++------------ package.json | 6 +- stacks/billing-db-stack.js | 7 +- stacks/billing-stack.js | 34 +++- stacks/upload-api-stack.js | 7 +- .../functions/ucan-invocation-router.js | 6 +- upload-api/package.json | 4 +- upload-api/stores/provisions.js | 3 +- upload-api/stores/usage.js | 32 +++- upload-api/tables/consumer.js | 3 +- upload-api/types.ts | 2 +- 28 files changed, 764 insertions(+), 134 deletions(-) create mode 100644 billing/data/egress.js create mode 100644 billing/functions/egress-traffic-queue.js create mode 100644 billing/queues/egress-traffic.js create mode 100644 billing/tables/egress-traffic.js create mode 100644 billing/test/helpers/egress.js create mode 100644 billing/test/lib.egress-traffic.spec.js create mode 100644 billing/test/lib/egress-traffic.js diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index b968740b..773a4ec4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -25,3 +25,5 @@ jobs: AWS_ACCESS_KEY_ID: 'NOSUCH' AWS_SECRET_ACCESS_KEY: 'NOSUCH' STRIPE_TEST_SECRET_KEY: ${{ secrets.STRIPE_TEST_SECRET_KEY }} + STRIPE_BILLING_METER_ID: ${{ vars.STRIPE_BILLING_METER_ID }} + STRIPE_BILLING_METER_EVENT_NAME: ${{ vars.STRIPE_BILLING_METER_EVENT_NAME }} diff --git a/billing/data/egress.js b/billing/data/egress.js new file mode 100644 index 00000000..3e7b630f --- /dev/null +++ b/billing/data/egress.js @@ -0,0 +1,110 @@ +import { Link } from '@ucanto/server' +import { DecodeFailure, EncodeFailure, Schema } from './lib.js' + +/** + * @typedef { import('../types').InferStoreRecord } EgressTrafficStoreRecord + * @typedef { import('../types').InferStoreRecord } EgressTrafficKeyStoreRecord + */ + +export const egressSchema = Schema.struct({ + space: Schema.did({ method: 'key' }), + customer: Schema.did({ method: 'mailto' }), + resource: Schema.link(), + bytes: Schema.number(), + servedAt: Schema.date(), + cause: Schema.link(), +}) + +/** @type {import('../lib/api').Validator} */ +export const validate = input => egressSchema.read(input) + +/** @type {import('../lib/api').Encoder} */ +export const encode = input => { + try { + return { + ok: { + space: input.space.toString(), + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: Number(input.bytes), + servedAt: input.servedAt.toISOString(), + cause: input.cause.toString(), + } + } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Encoder} */ +export const encodeStr = input => { + try { + const data = encode(input) + if (data.error) throw data.error + return { ok: JSON.stringify(data.ok) } + } catch (/** @type {any} */ err) { + return { + error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decode = input => { + try { + return { + ok: { + space: Schema.did({ method: 'key' }).from(input.space), + customer: Schema.did({ method: 'mailto' }).from(input.customer), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: Number(input.bytes), + servedAt: new Date(input.servedAt), + cause: Link.parse(/** @type {string} */(input.cause)), + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err }) + } + } +} + +/** @type {import('../lib/api').Decoder} */ +export const decodeStr = input => { + try { + return decode(JSON.parse(input)) + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err }) + } + } +} + +export const lister = { + /** @type {import('../lib/api').Encoder} */ + encodeKey: input => ({ + ok: { + space: input.space.toString(), + customer: input.customer.toString(), + from: input.from.toISOString() + } + }), + /** @type {import('../lib/api').Decoder} */ + decodeKey: input => { + try { + return { + ok: { + space: Schema.did({ method: 'key' }).from(input.space), + customer: Schema.did({ method: 'mailto' }).from(input.customer), + from: new Date(input.from) + } + } + } catch (/** @type {any} */ err) { + return { + error: new DecodeFailure(`decoding egress traffic event list key: ${err.message}`, { cause: err }) + } + } + } +} \ No newline at end of file diff --git a/billing/functions/egress-traffic-queue.js b/billing/functions/egress-traffic-queue.js new file mode 100644 index 00000000..34a565f0 --- /dev/null +++ b/billing/functions/egress-traffic-queue.js @@ -0,0 +1,95 @@ +import * as Sentry from '@sentry/serverless' +import { expect } from './lib.js' +import { decodeStr } from '../data/egress.js' +import { mustGetEnv } from '../../lib/env.js' +import { createCustomerStore } from '../tables/customer.js' +import Stripe from 'stripe' +import { Config } from 'sst/node/config' +import { recordBillingMeterEvent } from '../utils/stripe.js' +import { createEgressTrafficEventStore } from '../tables/egress-traffic.js' + + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0 +}) + +/** + * @typedef {{ + * region?: 'us-west-2'|'us-east-2' + * egressTrafficQueueUrl?: string + * customerTable?: string + * billingMeterName?: string + * stripeSecretKey?: string + * customerStore?: import('../lib/api.js').CustomerStore + * egressTrafficTable?: string + * egressTrafficEventStore?: import('../lib/api.js').EgressTrafficEventStore + * }} CustomHandlerContext + */ + +/** + * AWS Lambda handler to process egress events from the egress traffic queue. + * Each event is a JSON object with `customer`, `resource`, `bytes` and `servedAt`. + * The message is then deleted from the queue when successful. + */ +export const handler = Sentry.AWSLambda.wrapHandler( + /** + * @param {import('aws-lambda').SQSEvent} event + * @param {import('aws-lambda').Context} context + */ + async (event, context) => { + /** @type {CustomHandlerContext|undefined} */ + const customContext = context?.clientContext?.Custom + const region = customContext?.region ?? mustGetEnv('AWS_REGION') + const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) + const egressTrafficTable = customContext?.egressTrafficTable ?? mustGetEnv('EGRESS_TRAFFIC_TABLE_NAME') + const egressTrafficEventStore = customContext?.egressTrafficEventStore ?? createEgressTrafficEventStore({ region }, { tableName: egressTrafficTable }) + + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) + const batchItemFailures = [] + for (const record of event.Records) { + try { + const decoded = decodeStr(record.body) + const egressData = expect(decoded, 'Failed to decode egress event') + + const putResult = await egressTrafficEventStore.put(egressData) + if (putResult.error) throw putResult.error + + const response = await customerStore.get({ customer: egressData.customer }) + if (response.error) { + return { + error: { + name: 'CustomerNotFound', + message: `Error getting customer ${egressData.customer}`, + cause: response.error + } + } + } + const customerAccount = response.ok.account + + expect( + await recordBillingMeterEvent(stripe, billingMeterName, egressData, customerAccount), + `Failed to record egress event in Stripe API for customer: ${egressData.customer}, account: ${customerAccount}, bytes: ${egressData.bytes}, servedAt: ${egressData.servedAt.toISOString()}, resource: ${egressData.resource}` + ) + } catch (error) { + console.error('Error processing egress event:', error) + batchItemFailures.push({ itemIdentifier: record.messageId }) + } + } + + return { + statusCode: 200, + body: 'Egress events processed successfully', + // Return the failed records so they can be retried + batchItemFailures + } + }, +) \ No newline at end of file diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 22df39de..1eafd920 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -1,4 +1,4 @@ -import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure } from '@ucanto/interface' +import { DID, Link, URI, LinkJSON, Result, Capabilities, Unit, Failure, UnknownLink } from '@ucanto/interface' // Billing stores ///////////////////////////////////////////////////////////// @@ -133,6 +133,11 @@ export interface UsageListKey { customer: CustomerDID, from: Date } export type UsageStore = StorePutter +/** + * Store for egress traffic data. + */ +export type EgressTrafficEventStore = StorePutter & StoreLister + // Billing queues ///////////////////////////////////////////////////////////// /** @@ -158,6 +163,34 @@ export interface CustomerBillingInstruction { export type CustomerBillingQueue = QueueAdder +/** + * Captures details about egress traffic that should be billed for a given period + */ +export interface EgressTrafficData { + /** Space DID (did:key:...). */ + space: ConsumerDID + /** Customer DID (did:mailto:...). */ + customer: CustomerDID + /** Resource that was served. */ + resource: UnknownLink + /** Number of bytes that were served. */ + bytes: number + /** Time the egress traffic was served at. */ + servedAt: Date + /** UCAN invocation IDthat caused the egress traffic. */ + cause: UnknownLink +} + +/** + * Queue for egress traffic data. + */ +export type EgressTrafficQueue = QueueAdder + +/** + * List key for egress traffic data. + */ +export interface EgressTrafficEventListKey { space: ConsumerDID, customer: CustomerDID, from: Date } + /** * Captures details about a space that should be billed for a given customer in * the given period of usage. diff --git a/billing/package.json b/billing/package.json index b0a96027..6bef96da 100644 --- a/billing/package.json +++ b/billing/package.json @@ -4,6 +4,7 @@ "type": "module", "scripts": { "test": "entail '**/*.spec.js'", + "test-only": "entail", "coverage": "c8 -r text -r html npm test" }, "dependencies": { @@ -13,7 +14,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", diff --git a/billing/queues/egress-traffic.js b/billing/queues/egress-traffic.js new file mode 100644 index 00000000..173b79d2 --- /dev/null +++ b/billing/queues/egress-traffic.js @@ -0,0 +1,9 @@ +import { createQueueAdderClient } from './client.js' +import { encodeStr, validate } from '../data/egress.js' + +/** + * @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf + * @param {{ url: URL }} context + */ +export const createEgressTrafficQueue = (conf, { url }) => + createQueueAdderClient(conf, { url, encode: encodeStr, validate }) diff --git a/billing/tables/egress-traffic.js b/billing/tables/egress-traffic.js new file mode 100644 index 00000000..fadd160c --- /dev/null +++ b/billing/tables/egress-traffic.js @@ -0,0 +1,42 @@ +import { createStorePutterClient, createStoreListerClient } from './client.js' +import { validate, encode, lister, decode } from '../data/egress.js' + +/** + * Source of truth for egress traffic data. + * + * @type {import('sst/constructs').TableProps} + */ +export const egressTrafficTableProps = { + fields: { + /** Space DID (did:key:...). */ + space: 'string', + /** Customer DID (did:mailto:...). */ + customer: 'string', + /** Resource CID. */ + resource: 'string', + /** ISO timestamp of the event. */ + servedAt: 'string', + /** Bytes served. */ + bytes: 'number', + /** UCAN invocation ID that caused the egress traffic. */ + cause: 'string', + }, + primaryIndex: { partitionKey: 'space', sortKey: 'servedAt' }, + globalIndexes: { + customer: { + partitionKey: 'customer', + sortKey: 'servedAt', + projection: ['space', 'resource', 'bytes', 'cause', 'servedAt'] + } + } +} + +/** + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {{ tableName: string }} context + * @returns {import('../lib/api.js').EgressTrafficEventStore} + */ +export const createEgressTrafficEventStore = (conf, { tableName }) => ({ + ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) +}) \ No newline at end of file diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 478c5d1f..ea321a73 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -1,3 +1,5 @@ +import dotenv from 'dotenv' +import path from 'node:path' import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js' import { createCustomerStore, customerTableProps } from '../../tables/customer.js' import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js' @@ -6,6 +8,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js' import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js' import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js' +import { decodeStr as decodeEgressTrafficEvent, validate as validateEgressTrafficEvent, encode as encodeEgressTrafficEvent } from '../../data/egress.js' import { createCustomerBillingQueue } from '../../queues/customer.js' import { createSpaceBillingQueue } from '../../queues/space.js' import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js' @@ -16,6 +19,12 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js' import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' +import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' +import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-queue.js' +import Stripe from 'stripe' +import { createEgressTrafficEventStore, egressTrafficTableProps } from '../../tables/egress-traffic.js' + +dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) /** * @typedef {{ @@ -33,6 +42,26 @@ const createAWSServices = async () => { } } +/** + * @returns {{ stripe: Stripe, stripeSecretKey: string, billingMeterEventName: string, billingMeterId: string }} + */ +const createStripeService = () => { + const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY + if (!stripeSecretKey) { + throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') + } + const billingMeterEventName = process.env.STRIPE_BILLING_METER_EVENT_NAME + if (!billingMeterEventName) { + throw new Error('STRIPE_BILLING_METER_EVENT_NAME environment variable is not set') + } + const billingMeterId = process.env.STRIPE_BILLING_METER_ID + if (!billingMeterId) { + throw new Error('STRIPE_BILLING_METER_ID environment variable is not set') + } + const stripe = new Stripe(stripeSecretKey, { apiVersion: "2023-10-16" }) + return { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } +} + export const createBillingCronTestContext = async () => { await createAWSServices() @@ -137,6 +166,61 @@ export const createUCANStreamTestContext = async () => { return { consumerStore, spaceDiffStore } } +/** + * @returns {Promise} + */ +export const createEgressTrafficTestContext = async () => { + await createAWSServices() + + const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-')) + const egressTrafficQueue = { + add: createEgressTrafficQueue(awsServices.sqs.client, { url: egressQueueURL }).add, + remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressTrafficEvent }).remove, + } + + const accountId = (await awsServices.sqs.client.config.credentials()).accountId + const region = 'us-west-2' + + const customerTable = await createTable(awsServices.dynamo.client, customerTableProps, 'customer-') + const customerStore = { + ...createCustomerStore(awsServices.dynamo.client, { tableName: customerTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: customerTable, + validate: validateCustomer, // assume test data is valid + encode: encodeCustomer + }) + } + + const egressTrafficTable = await createTable(awsServices.dynamo.client, egressTrafficTableProps, 'egress-traffic-') + const egressTrafficEventStore = { + ...createEgressTrafficEventStore(awsServices.dynamo.client, { tableName: egressTrafficTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: egressTrafficTable, + validate: validateEgressTrafficEvent, // assume test data is valid + encode: encodeEgressTrafficEvent + }) + } + + const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService() + + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + return { + egressTrafficQueue, + egressTrafficQueueUrl: egressQueueURL.toString(), + egressTrafficHandler: createEgressTrafficHandler, + accountId: accountId ?? '', + region: region ?? '', + customerTable, + customerStore, + egressTrafficTable, + egressTrafficEventStore, + billingMeterEventName, + billingMeterId, + stripeSecretKey, + stripe, + } +} + /** * @template C * @param {import('../lib/api').TestSuite} suite diff --git a/billing/test/helpers/did.js b/billing/test/helpers/did.js index c58fb9e7..3f0bc759 100644 --- a/billing/test/helpers/did.js +++ b/billing/test/helpers/did.js @@ -8,7 +8,7 @@ const randomDomain = () => `${randomAlphas(randomInteger(1, 32))}.${tlds[randomInteger(0, tlds.length)]}` /** @returns {import("@ucanto/interface").DID<'mailto'>} */ -export const randomDIDMailto = () => +export const randomDIDMailto = () => `did:mailto:${randomDomain()}:${randomAlphas(randomInteger(1, 16))}` /** @returns {Promise} */ diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js new file mode 100644 index 00000000..f6f5cc9d --- /dev/null +++ b/billing/test/helpers/egress.js @@ -0,0 +1,16 @@ +import { randomLink } from './dag.js' +import { randomDID } from './did.js' + +/** + * @param {import('../../lib/api').Customer} customer + * @returns {Promise} + */ +export const randomEgressEvent = async (customer) => ({ + space: await randomDID(), + customer: customer.customer, + resource: randomLink(), + bytes: Math.floor(Math.random() * 1000000), + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), + cause: randomLink() +}) diff --git a/billing/test/lib.egress-traffic.spec.js b/billing/test/lib.egress-traffic.spec.js new file mode 100644 index 00000000..306b57a5 --- /dev/null +++ b/billing/test/lib.egress-traffic.spec.js @@ -0,0 +1,4 @@ +import * as EgressTrafficSuite from './lib/egress-traffic.js' +import { bindTestContext, createEgressTrafficTestContext } from './helpers/context.js' + +export const test = bindTestContext(EgressTrafficSuite.test, createEgressTrafficTestContext) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 68814ca9..317a8834 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -18,8 +18,13 @@ import { SpaceSnapshotStore, UsageStore, UsageListKey, - Usage + Usage, + EgressTrafficQueue, + EgressTrafficData, + EgressTrafficEventStore } from '../../lib/api.js' +import { Context, Handler, SQSEvent } from 'aws-lambda' +import Stripe from 'stripe' export interface BillingCronTestContext { customerStore: CustomerStore & StorePutter @@ -47,12 +52,30 @@ export interface UCANStreamTestContext { consumerStore: ConsumerStore & StorePutter } + +export interface EgressTrafficTestContext extends Context { + egressTrafficQueue: EgressTrafficQueue & QueueRemover + egressTrafficQueueUrl: string + egressTrafficHandler: Handler + accountId: string + region: string + customerTable: string + customerStore: CustomerStore + egressTrafficTable: string + egressTrafficEventStore: EgressTrafficEventStore + billingMeterEventName: string + billingMeterId: string + stripeSecretKey: string + stripe: Stripe +} + export type TestContext = & BillingCronTestContext & CustomerBillingQueueTestContext & SpaceBillingQueueTestContext & StripeTestContext & UCANStreamTestContext + & EgressTrafficTestContext /** QueueRemover can remove items from the head of the queue. */ export interface QueueRemover { diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js new file mode 100644 index 00000000..a5e27009 --- /dev/null +++ b/billing/test/lib/egress-traffic.js @@ -0,0 +1,109 @@ +import { encodeStr } from '../../data/egress.js' +import { randomCustomer } from '../helpers/customer.js' +import { randomDIDMailto } from '../helpers/did.js' +import { randomEgressEvent } from '../helpers/egress.js' +import * as DidMailto from '@web3-storage/did-mailto' + +/** @type {import('./api').TestSuite} */ +export const test = { + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id + + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') + + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + async () => await randomEgressEvent(customer) + ) + ) + + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } + + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encodeStr(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) + } + + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) + + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), + } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } + } + } +} \ No newline at end of file diff --git a/billing/test/utils/stripe.js b/billing/test/utils/stripe.js index 72828296..2b7c6206 100644 --- a/billing/test/utils/stripe.js +++ b/billing/test/utils/stripe.js @@ -1,4 +1,7 @@ -import { handleCustomerSubscriptionCreated } from '../../utils/stripe.js' +import { + accountIDToStripeCustomerID, + stripeIDToAccountID, + handleCustomerSubscriptionCreated } from '../../utils/stripe.js' import * as DidMailto from '@web3-storage/did-mailto' @@ -44,5 +47,17 @@ export const test = { assert.ok(result.ok) const customerRecord = await ctx.customerStore.get({ customer }) assert.equal(customerRecord.ok?.product, product) + }, + + 'should convert an account ID to a stripe customer ID': (/** @type {import('entail').assert} */ assert) => { + const accountID = 'stripe:cus_1234567890' + const stripeCustomerId = accountIDToStripeCustomerID(accountID) + assert.equal(stripeCustomerId, 'cus_1234567890') + }, + + 'should convert a stripe customer ID to an account ID': (/** @type {import('entail').assert} */ assert) => { + const stripeCustomerId = 'cus_1234567890' + const accountID = stripeIDToAccountID(stripeCustomerId) + assert.equal(accountID, 'stripe:cus_1234567890') } } diff --git a/billing/utils/stripe.js b/billing/utils/stripe.js index b47b3ded..5f05934d 100644 --- a/billing/utils/stripe.js +++ b/billing/utils/stripe.js @@ -7,6 +7,9 @@ import * as DidMailto from '@web3-storage/did-mailto' */ /** + * Converts a Stripe customer ID to an Account ID. + * e.g: + * cus_1234567890 -> stripe:cus_1234567890 * * @param {string} stripeID * @returns {AccountID} @@ -15,6 +18,17 @@ export function stripeIDToAccountID(stripeID) { return /** @type {AccountID} */(`stripe:${stripeID}`) } +/** + * Converts an Account ID to a Stripe customer ID. + * e.g: + * stripe:cus_1234567890 -> cus_1234567890 + * + * @param {AccountID} accountID + * @returns {string} + */ +export const accountIDToStripeCustomerID = (accountID) => accountID.slice('stripe:'.length) + + /** * * @param {Stripe} stripe @@ -50,3 +64,50 @@ export async function handleCustomerSubscriptionCreated(stripe, event, customerS }) } } + +/** + * Records an egress traffic event in the Stripe Billing Meter API for the given customer account. + * + * @param {import('stripe').Stripe} stripe + * @param {string} billingMeterEventName + * @param {import('../lib/api.js').EgressTrafficData} egressData + * @param {AccountID} customerAccount + */ +export async function recordBillingMeterEvent(stripe, billingMeterEventName, egressData, customerAccount) { + const stripeCustomerId = accountIDToStripeCustomerID(customerAccount) + /** @type {import('stripe').Stripe.Customer | import('stripe').Stripe.DeletedCustomer} */ + const stripeCustomer = await stripe.customers.retrieve(stripeCustomerId) + if (stripeCustomer.deleted) { + return { + error: { + name: 'StripeCustomerNotFound', + message: `Customer ${stripeCustomerId} has been deleted from Stripe`, + } + } + } + + /** @type {import('stripe').Stripe.Billing.MeterEvent} */ + const meterEvent = await stripe.billing.meterEvents.create({ + event_name: billingMeterEventName, + payload: { + stripe_customer_id: stripeCustomerId, + value: egressData.bytes.toString(), + }, + timestamp: Math.floor(egressData.servedAt.getTime() / 1000), + }, + { + idempotencyKey: `${egressData.servedAt.toISOString()}-${egressData.space}-${egressData.customer}-${egressData.resource}` + } + ) + + // Identifier is only set if the event was successfully created + if (meterEvent.identifier) { + return { ok: { meterEvent } } + } + return { + error: { + name: 'StripeBillingMeterEventCreationFailed', + message: `Error creating meter event for egress traffic in Stripe for customer ${egressData.customer} @ ${egressData.servedAt.toISOString()}`, + } + } +} \ No newline at end of file diff --git a/filecoin/test/filecoin-events.test.js b/filecoin/test/filecoin-events.test.js index dfd5663a..5added75 100644 --- a/filecoin/test/filecoin-events.test.js +++ b/filecoin/test/filecoin-events.test.js @@ -90,12 +90,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) diff --git a/filecoin/test/filecoin-service.test.js b/filecoin/test/filecoin-service.test.js index 7fc853e0..54d60ea1 100644 --- a/filecoin/test/filecoin-service.test.js +++ b/filecoin/test/filecoin-service.test.js @@ -91,12 +91,15 @@ test.after(async t => { }) for (const [title, unit] of Object.entries(filecoinApiTest.service.storefront)) { - const define = title.startsWith('only ') + let define; + if (title.startsWith('only ')) { // eslint-disable-next-line no-only-tests/no-only-tests - ? test.only - : title.startsWith('skip ') - ? test.skip - : test + define = test.only; + } else if (title.startsWith('skip ')) { + define = test.skip; + } else { + define = test; + } define(title, async (t) => { const queues = getQueues(t.context) diff --git a/package-lock.json b/package-lock.json index 281a02e1..5ef5964b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,12 +47,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", @@ -79,7 +79,7 @@ "@sentry/serverless": "^7.74.1", "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.4.0", "big.js": "^6.2.1", "lru-cache": "^11.0.0", "multiformats": "^13.1.0", @@ -7947,7 +7947,6 @@ "version": "9.2.1", "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-9.2.1.tgz", "integrity": "sha512-nyY48yE7r3dnJVlxrdaimrbloh4RokQaNRdI//btfTkcTEZbpmSrbYcBQ4VKTf8ZxXAOUJy4VsRpkJo+y9RTnA==", - "license": "Apache-2.0 OR MIT", "dependencies": { "cborg": "^4.0.0", "multiformats": "^13.1.0" @@ -11427,8 +11426,7 @@ "node_modules/@storacha/one-webcrypto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@storacha/one-webcrypto/-/one-webcrypto-1.0.1.tgz", - "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==", - "license": "MIT" + "integrity": "sha512-bD+vWmcgsEBqU0Dz04BR43SA03bBoLTAY29vaKasY9Oe8cb6XIP0/vkm0OS2UwKC13c8uRgFW4rjJUgDCNLejQ==" }, "node_modules/@trpc/server": { "version": "9.16.0", @@ -12204,7 +12202,6 @@ "version": "20.1.0", "resolved": "https://registry.npmjs.org/@web3-storage/access/-/access-20.1.0.tgz", "integrity": "sha512-IY6ICPRWE8++2jxvy+LzAiFvwAOIHR8cu9eNt+VT5sAFE796o4ma7GSU0eXRCiShmV2n6iSWAwWRT6XD5zIqPA==", - "license": "(Apache-2.0 OR MIT)", "dependencies": { "@ipld/car": "^5.1.1", "@ipld/dag-ucan": "^3.4.0", @@ -12236,14 +12233,15 @@ } }, "node_modules/@web3-storage/blob-index": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.3.tgz", - "integrity": "sha512-VjGLhf6Gf4ZmzjJXS6wU4aRvnM+HLcuRCJHegjQ36ka52sR2WWOcqDNNVvabtlpnYjGtVFQCPUzaCcs18wpqHQ==", + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@web3-storage/blob-index/-/blob-index-1.0.4.tgz", + "integrity": "sha512-04+PrmVHFT+xzRhyIPdcvGc8Y2NDffUe8R1gJOyErVzEVz5N1I9Q/BrlFHYt/A4HrjM5JBsxqSrZgTIkjfPmLA==", "dependencies": { "@ipld/dag-cbor": "^9.0.6", + "@storacha/one-webcrypto": "^1.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", - "@web3-storage/capabilities": "^17.1.1", + "@web3-storage/capabilities": "^17.2.0", "carstream": "^2.1.0", "multiformats": "^13.0.1", "uint8arrays": "^5.0.3" @@ -12261,10 +12259,9 @@ } }, "node_modules/@web3-storage/capabilities": { - "version": "17.3.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.3.0.tgz", - "integrity": "sha512-9415OPNVYO5gXDVf1vzZywkjndKTVA9IPnU04lQXxUaYfYZ5S5kzV2PI1SvySMOsCNE7u7uSCTiclblx5gPYAg==", - "license": "(Apache-2.0 OR MIT)", + "version": "17.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-17.4.0.tgz", + "integrity": "sha512-2VNLTTvv9qVewtXiek2Fb6W7WTQgOonq+FcNV9PyXAEgcXsQWsm8dOmbeB83W9bAuiwe9uIOzC1rftDHY3+uYA==", "dependencies": { "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", @@ -12279,7 +12276,6 @@ "version": "5.3.0", "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", - "license": "(Apache-2.0 AND MIT)", "dependencies": { "@ipld/dag-cbor": "^9.2.1", "multiformats": "^13.3.0", @@ -12357,9 +12353,9 @@ } }, "node_modules/@web3-storage/filecoin-api": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.2.0.tgz", - "integrity": "sha512-0tr+vlLXQn4vbHZR2Sxxr62fKW60TejQyH3ZG1CNCFLhLkBg4pXTYSu5rxijYg3ob8DHkejKp7hXMgPQhFzOHw==", + "version": "7.3.2", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-7.3.2.tgz", + "integrity": "sha512-DIhi6uheibt+JluLdfrCqH0xn/yZEIt+Nupf4OqhX9LBkaT30ySDraHz8yIolyFV7/8hxkSsglH1QQdM7M1F4A==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", @@ -12367,9 +12363,9 @@ "@ucanto/interface": "^10.0.1", "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.3.0", "@web3-storage/content-claims": "^5.0.0", - "@web3-storage/data-segment": "^4.0.0", + "@web3-storage/data-segment": "^5.2.0", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0", "p-map": "^6.0.0" }, @@ -12378,76 +12374,26 @@ } }, "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-4.0.0.tgz", - "integrity": "sha512-AnNyJp3wHMa7LBzguQzm4rmXSi8vQBz4uFs+jiXnSNtLR5dAqHfhMvi9XdWonWPYvxNvT5ZhYCSF0mpDjymqKg==", + "version": "5.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-5.3.0.tgz", + "integrity": "sha512-zFJ4m+pEKqtKatJNsFrk/2lHeFSbkXZ6KKXjBe7/2ayA9wAar7T/unewnOcZrrZTnCWmaxKsXWqdMFy9bXK9dw==", "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", + "@ipld/dag-cbor": "^9.2.1", + "multiformats": "^13.3.0", "sync-multihash-sha2": "^1.0.0" } }, - "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, "node_modules/@web3-storage/filecoin-client": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.3.tgz", - "integrity": "sha512-xFL8odr5PpTjQvpfw/4jphcm7ZvcBRMSKHn3ReEaVcFjxQL45Rojjleuq/QEdMwrNfsLCqqAxC54jk55o5/ERQ==", + "version": "3.3.4", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-3.3.4.tgz", + "integrity": "sha512-T2xur1NPvuH09yajyjCWEl7MBH712nqHERj51w4nDp6f8libMCKY6lca0frCrm4OC5s8oy0ZtoRFhsRYxgTzSg==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.1", "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/capabilities": "^16.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/capabilities": { - "version": "16.0.0", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-16.0.0.tgz", - "integrity": "sha512-wCjLpYc6t8tFRZrF2k2vBteJDWzHkmQjoJG0Yy/fjA04IjNN48iVZaCMQIANHXZxDGlYRGxhwzDwl4dovAdSTQ==", - "dependencies": { - "@ucanto/core": "^10.0.1", - "@ucanto/interface": "^10.0.1", - "@ucanto/principal": "^9.0.1", - "@ucanto/transport": "^9.1.1", - "@ucanto/validator": "^9.0.2", - "@web3-storage/data-segment": "^3.2.0", - "uint8arrays": "^5.0.3" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-3.2.0.tgz", - "integrity": "sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==", - "dependencies": { - "@ipld/dag-cbor": "^9.0.5", - "multiformats": "^11.0.2", - "sync-multihash-sha2": "^1.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/@web3-storage/data-segment/node_modules/multiformats": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-11.0.2.tgz", - "integrity": "sha512-b5mYMkOkARIuVZCpvijFj9a6m5wMVLC7cf/jIPd5D/ARDOfLC5+IFkbgDXQgcU2goIsTD/O9NY4DI/Mt4OGvlg==", - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, - "node_modules/@web3-storage/filecoin-client/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" + "@web3-storage/capabilities": "^17.3.0" } }, "node_modules/@web3-storage/multipart-parser": { @@ -12466,9 +12412,9 @@ } }, "node_modules/@web3-storage/upload-api": { - "version": "18.0.2", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.0.2.tgz", - "integrity": "sha512-IxV95h8/kb2OpwPSv8/Rew1xBfSnW9s8DL4y1r3cjN57n35XXLnsP0to4YU+FuFo6MKFNx9Yu0UrW+7GowC2vw==", + "version": "18.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-18.1.0.tgz", + "integrity": "sha512-+28NInlExKJ7ltOrN6o6jeHT4p7fX4TJFRkgCWIcImvVTfIQQPE8z8YKOshokAXPNZebQFRo7a1UrtAIbEg4mA==", "dependencies": { "@ucanto/client": "^9.0.1", "@ucanto/interface": "^10.0.1", @@ -12476,12 +12422,12 @@ "@ucanto/server": "^10.0.0", "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-api": "^7.1.1", + "@web3-storage/filecoin-api": "^7.3.2", "multiformats": "^12.1.2", "uint8arrays": "^5.0.3" }, @@ -12512,9 +12458,9 @@ "integrity": "sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==" }, "node_modules/@web3-storage/upload-client": { - "version": "16.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-16.1.1.tgz", - "integrity": "sha512-aVGpcqnLxRk4u3uZAum1jwC5BbEbjuqAZZBrGNZ3UZVFnnJJXPm3DE+r1WC8FV2mbgC/yKKqDMu1FP1uB9wJkA==", + "version": "17.1.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-client/-/upload-client-17.1.0.tgz", + "integrity": "sha512-0tUMe4Ez9gmUZjgn1Nrl6HYdGEsYyeLa6JrpoXcCGTQDBW2FehALc+GZZeoIjYQexRpw+qt9JstuJNN9dUNETw==", "dev": true, "dependencies": { "@ipld/car": "^5.2.2", @@ -12525,10 +12471,10 @@ "@ucanto/core": "^10.0.1", "@ucanto/interface": "^10.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/data-segment": "^5.1.0", - "@web3-storage/filecoin-client": "^3.3.3", + "@web3-storage/filecoin-client": "^3.3.4", "ipfs-utils": "^9.0.14", "multiformats": "^12.1.2", "p-retry": "^5.1.2", @@ -12600,9 +12546,9 @@ "link": true }, "node_modules/@web3-storage/w3up-client": { - "version": "14.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-14.1.1.tgz", - "integrity": "sha512-brBmN1aCJpjtNLwWpz0Jg1OkfsYs7r27m5VMOET9x+j4jy3DCtSqlUkHFIumLYrNhUn5oZrqh0kbWokcaUvrLg==", + "version": "16.4.0", + "resolved": "https://registry.npmjs.org/@web3-storage/w3up-client/-/w3up-client-16.4.0.tgz", + "integrity": "sha512-ndHXVufBt6bVyZHHxpe+bgS9bnOtpREsYFtL/C2h/AL8O5LZ1QBM91wFHVvLhFOJcKVcbW2WagyivCr+NOp9WA==", "dev": true, "dependencies": { "@ipld/dag-ucan": "^3.4.0", @@ -12611,12 +12557,12 @@ "@ucanto/interface": "^10.0.1", "@ucanto/principal": "^9.0.1", "@ucanto/transport": "^9.1.1", - "@web3-storage/access": "^20.0.0", - "@web3-storage/blob-index": "^1.0.3", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/access": "^20.1.0", + "@web3-storage/blob-index": "^1.0.4", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.1" + "@web3-storage/filecoin-client": "^3.3.4", + "@web3-storage/upload-client": "^17.1.0" }, "engines": { "node": ">=18" @@ -30353,9 +30299,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", diff --git a/package.json b/package.json index 3395c5a5..77d06ac8 100644 --- a/package.json +++ b/package.json @@ -40,12 +40,12 @@ "@web-std/fetch": "^4.1.0", "@web3-storage/access": "^20.1.0", "@web3-storage/blob-index": "^1.0.2", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/content-claims": "^5.1.0", "@web3-storage/data-segment": "5.1.0", "@web3-storage/filecoin-client": "^3.3.3", - "@web3-storage/upload-client": "^16.1.0", - "@web3-storage/w3up-client": "^14.1.1", + "@web3-storage/upload-client": "^17.1.0", + "@web3-storage/w3up-client": "^16.4.0", "ava": "^4.3.3", "chalk": "4.1.2", "constructs": "10.3.0", diff --git a/stacks/billing-db-stack.js b/stacks/billing-db-stack.js index f731a423..42712a2b 100644 --- a/stacks/billing-db-stack.js +++ b/stacks/billing-db-stack.js @@ -3,6 +3,7 @@ import { customerTableProps } from '../billing/tables/customer.js' import { spaceDiffTableProps } from '../billing/tables/space-diff.js' import { spaceSnapshotTableProps } from '../billing/tables/space-snapshot.js' import { usageTableProps } from '../billing/tables/usage.js' +import { egressTrafficTableProps } from '../billing/tables/egress-traffic.js' /** * @param {import('sst/constructs').StackContext} properties @@ -15,15 +16,17 @@ export const BillingDbStack = ({ stack }) => { ...usageTableProps, stream: 'new_image' }) + const egressTrafficTable = new Table(stack, 'egress-traffic', egressTrafficTableProps) stack.addOutputs({ customerTableName: customerTable.tableName, spaceSnapshotTableName: spaceSnapshotTable.tableName, spaceDiffTableName: spaceDiffTable.tableName, - usageTable: usageTable.tableName + usageTable: usageTable.tableName, + egressTrafficTableName: egressTrafficTable.tableName }) const stripeSecretKey = new Config.Secret(stack, 'STRIPE_SECRET_KEY') - return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, stripeSecretKey } + return { customerTable, spaceSnapshotTable, spaceDiffTable, usageTable, egressTrafficTable, stripeSecretKey } } diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 13011b88..f8b158f3 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -17,6 +17,7 @@ export function BillingStack ({ stack, app }) { spaceSnapshotTable, spaceDiffTable, usageTable, + egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { subscriptionTable, consumerTable } = use(UploadDbStack) @@ -179,5 +180,36 @@ export function BillingStack ({ stack, app }) { CustomDomain: customDomain ? `https://${customDomain.domainName}` : 'Set BILLING_HOSTED_ZONE in env to deploy to a custom domain' }) - return { billingCron } + // Lambda that handles egress traffic tracking + const egressTrafficQueueHandler = new Function(stack, 'egress-traffic-queue-handler', { + permissions: [customerTable, egressTrafficTable], + handler: 'billing/functions/egress-traffic-queue.handler', + timeout: '15 minutes', + bind: [stripeSecretKey], + environment: { + CUSTOMER_TABLE_NAME: customerTable.tableName, + EGRESS_TRAFFIC_TABLE_NAME: egressTrafficTable.tableName, + // Billing Meter Event Name for Stripe Test and Production APIs + STRIPE_BILLING_METER_EVENT_NAME: 'gateway-egress-traffic' + } + }) + + // Queue for egress traffic tracking + const egressTrafficDLQ = new Queue(stack, 'egress-traffic-dlq', { + cdk: { queue: { retentionPeriod: Duration.days(14) } } + }) + const egressTrafficQueue = new Queue(stack, 'egress-traffic-queue', { + consumer: { + function: egressTrafficQueueHandler, + deadLetterQueue: egressTrafficDLQ.cdk.queue, + cdk: { eventSource: { batchSize: 1 } } + }, + cdk: { queue: { visibilityTimeout: Duration.minutes(15) } } + }) + + stack.addOutputs({ + EgressTrafficQueueURL: egressTrafficQueue.queueUrl + }) + + return { billingCron, egressTrafficQueue } } diff --git a/stacks/upload-api-stack.js b/stacks/upload-api-stack.js index de4f9cd5..74d6e789 100644 --- a/stacks/upload-api-stack.js +++ b/stacks/upload-api-stack.js @@ -9,6 +9,7 @@ import { import { StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda' import { UploadDbStack } from './upload-db-stack.js' import { BillingDbStack } from './billing-db-stack.js' +import { BillingStack } from './billing-stack.js' import { CarparkStack } from './carpark-stack.js' import { FilecoinStack } from './filecoin-stack.js' import { UcanInvocationStack } from './ucan-invocation-stack.js' @@ -45,9 +46,10 @@ export function UploadApiStack({ stack, app }) { const { carparkBucket } = use(CarparkStack) const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey, contentClaimsPrivateKey } = use(UploadDbStack) const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack) - const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack) + const { customerTable, spaceDiffTable, spaceSnapshotTable, egressTrafficTable, stripeSecretKey } = use(BillingDbStack) const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack) const { blockAdvertPublisherQueue, blockIndexWriterQueue } = use(IndexerStack) + const { egressTrafficQueue } = use(BillingStack) // Setup API const customDomains = process.env.HOSTED_ZONES?.split(',').map(zone => getCustomDomain(stack.stage, zone)) @@ -82,6 +84,7 @@ export function UploadApiStack({ stack, app }) { pieceTable, spaceDiffTable, spaceSnapshotTable, + egressTrafficTable, carparkBucket, invocationBucket, taskBucket, @@ -91,6 +94,7 @@ export function UploadApiStack({ stack, app }) { filecoinSubmitQueue, blockAdvertPublisherQueue, blockIndexWriterQueue, + egressTrafficQueue, ], environment: { DID: process.env.UPLOAD_API_DID ?? '', @@ -119,6 +123,7 @@ export function UploadApiStack({ stack, app }) { FILECOIN_SUBMIT_QUEUE_URL: filecoinSubmitQueue.queueUrl, BLOCK_ADVERT_PUBLISHER_QUEUE_URL: blockAdvertPublisherQueue.queueUrl, BLOCK_INDEX_WRITER_QUEUE_URL: blockIndexWriterQueue.queueUrl, + EGRESS_TRAFFIC_QUEUE_URL: egressTrafficQueue.queueUrl, NAME: pkg.name, VERSION: pkg.version, COMMIT: git.commmit, diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index 16a3a6f6..0d0735a1 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -41,6 +41,7 @@ import { createStripeBillingProvider } from '../billing.js' import { createIPNIService } from '../external-services/ipni-service.js' import * as UploadAPI from '@web3-storage/upload-api' import { mustGetEnv } from '../../lib/env.js' +import { createEgressTrafficQueue } from '@web3-storage/w3infra-billing/queues/egress-traffic.js' Sentry.AWSLambda.init({ environment: process.env.SST_STAGE, @@ -120,6 +121,7 @@ export async function ucanInvocationRouter(request) { dealTrackerUrl, pieceOfferQueueUrl, filecoinSubmitQueueUrl, + egressTrafficQueueUrl, requirePaymentPlan, // set for testing dbEndpoint, @@ -201,7 +203,8 @@ export async function ucanInvocationRouter(request) { const revocationsStorage = createRevocationsTable(AWS_REGION, revocationTableName) const spaceDiffStore = createSpaceDiffStore({ region: AWS_REGION }, { tableName: spaceDiffTableName }) const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName }) - const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore }) + const egressTrafficQueue = createEgressTrafficQueue({ region: AWS_REGION }, { url: new URL(egressTrafficQueueUrl) }) + const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore, egressTrafficQueue }) const dealTrackerConnection = getServiceConnection({ did: dealTrackerDid, @@ -346,6 +349,7 @@ function getLambdaEnv () { spaceSnapshotTableName: mustGetEnv('SPACE_SNAPSHOT_TABLE_NAME'), pieceOfferQueueUrl: mustGetEnv('PIECE_OFFER_QUEUE_URL'), filecoinSubmitQueueUrl: mustGetEnv('FILECOIN_SUBMIT_QUEUE_URL'), + egressTrafficQueueUrl: mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL'), r2DelegationBucketEndpoint: mustGetEnv('R2_ENDPOINT'), r2DelegationBucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'), r2DelegationBucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'), diff --git a/upload-api/package.json b/upload-api/package.json index fd2f523e..4d4058dd 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -23,9 +23,9 @@ "@ucanto/transport": "^9.1.1", "@ucanto/validator": "^9.0.2", "@web3-storage/access": "^20.0.0", - "@web3-storage/capabilities": "^17.2.0", + "@web3-storage/capabilities": "^17.4.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^18.0.2", + "@web3-storage/upload-api": "^18.1.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "p-map": "^7.0.2", diff --git a/upload-api/stores/provisions.js b/upload-api/stores/provisions.js index bff7987d..19630e4e 100644 --- a/upload-api/stores/provisions.js +++ b/upload-api/stores/provisions.js @@ -109,7 +109,8 @@ export function useProvisionStore (subscriptionTable, consumerTable, spaceMetric did: consumer, allocated, limit: 1_000_000_000, // set to an arbitrarily high number because we currently don't enforce any limits - subscription: consumerRecord.subscription + subscription: consumerRecord.subscription, + customer: consumerRecord.customer } }) : ( { error: { name: 'ConsumerNotFound', message: `could not find ${consumer}` } } diff --git a/upload-api/stores/usage.js b/upload-api/stores/usage.js index 45720254..89c01a0f 100644 --- a/upload-api/stores/usage.js +++ b/upload-api/stores/usage.js @@ -4,15 +4,16 @@ import { iterateSpaceDiffs } from '@web3-storage/w3infra-billing/lib/space-billi * @param {object} conf * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceSnapshotStore} conf.spaceSnapshotStore * @param {import('@web3-storage/w3infra-billing/lib/api').SpaceDiffStore} conf.spaceDiffStore + * @param {import('@web3-storage/w3infra-billing/lib/api').EgressTrafficQueue} conf.egressTrafficQueue */ -export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { +export function useUsageStore({ spaceSnapshotStore, spaceDiffStore, egressTrafficQueue }) { return { /** * @param {import('@web3-storage/upload-api').ProviderDID} provider * @param {import('@web3-storage/upload-api').SpaceDID} space * @param {{ from: Date, to: Date }} period */ - async report (provider, space, period) { + async report(provider, space, period) { const snapResult = await spaceSnapshotStore.get({ provider, space, @@ -57,6 +58,33 @@ export function useUsageStore ({ spaceSnapshotStore, spaceDiffStore }) { events, } return { ok: report } + }, + + /** + * Handle egress traffic data and enqueues it, so the billing system can process it and update the Stripe Billing Meter API. + * + * @param {import('@web3-storage/upload-api').SpaceDID} space - The space that the egress traffic is associated with. + * @param {import('@web3-storage/upload-api').AccountDID} customer - The customer that will be billed for the egress traffic. + * @param {import('@web3-storage/upload-api').UnknownLink} resource - The resource that was served. + * @param {number} bytes - The number of bytes that were served. + * @param {Date} servedAt - The date and time when the egress traffic was served. + * @param {import('@web3-storage/upload-api').UnknownLink} cause - The UCAN invocation ID that caused the egress traffic. + * @returns {Promise>} + */ + async record(space, customer, resource, bytes, servedAt, cause) { + const record = { + space, + customer, + resource, + bytes, + servedAt, + cause + } + + const result = await egressTrafficQueue.add(record) + if (result.error) return result + + return { ok: { ...record, servedAt: servedAt.toISOString() } } } } } diff --git a/upload-api/tables/consumer.js b/upload-api/tables/consumer.js index 0cf48b8d..b2036c5d 100644 --- a/upload-api/tables/consumer.js +++ b/upload-api/tables/consumer.js @@ -98,7 +98,8 @@ export function useConsumerTable (dynamoDb, tableName) { // provider/consumer pair, but I suspect we'll never get there const record = response.Items?.map(i => unmarshall(i)).find(i => i.provider === provider) return record ? { - subscription: record.subscription + subscription: record.subscription, + customer: record.customer } : null }, diff --git a/upload-api/types.ts b/upload-api/types.ts index 20eab517..f7a5e5b7 100644 --- a/upload-api/types.ts +++ b/upload-api/types.ts @@ -184,7 +184,7 @@ export interface ConsumerListRecord { export interface ConsumerTable { /** get a consumer record for a given provider */ - get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string } | null> + get: (provider: ProviderDID, consumer: DIDKey) => Promise<{ subscription: string, customer: AccountDID } | null> /** get a consumer record for a given subscription */ getBySubscription: (provider: ProviderDID, subscription: string) => Promise<{ consumer: DID } | null> /** add a consumer - a relationship between a provider, subscription and consumer */