Skip to content

Commit

Permalink
materialize-redshift: compress checkpoints
Browse files Browse the repository at this point in the history
Adds compression for checkpoints that are stored in the checkpoints table, which
should significantly increase the maximum size of checkpoint the 1 MiB VARBYTE
column can hold.

VARBYTE columns can also be made larger (with added query overhead), so if this
still ends up not being enough we can add automatic enlargement of the VARBYTE
columns, similar to what we have for VARCHAR columns in Redshift already.

The changes in this commit really lean into Redshift handling metadata in its
own weird way. Previously the general "install fence" routine was modified to
all a decoding callback which was only used by Redshift. Rather than add an
additional encoding callback that only Redshift would use, we'll just have
Redshift do its own thing entirely and simplify the general case for all other
materializations. Having more direct control over the query construction for
Redshift has the added benefit of not needing to do a separate Hex decode within
the connector, since we can query the known encoding directly.
  • Loading branch information
williamhbaker committed Oct 1, 2024
1 parent 206e4d9 commit 9882d72
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 98 deletions.
3 changes: 1 addition & 2 deletions materialize-motherduck/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
}

func (c *client) Close() {
Expand Down
3 changes: 1 addition & 2 deletions materialize-mysql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -174,7 +173,7 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
Expand Down
3 changes: 1 addition & 2 deletions materialize-postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -179,7 +178,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
}

func (c *client) Close() {
Expand Down
9 changes: 0 additions & 9 deletions materialize-redshift/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,6 @@ WHERE "a-schema".key_value.key1 = r.key1 AND "a-schema".key_value.key2 = r.key2

--- End "a-schema".delta_updates deleteQuery ---

--- Begin Fence Update ---
UPDATE path."to".checkpoints
SET checkpoint = 'AAECAwQFBgcICQ=='
WHERE materialization = 'some/Materialization'
AND key_begin = 1122867
AND key_end = 4293844428
AND fence = 123;
--- End Fence Update ---

--- Begin "a-schema".key_value createLoadTable (no varchar length) ---
CREATE TEMPORARY TABLE flow_temp_table_0 (
key1 BIGINT,
Expand Down
232 changes: 189 additions & 43 deletions materialize-redshift/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
stdsql "database/sql"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -24,9 +23,8 @@ import (
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"

_ "github.com/jackc/pgx/v5/stdlib"
)

var _ sql.SchemaManager = (*client)(nil)
Expand Down Expand Up @@ -83,14 +81,12 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er
return fmt.Errorf("application logic error - specBytes was not a spec: %w", err)
}

var gzb bytes.Buffer
w := gzip.NewWriter(&gzb)
if _, err := w.Write(specBytes); err != nil {
compressed, err := compressBytes(specBytes)
if err != nil {
return fmt.Errorf("compressing spec bytes: %w", err)
} else if err := w.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
}
updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(gzb.Bytes())

updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(compressed)

_, err = c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...)
return err
Expand Down Expand Up @@ -259,55 +255,46 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr {
}

