Skip to content

Commit

Permalink
feat: space billing instruction implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Oct 23, 2023
1 parent c505faf commit 01af626
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 466 deletions.
8 changes: 4 additions & 4 deletions billing/data/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Link from 'multiformats/link'
import { DecodeFailure, asDID } from './lib.js'

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, import('../lib/api').Consumer>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, import('../lib/api').Consumer>}
*/
export const decode = input => {
try {
Expand All @@ -24,7 +24,7 @@ export const decode = input => {
}

/**
* @type {import('../lib/api').Encoder<import('../lib/api').ConsumerKey, import('../lib/api').InferStoreRecord<import('../lib/api').ConsumerKey>>}
* @type {import('../lib/api').Encoder<import('../lib/api').ConsumerKey, import('../types').InferStoreRecord<import('../lib/api').ConsumerKey>>}
*/
export const encodeKey = input => ({
ok: {
Expand All @@ -36,11 +36,11 @@ export const encodeKey = input => ({
/** Encoders/decoders for listings. */
export const lister = {
/**
* @type {import('../lib/api').Encoder<import('../lib/api').ConsumerListKey, import('../lib/api').InferStoreRecord<import('../lib/api').ConsumerListKey>>}
* @type {import('../lib/api').Encoder<import('../lib/api').ConsumerListKey, import('../types').InferStoreRecord<import('../lib/api').ConsumerListKey>>}
*/
encodeKey: input => ({ ok: { consumer: input.consumer } }),
/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, Pick<import('../lib/api').Consumer, 'consumer'|'provider'|'subscription'>>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, Pick<import('../lib/api').Consumer, 'consumer'|'provider'|'subscription'>>}
*/
decode: input => {
try {
Expand Down
2 changes: 1 addition & 1 deletion billing/data/customer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Link from 'multiformats/link'
import { DecodeFailure, asDIDMailto } from './lib.js'

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, import('../lib/api').Customer>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, import('../lib/api').Customer>}
*/
export const decode = input => {
try {
Expand Down
6 changes: 3 additions & 3 deletions billing/data/space-diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const validate = input => {
}

/**
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceDiff, import('../lib/api').InferStoreRecord<import('../lib/api').SpaceDiff>>}
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceDiff, import('../types').InferStoreRecord<import('../lib/api').SpaceDiff>>}
*/
export const encode = input => {
try {
Expand All @@ -55,12 +55,12 @@ export const encode = input => {
}

/**
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceDiffKey, import('../lib/api').InferStoreRecord<import('../lib/api').SpaceDiffKey>>}
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceDiffKey, import('../types').InferStoreRecord<import('../lib/api').SpaceDiffKey>>}
*/
export const encodeKey = input => ({ ok: { customer: input.customer } })

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, import('../lib/api').SpaceDiff>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, import('../lib/api').SpaceDiff>}
*/
export const decode = input => {
try {
Expand Down
6 changes: 3 additions & 3 deletions billing/data/space-snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const validate = input => {
return { ok: {} }
}

/** @type {import('../lib/api').Encoder<import('../lib/api').SpaceSnapshot, Omit<import('../lib/api').InferStoreRecord<import('../lib/api').SpaceSnapshot>, 'provider'>>} */
/** @type {import('../lib/api').Encoder<import('../lib/api').SpaceSnapshot, Omit<import('../types').InferStoreRecord<import('../lib/api').SpaceSnapshot>, 'provider'>>} */
export const encode = input => {
try {
return {
Expand All @@ -42,7 +42,7 @@ export const encode = input => {
}

/**
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceSnapshotKey, Omit<import('../lib/api').InferStoreRecord<import('../lib/api').SpaceSnapshotKey>, 'provider'>>}
* @type {import('../lib/api').Encoder<import('../lib/api').SpaceSnapshotKey, Omit<import('../types').InferStoreRecord<import('../lib/api').SpaceSnapshotKey>, 'provider'>>}
*/
export const encodeKey = input => ({
ok: {
Expand All @@ -52,7 +52,7 @@ export const encodeKey = input => ({
})

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, import('../lib/api').SpaceSnapshot>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, import('../lib/api').SpaceSnapshot>}
*/
export const decode = input => {
if (typeof input.space !== 'string') {
Expand Down
8 changes: 4 additions & 4 deletions billing/data/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Link from 'multiformats/link'
import { DecodeFailure, isDIDMailto, isDID } from './lib.js'

/**
* @type {import('../lib/api').Encoder<import('../lib/api').SubscriptionKey, import('../lib/api').InferStoreRecord<import('../lib/api').SubscriptionKey>>}
* @type {import('../lib/api').Encoder<import('../lib/api').SubscriptionKey, import('../types').InferStoreRecord<import('../lib/api').SubscriptionKey>>}
*/
export const encodeKey = input => ({
ok: {
Expand All @@ -12,7 +12,7 @@ export const encodeKey = input => ({
})

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, import('../lib/api').Subscription>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, import('../lib/api').Subscription>}
*/
export const decode = input => {
if (!isDIDMailto(input.customer)) {
Expand Down Expand Up @@ -48,12 +48,12 @@ export const decode = input => {
/** Encoders/decoders for listings. */
export const lister = {
/**
* @type {import('../lib/api').Encoder<import('../lib/api').SubscriptionListKey, import('../lib/api').InferStoreRecord<import('../lib/api').SubscriptionListKey>>}
* @type {import('../lib/api').Encoder<import('../lib/api').SubscriptionListKey, import('../types').InferStoreRecord<import('../lib/api').SubscriptionListKey>>}
*/
encodeKey: input => ({ ok: { customer: input.customer } }),

/**
* @type {import('../lib/api').Decoder<import('../lib/api').StoreRecord, Pick<import('../lib/api').Subscription, 'customer'|'provider'|'subscription'|'cause'>>}
* @type {import('../lib/api').Decoder<import('../types').StoreRecord, Pick<import('../lib/api').Subscription, 'customer'|'provider'|'subscription'|'cause'>>}
*/
decode: input => {
if (!isDIDMailto(input.customer)) {
Expand Down
2 changes: 1 addition & 1 deletion billing/data/usage.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const validate = input => {
return { ok: {} }
}

/** @type {import('../lib/api').Encoder<import('../lib/api').Usage, import('../lib/api').InferStoreRecord<import('../lib/api').Usage>>} */
/** @type {import('../lib/api').Encoder<import('../lib/api').Usage, import('../types').InferStoreRecord<import('../lib/api').Usage>>} */
export const encode = input => {
try {
return {
Expand Down
7 changes: 0 additions & 7 deletions billing/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,6 @@ export interface RecordNotFound<K> extends Failure {
key: K
}

export type InferStoreRecord<T> = {
[Property in keyof T]: T[Property] extends Number ? T[Property] : string
}

/** A record that is of suitable type to be put in storage. */
export type StoreRecord = Record<string, string|number>

export interface StorePutter<T> {
put: (rec: T) => Promise<Result<Unit, InvalidInput|EncodeFailure|StoreOperationFailure>>
}
Expand Down
14 changes: 5 additions & 9 deletions billing/lib/customer-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,29 @@
* subscriptionStore: import('./api').SubscriptionStore
* consumerStore: import('./api').ConsumerStore
* spaceBillingQueue: import('./api').SpaceBillingQueue
* }} stores
* }} ctx
* @returns {Promise<import('@ucanto/interface').Result>}
*/
export const handleCustomerBillingInstruction = async (instruction, {
subscriptionStore,
consumerStore,
spaceBillingQueue
}) => {
export const handleCustomerBillingInstruction = async (instruction, ctx) => {
console.log(`processing customer billing instruction for: ${instruction.customer}`)
console.log(`period: ${instruction.from.toISOString()} - ${instruction.to.toISOString()}`)

let cursor
while (true) {
const subsList = await subscriptionStore.list({ customer: instruction.customer }, { cursor })
const subsList = await ctx.subscriptionStore.list({ customer: instruction.customer }, { cursor })

Check failure on line 16 in billing/lib/customer-queue.js

View workflow job for this annotation

GitHub Actions / Test

'subsList' implicitly has type 'any' because it does not have a type annotation and is referenced directly or indirectly in its own initializer.
if (subsList.error) return subsList

// TODO: this is going to be inefficient for any client with many spaces
// and may eventually cause billing to fail.
for (const s of subsList.ok.results) {
const consumerGet = await consumerStore.get({
const consumerGet = await ctx.consumerStore.get({
subscription: s.subscription,
provider: s.provider
})
if (consumerGet.error) return consumerGet

console.log(`adding space billing instruction for: ${consumerGet.ok.consumer}`)
const queueAdd = await spaceBillingQueue.add({
const queueAdd = await ctx.spaceBillingQueue.add({
...instruction,
provider: s.provider,
space: consumerGet.ok.consumer
Expand Down
8 changes: 4 additions & 4 deletions billing/lib/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
* @param {{
* customerStore: import('./api').CustomerStore
* customerBillingQueue: import('./api').CustomerBillingQueue
* }} stores
* }} ctx
* @returns {Promise<import('@ucanto/interface').Result>}
*/
export const handleCronTick = async ({ customerStore, customerBillingQueue }) => {
export const handleCronTick = async ctx => {
const from = startOfMonth()
const to = startOfNextMonth()

let cursor
while (true) {
const customerList = await customerStore.list({}, { cursor, size: 1000 })
const customerList = await ctx.customerStore.list({}, { cursor, size: 1000 })

Check failure on line 14 in billing/lib/runner.js

View workflow job for this annotation

GitHub Actions / Test

'customerList' implicitly has type 'any' because it does not have a type annotation and is referenced directly or indirectly in its own initializer.
if (customerList.error) return customerList

for (const c of customerList.ok.results) {
console.log(`adding customer billing instruction for: ${c.customer}`)
const queueAdd = await customerBillingQueue.add({
const queueAdd = await ctx.customerBillingQueue.add({
customer: c.customer,
account: c.account,
product: c.product,
Expand Down
45 changes: 28 additions & 17 deletions billing/lib/space-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
* spaceDiffStore: import('./api').SpaceDiffStore
* spaceSnapshotStore: import('./api').SpaceSnapshotStore
* usageStore: import('./api').UsageStore
* }} stores
* }} ctx
* @returns {Promise<import('@ucanto/interface').Result>}
*/
export const handleSpaceBillingInstruction = async (instruction, {
spaceDiffStore,
spaceSnapshotStore,
usageStore
}) => {
export const handleSpaceBillingInstruction = async (instruction, ctx) => {
console.log(`processing space billing instruction for: ${instruction.customer}`)
console.log(`period: ${instruction.from.toISOString()} - ${instruction.to.toISOString()}`)

const { ok: snap, error } = await spaceSnapshotStore.get({
const { ok: snap, error } = await ctx.spaceSnapshotStore.get({
space: instruction.space,
provider: instruction.provider,
recordedAt: instruction.from
Expand All @@ -24,27 +20,42 @@ export const handleSpaceBillingInstruction = async (instruction, {

console.log(`space ${snap.space} is ${snap.size} bytes @ ${snap.recordedAt.toISOString()}`)

/** @type {import('./api').SpaceDiff[]} */
const diffs = []
let size = snap.size
let usage = size * (instruction.to.getTime() - instruction.from.getTime())

let cursor
while (true) {
const { ok: listing, error: listErr } = await spaceDiffStore.listBetween(
const spaceDiffList = await ctx.spaceDiffStore.listBetween(

Check failure on line 28 in billing/lib/space-queue.js

View workflow job for this annotation

GitHub Actions / Test

'spaceDiffList' implicitly has type 'any' because it does not have a type annotation and is referenced directly or indirectly in its own initializer.
{ customer: instruction.customer },
instruction.from,
instruction.to,
{ cursor, size: 1000 }
)
if (listErr) return { error: listErr }
for (const diff of listing.results) {
if (spaceDiffList.error) return spaceDiffList
for (const diff of spaceDiffList.ok.results) {
if (diff.provider !== snap.provider) continue
diffs.push(diff)
console.log(`${diff.receiptAt.toISOString()}: ${diff.change} bytes`)
size += diff.change
usage += diff.change * (instruction.to.getTime() - diff.receiptAt.getTime())
}
if (!listing.cursor) break
cursor = listing.cursor
if (!spaceDiffList.ok.cursor) break
cursor = spaceDiffList.ok.cursor
}

console.log(`${diffs.length} space updates`)
console.log(`space ${snap.space} is ${size} bytes @ ${instruction.to.toISOString()}`)
const snapPut = await ctx.spaceSnapshotStore.put({
provider: instruction.provider,
space: instruction.space,
size,
recordedAt: instruction.to,
insertedAt: new Date()
})
if (snapPut.error) return snapPut

return { ok: {} }
console.log(`${usage} bytes consumed over ${instruction.to.getTime() - instruction.from.getTime()} ms`)
return await ctx.usageStore.put({
...instruction,
usage,
insertedAt: new Date()
})
}
10 changes: 5 additions & 5 deletions billing/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import * as StoreCaps from '@web3-storage/capabilities/store'
* spaceDiffStore: import('./api').SpaceDiffStore
* subscriptionStore: import('./api').SubscriptionStore
* consumerStore: import('./api').ConsumerStore
* }} stores
* }} ctx
* @returns {Promise<import('@ucanto/interface').Result>}
*/
export const handleUcanStreamMessage = async (message, { spaceDiffStore, subscriptionStore, consumerStore }) => {
export const handleUcanStreamMessage = async (message, ctx) => {
if (!isReceipt(message)) return { ok: {} }

/** @type {number|undefined} */
Expand All @@ -27,16 +27,16 @@ export const handleUcanStreamMessage = async (message, { spaceDiffStore, subscri
}

const space = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with)
const consumerList = await consumerStore.list({ consumer: space })
const consumerList = await ctx.consumerStore.list({ consumer: space })
if (consumerList.error) return consumerList

// There should only be one subscription per provider, but in theory you
// could have multiple providers for the same consumer (space).
for (const consumer of consumerList.ok.results) {
const subGet = await subscriptionStore.get({ provider: consumer.provider, subscription: consumer.subscription })
const subGet = await ctx.subscriptionStore.get({ provider: consumer.provider, subscription: consumer.subscription })
if (subGet.error) return subGet

const spaceDiffPut = await spaceDiffStore.put({
const spaceDiffPut = await ctx.spaceDiffStore.put({
customer: subGet.ok.customer,
provider: consumer.provider,
subscription: subGet.ok.subscription,
Expand Down
Loading

0 comments on commit 01af626

Please sign in to comment.