Skip to content

Commit

Permalink
feat: add PSA stack (#424)
Browse files Browse the repository at this point in the history
Adds a temporary stack for the old Pinning Service API (PSA) data.

The stack creates 2 functions:

- `hash` given a root CID, find a CAR file in one of the configured
buckets and calculate it's hash.
- `download` given a root CID, find a CAR file in one of the configured
buckets and return a signed URL allowing temporary access to the data.

This is for the PSA migration tool in console.

1. Users list pinned root CIDs.
2. Users get hash of CAR file containing pinned data by calling `hash`.
3. Users `blob/add` CAR file.
4. (Maybe) users need to upload the CAR, so they call `download` to get
a signed URL and then download the data, and upload it to Storacha.
  • Loading branch information
Alan Shaw authored Oct 1, 2024
1 parent 12fdbe9 commit 120908a
Show file tree
Hide file tree
Showing 24 changed files with 8,254 additions and 5,881 deletions.
2 changes: 1 addition & 1 deletion filecoin/store/invocation.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { getS3Client } from '../../lib/aws/s3.js'
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {Partial<import('../../lib/aws/s3.js').Address>} [options]
*/
export function createInvocationStore(region, bucketName, options = {}) {
const s3client = getS3Client({
Expand Down
2 changes: 1 addition & 1 deletion filecoin/store/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getS3Client } from '../../lib/aws/s3.js'
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
* @param {Partial<import('../../lib/aws/s3.js').Address>} [options]
*/
export function createWorkflowStore(region, bucketName, options = {}) {
const s3client = getS3Client({
Expand Down
2 changes: 1 addition & 1 deletion filecoin/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import anyTest from 'ava'
*
* @typedef {object} S3Context
* @property {import('@aws-sdk/client-s3').S3Client} s3Client
* @property {import('@aws-sdk/client-s3').ServiceInputTypes} s3Opts
* @property {import('../../../lib/aws/s3.js').Address} s3Opts
*
* @typedef {object} DynamoContext
* @property {string} dbEndpoint
Expand Down
13,518 changes: 7,646 additions & 5,872 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"unicorn/no-array-reduce": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/no-for-loop": "off",
"unicorn/numeric-separators-style": "off",
"unicorn/prefer-export-from": "off",
"unicorn/prefer-object-from-entries": "off",
"unicorn/prefer-set-has": "off",
Expand Down Expand Up @@ -129,6 +130,7 @@
"carpark",
"filecoin",
"indexer",
"psa",
"replicator",
"roundabout",
"tools",
Expand Down
53 changes: 53 additions & 0 deletions psa/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { base32 } from 'multiformats/bases/base32'
import { S3Client } from '@aws-sdk/client-s3'
import { createDudeWhereLocator, createHashEncodedInKeyHasher, createObjectHasher, createObjectLocator } from './lib.js'
import { mustGetEnv } from '../lib/env.js'

/** @type {import('./lib.js').Bucket[]} */
export const buckets = [
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_DOTSTORAGE_0_BUCKET_REGION') }),
mustGetEnv('S3_DOTSTORAGE_0_BUCKET_NAME'),
root => {
const s = root.toV1().toString(base32)
return `complete/${s}/${s}.car`
}
),
hasher: createObjectHasher()
},
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_DOTSTORAGE_1_BUCKET_REGION') }),
mustGetEnv('S3_DOTSTORAGE_1_BUCKET_NAME'),
root => {
const s = root.toV1().toString(base32)
return `complete/${s}/${s}.car`
}
),
hasher: createObjectHasher()
},
{
locator: createObjectLocator(
new S3Client({ region: mustGetEnv('S3_PICKUP_BUCKET_REGION') }),
mustGetEnv('S3_PICKUP_BUCKET_NAME'),
r => `pickup/${r}/${r}.root.car`
),
hasher: createObjectHasher()
},
{
locator: createDudeWhereLocator(
new S3Client({
endpoint: mustGetEnv('R2_ENDPOINT'),
credentials: {
accessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
secretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
},
region: mustGetEnv('R2_REGION')
}),
mustGetEnv('R2_DUDEWHERE_BUCKET_NAME'),
mustGetEnv('R2_CARPARK_BUCKET_NAME')
),
hasher: createHashEncodedInKeyHasher()
}
]
23 changes: 23 additions & 0 deletions psa/functions/download.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ApiHandler } from 'sst/node/api'
import * as Link from 'multiformats/link'
import { getDownloadURL, NotFound } from '../lib.js'
import * as Config from '../config.js'
import { errorResponse, okResponse } from '../util.js'

export const handler = ApiHandler(async event => {
const { searchParams } = new URL(`http://localhost/?${event.rawQueryString}`)

let root
try {
root = Link.parse(searchParams.get('root') ?? '')
} catch {
return errorResponse('Invalid "root" search parameter', 400)
}

try {
const url = await getDownloadURL(Config.buckets, root)
return okResponse({ root, url })
} catch (/** @type {any} */ err) {
return errorResponse(err.message, err instanceof NotFound ? 404 : 500)
}
})
23 changes: 23 additions & 0 deletions psa/functions/hash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ApiHandler } from 'sst/node/api'
import * as Link from 'multiformats/link'
import { getHash, NotFound } from '../lib.js'
import * as Config from '../config.js'
import { okResponse, errorResponse } from '../util.js'

