Skip to content

Commit

Permalink
feat: trigger aggregator on computed piece
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 15, 2023
1 parent 2429663 commit e7df15f
Show file tree
Hide file tree
Showing 20 changed files with 635 additions and 94 deletions.
3 changes: 3 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ EIPFS_INDEXER_SQS_URL = 'https://sqs.us-west-2.amazonaws.com/505595374361/stagin
ACCESS_SERVICE_DID = ''
UPLOAD_API_DID = ''
ACCESS_SERVICE_URL = ''
AGGREGATOR_DID = ''
AGGREGATOR_URL = ''

POSTMARK_TOKEN = ''
R2_ACCESS_KEY_ID = ''
R2_CARPARK_BUCKET_NAME = ''
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ DID of the w3access service.

URL of the w3access service.

#### `AGGREGATOR_SERVICE_DID`

DID of the filecoin aggregator service.

#### `AGGREGATOR_SERVICE_URL`

URL of the filecoin aggregator service.

#### `UPLOAD_API_DID`

[DID](https://www.w3.org/TR/did-core/) of the upload-api ucanto server. e.g. `did:web:up.web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY`
Expand Down
11 changes: 1 addition & 10 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'

import { computePieceCid } from '../index.js'
import { mustGetEnv } from './utils.js'
import { createPieceTable } from '../tables/piece.js'

Sentry.AWSLambda.init({
Expand Down Expand Up @@ -60,16 +61,6 @@ function getEnv () {
}
}

/**
* @param {string} name
* @returns {string}
*/
function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}

/**
* Extract an EventRecord from the passed SQS Event
*
Expand Down
87 changes: 87 additions & 0 deletions filecoin/functions/piece-cid-report.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import * as Sentry from '@sentry/serverless'
import { Config } from '@serverless-stack/node/config/index.js'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Piece } from '@web3-storage/data-segment'

import { reportPieceCid } from '../index.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function pieceCidReport (event) {
const { aggregatorDid, aggregatorUrl } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one ferry to update')
}

// @ts-expect-error can't figure out type of new
const pieceRecord = unmarshall(records[0].new)
const piece = Piece.fromString(pieceRecord.piece).link

const aggregateServiceConnection = getServiceConnection({
did: aggregatorDid,
url: aggregatorUrl
})
const issuer = getServiceSigner({
privateKey
})
const audience = aggregateServiceConnection.id
/** @type {import('@web3-storage/filecoin-client/types').InvocationConfig} */
const invocationConfig = {
issuer,
audience,
with: issuer.did(),
}

const { ok, error } = await reportPieceCid({
piece,
group: issuer.did(),
aggregateServiceConnection,
invocationConfig
})

if (error) {
return {
statusCode: 500,
body: error.message || 'failed to add aggregate'
}
}

return {
statusCode: 200,
body: ok
}
}

export const handler = Sentry.AWSLambda.wrapHandler(pieceCidReport)

/**
* Get Env validating it is set.
*/
function getEnv() {
return {
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
aggregatorUrl: mustGetEnv('AGGREGATOR_URL'),
}
}

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
function parseDynamoDbEvent (event) {
return event.Records.map(r => ({
new: r.dynamodb?.NewImage,
old: r.dynamodb?.OldImage
}))
}
9 changes: 9 additions & 0 deletions filecoin/functions/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* @param {string} name
* @returns {string}
*/
export function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}
32 changes: 32 additions & 0 deletions filecoin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import { Aggregator } from '@web3-storage/filecoin-client'

import { GetCarFailed, ComputePieceFailed } from './errors.js'

Expand Down Expand Up @@ -78,3 +79,34 @@ export async function computePieceCid({
error
}
}

/**
* @param {object} props
* @param {import('@web3-storage/data-segment').PieceLink} props.piece
* @param {string} props.group
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} props.invocationConfig
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.aggregateServiceConnection
*/
export async function reportPieceCid ({
piece,
group,
invocationConfig,
aggregateServiceConnection
}) {
// Add piece for aggregation
const aggregateQueue = await Aggregator.aggregateQueue(
invocationConfig,
piece,
group,
{ connection: aggregateServiceConnection }
)

if (aggregateQueue.out.error) {
return {
error: aggregateQueue.out.error
}
}
return {
ok: {},
}
}
5 changes: 5 additions & 0 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@ucanto/client": "^8.0.1",
"@ucanto/principal": "^8.1.0",
"@ucanto/transport": "^8.0.0",
"@web3-storage/data-segment": "^3.0.1",
"@web3-storage/filecoin-client": "^1.3.0",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash",
"multiformats": "^12.1.1"
},
"devDependencies": {
"@serverless-stack/resources": "*",
"ava": "^4.3.3",
"nanoid": "^4.0.0",
"p-defer": "^4.0.0",
"testcontainers": "^8.13.0"
}
}
36 changes: 36 additions & 0 deletions filecoin/service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as ed25519 from '@ucanto/principal/ed25519'
import * as DID from '@ipld/dag-ucan/did'
import { CAR, HTTP } from '@ucanto/transport'
import { connect } from '@ucanto/client'

