Skip to content

Commit

Permalink
materialize-postgres: limit the absolute number of documents in a que…
Browse files Browse the repository at this point in the history
…ued batch

There is a crude limit on batch size based on queued document sizes already, but
it seems that very small documents (particularly small load keys) have a high
ratio of "overhead" to their actual data size, and that can result in excessive
memory usage if there are lot of these documents to process.

This adds an absolute limit on the number of queries that can be queued at once,
to prevent tiny documents from blowing up the connector's memory. A limit on
number alone is not enough since documents can be huge, and so it seems that a
limit on queued bytes is not enough since documents can be tiny.
  • Loading branch information
williamhbaker committed Oct 1, 2024
1 parent 9a05f53 commit ccca69c
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions materialize-postgres/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const (
// example, if documents average 2kb then a 10mb batch size will allow for ~5000 documents per
// batch.
batchBytesLimit = 10 * 1024 * 1024 // Bytes

// Limit the maximum number of documents in a buffered batch as well since
// very small documents (particularly keys to load) have a much higher ratio
// of "overhead" to actual data size, and the batchBytesLimit alone may
// result in excessive memory use.
batchSizeLimit = 5000
)

type sshForwarding struct {
Expand Down Expand Up @@ -381,7 +387,7 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
batch.Queue(b.loadInsertSQL, converted...)
}

if batchBytes >= batchBytesLimit {
if batchBytes >= batchBytesLimit || batch.Len() > batchSizeLimit {
if err := sendBatch(ctx, txn, &batch); err != nil {
return fmt.Errorf("sending load batch: %w", err)
}
Expand Down Expand Up @@ -470,7 +476,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}
}

if batchBytes >= batchBytesLimit {
if batchBytes >= batchBytesLimit || batch.Len() > batchSizeLimit {
if err := sendBatch(ctx, txn, &batch); err != nil {
return nil, fmt.Errorf("sending store batch: %w", err)
}
Expand Down

0 comments on commit ccca69c

Please sign in to comment.