Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add PSA stack #424

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filecoin/store/invocation.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { getS3Client } from '../../lib/aws/s3.js'
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {Partial<import('../../lib/aws/s3.js').Address>} [options]
*/
export function createInvocationStore(region, bucketName, options = {}) {
const s3client = getS3Client({
Expand Down
2 changes: 1 addition & 1 deletion filecoin/store/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getS3Client } from '../../lib/aws/s3.js'
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {Partial<import('../../lib/aws/s3.js').Address>} [options]
*/
export function createWorkflowStore(region, bucketName, options = {}) {
const s3client = getS3Client({
Expand Down
2 changes: 1 addition & 1 deletion filecoin/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import anyTest from 'ava'
*
* @typedef {object} S3Context
* @property {import('@aws-sdk/client-s3').S3Client} s3Client
* @property {import('@aws-sdk/client-s3').ServiceInputTypes} s3Opts
* @property {import('../../../lib/aws/s3.js').Address} s3Opts
*
* @typedef {object} DynamoContext
* @property {string} dbEndpoint
Expand Down
13,518 changes: 7,646 additions & 5,872 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"unicorn/no-array-reduce": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/no-for-loop": "off",
"unicorn/numeric-separators-style": "off",
"unicorn/prefer-export-from": "off",
"unicorn/prefer-object-from-entries": "off",
"unicorn/prefer-set-has": "off",
Expand Down Expand Up @@ -129,6 +130,7 @@
"carpark",
"filecoin",
"indexer",
"psa",
"replicator",
"roundabout",
"tools",
Expand Down
53 changes: 53 additions & 0 deletions psa/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { base32 } from 'multiformats/bases/base32'
import { S3Client } from '@aws-sdk/client-s3'
import { createDudeWhereLocator, createHashEncodedInKeyHasher, createObjectHasher, createObjectLocator } from './lib.js'
import { mustGetEnv } from '../lib/env.js'

/** @type {import('./lib.js').Bucket[]} */
export const buckets = [
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_DOTSTORAGE_0_BUCKET_REGION') }),
mustGetEnv('S3_DOTSTORAGE_0_BUCKET_NAME'),
root => {
const s = root.toV1().toString(base32)
return `complete/${s}/${s}.car`
}
),
hasher: createObjectHasher()
},
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_DOTSTORAGE_1_BUCKET_REGION') }),
mustGetEnv('S3_DOTSTORAGE_1_BUCKET_NAME'),
root => {
const s = root.toV1().toString(base32)
return `complete/${s}/${s}.car`
}
),
hasher: createObjectHasher()
},
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_PICKUP_BUCKET_REGION') }),
mustGetEnv('S3_PICKUP_BUCKET_NAME'),
r => `pickup/${r}/${r}.root.car`
),
hasher: createObjectHasher()
},
{
locator: createDudeWhereLocator(
new S3Client({
endpoint: mustGetEnv('R2_ENDPOINT'),
credentials: {
accessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
secretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
},
region: mustGetEnv('R2_REGION')
}),
mustGetEnv('R2_DUDEWHERE_BUCKET_NAME'),
mustGetEnv('R2_CARPARK_BUCKET_NAME')
),
hasher: createHashEncodedInKeyHasher()
}
]
23 changes: 23 additions & 0 deletions psa/functions/download.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ApiHandler } from 'sst/node/api'
import * as Link from 'multiformats/link'
import { getDownloadURL, NotFound } from '../lib.js'
import * as Config from '../config.js'
import { errorResponse, okResponse } from '../util.js'

export const handler = ApiHandler(async event => {
const { searchParams } = new URL(`http://localhost/?${event.rawQueryString}`)

let root
try {
root = Link.parse(searchParams.get('root') ?? '')
} catch {
return errorResponse('Invalid "root" search parameter', 400)
}

try {
const url = await getDownloadURL(Config.buckets, root)
return okResponse({ root, url })
} catch (/** @type {any} */ err) {
return errorResponse(err.message, err instanceof NotFound ? 404 : 500)
}
})
23 changes: 23 additions & 0 deletions psa/functions/hash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ApiHandler } from 'sst/node/api'
import * as Link from 'multiformats/link'
import { getHash, NotFound } from '../lib.js'
import * as Config from '../config.js'
import { okResponse, errorResponse } from '../util.js'