export const handler = ApiHandler(async event => {
const { searchParams } = new URL(`http://localhost/?${event.rawQueryString}`)

let root
try {
root = Link.parse(searchParams.get('root') ?? '')
} catch {
return errorResponse('Invalid "root" search parameter', 400)
}

try {
const { link, size } = await getHash(Config.buckets, root)
return okResponse({ root, link, size })
} catch (/** @type {any} */ err) {
return errorResponse(err.message, err instanceof NotFound ? 404 : 500)
}
})
202 changes: 202 additions & 0 deletions psa/lib.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import crypto from 'node:crypto'
import { HeadObjectCommand, GetObjectCommand, ListObjectsV2Command } from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import * as Link from 'multiformats/link'
import * as Digest from 'multiformats/hashes/digest'
import { sha256 } from 'multiformats/hashes/sha2'

/**
* @typedef {import('@aws-sdk/client-s3').S3Client} S3Client
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {{ locator: Locator, hasher: Hasher }} Bucket
* @typedef {{ root: UnknownLink, client: S3Client, bucket: string, key: string, size: number }} Location
* @typedef {{ locate: (root: UnknownLink) => Promise<Location|undefined> }} Locator
* @typedef {{ digest: (location: Location) => Promise<import('multiformats').Link> }} Hasher
*/

const CAR_CODEC = 0x0202

/**
* Get the hash of a CAR file stored in one of the passed buckets that contains
* the complete DAG for the given root CID.
*
* @param {Bucket[]} buckets
* @param {UnknownLink} root
* @throws {NotFound}
*/
export const getHash = async (buckets, root) => {
for (const bucket of buckets) {
const location = await bucket.locator.locate(root)
if (!location) continue

const link = await bucket.hasher.digest(location)
return { link, size: location.size }
}
throw new NotFound(`not found: ${root}`)
}

/**
* Create a locator that can find a key in any S3 compatible bucket.
*
* @param {S3Client} client
* @param {string} bucketName
* @param {(root: UnknownLink) => string} encodeKey
* @returns {Locator}
*/
export const createObjectLocator = (client, bucketName, encodeKey) =>
new S3ObjectLocator(client, bucketName, encodeKey)

/** @implements {Locator} */
class S3ObjectLocator {
/**
* @param {S3Client} client
* @param {string} bucketName
* @param {(root: UnknownLink) => string} encodeKey
*/
constructor (client, bucketName, encodeKey) {
this.client = client
this.bucketName = bucketName
this.encodeKey = encodeKey
}

/** @param {UnknownLink} root */
async locate (root) {
const key = this.encodeKey(root)
const cmd = new HeadObjectCommand({ Bucket: this.bucketName, Key: key })
try {
const res = await this.client.send(cmd)
const size = res.ContentLength
if (size == null) throw new Error(`missing ContentLength: ${root}`)
return { root, client: this.client, bucket: this.bucketName, key, size }
} catch (/** @type {any} */ err) {
if (err?.$metadata.httpStatusCode !== 404) {
throw err
}
}
}
}

/**
* Creates a client that knows how to locate an object by looking in the legacy
* DUDEWHERE index bucket to find the key.
*
* @param {S3Client} client
* @param {string} indexBucketName Name of the DUDEWHERE bucket.
* @param {string} dataBucketName Name of the CARPARK bucket.
*/
export const createDudeWhereLocator = (client, indexBucketName, dataBucketName) =>
new DudeWhereLocator(client, indexBucketName, dataBucketName)