func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) {
specHex, version, err := sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization)
if err != nil {
var version, spec string

if err := c.db.QueryRowContext(
ctx,
fmt.Sprintf(
"SELECT version, FROM_VARBYTE(spec, 'utf8') FROM %s WHERE materialization = %s;",
specs.Identifier,
specs.Keys[0].Placeholder,
),
materialization.String(),
).Scan(&version, &spec); err != nil {
return "", "", err
}

specBytesFromHex, err := hex.DecodeString(specHex)
if err != nil {
return "", "", fmt.Errorf("hex.DecodeString: %w", err)
}

specBytes, err := base64.StdEncoding.DecodeString(string(specBytesFromHex))
if err != nil {
if specBytes, err := base64.StdEncoding.DecodeString(spec); err != nil {
return "", "", fmt.Errorf("base64.DecodeString: %w", err)
}

// Handle specs that were persisted prior to compressing their byte content, as well as current
// specs that are compressed.
if specBytes[0] == 0x1f && specBytes[1] == 0x8b { // Valid gzip header bytes
if r, err := gzip.NewReader(bytes.NewReader(specBytes)); err != nil {
return "", "", err
} else if specBytes, err = io.ReadAll(r); err != nil {
return "", "", fmt.Errorf("reading compressed specBytes: %w", err)
}
} else if specBytes, err = maybeDecompressBytes(specBytes); err != nil {
return "", "", fmt.Errorf("decompressing spec: %w", err)
} else {
// Legacy spec that hasn't been re-persisted yet.
log.Info("loaded uncompressed spec")
return base64.StdEncoding.EncodeToString(specBytes), version, nil
}

return base64.StdEncoding.EncodeToString(specBytes), version, nil
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
return c.withDB(func(db *stdsql.DB) error { return sql.StdSQLExecStatements(ctx, db, statements) })
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
var err = c.withDB(func(db *stdsql.DB) error {
if err := c.withDB(func(db *stdsql.DB) error {
var err error
fence, err = sql.StdInstallFence(ctx, db, checkpoints, fence, func(fenceHex string) ([]byte, error) {
fenceHexBytes, err := hex.DecodeString(fenceHex)
if err != nil {
return nil, err
}
fence, err = installFence(ctx, db, checkpoints, fence)
if err != nil {
return fmt.Errorf("installing fence: %w", err)
}
return nil
}); err != nil {
return sql.Fence{}, err
}

return base64.StdEncoding.DecodeString(string(fenceHexBytes))
})
return err
})
return fence, err
return fence, nil
}

func (c *client) Close() {
Expand All @@ -322,3 +309,162 @@ func (c *client) withDB(fn func(*stdsql.DB) error) error {
defer db.Close()
return fn(db)
}

func compressBytes(b []byte) ([]byte, error) {
var gzb bytes.Buffer
w := gzip.NewWriter(&gzb)
if _, err := w.Write(b); err != nil {
return nil, fmt.Errorf("compressing bytes: %w", err)
} else if err := w.Close(); err != nil {
return nil, fmt.Errorf("closing gzip writer: %w", err)
}
return gzb.Bytes(), nil
}

func maybeDecompressBytes(b []byte) ([]byte, error) {
if b[0] == 0x1f && b[1] == 0x8b { // Valid gzip header bytes
var out bytes.Buffer
if r, err := gzip.NewReader(bytes.NewReader(b)); err != nil {
return nil, fmt.Errorf("decompressing bytes: %w", err)
} else if _, err = io.Copy(&out, r); err != nil {
return nil, fmt.Errorf("reading decompressed bytes: %w", err)
}
return out.Bytes(), nil
} else {
log.Info("loaded uncompressed bytes")
return b, nil
}
}

// installFence is a modified version of sql.StdInstallFence that handles
// compression of the checkpoint of reading varbyte values from Redshift.
func installFence(ctx context.Context, db *stdsql.DB, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
// TODO(whb): With the historical usage of sql.StdInstallFence, we were actually
// base64 encoding the checkpoint bytes and then sending that UTF8 string to
// Redshift, which stores those characters as bytes in the VARBYTE column. A
// slightly more direct & efficient way to handle this would be to store the
// bytes directly using TO_VARBYTE(checkpoint, 'base64'). This would require
// handling for the pre-existing checkpoints that were encoded in the previous
// way, and is not being implemented right now.
var txn, err = db.BeginTx(ctx, nil)
if err != nil {
return sql.Fence{}, fmt.Errorf("db.BeginTx: %w", err)
}
defer func() {
if txn != nil {
_ = txn.Rollback()
}
}()

// Increment the fence value of _any_ checkpoint which overlaps our key range.
if _, err = txn.Exec(
fmt.Sprintf(`
UPDATE %s
SET fence=fence+1
WHERE materialization=%s
AND key_end>=%s
AND key_begin<=%s
;
`,
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
); err != nil {
return sql.Fence{}, fmt.Errorf("incrementing fence: %w", err)
}

// Read the checkpoint with the narrowest [key_begin, key_end] which fully overlaps our range.
var readBegin, readEnd uint32
var checkpoint string

if err = txn.QueryRow(
fmt.Sprintf(`
SELECT fence, key_begin, key_end, FROM_VARBYTE(checkpoint, 'utf8')
FROM %s
WHERE materialization=%s
AND key_begin<=%s
AND key_end>=%s
ORDER BY key_end - key_begin ASC
LIMIT 1
;
`,
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
).Scan(&fence.Fence, &readBegin, &readEnd, &checkpoint); err == stdsql.ErrNoRows {
// Set an invalid range, which compares as unequal to trigger an insertion below.
readBegin, readEnd = 1, 0
} else if err != nil {
return sql.Fence{}, fmt.Errorf("scanning fence and checkpoint: %w", err)
} else if base64Bytes, err := base64.StdEncoding.DecodeString(checkpoint); err != nil {
return sql.Fence{}, fmt.Errorf("base64.Decode(string(decompressed)): %w", err)
} else if fence.Checkpoint, err = maybeDecompressBytes(base64Bytes); err != nil {
return sql.Fence{}, fmt.Errorf("maybeDecompressBytes(fenceHexBytes): %w", err)
}

// If a checkpoint for this exact range doesn't exist then insert it now.
if readBegin == fence.KeyBegin && readEnd == fence.KeyEnd {
// Exists; no-op.
} else if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil {
return sql.Fence{}, fmt.Errorf("compressing checkpoint: %w", err)
} else if _, err = txn.Exec(
fmt.Sprintf(
"INSERT INTO %s (materialization, key_begin, key_end, fence, checkpoint) VALUES (%s, %s, %s, %s, %s);",
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
checkpoints.Values[0].Placeholder,
checkpoints.Values[1].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
fence.Fence,
base64.StdEncoding.EncodeToString(compressedCheckpoint),
); err != nil {
return sql.Fence{}, fmt.Errorf("inserting fence: %w", err)
}

err = txn.Commit()
txn = nil // Disable deferred rollback.

if err != nil {
return sql.Fence{}, fmt.Errorf("txn.Commit: %w", err)
}
return fence, nil
}

// updateFence updates a fence and reports if the materialization instance was
// fenced off. It handles compression of the checkpoint, and is used instead of
// the typical templated fence update query because of that.
func updateFence(ctx context.Context, txn pgx.Tx, dialect sql.Dialect, fence sql.Fence) error {
if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil {
return fmt.Errorf("compressing checkpoint: %w", err)
} else if res, err := txn.Exec(ctx, fmt.Sprintf(
"UPDATE %s SET checkpoint = $1 WHERE materialization = $2 AND key_begin = $3 AND key_end = $4 AND fence = $5;",
dialect.Identifier(fence.TablePath...),
),
base64.StdEncoding.EncodeToString(compressedCheckpoint),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
fence.Fence,
); err != nil {
return fmt.Errorf("fetching fence update rows: %w", err)
} else if res.RowsAffected() != 1 {
return fmt.Errorf("this instance was fenced off by another")
}

return nil
}
19 changes: 4 additions & 15 deletions materialize-redshift/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,20 +769,11 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err))
}

var fenceUpdate strings.Builder
if err := d.templates.updateFence.Execute(&fenceUpdate, d.fence); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("evaluating fence update template: %w", err))
}

return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, fenceUpdate.String(), varcharColumnUpdates) })
return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, varcharColumnUpdates) })
}, nil
}

func (d *transactor) commit(
ctx context.Context,
fenceUpdate string,
varcharColumnUpdates map[string][]string,
) error {
func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string][]string) error {
defer func() {
for _, b := range d.bindings {
// Arrange to clean up any staged files once this commit attempt is
Expand Down Expand Up @@ -912,10 +903,8 @@ func (d *transactor) commit(

log.Info("store: finished encoding and uploading of files")

if fenceRes, err := txn.Exec(ctx, fenceUpdate); err != nil {
return fmt.Errorf("fetching fence update rows: %w", err)
} else if fenceRes.RowsAffected() != 1 {
return errors.New("this instance was fenced off by another")
if err := updateFence(ctx, txn, d.dialect, d.fence); err != nil {
return err
} else if err := txn.Commit(ctx); err != nil {
return fmt.Errorf("committing store transaction: %w", err)
}
Expand Down
Loading

0 comments on commit 9882d72

Please sign in to comment.