export const handler = ApiHandler(async event => {
const { searchParams } = new URL(`http://localhost/?${event.rawQueryString}`)

let root
try {
root = Link.parse(searchParams.get('root') ?? '')
} catch {
return errorResponse('Invalid "root" search parameter', 400)
}

try {
const { link, size } = await getHash(Config.buckets, root)
return okResponse({ root, link, size })
} catch (/** @type {any} */ err) {
return errorResponse(err.message, err instanceof NotFound ? 404 : 500)
}
})
202 changes: 202 additions & 0 deletions psa/lib.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import crypto from 'node:crypto'
import { HeadObjectCommand, GetObjectCommand, ListObjectsV2Command } from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import * as Link from 'multiformats/link'
import * as Digest from 'multiformats/hashes/digest'
import { sha256 } from 'multiformats/hashes/sha2'

/**
* @typedef {import('@aws-sdk/client-s3').S3Client} S3Client
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {{ locator: Locator, hasher: Hasher }} Bucket
* @typedef {{ root: UnknownLink, client: S3Client, bucket: string, key: string, size: number }} Location
* @typedef {{ locate: (root: UnknownLink) => Promise<Location|undefined> }} Locator
* @typedef {{ digest: (location: Location) => Promise<import('multiformats').Link> }} Hasher
*/

const CAR_CODEC = 0x0202

/**
* Get the hash of a CAR file stored in one of the passed buckets that contains
* the complete DAG for the given root CID.
*
* @param {Bucket[]} buckets
* @param {UnknownLink} root
* @throws {NotFound}
*/
export const getHash = async (buckets, root) => {
for (const bucket of buckets) {
const location = await bucket.locator.locate(root)
if (!location) continue

const link = await bucket.hasher.digest(location)
return { link, size: location.size }
}
throw new NotFound(`not found: ${root}`)
}

/**
* Create a locator that can find a key in any S3 compatible bucket.
*
* @param {S3Client} client
* @param {string} bucketName
* @param {(root: UnknownLink) => string} encodeKey
* @returns {Locator}
*/
export const createObjectLocator = (client, bucketName, encodeKey) =>
new S3ObjectLocator(client, bucketName, encodeKey)

/** @implements {Locator} */
class S3ObjectLocator {
/**
* @param {S3Client} client
* @param {string} bucketName
* @param {(root: UnknownLink) => string} encodeKey
*/
constructor (client, bucketName, encodeKey) {
this.client = client
this.bucketName = bucketName
this.encodeKey = encodeKey
}

/** @param {UnknownLink} root */
async locate (root) {
const key = this.encodeKey(root)
const cmd = new HeadObjectCommand({ Bucket: this.bucketName, Key: key })
try {
const res = await this.client.send(cmd)
const size = res.ContentLength
if (size == null) throw new Error(`missing ContentLength: ${root}`)
return { root, client: this.client, bucket: this.bucketName, key, size }
} catch (/** @type {any} */ err) {
if (err?.$metadata.httpStatusCode !== 404) {
throw err
}
}
}
}

/**
* Creates a client that knows how to locate an object by looking in the legacy
* DUDEWHERE index bucket to find the key.
*
* @param {S3Client} client
* @param {string} indexBucketName Name of the DUDEWHERE bucket.
* @param {string} dataBucketName Name of the CARPARK bucket.
*/
export const createDudeWhereLocator = (client, indexBucketName, dataBucketName) =>
new DudeWhereLocator(client, indexBucketName, dataBucketName)

