Skip to content

Commit

Permalink
fix: try reducing batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Jul 10, 2024
1 parent cb8fce6 commit c03c285
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/serverless'
import { toString } from 'uint8arrays/to-string'
import { fromString } from 'uint8arrays/from-string'
import * as DAGJson from '@ipld/dag-json'

Expand Down Expand Up @@ -64,5 +63,5 @@ export const consumer = Sentry.AWSLambda.wrapHandler(handler)
*/
function parseKinesisEvent (event) {
const batch = event.Records.map(r => fromString(r.kinesis.data, 'base64'))
return batch.map(b => DAGJson.parse(toString(b, 'utf8')))
return batch.map(b => DAGJson.decode(b))
}
4 changes: 1 addition & 3 deletions filecoin/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export async function updateAggregateOfferTotal (ucanInvocations, ctx) {
console.log(`${workflowsWithAggregateOffers.size} aggregate offer workflows`)

// From workflows that include aggregate offer receipts, try to get the block with Pieces included in Aggregate
/** @type {AggregateOfferGet[]} */
const aggregateOfferGets = (await Promise.all(
Array.from(workflowsWithAggregateOffers.entries()).map(async ([carCid, aggregateOfferInvocation]) => {
console.log(`getting agent message for task: ${aggregateOfferInvocation.invocationCid}`)
Expand All @@ -75,8 +74,7 @@ export async function updateAggregateOfferTotal (ucanInvocations, ctx) {
console.log('update aggregate/offer total for worflow', carCid, aggregateOfferInvocation.invocationCid, 'with pieces', aggregateOfferInvocation.capabilities.map(aggregateOfferCap => aggregateOfferCap.nb.pieces.toString()))
return Promise.all(aggregateOfferInvocation.capabilities.map(aggregateOfferCap => getOfferInfoBlock(aggregateOfferCap, agentMessage.ok)))
})
// @ts-expect-error error types
)).flatMap(get => get)
)).flat()
const aggregateOfferGetError = aggregateOfferGets.find(get => get.error)
if (aggregateOfferGetError) {
throw aggregateOfferGetError.error
Expand Down
1 change: 1 addition & 0 deletions stacks/filecoin-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ export function FilecoinStack({ stack, app }) {
cdk: {
eventSource: {
...(getEventSourceConfig(stack)),
batchSize: 1,
parallelizationFactor: 10
}
}
Expand Down

0 comments on commit c03c285

Please sign in to comment.