Skip to content

Commit

Permalink
feat: compute piece for uploaded cars
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 14, 2023
1 parent 3756f02 commit 82ee974
Show file tree
Hide file tree
Showing 21 changed files with 979 additions and 9 deletions.
2 changes: 1 addition & 1 deletion carpark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
"nanoid": "^4.0.0",
"testcontainers": "^8.13.0"
}
}
}
66 changes: 66 additions & 0 deletions filecoin/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
export class Failure extends Error {
describe() {
return this.toString()
}
get message() {

Check failure on line 5 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return this.describe()
}
toJSON() {

Check failure on line 8 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
const { name, message, stack } = this
return { name, message, stack }
}
}

export const DatabaseOperationErrorName = /** @type {const} */ (
'DatabaseOperationFailed'
)
export class DatabaseOperationFailed extends Failure {
/**
* @param {string} message
*/
constructor(message) {

Check failure on line 21 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Useless constructor
super(message)
}
get reason() {

Check failure on line 24 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return this.message
}
get name() {

Check failure on line 27 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return DatabaseOperationErrorName
}
}

export const GetCarErrorName = /** @type {const} */ (
'GetCarFailed'
)
export class GetCarFailed extends Failure {
/**
* @param {string} message
*/
constructor(message) {

Check failure on line 39 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Useless constructor
super(message)
}
get reason() {

Check failure on line 42 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return this.message
}
get name() {

Check failure on line 45 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return GetCarErrorName
}
}

export const ComputePieceErrorName = /** @type {const} */ (
'ComputePieceFailed'
)
export class ComputePieceFailed extends Failure {
/**
* @param {string} message
*/
constructor(message) {

Check failure on line 57 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Useless constructor
super(message)
}
get reason() {

Check failure on line 60 in filecoin/errors.js

View workflow job for this annotation

GitHub Actions / Test

Expected blank line between class members
return this.message
}
get name() {
return ComputePieceErrorName
}
}
97 changes: 97 additions & 0 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'

import { computePieceCid } from '../index.js'
import { createPieceTable } from '../tables/piece.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* Get EventRecord from the SQS Event triggering the handler
*
* @param {import('aws-lambda').SQSEvent} event
*/
async function computeHandler (event) {
const {
pieceTableName,
} = getEnv()

const record = parseEvent(event)
if (!record) {
throw new Error('Unexpected sqs record format')
}

const s3Client = new S3Client({ region: record.bucketRegion })
const pieceTable = createPieceTable(AWS_REGION, pieceTableName)

const { error, ok } = await computePieceCid({
record,
s3Client,
pieceTable,
})

if (error) {
return {
statusCode: 500,
body: error.message
}
}

return {
statusCode: 200,
body: ok
}
}

export const handler = Sentry.AWSLambda.wrapHandler(computeHandler)

/**
* Get Env validating it is set.
*/
function getEnv () {
return {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
}
}

/**
* @param {string} name
* @returns {string}
*/
function mustGetEnv (name) {
const value = process.env[name]
if (!value) throw new Error(`Missing env var: ${name}`)
return value
}

/**
* Extract an EventRecord from the passed SQS Event
*
* @param {import('aws-lambda').SQSEvent} sqsEvent
* @returns {import('../index.js').EventRecord | undefined}
*/
function parseEvent (sqsEvent) {
if (sqsEvent.Records.length !== 1) {
throw new Error(
`Expected 1 CAR per invocation but received ${sqsEvent.Records.length} CARs`
)
}

const body = sqsEvent.Records[0].body
if (!body) {
return
}
const { key, bucketName, bucketRegion } = JSON.parse(body)

return {
bucketRegion,
bucketName,
key,
}
}
85 changes: 85 additions & 0 deletions filecoin/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {
GetObjectCommand,
} from '@aws-sdk/client-s3'
// @ts-expect-error needs final dep
// import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'

// import { GetCarFailed, ComputePieceFailed } from './errors.js'
import { GetCarFailed } from './errors.js'

/**
* @typedef {object} EventRecord
* @property {string} bucketRegion
* @property {string} bucketName
* @property {string} key
*
* @typedef {import('@aws-sdk/client-s3').S3Client} S3Client
* @typedef {import('@aws-sdk/client-dynamodb').DynamoDBClient} DynamoDBClient
*
* @param {DynamoDBClient} props.dynamoClient
* @param {string} props.pieceTableName
*/

