From 73078cc1522c475fcacadc1249f92884144352b2 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Mon, 30 Sep 2024 11:33:06 -0400 Subject: [PATCH 1/2] materialize-postgres: materialize UUID formatted strings as UUID columns Materialize UUID formatted strings as UUID columns, but continue to allow TEXT columns to validate for these fields since there are pre-existing columns and we don't want to force a backfill of them. Transferring the values of the fields is the same if they are UUID columns vs. TEXT columns. --- materialize-postgres/.snapshots/TestValidateAndApply | 6 +++--- materialize-postgres/sqlgen.go | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/materialize-postgres/.snapshots/TestValidateAndApply b/materialize-postgres/.snapshots/TestValidateAndApply index 97ae359c0f..6d3a000500 100644 --- a/materialize-postgres/.snapshots/TestValidateAndApply +++ b/materialize-postgres/.snapshots/TestValidateAndApply @@ -113,7 +113,7 @@ Big Schema Changed Types Constraints: {"Field":"stringUriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriReferenceField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} {"Field":"stringUriTemplateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"stringUuidField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"stringUuidField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUuidField' is already being materialized as endpoint type 'UUID' but endpoint type 'TEXT' is required by its schema '{ type: [string] }'"} Big Schema Materialized Resource Schema With All Fields Required: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"boolean"} @@ -151,7 +151,7 @@ Big Schema Materialized Resource Schema With All Fields Required: {"Name":"stringUriField","Nullable":"NO","Type":"text"} {"Name":"stringUriReferenceField","Nullable":"NO","Type":"text"} {"Name":"stringUriTemplateField","Nullable":"NO","Type":"text"} -{"Name":"stringUuidField","Nullable":"NO","Type":"text"} +{"Name":"stringUuidField","Nullable":"NO","Type":"uuid"} Big Schema Materialized Resource Schema With No Fields Required: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"boolean"} @@ -189,7 +189,7 @@ Big Schema Materialized Resource Schema With No Fields Required: {"Name":"stringUriField","Nullable":"YES","Type":"text"} {"Name":"stringUriReferenceField","Nullable":"YES","Type":"text"} {"Name":"stringUriTemplateField","Nullable":"YES","Type":"text"} -{"Name":"stringUuidField","Nullable":"YES","Type":"text"} +{"Name":"stringUuidField","Nullable":"YES","Type":"uuid"} Big Schema Changed Types With Table Replacement Constraints: {"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"} diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 489518b946..0a414b2477 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -43,6 +43,10 @@ var pgDialect = func() sql.Dialect { "macaddr": sql.MapStatic("MACADDR"), "macaddr8": sql.MapStatic("MACADDR8"), "time": sql.MapStatic("TIME", sql.AlsoCompatibleWith("time without time zone")), + // UUID format was added on 30-Sept-2024, and pre-existing + // text type of columns are allowed to validate for + // compatibility with pre-existing columns. + "uuid": sql.MapStatic("UUID", sql.AlsoCompatibleWith("text", "character varying")), }, }), }, From 1b9d6ae27675c5268b463f1f402a33d43d99ca18 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Mon, 30 Sep 2024 12:56:35 -0400 Subject: [PATCH 2/2] materialize-postgres: create load table columns based on existing table key columns In scenarios where there may be more than one allowed pre-existing column type, the load table must be created with a column type corresponding to the existing table column to ensure that join query comparisons work correctly. This required threading through a hydrated `InfoSchema` to the `NewTransactor` constructor, and that has been added as a generally available capability. This simplifies `materialize-redshift` and `materialize-mysql` which were already making an `InfoSchema` in their own bespoke way, and eventually most other materializations will probably need to do something like `materialize-postgres` does with its load table columns. Currently the only other materialization that matches its load table column types to the existing table column types is `materialize-bigquery`, and it may be useful to refactor this handling in terms of the `InfoSchema` at some point as well. --- materialize-bigquery/transactor.go | 1 + materialize-databricks/driver.go | 1 + materialize-motherduck/driver.go | 2 ++ materialize-mysql/driver.go | 18 ++------------ materialize-postgres/driver.go | 38 ++++++++++++++++++++---------- materialize-postgres/sqlgen.go | 10 ++++++++ materialize-redshift/driver.go | 34 ++------------------------ materialize-snowflake/snowflake.go | 1 + materialize-sql/driver.go | 13 +++++++++- materialize-sql/endpoint.go | 2 +- materialize-sqlite/sqlite.go | 1 + materialize-sqlserver/driver.go | 3 ++- materialize-starburst/starburst.go | 1 + 13 files changed, 62 insertions(+), 63 deletions(-) diff --git a/materialize-bigquery/transactor.go b/materialize-bigquery/transactor.go index 41f572d324..3c8d9ccb22 100644 --- a/materialize-bigquery/transactor.go +++ b/materialize-bigquery/transactor.go @@ -40,6 +40,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { cfg := ep.Config.(*config) diff --git a/materialize-databricks/driver.go b/materialize-databricks/driver.go index 6bb1e72da9..bacdd4ade3 100644 --- a/materialize-databricks/driver.go +++ b/materialize-databricks/driver.go @@ -135,6 +135,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) diff --git a/materialize-motherduck/driver.go b/materialize-motherduck/driver.go index 0080fca1da..238417d782 100644 --- a/materialize-motherduck/driver.go +++ b/materialize-motherduck/driver.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" m "github.com/estuary/connectors/go/protocols/materialize" + boilerplate "github.com/estuary/connectors/materialize-boilerplate" sql "github.com/estuary/connectors/materialize-sql" pf "github.com/estuary/flow/go/protocols/flow" pm "github.com/estuary/flow/go/protocols/materialize" @@ -197,6 +198,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { cfg := ep.Config.(*config) diff --git a/materialize-mysql/driver.go b/materialize-mysql/driver.go index efbd50731d..0695a238be 100644 --- a/materialize-mysql/driver.go +++ b/materialize-mysql/driver.go @@ -379,13 +379,14 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error func prepareNewTransactor( dialect sql.Dialect, templates templates, -) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open) (m.Transactor, error) { +) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open, *boilerplate.InfoSchema) (m.Transactor, error) { return func( ctx context.Context, ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) var d = &transactor{dialect: dialect, templates: templates, cfg: cfg} @@ -403,21 +404,6 @@ func prepareNewTransactor( return nil, fmt.Errorf("store db.Conn: %w", err) } - db, err := stdsql.Open("mysql", cfg.ToURI()) - if err != nil { - return nil, fmt.Errorf("newTransactor sql.Open: %w", err) - } - defer db.Close() - - resourcePaths := make([][]string, 0, len(open.Materialization.Bindings)) - for _, b := range open.Materialization.Bindings { - resourcePaths = append(resourcePaths, b.ResourcePath) - } - is, err := sql.StdFetchInfoSchema(ctx, db, ep.Dialect, "def", resourcePaths) - if err != nil { - return nil, err - } - for _, binding := range bindings { if err = d.addBinding(ctx, binding, is); err != nil { return nil, fmt.Errorf("addBinding of %s: %w", binding.Path, err) diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index 5f45500a67..b267cc2953 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -260,6 +260,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) @@ -283,7 +284,7 @@ func newTransactor( } for _, binding := range bindings { - if err = d.addBinding(ctx, binding); err != nil { + if err = d.addBinding(ctx, binding, is); err != nil { return nil, fmt.Errorf("addBinding of %s: %w", binding.Path, err) } } @@ -299,23 +300,21 @@ func newTransactor( } type binding struct { - target sql.Table - createLoadTableSQL string - loadInsertSQL string - storeUpdateSQL string - storeInsertSQL string - deleteQuerySQL string - loadQuerySQL string + target sql.Table + loadInsertSQL string + storeUpdateSQL string + storeInsertSQL string + deleteQuerySQL string + loadQuerySQL string } -func (t *transactor) addBinding(ctx context.Context, target sql.Table) error { +func (t *transactor) addBinding(ctx context.Context, target sql.Table, is *boilerplate.InfoSchema) error { var b = &binding{target: target} for _, m := range []struct { sql *string tpl *template.Template }{ - {&b.createLoadTableSQL, tplCreateLoadTable}, {&b.loadInsertSQL, tplLoadInsert}, {&b.storeInsertSQL, tplStoreInsert}, {&b.storeUpdateSQL, tplStoreUpdate}, @@ -331,8 +330,23 @@ func (t *transactor) addBinding(ctx context.Context, target sql.Table) error { t.bindings = append(t.bindings, b) // Create a binding-scoped temporary table for staged keys to load. - if _, err := t.load.conn.Exec(ctx, b.createLoadTableSQL); err != nil { - return fmt.Errorf("Exec(%s): %w", b.createLoadTableSQL, err) + input := loadTableColumns{Binding: b.target.Binding} + for _, k := range b.target.Keys { + existing, err := is.GetField(b.target.Path, k.Field) + if err != nil { + return fmt.Errorf("getting existing key field %s for binding %s: %w", k.Field, b.target.Path, err) + } + input.Keys = append(input.Keys, loadTableKey{ + Identifier: k.Identifier, + DDL: existing.Type + " NOT NULL", // nullable key fields are not allowed + }) + } + + var w strings.Builder + if err := tplCreateLoadTable.Execute(&w, &input); err != nil { + return fmt.Errorf("executing createLoadTable template: %w", err) + } else if _, err := t.load.conn.Exec(ctx, w.String()); err != nil { + return fmt.Errorf("Exec(%s): %w", w.String(), err) } return nil diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 0a414b2477..70829c3f67 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -82,6 +82,16 @@ var pgDialect = func() sql.Dialect { } }() +type loadTableKey struct { + Identifier string + DDL string +} + +type loadTableColumns struct { + Binding int + Keys []loadTableKey +} + var ( tplAll = sql.MustParseTemplate(pgDialect, "root", ` {{ define "temp_name" -}} diff --git a/materialize-redshift/driver.go b/materialize-redshift/driver.go index e735d10af2..5ceba29dfa 100644 --- a/materialize-redshift/driver.go +++ b/materialize-redshift/driver.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "net/url" - "slices" "strings" "text/template" @@ -286,13 +285,14 @@ type transactor struct { func prepareNewTransactor( templates templates, caseSensitiveIdentifierEnabled bool, -) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open) (m.Transactor, error) { +) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open, *boilerplate.InfoSchema) (m.Transactor, error) { return func( ctx context.Context, ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) @@ -314,36 +314,6 @@ func prepareNewTransactor( return nil, err } - db, err := stdsql.Open("pgx", d.cfg.toURI()) - if err != nil { - return nil, err - } - defer db.Close() - - schemas := []string{} - for _, b := range bindings { - if !slices.Contains(schemas, b.InfoLocation.TableSchema) { - schemas = append(schemas, b.InfoLocation.TableSchema) - } - } - - catalog := cfg.Database - if catalog == "" { - // An endpoint-level database configuration is not required, so query for the active - // database if that's the case. - if err := db.QueryRowContext(ctx, "select current_database();").Scan(&catalog); err != nil { - return nil, fmt.Errorf("querying for connected database: %w", err) - } - } - resourcePaths := make([][]string, 0, len(open.Materialization.Bindings)) - for _, b := range open.Materialization.Bindings { - resourcePaths = append(resourcePaths, b.ResourcePath) - } - is, err := sql.StdFetchInfoSchema(ctx, db, ep.Dialect, catalog, resourcePaths) - if err != nil { - return nil, err - } - for idx, target := range bindings { if err = d.addBinding( idx, diff --git a/materialize-snowflake/snowflake.go b/materialize-snowflake/snowflake.go index 18c9ada2be..6fc62bdaa6 100644 --- a/materialize-snowflake/snowflake.go +++ b/materialize-snowflake/snowflake.go @@ -216,6 +216,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) diff --git a/materialize-sql/driver.go b/materialize-sql/driver.go index 275a6b77ff..76041d1f5f 100644 --- a/materialize-sql/driver.go +++ b/materialize-sql/driver.go @@ -246,7 +246,10 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Tra } defer client.Close() + var resourcePaths [][]string if endpoint.MetaSpecs != nil { + resourcePaths = append(resourcePaths, endpoint.MetaSpecs.Path) + if _, loadedVersion, err = loadSpec(ctx, client, endpoint, open.Materialization.Name); err != nil { return nil, nil, fmt.Errorf("loading prior applied materialization spec: %w", err) } else if loadedVersion == "" { @@ -261,6 +264,7 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Tra var tables []Table for index, spec := range open.Materialization.Bindings { var resource = endpoint.NewResource(endpoint) + resourcePaths = append(resourcePaths, resource.Path()) if err := pf.UnmarshalStrict(spec.ResourceConfigJson, resource); err != nil { return nil, nil, fmt.Errorf("resource binding for collection %q: %w", spec.Collection.Name, err) @@ -285,6 +289,8 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Tra } if endpoint.MetaCheckpoints != nil { + resourcePaths = append(resourcePaths, endpoint.MetaCheckpoints.Path) + // We must install a fence to prevent another (zombie) instances of this // materialization from committing further transactions. var metaCheckpoints, err = ResolveTable(*endpoint.MetaCheckpoints, endpoint.Dialect) @@ -303,7 +309,12 @@ func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Tra } } - transactor, err := endpoint.NewTransactor(ctx, endpoint, fence, tables, open) + is, err := client.InfoSchema(ctx, resourcePaths) + if err != nil { + return nil, nil, fmt.Errorf("getting info schema: %w", err) + } + + transactor, err := endpoint.NewTransactor(ctx, endpoint, fence, tables, open, is) if err != nil { return nil, nil, fmt.Errorf("building transactor: %w", err) } diff --git a/materialize-sql/endpoint.go b/materialize-sql/endpoint.go index a0a0b99ea4..81d162748b 100644 --- a/materialize-sql/endpoint.go +++ b/materialize-sql/endpoint.go @@ -111,7 +111,7 @@ type Endpoint struct { // which will be parsed into and validated from a resource configuration. NewResource func(*Endpoint) Resource // NewTransactor returns a Transactor ready for pm.RunTransactions. - NewTransactor func(ctx context.Context, _ *Endpoint, _ Fence, bindings []Table, open pm.Request_Open) (m.Transactor, error) + NewTransactor func(ctx context.Context, _ *Endpoint, _ Fence, bindings []Table, open pm.Request_Open, is *boilerplate.InfoSchema) (m.Transactor, error) // Tenant owning this task, as determined from the task name. Tenant string // ConcurrentApply of Apply actions, for system that may benefit from a scatter/gather strategy diff --git a/materialize-sqlite/sqlite.go b/materialize-sqlite/sqlite.go index 83bd944945..0926e7e3b9 100644 --- a/materialize-sqlite/sqlite.go +++ b/materialize-sqlite/sqlite.go @@ -155,6 +155,7 @@ func newTransactor( fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var d = &transactor{ dialect: &sqliteDialect, diff --git a/materialize-sqlserver/driver.go b/materialize-sqlserver/driver.go index bac5766ed3..1e61e49d17 100644 --- a/materialize-sqlserver/driver.go +++ b/materialize-sqlserver/driver.go @@ -241,13 +241,14 @@ type transactor struct { func prepareNewTransactor( templates templates, -) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open) (m.Transactor, error) { +) func(context.Context, *sql.Endpoint, sql.Fence, []sql.Table, pm.Request_Open, *boilerplate.InfoSchema) (m.Transactor, error) { return func( ctx context.Context, ep *sql.Endpoint, fence sql.Fence, bindings []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) var d = &transactor{templates: templates, cfg: cfg} diff --git a/materialize-starburst/starburst.go b/materialize-starburst/starburst.go index d5c31cf46c..6e6b9a15a5 100644 --- a/materialize-starburst/starburst.go +++ b/materialize-starburst/starburst.go @@ -157,6 +157,7 @@ func newTransactor( _ sql.Fence, tables []sql.Table, open pm.Request_Open, + is *boilerplate.InfoSchema, ) (_ m.Transactor, err error) { var cfg = ep.Config.(*config) var templates = renderTemplates(starburstTrinoDialect)