Skip to content

Commit

Permalink
fix: datalake memory limit issue (#7018)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored Oct 23, 2024
1 parent 0bfb79b commit 498b390
Showing 1 changed file with 37 additions and 21 deletions.
58 changes: 37 additions & 21 deletions workers/datalake/src/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { copyVideo, deleteVideo } from './video'
const expires = 86400
const cacheControl = `public,max-age=${expires}`

// 64MB hash limit
const HASH_LIMIT = 64 * 1024 * 1024
// 1MB hash limit
const HASH_LIMIT = 1 * 1024 * 1024

interface BlobMetadata {
lastModified: number
Expand Down Expand Up @@ -121,6 +121,12 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr
}

export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise<Response> {
const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) {
console.error({ error: 'expected multipart/form-data' })
return error(400, 'Expected multipart/form-data')
}

const sql = postgres(env.HYPERDRIVE.connectionString)
const formData = await request.formData()

Expand Down Expand Up @@ -168,14 +174,11 @@ async function saveBlob (
const httpMetadata = { contentType: type, cacheControl }
const filename = getUniqueFilename()

const sha256hash = await getSha256(file)

if (sha256hash !== null) {
// Lucky boy, nothing to upload, use existing blob
const hash = sha256hash

if (file.size <= HASH_LIMIT) {
const hash = await getSha256(file)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// Lucky boy, nothing to upload, use existing blob
await db.createBlob(sql, { workspace, name, hash, location })
} else {
await bucket.put(filename, file, { httpMetadata })
Expand All @@ -189,11 +192,7 @@ async function saveBlob (
} else {
// For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum
const object = await bucket.put(filename, file, { httpMetadata })

const hash =
object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)

const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata })
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// We found an existing blob with the same hash
Expand All @@ -220,7 +219,7 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
throw Error('blob not found')
}

const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)

const data = await db.getData(sql, { hash, location })
if (data !== null) {
Expand All @@ -234,23 +233,40 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
}
}

async function uploadLargeFile (
bucket: R2Bucket,
file: File,
filename: string,
options: R2PutOptions
): Promise<{ hash: UUID }> {
const digestStream = new crypto.DigestStream('SHA-256')

const fileStream = file.stream()
const [digestFS, uploadFS] = fileStream.tee()

const digestPromise = digestFS.pipeTo(digestStream)
const uploadPromise = bucket.put(filename, uploadFS, options)

await Promise.all([digestPromise, uploadPromise])

const hash = digestToUUID(await digestStream.digest)

return { hash }
}

function getUniqueFilename (): UUID {
return crypto.randomUUID() as UUID
}

async function getSha256 (file: File): Promise<UUID | null> {
if (file.size > HASH_LIMIT) {
return null
}

async function getSha256 (file: File): Promise<UUID> {
const digestStream = new crypto.DigestStream('SHA-256')
await file.stream().pipeTo(digestStream)
const digest = await digestStream.digest

return toUUID(new Uint8Array(digest))
return digestToUUID(digest)
}

function getMd5Checksum (digest: ArrayBuffer): UUID {
function digestToUUID (digest: ArrayBuffer): UUID {
return toUUID(new Uint8Array(digest))
}

Expand Down

0 comments on commit 498b390

Please sign in to comment.