/**
* Create CAR side index and write it to Satnav bucket if non existing.
*
* @param {object} props
* @param {EventRecord} props.record
* @param {S3Client} props.s3Client
* @param {import('./types').PieceTable} props.pieceTable
*/
export async function computePieceCid({
record,
s3Client,
pieceTable
}) {
const key = record.key
// CIDs in carpark are in format `${carCid}/${carCid}.car`
const cidString = key.split('/')[0]

const getCmd = new GetObjectCommand({
Bucket: record.bucketName,
Key: key,
})
const res = await s3Client.send(getCmd)
if (!res.Body) {
return {
error: new GetCarFailed(`failed to get CAR file with key ${key} in bucket ${record.bucketName}`)
}
}

// let piece
// try {
// const hasher = Hasher.create()
// const digestBytes = new Uint8Array(36)

// // @ts-expect-error aws Readable stream types are not good
// for await (const chunk of res.Body.transformToWebStream()) {
// hasher.write(chunk)
// }
// hasher.digestInto(digestBytes, 0, true)

// const digest = Digest.decode(digestBytes)
// // @ts-expect-error some properties from PieceDigest are not present in MultihashDigest
// piece = Piece.fromDigest(digest)
// } catch (/** @type {any} */ error) {
// return {
// error: new ComputePieceFailed(error.cause)
// }
// }
const piece = Piece.fromPayload(await res.Body.transformToByteArray())

// Write to table
const { ok, error } = await pieceTable.insert({
link: CID.parse(cidString),
piece: piece.link,
})

return {
ok,
error
}
}
23 changes: 23 additions & 0 deletions filecoin/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "@web3-storage/w3infra-filecoin",
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "ava --verbose --timeout=60s **/*.test.js"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.211.0",
"@aws-sdk/client-s3": "^3.211.0",
"@aws-sdk/client-sqs": "^3.226.0",
"@sentry/serverless": "^7.22.0",
"@web3-storage/data-segment": "^3.0.1",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "github:web3-storage/fr32-sha2-256-trunc254-padded-binary-tree-multihash",
"multiformats": "^12.1.1"
},
"devDependencies": {
"@serverless-stack/resources": "*",
"ava": "^4.3.3",
"nanoid": "^4.0.0",
"testcontainers": "^8.13.0"
}
}
16 changes: 16 additions & 0 deletions filecoin/tables/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/** @typedef {import('@serverless-stack/resources').TableProps} TableProps */

/** @type TableProps */
export const pieceTableProps = {
fields: {
piece: 'string', // `baga...1`
link: 'string', // `bagy...1`
aggregate: 'string', // `bagy...9`
inclusion: 'string', // TODO: Inclusion?
insertedAt: 'string', // `2022-12-24T...`
},
primaryIndex: { partitionKey: 'piece', sortKey: 'insertedAt' },
globalIndexes: {
link: { partitionKey: 'link', sortKey: 'aggregate', projection: ['piece', 'aggregate', 'inclusion', 'insertedAt'] }
}
}
64 changes: 64 additions & 0 deletions filecoin/tables/piece.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import {
DynamoDBClient,
PutItemCommand,
} from '@aws-sdk/client-dynamodb'
import { marshall } from '@aws-sdk/util-dynamodb'

import { DatabaseOperationFailed } from '../errors.js'

/**
* Abstraction layer to handle operations on Piece Table.
*
* @param {string} region
* @param {string} tableName
* @param {object} [options]
* @param {string} [options.endpoint]
* @returns {import('../types').PieceTable}
*/
export function createPieceTable (region, tableName, options = {}) {
const dynamoDb = new DynamoDBClient({
region,
endpoint: options.endpoint
})

return usePieceTable(dynamoDb, tableName)
}

/**
* @param {DynamoDBClient} dynamoDb
* @param {string} tableName
* @returns {import('../types').PieceTable}
*/
export function usePieceTable(dynamoDb, tableName) {
return {
/**
* Check if the given link CID is bound to the uploader account
*
* @param {import('../types').PieceInsertInput} input
*/
insert: async (input) => {
const insertedAt = new Date().toISOString()

const cmd = new PutItemCommand({
TableName: tableName,
Item: marshall({
link: input.link.toString(),
piece: input.piece.toString(),
insertedAt
}),
})

try {
await dynamoDb.send(cmd)
} catch (/** @type {any} */ error) {
return {
error: new DatabaseOperationFailed(error.cause)
}
}

return {
ok: {}
}
},
}
}
Loading

0 comments on commit 82ee974

Please sign in to comment.