Skip to content

Commit

Permalink
feat: trigger aggregator on computed piece
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 15, 2023
1 parent 2429663 commit 39fdd12
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 54 deletions.
3 changes: 3 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ EIPFS_INDEXER_SQS_URL = 'https://sqs.us-west-2.amazonaws.com/505595374361/stagin
ACCESS_SERVICE_DID = ''
UPLOAD_API_DID = ''
ACCESS_SERVICE_URL = ''
AGGREGATOR_DID = ''
AGGREGATOR_URL = ''

POSTMARK_TOKEN = ''
R2_ACCESS_KEY_ID = ''
R2_CARPARK_BUCKET_NAME = ''
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ DID of the w3access service.

URL of the w3access service.

#### `AGGREGATOR_SERVICE_DID`

DID of the filecoin aggregator service.

#### `AGGREGATOR_SERVICE_URL`

URL of the filecoin aggregator service.

#### `UPLOAD_API_DID`

[DID](https://www.w3.org/TR/did-core/) of the upload-api ucanto server. e.g. `did:web:up.web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY`
Expand Down
11 changes: 1 addition & 10 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'

import { computePieceCid } from '../index.js'
import { mustGetEnv } from './utils.js'
import { createPieceTable } from '../tables/piece.js'

Sentry.AWSLambda.init({
Expand Down Expand Up @@ -60,16 +61,6 @@ function getEnv () {
}
}

/**
* @param {string} name
* @returns {string}
*/
function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}

/**
* Extract an EventRecord from the passed SQS Event
*
Expand Down
87 changes: 87 additions & 0 deletions filecoin/functions/piece-cid-report.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import * as Sentry from '@sentry/serverless'
import { Config } from '@serverless-stack/node/config/index.js'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Piece } from '@web3-storage/data-segment'

import { reportPieceCid } from '../index.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function pieceCidReport (event) {
const { aggregatorDid, aggregatorUrl } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one ferry to update')
}

// @ts-expect-error can't figure out type of new
const pieceRecord = unmarshall(records[0].new)
const piece = Piece.fromString(pieceRecord.piece).link

const aggregateServiceConnection = getServiceConnection({
did: aggregatorDid,
url: aggregatorUrl
})
const issuer = getServiceSigner({
privateKey
})
const audience = aggregateServiceConnection.id
/** @type {import('@web3-storage/filecoin-client/types').InvocationConfig} */
const invocationConfig = {
issuer,
audience,
with: issuer.did(),
}

const { ok, error } = await reportPieceCid({
piece,
group: issuer.did(),
aggregateServiceConnection,
invocationConfig
})

if (error) {
return {
statusCode: 500,
body: error.message || 'failed to add aggregate'
}
}

return {
statusCode: 200,
body: ok
}
}

export const handler = Sentry.AWSLambda.wrapHandler(pieceCidReport)

/**
* Get Env validating it is set.
*/
function getEnv() {
return {
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
aggregatorUrl: mustGetEnv('AGGREGATOR_URL'),
}
}

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
function parseDynamoDbEvent (event) {
return event.Records.map(r => ({
new: r.dynamodb?.NewImage,
old: r.dynamodb?.OldImage
}))
}
9 changes: 9 additions & 0 deletions filecoin/functions/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* @param {string} name
* @returns {string}
*/
export function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}
33 changes: 33 additions & 0 deletions filecoin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import { Aggregator } from '@web3-storage/filecoin-client'

import { GetCarFailed, ComputePieceFailed } from './errors.js'

Expand Down Expand Up @@ -78,3 +79,35 @@ export async function computePieceCid({
error
}
}

/**
* @param {object} props
* @param {import('@web3-storage/data-segment').PieceLink} props.piece
* @param {string} props.group
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} props.invocationConfig
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.aggregateServiceConnection
*/
export async function reportPieceCid ({
piece,
group,
invocationConfig,
aggregateServiceConnection
}) {
// Add piece for aggregation
const aggregateQueue = await Aggregator.aggregateQueue(
invocationConfig,
piece,
group,
{ connection: aggregateServiceConnection }
)
console.log('aggregate queue', piece, group, aggregateQueue.out)

if (aggregateQueue.out.error) {
return {
error: aggregateQueue.out.error
}
}
return {
ok: {},
}
}
4 changes: 4 additions & 0 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@ucanto/client": "^8.0.1",
"@ucanto/principal": "^8.1.0",
"@ucanto/transport": "^8.0.0",
"@web3-storage/data-segment": "^3.0.1",
"@web3-storage/filecoin-client": "^1.3.0",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash",
"multiformats": "^12.1.1"
},
Expand Down
36 changes: 36 additions & 0 deletions filecoin/service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as ed25519 from '@ucanto/principal/ed25519'
import * as DID from '@ipld/dag-ucan/did'
import { CAR, HTTP } from '@ucanto/transport'
import { connect } from '@ucanto/client'

/**
* Given a config, return a ucanto Signer object representing the service
*
* @param {object} config
* @param {string} config.privateKey - multiformats private key of primary signing key
* @returns {import('@ucanto/principal/ed25519').Signer.Signer}
*/
export function getServiceSigner(config) {
return ed25519.parse(config.privateKey)
}

/**
*
* @param {{ did: string, url: string }} config
* @returns
*/
export function getServiceConnection (config) {
const servicePrincipal = DID.parse(config.did) // 'did:web:filecoin.web3.storage'
const serviceURL = new URL(config.url) // 'https://filecoin.web3.storage'

const serviceConnection = connect({
id: servicePrincipal,
codec: CAR.outbound,
channel: HTTP.open({
url: serviceURL,
method: 'POST',
}),
})

return serviceConnection
}
Loading

0 comments on commit 39fdd12

Please sign in to comment.