From e7df15fc080db8c288767b0204602c33eb8b97e4 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 15 Sep 2023 09:51:54 +0200 Subject: [PATCH] feat: trigger aggregator on computed piece --- .env.tpl | 3 + README.md | 8 ++ filecoin/functions/piece-cid-compute.js | 11 +- filecoin/functions/piece-cid-report.js | 87 ++++++++++++++++ filecoin/functions/utils.js | 9 ++ filecoin/index.js | 32 ++++++ filecoin/package.json | 5 + filecoin/service.js | 36 +++++++ filecoin/test/compute-piece-cid.test.js | 41 +------- filecoin/test/helpers/car.js | 40 ++++++++ filecoin/test/helpers/errors.js | 21 ++++ filecoin/test/helpers/mocks.js | 35 +++++++ filecoin/test/helpers/ucanto.js | 131 ++++++++++++++++++++++++ filecoin/test/report-piece-cid.test.js | 81 +++++++++++++++ package-lock.json | 129 +++++++++++++++++------ stacks/config.js | 5 +- stacks/filecoin-stack.js | 37 ++++++- stacks/upload-api-stack.js | 3 +- stacks/upload-db-stack.js | 13 ++- test/helpers/table.js | 2 - 20 files changed, 635 insertions(+), 94 deletions(-) create mode 100644 filecoin/functions/piece-cid-report.js create mode 100644 filecoin/functions/utils.js create mode 100644 filecoin/service.js create mode 100644 filecoin/test/helpers/car.js create mode 100644 filecoin/test/helpers/errors.js create mode 100644 filecoin/test/helpers/mocks.js create mode 100644 filecoin/test/helpers/ucanto.js create mode 100644 filecoin/test/report-piece-cid.test.js diff --git a/.env.tpl b/.env.tpl index 14dead88..2b13f798 100644 --- a/.env.tpl +++ b/.env.tpl @@ -17,6 +17,9 @@ EIPFS_INDEXER_SQS_URL = 'https://sqs.us-west-2.amazonaws.com/505595374361/stagin ACCESS_SERVICE_DID = '' UPLOAD_API_DID = '' ACCESS_SERVICE_URL = '' +AGGREGATOR_DID = '' +AGGREGATOR_URL = '' + POSTMARK_TOKEN = '' R2_ACCESS_KEY_ID = '' R2_CARPARK_BUCKET_NAME = '' diff --git a/README.md b/README.md index e0f1b2ce..9ada0d9c 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,14 @@ DID of the w3access service. URL of the w3access service. +#### `AGGREGATOR_SERVICE_DID` + +DID of the filecoin aggregator service. + +#### `AGGREGATOR_SERVICE_URL` + +URL of the filecoin aggregator service. + #### `UPLOAD_API_DID` [DID](https://www.w3.org/TR/did-core/) of the upload-api ucanto server. e.g. `did:web:up.web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY` diff --git a/filecoin/functions/piece-cid-compute.js b/filecoin/functions/piece-cid-compute.js index 2639e270..38b2b59b 100644 --- a/filecoin/functions/piece-cid-compute.js +++ b/filecoin/functions/piece-cid-compute.js @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3' import * as Sentry from '@sentry/serverless' import { computePieceCid } from '../index.js' +import { mustGetEnv } from './utils.js' import { createPieceTable } from '../tables/piece.js' Sentry.AWSLambda.init({ @@ -60,16 +61,6 @@ function getEnv () { } } -/** - * @param {string} name - * @returns {string} - */ -function mustGetEnv (name) { - const value = process.env[name] - if (!value) throw new Error(`Missing env var: ${name}`) - return value -} - /** * Extract an EventRecord from the passed SQS Event * diff --git a/filecoin/functions/piece-cid-report.js b/filecoin/functions/piece-cid-report.js new file mode 100644 index 00000000..f50c12a8 --- /dev/null +++ b/filecoin/functions/piece-cid-report.js @@ -0,0 +1,87 @@ +import * as Sentry from '@sentry/serverless' +import { Config } from '@serverless-stack/node/config/index.js' +import { unmarshall } from '@aws-sdk/util-dynamodb' +import { Piece } from '@web3-storage/data-segment' + +import { reportPieceCid } from '../index.js' +import { getServiceConnection, getServiceSigner } from '../service.js' +import { mustGetEnv } from './utils.js' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +/** + * @param {import('aws-lambda').DynamoDBStreamEvent} event + */ +async function pieceCidReport (event) { + const { aggregatorDid, aggregatorUrl } = getEnv() + const { PRIVATE_KEY: privateKey } = Config + + const records = parseDynamoDbEvent(event) + if (records.length > 1) { + throw new Error('Should only receive one ferry to update') + } + + // @ts-expect-error can't figure out type of new + const pieceRecord = unmarshall(records[0].new) + const piece = Piece.fromString(pieceRecord.piece).link + + const aggregateServiceConnection = getServiceConnection({ + did: aggregatorDid, + url: aggregatorUrl + }) + const issuer = getServiceSigner({ + privateKey + }) + const audience = aggregateServiceConnection.id + /** @type {import('@web3-storage/filecoin-client/types').InvocationConfig} */ + const invocationConfig = { + issuer, + audience, + with: issuer.did(), + } + + const { ok, error } = await reportPieceCid({ + piece, + group: issuer.did(), + aggregateServiceConnection, + invocationConfig + }) + + if (error) { + return { + statusCode: 500, + body: error.message || 'failed to add aggregate' + } + } + + return { + statusCode: 200, + body: ok + } +} + +export const handler = Sentry.AWSLambda.wrapHandler(pieceCidReport) + +/** + * Get Env validating it is set. + */ +function getEnv() { + return { + aggregatorDid: mustGetEnv('AGGREGATOR_DID'), + aggregatorUrl: mustGetEnv('AGGREGATOR_URL'), + } +} + +/** + * @param {import('aws-lambda').DynamoDBStreamEvent} event + */ +function parseDynamoDbEvent (event) { + return event.Records.map(r => ({ + new: r.dynamodb?.NewImage, + old: r.dynamodb?.OldImage + })) +} \ No newline at end of file diff --git a/filecoin/functions/utils.js b/filecoin/functions/utils.js new file mode 100644 index 00000000..8396f485 --- /dev/null +++ b/filecoin/functions/utils.js @@ -0,0 +1,9 @@ +/** + * @param {string} name + * @returns {string} + */ +export function mustGetEnv (name) { + const value = process.env[name] + if (!value) throw new Error(`Missing env var: ${name}`) + return value +} \ No newline at end of file diff --git a/filecoin/index.js b/filecoin/index.js index cca4b86a..3c24d34f 100644 --- a/filecoin/index.js +++ b/filecoin/index.js @@ -4,6 +4,7 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' import * as Digest from 'multiformats/hashes/digest' import { Piece } from '@web3-storage/data-segment' import { CID } from 'multiformats/cid' +import { Aggregator } from '@web3-storage/filecoin-client' import { GetCarFailed, ComputePieceFailed } from './errors.js' @@ -78,3 +79,34 @@ export async function computePieceCid({ error } } + +/** + * @param {object} props + * @param {import('@web3-storage/data-segment').PieceLink} props.piece + * @param {string} props.group + * @param {import('@web3-storage/filecoin-client/types').InvocationConfig} props.invocationConfig + * @param {import('@ucanto/principal/ed25519').ConnectionView} props.aggregateServiceConnection + */ +export async function reportPieceCid ({ + piece, + group, + invocationConfig, + aggregateServiceConnection +}) { + // Add piece for aggregation + const aggregateQueue = await Aggregator.aggregateQueue( + invocationConfig, + piece, + group, + { connection: aggregateServiceConnection } + ) + + if (aggregateQueue.out.error) { + return { + error: aggregateQueue.out.error + } + } + return { + ok: {}, + } +} diff --git a/filecoin/package.json b/filecoin/package.json index c02725b2..6294aae9 100644 --- a/filecoin/package.json +++ b/filecoin/package.json @@ -10,7 +10,11 @@ "@aws-sdk/client-s3": "^3.211.0", "@aws-sdk/client-sqs": "^3.226.0", "@sentry/serverless": "^7.22.0", + "@ucanto/client": "^8.0.1", + "@ucanto/principal": "^8.1.0", + "@ucanto/transport": "^8.0.0", "@web3-storage/data-segment": "^3.0.1", + "@web3-storage/filecoin-client": "^1.3.0", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", "multiformats": "^12.1.1" }, @@ -18,6 +22,7 @@ "@serverless-stack/resources": "*", "ava": "^4.3.3", "nanoid": "^4.0.0", + "p-defer": "^4.0.0", "testcontainers": "^8.13.0" } } diff --git a/filecoin/service.js b/filecoin/service.js new file mode 100644 index 00000000..1d7adc2d --- /dev/null +++ b/filecoin/service.js @@ -0,0 +1,36 @@ +import * as ed25519 from '@ucanto/principal/ed25519' +import * as DID from '@ipld/dag-ucan/did' +import { CAR, HTTP } from '@ucanto/transport' +import { connect } from '@ucanto/client' + +/** + * Given a config, return a ucanto Signer object representing the service + * + * @param {object} config + * @param {string} config.privateKey - multiformats private key of primary signing key + * @returns {import('@ucanto/principal/ed25519').Signer.Signer} + */ +export function getServiceSigner(config) { + return ed25519.parse(config.privateKey) +} + +/** + * + * @param {{ did: string, url: string }} config + * @returns + */ +export function getServiceConnection (config) { + const servicePrincipal = DID.parse(config.did) // 'did:web:filecoin.web3.storage' + const serviceURL = new URL(config.url) // 'https://filecoin.web3.storage' + + const serviceConnection = connect({ + id: servicePrincipal, + codec: CAR.outbound, + channel: HTTP.open({ + url: serviceURL, + method: 'POST', + }), + }) + + return serviceConnection +} diff --git a/filecoin/test/compute-piece-cid.test.js b/filecoin/test/compute-piece-cid.test.js index b9f21964..e8152155 100644 --- a/filecoin/test/compute-piece-cid.test.js +++ b/filecoin/test/compute-piece-cid.test.js @@ -1,16 +1,10 @@ import { test } from './helpers/context.js' import { PutObjectCommand } from '@aws-sdk/client-s3' -import { encode } from 'multiformats/block' -import { identity } from 'multiformats/hashes/identity' -import { sha256 as hasher } from 'multiformats/hashes/sha2' -import * as pb from '@ipld/dag-pb' -import { CarBufferWriter } from '@ipld/car' -import { toString } from 'uint8arrays' -import { Piece } from '@web3-storage/data-segment' import { createS3, createBucket, createDynamodDb } from './helpers/resources.js' import { createDynamoTable, getItemsFromTable } from './helpers/tables.js' +import { createCar } from './helpers/car.js' import { computePieceCid } from '../index.js' import { pieceTableProps } from '../tables/index.js' @@ -77,39 +71,6 @@ test('computes piece cid from a CAR file in the bucket', async t => { t.is(storedItems?.[0].piece, piece.toString()) }) -async function createCar () { - const id = await encode({ - value: pb.prepare({ Data: 'a red car on the street!' }), - codec: pb, - hasher: identity, - }) - - const parent = await encode({ - value: pb.prepare({ Links: [id.cid] }), - codec: pb, - hasher, - }) - const car = CarBufferWriter.createWriter(Buffer.alloc(1000), { - roots: [parent.cid], - }) - car.write(parent) - - const body = car.close() - const digest = await hasher.digest(body) - const checksum = toString(digest.digest, 'base64pad') - - const key = `${parent.cid.toString()}/${parent.cid.toString()}` - const piece = Piece.fromPayload(body) - - return { - body, - checksum, - key, - link: parent.cid, - piece: piece.link - } -} - /** * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient * @param {import("@aws-sdk/client-s3").S3Client} s3Client diff --git a/filecoin/test/helpers/car.js b/filecoin/test/helpers/car.js new file mode 100644 index 00000000..e13538d9 --- /dev/null +++ b/filecoin/test/helpers/car.js @@ -0,0 +1,40 @@ +import { encode } from 'multiformats/block' +import { identity } from 'multiformats/hashes/identity' +import { sha256 as hasher } from 'multiformats/hashes/sha2' +import * as pb from '@ipld/dag-pb' +import { CarBufferWriter } from '@ipld/car' +import { toString } from 'uint8arrays' +import { Piece } from '@web3-storage/data-segment' + +export async function createCar () { + const id = await encode({ + value: pb.prepare({ Data: 'a red car on the street!' }), + codec: pb, + hasher: identity, + }) + + const parent = await encode({ + value: pb.prepare({ Links: [id.cid] }), + codec: pb, + hasher, + }) + const car = CarBufferWriter.createWriter(Buffer.alloc(1000), { + roots: [parent.cid], + }) + car.write(parent) + + const body = car.close() + const digest = await hasher.digest(body) + const checksum = toString(digest.digest, 'base64pad') + + const key = `${parent.cid.toString()}/${parent.cid.toString()}` + const piece = Piece.fromPayload(body) + + return { + body, + checksum, + key, + link: parent.cid, + piece: piece.link + } +} diff --git a/filecoin/test/helpers/errors.js b/filecoin/test/helpers/errors.js new file mode 100644 index 00000000..eac43607 --- /dev/null +++ b/filecoin/test/helpers/errors.js @@ -0,0 +1,21 @@ +import * as Server from '@ucanto/server' + +export const OperationErrorName = /** @type {const} */ ('OperationFailed') +export class OperationFailed extends Server.Failure { + /** + * @param {string} message + * @param {import('@web3-storage/data-segment').PieceLink} piece + */ + constructor(message, piece) { + super(message) + this.piece = piece + } + + get reason() { + return this.message + } + + get name() { + return OperationErrorName + } +} diff --git a/filecoin/test/helpers/mocks.js b/filecoin/test/helpers/mocks.js new file mode 100644 index 00000000..d6dff7b2 --- /dev/null +++ b/filecoin/test/helpers/mocks.js @@ -0,0 +1,35 @@ +import * as Server from '@ucanto/server' + +const notImplemented = () => { + throw new Server.Failure('not implemented') +} + +/** + * @param {Partial< + * import('@web3-storage/filecoin-client/types').AggregatorService + * >} impl + */ +export function mockService(impl) { + return { + aggregate: { + add: withCallCount(impl.aggregate?.add ?? notImplemented), + queue: withCallCount(impl.aggregate?.queue ?? notImplemented), + }, + } +} + +/** + * @template {Function} T + * @param {T} fn + */ +function withCallCount(fn) { + /** @param {T extends (...args: infer A) => any ? A : never} args */ + const countedFn = (...args) => { + countedFn.called = true + countedFn.callCount++ + return fn(...args) + } + countedFn.called = false + countedFn.callCount = 0 + return countedFn +} diff --git a/filecoin/test/helpers/ucanto.js b/filecoin/test/helpers/ucanto.js new file mode 100644 index 00000000..c34dd151 --- /dev/null +++ b/filecoin/test/helpers/ucanto.js @@ -0,0 +1,131 @@ +import * as Signer from '@ucanto/principal/ed25519' +import * as Server from '@ucanto/server' +import * as Client from '@ucanto/client' +import * as CAR from '@ucanto/transport/car' +import * as FilecoinCapabilities from '@web3-storage/capabilities/filecoin' + +import { OperationFailed } from './errors.js' +import { mockService } from './mocks.js' + +const nop = (/** @type {any} */ invCap) => {} + +/** + * @param {any} serviceProvider + * @param {object} [options] + * @param {(inCap: any) => void} [options.onCall] + * @param {boolean} [options.mustFail] + */ +export async function getAggregatorServiceServer (serviceProvider, options = {}) { + const onCall = options.onCall || nop + + const service = mockService({ + aggregate: { + queue: Server.provideAdvanced({ + capability: FilecoinCapabilities.aggregateQueue, + handler: async ({ invocation, context }) => { + const invCap = invocation.capabilities[0] + + if (!invCap.nb) { + throw new Error('no nb field received in invocation') + } + + if (options.mustFail) { + return { + error: new OperationFailed( + 'failed to add to aggregate', + // @ts-ignore wrong dep + invCap.nb.aggregate + ) + } + } + + /** @type {import('@web3-storage/capabilities/types').AggregateAddSuccess} */ + const pieceAddResponse = { + piece: invCap.nb.piece, + } + + // Create effect for receipt with self signed queued operation + const fx = await FilecoinCapabilities.aggregateAdd + .invoke({ + issuer: context.id, + audience: context.id, + with: context.id.did(), + nb: { + ...invCap.nb, + // add storefront + storefront: invCap.with, + }, + }) + .delegate() + + onCall(invCap) + + return Server.ok(pieceAddResponse).join(fx.link()) + } + }), + add: Server.provideAdvanced({ + capability: FilecoinCapabilities.aggregateAdd, + handler: async ({ invocation }) => { + const invCap = invocation.capabilities[0] + + if (!invCap.nb) { + throw new Error('no nb field received in invocation') + } + + if (options.mustFail) { + return { + error: new OperationFailed( + 'failed to add to aggregate', + // @ts-ignore wrong dep + invCap.nb.aggregate + ) + } + } + + /** @type {import('@web3-storage/capabilities/types').AggregateAddSuccess} */ + const pieceAddResponse = { + piece: invCap.nb.piece, + } + + onCall(invCap) + + return Server.ok(pieceAddResponse) + } + }) + } + }) + + const server = Server.create({ + id: serviceProvider, + service, + codec: CAR.inbound, + }) + const connection = Client.connect({ + id: serviceProvider, + codec: CAR.outbound, + channel: server, + }) + + return { + service, + connection + } +} + +export async function getAggregatorServiceCtx () { + const storefront = await Signer.generate() + const aggregator = await Signer.generate() + + return { + storefront: { + did: storefront.did(), + privateKey: Signer.format(storefront), + raw: storefront + }, + aggregator: { + did: aggregator.did(), + privateKey: Signer.format(aggregator), + raw: aggregator + } + } +} diff --git a/filecoin/test/report-piece-cid.test.js b/filecoin/test/report-piece-cid.test.js new file mode 100644 index 00000000..7d4a50e2 --- /dev/null +++ b/filecoin/test/report-piece-cid.test.js @@ -0,0 +1,81 @@ +import { test } from './helpers/context.js' + +import pDefer from 'p-defer' + +import { reportPieceCid } from '../index.js' +import { getServiceSigner } from '../service.js' + +import { getAggregatorServiceServer, getAggregatorServiceCtx } from './helpers/ucanto.js' +import { createCar } from './helpers/car.js' + +test('reports piece cid from a piece written to the piece table', async t => { + const { piece } = await createCar() + const aggregatorQueueCall = pDefer() + const { invocationConfig, aggregatorService } = await getService({ + onCall: aggregatorQueueCall + }) + + const reportPieceCidResponse = await reportPieceCid({ + piece, + group: invocationConfig.issuer.did(), + invocationConfig, + aggregateServiceConnection: aggregatorService.connection + }) + + t.truthy(reportPieceCidResponse.ok) + t.falsy(reportPieceCidResponse.error) + + // Validate ucanto server call + t.is(aggregatorService.service.aggregate.queue.callCount, 1) + const invCap = await aggregatorQueueCall.promise + t.is(invCap.can, 'aggregate/queue') +}) + +test('fails reporting piece cid if fails to queue to aggregator', async t => { + const { piece } = await createCar() + const aggregatorQueueCall = pDefer() + const { invocationConfig, aggregatorService } = await getService({ + onCall: aggregatorQueueCall, + mustFail: true + }) + + const reportPieceCidResponse = await reportPieceCid({ + piece, + group: invocationConfig.issuer.did(), + invocationConfig, + aggregateServiceConnection: aggregatorService.connection + }) + + t.falsy(reportPieceCidResponse.ok) + t.truthy(reportPieceCidResponse.error) + + t.is(aggregatorService.service.aggregate.queue.callCount, 1) +}) + +/** + * @param {object} options + * @param {import('p-defer').DeferredPromise} options.onCall + * @param {boolean} [options.mustFail] + */ +async function getService (options) { + const { storefront, aggregator } = await getAggregatorServiceCtx() + const aggregatorService = await getAggregatorServiceServer(aggregator.raw, { + onCall: (invCap) => { + options.onCall.resolve(invCap) + }, + mustFail: options.mustFail + }) + const issuer = getServiceSigner(storefront) + const audience = aggregatorService.connection.id + /** @type {import('@web3-storage/filecoin-client/types').InvocationConfig} */ + const invocationConfig = { + issuer, + audience, + with: issuer.did(), + } + + return { + invocationConfig, + aggregatorService + } +} diff --git a/package-lock.json b/package-lock.json index 5a84a420..904f35ff 100644 --- a/package-lock.json +++ b/package-lock.json @@ -105,7 +105,11 @@ "@aws-sdk/client-s3": "^3.211.0", "@aws-sdk/client-sqs": "^3.226.0", "@sentry/serverless": "^7.22.0", + "@ucanto/client": "^8.0.1", + "@ucanto/principal": "^8.1.0", + "@ucanto/transport": "^8.0.0", "@web3-storage/data-segment": "^3.0.1", + "@web3-storage/filecoin-client": "^1.3.0", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", "multiformats": "^12.1.1" }, @@ -113,6 +117,7 @@ "@serverless-stack/resources": "*", "ava": "^4.3.3", "nanoid": "^4.0.0", + "p-defer": "^4.0.0", "testcontainers": "^8.13.0" } }, @@ -3110,9 +3115,9 @@ } }, "node_modules/@ipld/dag-ucan": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/@ipld/dag-ucan/-/dag-ucan-3.3.2.tgz", - "integrity": "sha512-EhuOrAfnudsVYIbzEIgi3itHAEo3WZNOt1VNPsYhxKBhOzDMeoTXh6/IHc7ZKBW1T2vDQHdgj4m1r64z6MssGA==", + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/@ipld/dag-ucan/-/dag-ucan-3.4.0.tgz", + "integrity": "sha512-sW4R43w3DbEdoGWWJZCwsblwXa600HCanG9p2w1MJPVBNTNjhvqc3XI0uEqKhT2oqKWrND7uInVtcPmZme7hhA==", "dependencies": { "@ipld/dag-cbor": "^9.0.0", "@ipld/dag-json": "^10.0.0", @@ -3176,6 +3181,17 @@ "npm": ">=7.0.0" } }, + "node_modules/@noble/curves": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@noble/curves/-/curves-1.2.0.tgz", + "integrity": "sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==", + "dependencies": { + "@noble/hashes": "1.3.2" + }, + "funding": { + "url": "https://paulmillr.com/funding/" + } + }, "node_modules/@noble/ed25519": { "version": "1.7.3", "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.3.tgz", @@ -3188,9 +3204,9 @@ ] }, "node_modules/@noble/hashes": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.1.tgz", - "integrity": "sha512-EbqwksQwz9xDRGfDST86whPBgM65E0OH/pCgqW0GBVzO22bNE+NuIbeTb714+IfSjU3aRk47EUvXIb5bTsenKA==", + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.2.tgz", + "integrity": "sha512-MVC8EAQp7MvEcm30KWENFjgR+Mkmf+D189XJTkFIlwohU5hcBbn1ZkKq7KVTi2Hme3PMGF390DaL52beVrIihQ==", "engines": { "node": ">= 16" }, @@ -4668,23 +4684,25 @@ } }, "node_modules/@ucanto/interface": { - "version": "8.0.0", - "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-8.0.0.tgz", - "integrity": "sha512-xeJJYdGAPKOYbCiG8BsGmyoBovZDtVya+42Gtd8fViZeNSS3h0f2BPDBS91YFOxSGswqCd2fqvrfrlg3TTMmZw==", + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-8.1.0.tgz", + "integrity": "sha512-n6WL9miVcN1PUq+e41hKUgZR0+Xn5sHHMQfXnt4YuLnGbh93tIgQkeGWmfUBJI+Y6C0vAFfaSCZnM6Z+kedskA==", "dependencies": { - "@ipld/dag-ucan": "^3.3.2", - "multiformats": "^11.0.0" + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" } }, "node_modules/@ucanto/principal": { - "version": "8.0.0", - "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-8.0.0.tgz", - "integrity": "sha512-85IXfp8P3FGbQ5rQbLtAA2DkIgjOaqdTPXZHA2W+/UdEsJxfb1jf2TqOjHUt3PWcCbP2hqbRZWBYAKJszkW2uA==", + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-8.1.0.tgz", + "integrity": "sha512-tSkqpxRXP/M+GXNKqQLCmMAP+7zX7l/tKb3uygAaQwTnev4nRauklXgWx6EYDK+2d8tiOyPdL3SlG54GQPFcLQ==", "dependencies": { - "@ipld/dag-ucan": "^3.3.2", + "@ipld/dag-ucan": "^3.4.0", + "@noble/curves": "^1.2.0", "@noble/ed25519": "^1.7.3", - "@ucanto/interface": "^8", - "multiformats": "^11.0.0", + "@noble/hashes": "^1.3.2", + "@ucanto/interface": "^8.1.0", + "multiformats": "^11.0.2", "one-webcrypto": "^1.0.3" } }, @@ -4853,6 +4871,20 @@ "node": ">=16.15" } }, + "node_modules/@web3-storage/filecoin-client": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-1.3.0.tgz", + "integrity": "sha512-391lAc4OaTLlzZbN5jH83V/YZk39gb32niF/ikeNuVQeTNuIonEPwQBvypWAzSrZB0WsllhvXsRCl2Ec+NsjGQ==", + "dependencies": { + "@ipld/dag-cbor": "^9.0.0", + "@ipld/dag-ucan": "^3.3.2", + "@ucanto/client": "^8.0.0", + "@ucanto/core": "^8.0.0", + "@ucanto/interface": "^8.0.0", + "@ucanto/transport": "^8.0.0", + "@web3-storage/capabilities": "^9.2.1" + } + }, "node_modules/@web3-storage/multipart-parser": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/@web3-storage/multipart-parser/-/multipart-parser-1.0.0.tgz", @@ -18335,9 +18367,9 @@ } }, "@ipld/dag-ucan": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/@ipld/dag-ucan/-/dag-ucan-3.3.2.tgz", - "integrity": "sha512-EhuOrAfnudsVYIbzEIgi3itHAEo3WZNOt1VNPsYhxKBhOzDMeoTXh6/IHc7ZKBW1T2vDQHdgj4m1r64z6MssGA==", + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/@ipld/dag-ucan/-/dag-ucan-3.4.0.tgz", + "integrity": "sha512-sW4R43w3DbEdoGWWJZCwsblwXa600HCanG9p2w1MJPVBNTNjhvqc3XI0uEqKhT2oqKWrND7uInVtcPmZme7hhA==", "requires": { "@ipld/dag-cbor": "^9.0.0", "@ipld/dag-json": "^10.0.0", @@ -18394,15 +18426,23 @@ } } }, + "@noble/curves": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@noble/curves/-/curves-1.2.0.tgz", + "integrity": "sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==", + "requires": { + "@noble/hashes": "1.3.2" + } + }, "@noble/ed25519": { "version": "1.7.3", "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.3.tgz", "integrity": "sha512-iR8GBkDt0Q3GyaVcIu7mSsVIqnFbkbRzGLWlvhwunacoLwt4J3swfKhfaM6rN6WY+TBGoYT1GtT1mIh2/jGbRQ==" }, "@noble/hashes": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.1.tgz", - "integrity": "sha512-EbqwksQwz9xDRGfDST86whPBgM65E0OH/pCgqW0GBVzO22bNE+NuIbeTb714+IfSjU3aRk47EUvXIb5bTsenKA==" + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.2.tgz", + "integrity": "sha512-MVC8EAQp7MvEcm30KWENFjgR+Mkmf+D189XJTkFIlwohU5hcBbn1ZkKq7KVTi2Hme3PMGF390DaL52beVrIihQ==" }, "@nodelib/fs.scandir": { "version": "2.1.5", @@ -19600,23 +19640,25 @@ } }, "@ucanto/interface": { - "version": "8.0.0", - "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-8.0.0.tgz", - "integrity": "sha512-xeJJYdGAPKOYbCiG8BsGmyoBovZDtVya+42Gtd8fViZeNSS3h0f2BPDBS91YFOxSGswqCd2fqvrfrlg3TTMmZw==", + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/@ucanto/interface/-/interface-8.1.0.tgz", + "integrity": "sha512-n6WL9miVcN1PUq+e41hKUgZR0+Xn5sHHMQfXnt4YuLnGbh93tIgQkeGWmfUBJI+Y6C0vAFfaSCZnM6Z+kedskA==", "requires": { - "@ipld/dag-ucan": "^3.3.2", - "multiformats": "^11.0.0" + "@ipld/dag-ucan": "^3.4.0", + "multiformats": "^11.0.2" } }, "@ucanto/principal": { - "version": "8.0.0", - "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-8.0.0.tgz", - "integrity": "sha512-85IXfp8P3FGbQ5rQbLtAA2DkIgjOaqdTPXZHA2W+/UdEsJxfb1jf2TqOjHUt3PWcCbP2hqbRZWBYAKJszkW2uA==", + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/@ucanto/principal/-/principal-8.1.0.tgz", + "integrity": "sha512-tSkqpxRXP/M+GXNKqQLCmMAP+7zX7l/tKb3uygAaQwTnev4nRauklXgWx6EYDK+2d8tiOyPdL3SlG54GQPFcLQ==", "requires": { - "@ipld/dag-ucan": "^3.3.2", + "@ipld/dag-ucan": "^3.4.0", + "@noble/curves": "^1.2.0", "@noble/ed25519": "^1.7.3", - "@ucanto/interface": "^8", - "multiformats": "^11.0.0", + "@noble/hashes": "^1.3.2", + "@ucanto/interface": "^8.1.0", + "multiformats": "^11.0.2", "one-webcrypto": "^1.0.3" } }, @@ -19777,6 +19819,20 @@ "resolved": "https://registry.npmjs.org/@web3-storage/did-mailto/-/did-mailto-2.0.0.tgz", "integrity": "sha512-y0uWnAG6V0PmKCPQdiSc8eR+yOpj3kyRqlm4ByNZcYd/HUT5t9UzUFMBO7hks14JOTUbV2bphHMToBS9u+f1GQ==" }, + "@web3-storage/filecoin-client": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-client/-/filecoin-client-1.3.0.tgz", + "integrity": "sha512-391lAc4OaTLlzZbN5jH83V/YZk39gb32niF/ikeNuVQeTNuIonEPwQBvypWAzSrZB0WsllhvXsRCl2Ec+NsjGQ==", + "requires": { + "@ipld/dag-cbor": "^9.0.0", + "@ipld/dag-ucan": "^3.3.2", + "@ucanto/client": "^8.0.0", + "@ucanto/core": "^8.0.0", + "@ucanto/interface": "^8.0.0", + "@ucanto/transport": "^8.0.0", + "@web3-storage/capabilities": "^9.2.1" + } + }, "@web3-storage/multipart-parser": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/@web3-storage/multipart-parser/-/multipart-parser-1.0.0.tgz", @@ -19902,11 +19958,16 @@ "@aws-sdk/client-sqs": "^3.226.0", "@sentry/serverless": "^7.22.0", "@serverless-stack/resources": "*", + "@ucanto/client": "^8.0.1", + "@ucanto/principal": "^8.1.0", + "@ucanto/transport": "^8.0.0", "@web3-storage/data-segment": "^3.0.1", + "@web3-storage/filecoin-client": "^1.3.0", "ava": "^4.3.3", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash", "multiformats": "^12.1.1", "nanoid": "^4.0.0", + "p-defer": "^4.0.0", "testcontainers": "^8.13.0" }, "dependencies": { diff --git a/stacks/config.js b/stacks/config.js index 49424328..3a7c9755 100644 --- a/stacks/config.js +++ b/stacks/config.js @@ -122,9 +122,12 @@ export function setupSentry (app, stack) { /** * Get Env validating it is set. */ - function getEnv() { +export function getEnv() { return { SENTRY_DSN: mustGetEnv('SENTRY_DSN'), + UPLOAD_API_DID: mustGetEnv('UPLOAD_API_DID'), + AGGREGATOR_DID: mustGetEnv('AGGREGATOR_DID'), + AGGREGATOR_URL: mustGetEnv('AGGREGATOR_URL'), } } diff --git a/stacks/filecoin-stack.js b/stacks/filecoin-stack.js index f0b9598e..277671ec 100644 --- a/stacks/filecoin-stack.js +++ b/stacks/filecoin-stack.js @@ -4,11 +4,12 @@ import { use, } from '@serverless-stack/resources' import { Duration, aws_events as awsEvents } from 'aws-cdk-lib' +import { StartingPosition } from 'aws-cdk-lib/aws-lambda' import { BusStack } from './bus-stack.js' import { CarparkStack } from './carpark-stack.js' import { UploadDbStack } from './upload-db-stack.js' -import { setupSentry } from './config.js' +import { setupSentry, getEnv } from './config.js' import { CARPARK_EVENT_BRIDGE_SOURCE_EVENT } from '../carpark/event-bus/source.js' /** @@ -19,6 +20,8 @@ export function FilecoinStack({ stack, app }) { srcPath: 'filecoin' }) + const { AGGREGATOR_DID, AGGREGATOR_URL } = getEnv() + // Setup app monitoring with Sentry setupSentry(app, stack) @@ -27,7 +30,37 @@ export function FilecoinStack({ stack, app }) { // Get eventBus reference const { eventBus } = use(BusStack) // Get store table reference - const { pieceTable } = use(UploadDbStack) + const { pieceTable, privateKey } = use(UploadDbStack) + + // piece-cid reporting + pieceTable.addConsumers(stack, { + handleNewPiece: { + function: { + handler: 'functions/piece-cid-report.handler', + environment: { + AGGREGATOR_DID, + AGGREGATOR_URL, + }, + timeout: 3 * 60, + bind: [ + privateKey, + ] + }, + cdk: { + // https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.DynamoEventSourceProps.html#filters + eventSource: { + batchSize: 1, + // Start reading at the last untrimmed record in the shard in the system. + startingPosition: StartingPosition.TRIM_HORIZON, + }, + }, + filters: [ + { + eventName: ['INSERT'] + } + ] + } + }) // piece-cid compute const pieceCidComputeHandler = new Function( diff --git a/stacks/upload-api-stack.js b/stacks/upload-api-stack.js index b22544e9..b8d23a20 100644 --- a/stacks/upload-api-stack.js +++ b/stacks/upload-api-stack.js @@ -22,14 +22,13 @@ export function UploadApiStack({ stack, app }) { // Get references to constructs created in other stacks const { carparkBucket } = use(CarparkStack) - const { storeTable, uploadTable, delegationBucket, delegationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable } = use(UploadDbStack) + const { storeTable, uploadTable, delegationBucket, delegationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, privateKey } = use(UploadDbStack) const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack) // Setup API const customDomain = getCustomDomain(stack.stage, process.env.HOSTED_ZONE) const pkg = getApiPackageJson() const git = getGitInfo() - const privateKey = new Config.Secret(stack, 'PRIVATE_KEY') const ucanInvocationPostbasicAuth = new Config.Secret(stack, 'UCAN_INVOCATION_POST_BASIC_AUTH') const api = new Api(stack, 'http-gateway', { diff --git a/stacks/upload-db-stack.js b/stacks/upload-db-stack.js index 5d1d7285..a92a6d31 100644 --- a/stacks/upload-db-stack.js +++ b/stacks/upload-db-stack.js @@ -1,4 +1,4 @@ -import { Table, Bucket } from '@serverless-stack/resources' +import { Table, Bucket, Config } from '@serverless-stack/resources' import { storeTableProps, @@ -25,6 +25,8 @@ export function UploadDbStack({ stack, app }) { // Setup app monitoring with Sentry setupSentry(app, stack) + const privateKey = new Config.Secret(stack, 'PRIVATE_KEY') + /** * This table takes a stored CAR and makes an entry in the store table * Used by the store/* service capabilities. @@ -41,7 +43,11 @@ export function UploadDbStack({ stack, app }) { * This table takes a stored CAR and makes an entry in the piece table * Used by the filecoin/* service capabilities. // TODO */ - const pieceTable = new Table(stack, 'piece', pieceTableProps) + const pieceTable = new Table(stack, 'piece', { + ...pieceTableProps, + // information that will be written to the stream + stream: 'new_image', + }) /** * This table tracks the relationship between customers and providers. @@ -93,6 +99,7 @@ export function UploadDbStack({ stack, app }) { delegationBucket, delegationTable, adminMetricsTable, - spaceMetricsTable + spaceMetricsTable, + privateKey } } diff --git a/test/helpers/table.js b/test/helpers/table.js index 1d350c90..315a9ab1 100644 --- a/test/helpers/table.js +++ b/test/helpers/table.js @@ -46,8 +46,6 @@ export async function pollQueryTable (dynamo, tableName, keyConditions, options }) } catch {} - console.log('items', response?.Items) - return response?.Items && response?.Items.map(i => unmarshall(i)) }