Skip to content

Commit

Permalink
materialize-bigquery: flush bindings when stores are complete
Browse files Browse the repository at this point in the history
Following #1913 and various updates to other materializations that were before
that, this brings in the memory optimization to flush staged files during the
Store phase after the binding ticks over to the next one.
  • Loading branch information
williamhbaker committed Oct 16, 2024
1 parent 1df91cf commit 5c2a4cb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 15 deletions.
1 change: 1 addition & 0 deletions materialize-bigquery/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type binding struct {
loadFile *stagedFile
storeFile *stagedFile
tempTableName string
hasData bool
}

// bindingDocument is used by the load operation to fetch binding flow_document values
Expand Down
67 changes: 52 additions & 15 deletions materialize-bigquery/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,31 @@ func (t *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
var ctx = it.Context()

cleanupFiles := []func(context.Context){}

var lastBinding = -1
for it.Next() {
if lastBinding == -1 {
lastBinding = it.Binding
}

if lastBinding != it.Binding {
// Flush the staged file(s) for the binding now that it's stores are
// fully processed.
var b = t.bindings[lastBinding]
// There may be no staged file if the binding has received nothing
// but hard deletion requests for keys that aren't in the
// destination table.
if b.storeFile.started {
cleanupFn, err := b.storeFile.flush()
if err != nil {
return nil, fmt.Errorf("flushing staged files for collection %q: %w", b.target.Source.String(), err)
}
cleanupFiles = append(cleanupFiles, cleanupFn)
}
lastBinding = it.Binding
}

var b = t.bindings[it.Binding]

var flowDocument = it.RawJSON
Expand All @@ -263,15 +287,25 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
}

b.storeFile.start()

converted, err := b.target.ConvertAll(it.Key, it.Values, flowDocument)
if err != nil {
b.hasData = true
if converted, err := b.target.ConvertAll(it.Key, it.Values, flowDocument); err != nil {
return nil, fmt.Errorf("converting store parameters: %w", err)
} else if err = b.storeFile.encodeRow(ctx, converted); err != nil {
return nil, fmt.Errorf("encoding Store to scratch file: %w", err)
}
}
if it.Err() != nil {
return nil, it.Err()
}

if err = b.storeFile.encodeRow(ctx, converted); err != nil {
return nil, fmt.Errorf("encoding Store to scratch file: %w", err)
// Flush the final binding.
if lastBinding != -1 {
var b = t.bindings[lastBinding]
cleanupFn, err := b.storeFile.flush()
if err != nil {
return nil, fmt.Errorf("final binding flushing staged files for collection %q: %w", b.target.Source.String(), err)
}
cleanupFiles = append(cleanupFiles, cleanupFn)
}

return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint) (*pf.ConnectorState, m.OpFuture) {
Expand All @@ -280,11 +314,17 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err))
}

return nil, pf.RunAsyncOperation(func() error { return t.commit(ctx) })
return nil, pf.RunAsyncOperation(func() error { return t.commit(ctx, cleanupFiles) })
}, nil
}

func (t *transactor) commit(ctx context.Context) error {
func (t *transactor) commit(ctx context.Context, cleanupFiles []func(context.Context)) error {
defer func() {
for _, f := range cleanupFiles {
f(ctx)
}
}()

// Build the slice of transactions required for a commit.
var subqueries []string

Expand All @@ -303,25 +343,22 @@ func (t *transactor) commit(ctx context.Context) error {
// This is the map of external table references we will populate. Loop through the bindings and
// append the SQL for that table.
var edcTableDefs = make(map[string]bigquery.ExternalData)
for idx, b := range t.bindings {
if !b.storeFile.started {
for _, b := range t.bindings {
if !b.hasData {
// No stores for this binding.
continue
}

delete, err := b.storeFile.flush()
if err != nil {
return fmt.Errorf("flushing store file for binding[%d]: %w", idx, err)
}
defer delete(ctx)

edcTableDefs[b.tempTableName] = b.storeFile.edc()

if b.target.DeltaUpdates {
subqueries = append(subqueries, b.storeInsertSQL)
} else {
subqueries = append(subqueries, b.storeUpdateSQL)
}

// Reset for the next round.
b.hasData = false
}

// Complete the transaction and return the appropriate error.
Expand Down

0 comments on commit 5c2a4cb

Please sign in to comment.