/**
* Given a config, return a ucanto Signer object representing the service
*
* @param {object} config
* @param {string} config.privateKey - multiformats private key of primary signing key
* @returns {import('@ucanto/principal/ed25519').Signer.Signer}
*/
export function getServiceSigner(config) {
return ed25519.parse(config.privateKey)
}

/**
*
* @param {{ did: string, url: string }} config
* @returns
*/
export function getServiceConnection (config) {
const servicePrincipal = DID.parse(config.did) // 'did:web:filecoin.web3.storage'
const serviceURL = new URL(config.url) // 'https://filecoin.web3.storage'

const serviceConnection = connect({
id: servicePrincipal,
codec: CAR.outbound,
channel: HTTP.open({
url: serviceURL,
method: 'POST',
}),
})

return serviceConnection
}
41 changes: 1 addition & 40 deletions filecoin/test/compute-piece-cid.test.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
import { test } from './helpers/context.js'

import { PutObjectCommand } from '@aws-sdk/client-s3'
import { encode } from 'multiformats/block'
import { identity } from 'multiformats/hashes/identity'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import * as pb from '@ipld/dag-pb'
import { CarBufferWriter } from '@ipld/car'
import { toString } from 'uint8arrays'
import { Piece } from '@web3-storage/data-segment'

import { createS3, createBucket, createDynamodDb } from './helpers/resources.js'
import { createDynamoTable, getItemsFromTable } from './helpers/tables.js'
import { createCar } from './helpers/car.js'

import { computePieceCid } from '../index.js'
import { pieceTableProps } from '../tables/index.js'
Expand Down Expand Up @@ -77,39 +71,6 @@ test('computes piece cid from a CAR file in the bucket', async t => {
t.is(storedItems?.[0].piece, piece.toString())
})

async function createCar () {
const id = await encode({
value: pb.prepare({ Data: 'a red car on the street!' }),
codec: pb,
hasher: identity,
})

const parent = await encode({
value: pb.prepare({ Links: [id.cid] }),
codec: pb,
hasher,
})
const car = CarBufferWriter.createWriter(Buffer.alloc(1000), {
roots: [parent.cid],
})
car.write(parent)

const body = car.close()
const digest = await hasher.digest(body)
const checksum = toString(digest.digest, 'base64pad')

const key = `${parent.cid.toString()}/${parent.cid.toString()}`
const piece = Piece.fromPayload(body)

return {
body,
checksum,
key,
link: parent.cid,
piece: piece.link
}
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
* @param {import("@aws-sdk/client-s3").S3Client} s3Client
Expand Down
40 changes: 40 additions & 0 deletions filecoin/test/helpers/car.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { encode } from 'multiformats/block'
import { identity } from 'multiformats/hashes/identity'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import * as pb from '@ipld/dag-pb'
import { CarBufferWriter } from '@ipld/car'
import { toString } from 'uint8arrays'
import { Piece } from '@web3-storage/data-segment'

export async function createCar () {
const id = await encode({
value: pb.prepare({ Data: 'a red car on the street!' }),
codec: pb,
hasher: identity,
})

const parent = await encode({
value: pb.prepare({ Links: [id.cid] }),
codec: pb,
hasher,
})
const car = CarBufferWriter.createWriter(Buffer.alloc(1000), {
roots: [parent.cid],
})
car.write(parent)

const body = car.close()
const digest = await hasher.digest(body)
const checksum = toString(digest.digest, 'base64pad')

const key = `${parent.cid.toString()}/${parent.cid.toString()}`
const piece = Piece.fromPayload(body)

return {
body,
checksum,
key,
link: parent.cid,
piece: piece.link
}
}
21 changes: 21 additions & 0 deletions filecoin/test/helpers/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as Server from '@ucanto/server'

export const OperationErrorName = /** @type {const} */ ('OperationFailed')
export class OperationFailed extends Server.Failure {
/**
* @param {string} message
* @param {import('@web3-storage/data-segment').PieceLink} piece
*/
constructor(message, piece) {
super(message)
this.piece = piece
}

get reason() {
return this.message
}

get name() {
return OperationErrorName
}
}
Loading

0 comments on commit e7df15f

Please sign in to comment.