/** @implements {Locator} */
class DudeWhereLocator {
/**
* @param {S3Client} client
* @param {string} indexBucketName Name of the DUDEWHERE bucket.
* @param {string} dataBucketName Name of the CARPARK bucket.
*/
constructor (client, indexBucketName, dataBucketName) {
this.client = client
this.indexBucketName = indexBucketName
this.dataBucketName = dataBucketName
}

/** @param {UnknownLink} root */
async locate (root) {
const cmd = new ListObjectsV2Command({
Bucket: this.indexBucketName,
MaxKeys: 2,
Prefix: `${root}/`
})
const res = await this.client.send(cmd)
const contents = res.Contents

// if there's no items then it simply not found
if (!contents?.length) return
// if there's more than one item, then someone else has stored this root,
// as multiple shards, or with a different block ordering. There's no way
// to know which subset of shards contains the entire DAG.
if (contents.length > 1) return
// if no key then this is a weird situation
if (!contents[0].Key) return

const key = contents[0].Key
const locator = createObjectLocator(this.client, this.dataBucketName, () => {
const link = Link.parse(key.split('/').pop() ?? '')
return `${link}/${link}.car`
})
return locator.locate(root)
}
}

/**
* A hasher that reads data from a location and hashes it.
*
* @returns {Hasher}
*/
export const createObjectHasher = () => new ObjectHasher()

/** @implements {Hasher} */
class ObjectHasher {
/** @param {Location} location */
async digest (location) {
const cmd = new GetObjectCommand({ Bucket: location.bucket, Key: location.key })

const res = await location.client.send(cmd)
if (!res.Body) {
throw new NotFound(`Object not found: ${location.root}`) // shouldn't happen
}

const hash = crypto.createHash('sha256')
await res.Body.transformToWebStream()
.pipeTo(new WritableStream({ write: chunk => { hash.update(chunk) } }))

const digest = Digest.create(sha256.code, hash.digest())
return Link.create(CAR_CODEC, digest)
}
}

/**
* A hasher that extracts the CAR hash from the key.
*
* @returns {Hasher}
*/
export const createHashEncodedInKeyHasher = () => new HashEncodedInKeyHasher()

/** @implements {Hasher} */
class HashEncodedInKeyHasher {
/** @param {Location} location */
async digest (location) {
const filename = location.key.split('/').pop()
if (!filename || !filename.endsWith('.car')) {
throw new Error('unexpected key format')
}
const hash =
/** @type {import('multiformats').Link<unknown, number, number, 1>} */
(Link.parse(filename.replace('.car', '')))
return hash
}
}

export const DownloadURLExpiration = 1000 * 60 * 60 * 24 // 1 day in seconds

/**
* Get a signed download URL for the CAR file stored in one of the passed
* buckets that contains the complete DAG for the given root CID.
*
* @param {Bucket[]} buckets
* @param {UnknownLink} root
* @throws {NotFound}
*/
export const getDownloadURL = async (buckets, root) => {
for (const bucket of buckets) {
const location = await bucket.locator.locate(root)
if (!location) continue

const cmd = new GetObjectCommand({ Bucket: location.bucket, Key: location.key })
const url = await getSignedUrl(location.client, cmd, { expiresIn: DownloadURLExpiration })
return new URL(url)
}
throw new NotFound(`not found: ${root}`)
}

export class NotFound extends Error {}
19 changes: 19 additions & 0 deletions psa/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@web3-storage/w3infra-psa",
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "entail test/*.spec.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.658.1",
"@aws-sdk/s3-request-presigner": "^3.658.1",
"multiformats": "^13.3.0",
"sst": "^2.43.7"
},
"devDependencies": {
"@ipld/car": "^5.3.2",
"entail": "^2.1.2",
"nanoid": "^5.0.7"
}
}
14 changes: 14 additions & 0 deletions psa/test/helpers/bytes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { webcrypto } from 'node:crypto'

/** @param {number} size */
export const randomBytes = size => {
const bytes = new Uint8Array(size)
while (size) {
const chunk = new Uint8Array(Math.min(size, 65_536))
webcrypto.getRandomValues(chunk)

size -= bytes.length
bytes.set(chunk, size)
}
return bytes
}
Loading

0 comments on commit 120908a

Please sign in to comment.