diff --git a/materialize-motherduck/client.go b/materialize-motherduck/client.go index 75215bcf8..610e229ec 100644 --- a/materialize-motherduck/client.go +++ b/materialize-motherduck/client.go @@ -3,7 +3,6 @@ package main import ( "context" stdsql "database/sql" - "encoding/base64" "errors" "fmt" "net/http" @@ -202,7 +201,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error } func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { - return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString) + return sql.StdInstallFence(ctx, c.db, checkpoints, fence) } func (c *client) Close() { diff --git a/materialize-mysql/client.go b/materialize-mysql/client.go index db483584b..d94299939 100644 --- a/materialize-mysql/client.go +++ b/materialize-mysql/client.go @@ -3,7 +3,6 @@ package main import ( "context" stdsql "database/sql" - "encoding/base64" "errors" "fmt" "net" @@ -174,7 +173,7 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er } func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { - return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString) + return sql.StdInstallFence(ctx, c.db, checkpoints, fence) } func (c *client) ExecStatements(ctx context.Context, statements []string) error { diff --git a/materialize-postgres/client.go b/materialize-postgres/client.go index de0eaf734..ea513c931 100644 --- a/materialize-postgres/client.go +++ b/materialize-postgres/client.go @@ -3,7 +3,6 @@ package main import ( "context" stdsql "database/sql" - "encoding/base64" "errors" "fmt" "net" @@ -179,7 +178,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error } func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { - return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString) + return sql.StdInstallFence(ctx, c.db, checkpoints, fence) } func (c *client) Close() { diff --git a/materialize-redshift/.snapshots/TestSQLGeneration b/materialize-redshift/.snapshots/TestSQLGeneration index a73ea755f..bae828923 100644 --- a/materialize-redshift/.snapshots/TestSQLGeneration +++ b/materialize-redshift/.snapshots/TestSQLGeneration @@ -146,15 +146,6 @@ WHERE "a-schema".key_value.key1 = r.key1 AND "a-schema".key_value.key2 = r.key2 --- End "a-schema".delta_updates deleteQuery --- ---- Begin Fence Update --- -UPDATE path."to".checkpoints - SET checkpoint = 'AAECAwQFBgcICQ==' - WHERE materialization = 'some/Materialization' - AND key_begin = 1122867 - AND key_end = 4293844428 - AND fence = 123; ---- End Fence Update --- - --- Begin "a-schema".key_value createLoadTable (no varchar length) --- CREATE TEMPORARY TABLE flow_temp_table_0 ( key1 BIGINT, diff --git a/materialize-redshift/client.go b/materialize-redshift/client.go index 71eabb32f..265fb0562 100644 --- a/materialize-redshift/client.go +++ b/materialize-redshift/client.go @@ -6,7 +6,6 @@ import ( "context" stdsql "database/sql" "encoding/base64" - "encoding/hex" "errors" "fmt" "io" @@ -24,9 +23,8 @@ import ( pf "github.com/estuary/flow/go/protocols/flow" "github.com/google/uuid" "github.com/jackc/pgconn" + "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" - - _ "github.com/jackc/pgx/v5/stdlib" ) var _ sql.SchemaManager = (*client)(nil) @@ -83,14 +81,12 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er return fmt.Errorf("application logic error - specBytes was not a spec: %w", err) } - var gzb bytes.Buffer - w := gzip.NewWriter(&gzb) - if _, err := w.Write(specBytes); err != nil { + compressed, err := compressBytes(specBytes) + if err != nil { return fmt.Errorf("compressing spec bytes: %w", err) - } else if err := w.Close(); err != nil { - return fmt.Errorf("closing gzip writer: %w", err) } - updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(gzb.Bytes()) + + updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(compressed) _, err = c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...) return err @@ -259,35 +255,27 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr { } func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) { - specHex, version, err := sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization) - if err != nil { + var version, spec string + + if err := c.db.QueryRowContext( + ctx, + fmt.Sprintf( + "SELECT version, FROM_VARBYTE(spec, 'utf8') FROM %s WHERE materialization = %s;", + specs.Identifier, + specs.Keys[0].Placeholder, + ), + materialization.String(), + ).Scan(&version, &spec); err != nil { return "", "", err } - specBytesFromHex, err := hex.DecodeString(specHex) - if err != nil { - return "", "", fmt.Errorf("hex.DecodeString: %w", err) - } - - specBytes, err := base64.StdEncoding.DecodeString(string(specBytesFromHex)) - if err != nil { + if specBytes, err := base64.StdEncoding.DecodeString(spec); err != nil { return "", "", fmt.Errorf("base64.DecodeString: %w", err) - } - - // Handle specs that were persisted prior to compressing their byte content, as well as current - // specs that are compressed. - if specBytes[0] == 0x1f && specBytes[1] == 0x8b { // Valid gzip header bytes - if r, err := gzip.NewReader(bytes.NewReader(specBytes)); err != nil { - return "", "", err - } else if specBytes, err = io.ReadAll(r); err != nil { - return "", "", fmt.Errorf("reading compressed specBytes: %w", err) - } + } else if specBytes, err = maybeDecompressBytes(specBytes); err != nil { + return "", "", fmt.Errorf("decompressing spec: %w", err) } else { - // Legacy spec that hasn't been re-persisted yet. - log.Info("loaded uncompressed spec") + return base64.StdEncoding.EncodeToString(specBytes), version, nil } - - return base64.StdEncoding.EncodeToString(specBytes), version, nil } func (c *client) ExecStatements(ctx context.Context, statements []string) error { @@ -295,19 +283,18 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error } func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { - var err = c.withDB(func(db *stdsql.DB) error { + if err := c.withDB(func(db *stdsql.DB) error { var err error - fence, err = sql.StdInstallFence(ctx, db, checkpoints, fence, func(fenceHex string) ([]byte, error) { - fenceHexBytes, err := hex.DecodeString(fenceHex) - if err != nil { - return nil, err - } + fence, err = installFence(ctx, db, checkpoints, fence) + if err != nil { + return fmt.Errorf("installing fence: %w", err) + } + return nil + }); err != nil { + return sql.Fence{}, err + } - return base64.StdEncoding.DecodeString(string(fenceHexBytes)) - }) - return err - }) - return fence, err + return fence, nil } func (c *client) Close() { @@ -322,3 +309,162 @@ func (c *client) withDB(fn func(*stdsql.DB) error) error { defer db.Close() return fn(db) } + +func compressBytes(b []byte) ([]byte, error) { + var gzb bytes.Buffer + w := gzip.NewWriter(&gzb) + if _, err := w.Write(b); err != nil { + return nil, fmt.Errorf("compressing bytes: %w", err) + } else if err := w.Close(); err != nil { + return nil, fmt.Errorf("closing gzip writer: %w", err) + } + return gzb.Bytes(), nil +} + +func maybeDecompressBytes(b []byte) ([]byte, error) { + if b[0] == 0x1f && b[1] == 0x8b { // Valid gzip header bytes + var out bytes.Buffer + if r, err := gzip.NewReader(bytes.NewReader(b)); err != nil { + return nil, fmt.Errorf("decompressing bytes: %w", err) + } else if _, err = io.Copy(&out, r); err != nil { + return nil, fmt.Errorf("reading decompressed bytes: %w", err) + } + return out.Bytes(), nil + } else { + log.Info("loaded uncompressed bytes") + return b, nil + } +} + +// installFence is a modified version of sql.StdInstallFence that handles +// compression of the checkpoint of reading varbyte values from Redshift. +func installFence(ctx context.Context, db *stdsql.DB, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) { + // TODO(whb): With the historical usage of sql.StdInstallFence, we were actually + // base64 encoding the checkpoint bytes and then sending that UTF8 string to + // Redshift, which stores those characters as bytes in the VARBYTE column. A + // slightly more direct & efficient way to handle this would be to store the + // bytes directly using TO_VARBYTE(checkpoint, 'base64'). This would require + // handling for the pre-existing checkpoints that were encoded in the previous + // way, and is not being implemented right now. + var txn, err = db.BeginTx(ctx, nil) + if err != nil { + return sql.Fence{}, fmt.Errorf("db.BeginTx: %w", err) + } + defer func() { + if txn != nil { + _ = txn.Rollback() + } + }() + + // Increment the fence value of _any_ checkpoint which overlaps our key range. + if _, err = txn.Exec( + fmt.Sprintf(` + UPDATE %s + SET fence=fence+1 + WHERE materialization=%s + AND key_end>=%s + AND key_begin<=%s + ; + `, + checkpoints.Identifier, + checkpoints.Keys[0].Placeholder, + checkpoints.Keys[1].Placeholder, + checkpoints.Keys[2].Placeholder, + ), + fence.Materialization, + fence.KeyBegin, + fence.KeyEnd, + ); err != nil { + return sql.Fence{}, fmt.Errorf("incrementing fence: %w", err) + } + + // Read the checkpoint with the narrowest [key_begin, key_end] which fully overlaps our range. + var readBegin, readEnd uint32 + var checkpoint string + + if err = txn.QueryRow( + fmt.Sprintf(` + SELECT fence, key_begin, key_end, FROM_VARBYTE(checkpoint, 'utf8') + FROM %s + WHERE materialization=%s + AND key_begin<=%s + AND key_end>=%s + ORDER BY key_end - key_begin ASC + LIMIT 1 + ; + `, + checkpoints.Identifier, + checkpoints.Keys[0].Placeholder, + checkpoints.Keys[1].Placeholder, + checkpoints.Keys[2].Placeholder, + ), + fence.Materialization, + fence.KeyBegin, + fence.KeyEnd, + ).Scan(&fence.Fence, &readBegin, &readEnd, &checkpoint); err == stdsql.ErrNoRows { + // Set an invalid range, which compares as unequal to trigger an insertion below. + readBegin, readEnd = 1, 0 + } else if err != nil { + return sql.Fence{}, fmt.Errorf("scanning fence and checkpoint: %w", err) + } else if base64Bytes, err := base64.StdEncoding.DecodeString(checkpoint); err != nil { + return sql.Fence{}, fmt.Errorf("base64.Decode(string(decompressed)): %w", err) + } else if fence.Checkpoint, err = maybeDecompressBytes(base64Bytes); err != nil { + return sql.Fence{}, fmt.Errorf("maybeDecompressBytes(fenceHexBytes): %w", err) + } + + // If a checkpoint for this exact range doesn't exist then insert it now. + if readBegin == fence.KeyBegin && readEnd == fence.KeyEnd { + // Exists; no-op. + } else if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil { + return sql.Fence{}, fmt.Errorf("compressing checkpoint: %w", err) + } else if _, err = txn.Exec( + fmt.Sprintf( + "INSERT INTO %s (materialization, key_begin, key_end, fence, checkpoint) VALUES (%s, %s, %s, %s, %s);", + checkpoints.Identifier, + checkpoints.Keys[0].Placeholder, + checkpoints.Keys[1].Placeholder, + checkpoints.Keys[2].Placeholder, + checkpoints.Values[0].Placeholder, + checkpoints.Values[1].Placeholder, + ), + fence.Materialization, + fence.KeyBegin, + fence.KeyEnd, + fence.Fence, + base64.StdEncoding.EncodeToString(compressedCheckpoint), + ); err != nil { + return sql.Fence{}, fmt.Errorf("inserting fence: %w", err) + } + + err = txn.Commit() + txn = nil // Disable deferred rollback. + + if err != nil { + return sql.Fence{}, fmt.Errorf("txn.Commit: %w", err) + } + return fence, nil +} + +// updateFence updates a fence and reports if the materialization instance was +// fenced off. It handles compression of the checkpoint, and is used instead of +// the typical templated fence update query because of that. +func updateFence(ctx context.Context, txn pgx.Tx, dialect sql.Dialect, fence sql.Fence) error { + if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil { + return fmt.Errorf("compressing checkpoint: %w", err) + } else if res, err := txn.Exec(ctx, fmt.Sprintf( + "UPDATE %s SET checkpoint = $1 WHERE materialization = $2 AND key_begin = $3 AND key_end = $4 AND fence = $5;", + dialect.Identifier(fence.TablePath...), + ), + base64.StdEncoding.EncodeToString(compressedCheckpoint), + fence.Materialization, + fence.KeyBegin, + fence.KeyEnd, + fence.Fence, + ); err != nil { + return fmt.Errorf("fetching fence update rows: %w", err) + } else if res.RowsAffected() != 1 { + return fmt.Errorf("this instance was fenced off by another") + } + + return nil +} diff --git a/materialize-redshift/driver.go b/materialize-redshift/driver.go index e735d10af..8398c8366 100644 --- a/materialize-redshift/driver.go +++ b/materialize-redshift/driver.go @@ -769,20 +769,11 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err)) } - var fenceUpdate strings.Builder - if err := d.templates.updateFence.Execute(&fenceUpdate, d.fence); err != nil { - return nil, m.FinishedOperation(fmt.Errorf("evaluating fence update template: %w", err)) - } - - return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, fenceUpdate.String(), varcharColumnUpdates) }) + return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, varcharColumnUpdates) }) }, nil } -func (d *transactor) commit( - ctx context.Context, - fenceUpdate string, - varcharColumnUpdates map[string][]string, -) error { +func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string][]string) error { defer func() { for _, b := range d.bindings { // Arrange to clean up any staged files once this commit attempt is @@ -912,10 +903,8 @@ func (d *transactor) commit( log.Info("store: finished encoding and uploading of files") - if fenceRes, err := txn.Exec(ctx, fenceUpdate); err != nil { - return fmt.Errorf("fetching fence update rows: %w", err) - } else if fenceRes.RowsAffected() != 1 { - return errors.New("this instance was fenced off by another") + if err := updateFence(ctx, txn, d.dialect, d.fence); err != nil { + return err } else if err := txn.Commit(ctx); err != nil { return fmt.Errorf("committing store transaction: %w", err) } diff --git a/materialize-redshift/driver_test.go b/materialize-redshift/driver_test.go index 442f4ce1d..ac0589ff3 100644 --- a/materialize-redshift/driver_test.go +++ b/materialize-redshift/driver_test.go @@ -3,6 +3,7 @@ package main import ( "context" stdsql "database/sql" + "encoding/base64" "encoding/json" "fmt" "os" @@ -15,9 +16,8 @@ import ( pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" "github.com/google/uuid" + "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" - - _ "github.com/jackc/pgx/v5/stdlib" ) func mustGetCfg(t *testing.T) config { @@ -112,16 +112,61 @@ func TestFencingCases(t *testing.T) { testDialect, templates.createTargetTable, func(table sql.Table, fence sql.Fence) error { - var fenceUpdate strings.Builder - if err := templates.updateFence.Execute(&fenceUpdate, fence); err != nil { - return fmt.Errorf("evaluating fence template: %w", err) + conn, err := pgx.Connect(ctx, cfg.toURI()) + if err != nil { + return fmt.Errorf("store pgx.Connect: %w", err) + } + defer conn.Close(ctx) + + txn, err := conn.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer txn.Rollback(ctx) + + if updateFence(ctx, txn, testDialect, fence) != nil { + return err } - return c.ExecStatements(ctx, []string{fenceUpdate.String()}) + + return txn.Commit(ctx) }, func(table sql.Table) (out string, err error) { err = c.(*client).withDB(func(db *stdsql.DB) error { - out, err = sql.StdDumpTable(ctx, db, table) - return err + var sql = fmt.Sprintf( + "select materialization, key_begin, key_end, fence, FROM_VARBYTE(checkpoint, 'utf8') from %s order by materialization, key_begin, key_end asc;", + table.Identifier, + ) + + rows, err := db.Query(sql) + if err != nil { + return err + } + defer rows.Close() + + var b strings.Builder + b.WriteString("materialization, key_begin, key_end, fence, checkpoint") + + for rows.Next() { + b.WriteString("\n") + var materialization string + var keyBegin, keyEnd, fence int + var checkpoint string + if err = rows.Scan(&materialization, &keyBegin, &keyEnd, &fence, &checkpoint); err != nil { + return err + } else if base64Bytes, err := base64.StdEncoding.DecodeString(checkpoint); err != nil { + return err + } else if decompressed, err := maybeDecompressBytes(base64Bytes); err != nil { + return err + } else { + b.WriteString(fmt.Sprintf( + "%s, %d, %d, %d, %s", + materialization, keyBegin, keyEnd, fence, base64.StdEncoding.EncodeToString(decompressed)), + ) + } + } + + out = b.String() + return nil }) return }, diff --git a/materialize-redshift/sqlgen.go b/materialize-redshift/sqlgen.go index 46f9c77ef..265ecd02b 100644 --- a/materialize-redshift/sqlgen.go +++ b/materialize-redshift/sqlgen.go @@ -160,7 +160,6 @@ type templates struct { mergeInto *template.Template deleteQuery *template.Template loadQuery *template.Template - updateFence *template.Template copyFromS3 *template.Template } @@ -286,17 +285,6 @@ SELECT * FROM (SELECT -1, CAST(NULL AS SUPER) LIMIT 0) as nodoc {{- end }} {{ end }} --- Templated update of a fence checkpoint. - -{{ define "updateFence" }} -UPDATE {{ Identifier $.TablePath }} - SET checkpoint = {{ Literal (Base64Std $.Checkpoint) }} - WHERE materialization = {{ Literal $.Materialization.String }} - AND key_begin = {{ $.KeyBegin }} - AND key_end = {{ $.KeyEnd }} - AND fence = {{ $.Fence }}; -{{ end }} - -- Templated command to copy data from an S3 file into the destination table. Note the 'ignorecase' -- JSON option: This is necessary since by default Redshift lowercases all identifiers. @@ -329,7 +317,6 @@ TRUNCATECOLUMNS; mergeInto: tplAll.Lookup("mergeInto"), deleteQuery: tplAll.Lookup("deleteQuery"), loadQuery: tplAll.Lookup("loadQuery"), - updateFence: tplAll.Lookup("updateFence"), copyFromS3: tplAll.Lookup("copyFromS3"), } } diff --git a/materialize-redshift/sqlgen_test.go b/materialize-redshift/sqlgen_test.go index 2fd1bc15c..ae57bc5c6 100644 --- a/materialize-redshift/sqlgen_test.go +++ b/materialize-redshift/sqlgen_test.go @@ -35,7 +35,6 @@ func TestSQLGeneration(t *testing.T) { templates.createDeleteTable, templates.deleteQuery, }, - TplUpdateFence: templates.updateFence, }, ) diff --git a/materialize-sql/std_sql.go b/materialize-sql/std_sql.go index da9c4e699..9f039c34c 100644 --- a/materialize-sql/std_sql.go +++ b/materialize-sql/std_sql.go @@ -69,7 +69,7 @@ func StdSQLExecStatements(ctx context.Context, db *sql.DB, statements []string) // StdInstallFence is a convenience for Client implementations which // use Go's standard `sql.DB` type under the hood. -func StdInstallFence(ctx context.Context, db *sql.DB, checkpoints Table, fence Fence, decodeFence func(string) ([]byte, error)) (Fence, error) { +func StdInstallFence(ctx context.Context, db *sql.DB, checkpoints Table, fence Fence) (Fence, error) { var txn, err = db.BeginTx(ctx, nil) if err != nil { return Fence{}, fmt.Errorf("db.BeginTx: %w", err) @@ -130,8 +130,8 @@ func StdInstallFence(ctx context.Context, db *sql.DB, checkpoints Table, fence F readBegin, readEnd = 1, 0 } else if err != nil { return Fence{}, fmt.Errorf("scanning fence and checkpoint: %w", err) - } else if fence.Checkpoint, err = decodeFence(checkpoint); err != nil { - return Fence{}, fmt.Errorf("decodeFence(checkpoint): %w", err) + } else if fence.Checkpoint, err = base64.StdEncoding.DecodeString(checkpoint); err != nil { + return Fence{}, fmt.Errorf("base64.Decode(checkpoint): %w", err) } // If a checkpoint for this exact range doesn't exist then insert it now.