/** @implements {Locator} */
class DudeWhereLocator {
/**
* @param {S3Client} client
* @param {string} indexBucketName Name of the DUDEWHERE bucket.
* @param {string} dataBucketName Name of the CARPARK bucket.
*/
constructor (client, indexBucketName, dataBucketName) {
this.client = client
this.indexBucketName = indexBucketName
this.dataBucketName = dataBucketName
}

/** @param {UnknownLink} root */
async locate (root) {
const cmd = new ListObjectsV2Command({
Bucket: this.indexBucketName,
MaxKeys: 2,
Prefix: `${root}/`
})
const res = await this.client.send(cmd)
const contents = res.Contents

// if there's no items then it simply not found
if (!contents?.length) return
// if there's more than one item, then someone else has stored this root,
// as multiple shards, or with a different block ordering. There's no way
// to know which subset of shards contains the entire DAG.
if (contents.length > 1) return
// if no key then this is a weird situation
if (!contents[0].Key) return

const key = contents[0].Key
const locator = createObjectLocator(this.client, this.dataBucketName, () => {
const link = Link.parse(key.split('/').pop() ?? '')
return `${link}/${link}.car`
})
return locator.locate(root)
}
}

/**
* A hasher that reads data from a location and hashes it.
*
* @returns {Hasher}
*/
export const createObjectHasher = () => new ObjectHasher()

/** @implements {Hasher} */
class ObjectHasher {
/** @param {Location} location */
async digest (location) {
const cmd = new GetObjectCommand({ Bucket: location.bucket, Key: location.key })

const res = await location.client.send(cmd)
if (!res.Body) {
throw new NotFound(`Object not found: ${location.root}`) // shouldn't happen
}

const hash = crypto.createHash('sha256')
await res.Body.transformToWebStream()
.pipeTo(new WritableStream({ write: chunk => { hash.update(chunk) } }))

const digest = Digest.create(sha256.code, hash.digest())
return Link.create(CAR_CODEC, digest)
}
}

/**
* A hasher that extracts the CAR hash from the key.
*
* @returns {Hasher}
*/
export const createHashEncodedInKeyHasher = () => new HashEncodedInKeyHasher()

/** @implements {Hasher} */
class HashEncodedInKeyHasher {
/** @param {Location} location */
async digest (location) {
const filename = location.key.split('/').pop()
if (!filename || !filename.endsWith('.car')) {
throw new Error('unexpected key format')
}
const hash =
/** @type {import('multiformats').Link<unknown, number, number, 1>} */
(Link.parse(filename.replace('.car', '')))
return hash
}
}

export const DownloadURLExpiration = 1000 * 60 * 60 * 24 // 1 day in seconds

/**
* Get a signed download URL for the CAR file stored in one of the passed
* buckets that contains the complete DAG for the given root CID.
*
* @param {Bucket[]} buckets
* @param {UnknownLink} root
* @throws {NotFound}
*/
export const getDownloadURL = async (buckets, root) => {
for (const bucket of buckets) {
const location = await bucket.locator.locate(root)
if (!location) continue

const cmd = new GetObjectCommand({ Bucket: location.bucket, Key: location.key })
const url = await getSignedUrl(location.client, cmd, { expiresIn: DownloadURLExpiration })
return new URL(url)
}
throw new NotFound(`not found: ${root}`)
}

export class NotFound extends Error {}
19 changes: 19 additions & 0 deletions psa/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@web3-storage/w3infra-psa",
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "entail test/*.spec.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.658.1",
"@aws-sdk/s3-request-presigner": "^3.658.1",
"multiformats": "^13.3.0",
"sst": "^2.43.7"
},
"devDependencies": {
"@ipld/car": "^5.3.2",
"entail": "^2.1.2",
"nanoid": "^5.0.7"
}
}
14 changes: 14 additions & 0 deletions psa/test/helpers/bytes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { webcrypto } from 'node:crypto'

/** @param {number} size */
export const randomBytes = size => {
const bytes = new Uint8Array(size)
while (size) {
const chunk = new Uint8Array(Math.min(size, 65_536))
webcrypto.getRandomValues(chunk)

size -= bytes.length
bytes.set(chunk, size)
}
return bytes
}
Loading
Loading