From be2bd0510d4d01ca23396fbf5ebf9952e8f4c80a Mon Sep 17 00:00:00 2001 From: Will Baker Date: Tue, 1 Oct 2024 09:33:24 -0400 Subject: [PATCH] materialize-postgres: limit the absolute number of documents in a queued batch There is a crude limit on batch size based on queued document sizes already, but it seems that very small documents (particularly small load keys) have a high ratio of "overhead" to their actual data size, and that can result in excessive memory usage if there are lot of these documents to process. This adds an absolute limit on the number of queries that can be queued at once, to prevent tiny documents from blowing up the connector's memory. A limit on number alone is not enough since documents can be huge, and so it seems that a limit on queued bytes is not enough since documents can be tiny. --- materialize-postgres/driver.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index b267cc295..17be5c20c 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -31,6 +31,12 @@ const ( // example, if documents average 2kb then a 10mb batch size will allow for ~5000 documents per // batch. batchBytesLimit = 10 * 1024 * 1024 // Bytes + + // Limit the maximum number of documents in a buffered batch as well since + // very small documents (particularly keys to load) have a much higher ratio + // of "overhead" to actual data size, and the batchBytesLimit alone may + // result in excessive memory use. + batchSizeLimit = 5000 ) type sshForwarding struct { @@ -381,7 +387,7 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) batch.Queue(b.loadInsertSQL, converted...) } - if batchBytes >= batchBytesLimit { + if batchBytes >= batchBytesLimit || batch.Len() > batchSizeLimit { if err := sendBatch(ctx, txn, &batch); err != nil { return fmt.Errorf("sending load batch: %w", err) } @@ -470,7 +476,7 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error) } } - if batchBytes >= batchBytesLimit { + if batchBytes >= batchBytesLimit || batch.Len() > batchSizeLimit { if err := sendBatch(ctx, txn, &batch); err != nil { return nil, fmt.Errorf("sending store batch: %w", err) }