From 7f747bdddcf636e24cf7e29d38814e6be6b976e9 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Tue, 24 Sep 2024 16:56:39 +0100 Subject: [PATCH] materialize-sql: detect changes in materialize-sql.UpdateResource --- materialize-bigquery/sqlgen.go | 6 ++- materialize-boilerplate/apply.go | 30 ------------ materialize-boilerplate/validate.go | 4 +- materialize-boilerplate/validate_test.go | 2 +- materialize-databricks/sqlgen.go | 8 ++- .../.snapshots/TestValidateAndApplyMigrations | 13 ----- materialize-motherduck/sqlgen.go | 6 ++- .../.snapshots/TestValidateAndApplyMigrations | 13 ----- materialize-mysql/sqlgen.go | 6 ++- .../.snapshots/TestValidateAndApplyMigrations | 13 ----- materialize-postgres/sqlgen.go | 6 ++- materialize-redshift/sqlgen.go | 6 ++- materialize-snowflake/snowflake_test.go | 2 +- materialize-snowflake/sqlgen.go | 5 +- materialize-sql/apply.go | 49 +++++++++++++++---- materialize-sql/dialect.go | 7 +-- materialize-sql/driver.go | 2 +- materialize-sql/type_mapping.go | 35 ++++++++++--- materialize-sql/validate_test.go | 6 +-- .../.snapshots/TestValidateAndApplyMigrations | 13 ----- materialize-sqlserver/sqlgen.go | 5 +- 21 files changed, 111 insertions(+), 126 deletions(-) diff --git a/materialize-bigquery/sqlgen.go b/materialize-bigquery/sqlgen.go index 5f8507f0a7..869ef72b1f 100644 --- a/materialize-bigquery/sqlgen.go +++ b/materialize-bigquery/sqlgen.go @@ -98,8 +98,10 @@ var bqDialect = func() sql.Dialect { ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"integer", "bignumeric", "float"}, + MigratableTypes: map[string][]string{ + "integer": {"string"}, + "bignumeric": {"string"}, + "float": {"string"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { return sql.InfoTableLocation{ diff --git a/materialize-boilerplate/apply.go b/materialize-boilerplate/apply.go index 3a76e4e8c1..5e4815f0f0 100644 --- a/materialize-boilerplate/apply.go +++ b/materialize-boilerplate/apply.go @@ -37,10 +37,6 @@ type BindingUpdate struct { // applied materialization spec, and is now delta updates. Some systems may need to do things // like drop primary key restraints in response to this change. NewlyDeltaUpdates bool - - // ChangedFieldTypes is a list of projections that have a changed type. The connector may or may - // not support migrating these cases - ChangedFieldTypes []pf.Projection } // Applier represents the capabilities needed for an endpoint to apply changes to materialized @@ -190,21 +186,6 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i // modified to be made nullable since it may need to hold null values now. params.NewlyNullableFields = append(params.NewlyNullableFields, existingField) } - - existingProjection := existingBinding.Collection.GetProjection(field) - existingNonNullTypes := nonNullTypes(existingProjection.Inference.Types) - newNonNullTypes := nonNullTypes(projection.Inference.Types) - - if !slices.Equal(existingNonNullTypes, newNonNullTypes) { - params.ChangedFieldTypes = append(params.ChangedFieldTypes, projection) - } else if existingProjection.Inference.String_ != nil && projection.Inference.String_ != nil { - var existingFormat = existingProjection.Inference.String_.Format - var newFormat = projection.Inference.String_.Format - - if existingFormat != newFormat { - params.ChangedFieldTypes = append(params.ChangedFieldTypes, projection) - } - } } else { // Field does not exist in the materialized resource, so this is a new // projection to add to it. @@ -276,14 +257,3 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil } - -func nonNullTypes(types []string) []string { - var filtered []string - for _, t := range types { - if t != pf.JsonTypeNull { - filtered = append(filtered, t) - } - } - - return filtered -} diff --git a/materialize-boilerplate/validate.go b/materialize-boilerplate/validate.go index 6a6b88e44e..5fb0a67a3a 100644 --- a/materialize-boilerplate/validate.go +++ b/materialize-boilerplate/validate.go @@ -245,7 +245,9 @@ func (v Validator) validateMatchesExistingBinding( rawConfig := fieldConfigJsonMap[p.Field] if compatible, err := v.c.Compatible(existingField, &p, rawConfig); err != nil { return nil, fmt.Errorf("determining compatibility for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err) - } else if compatible { + } else if migratable, err := v.c.Migratable(existingField, &p, rawConfig); err != nil { + return nil, fmt.Errorf("determining migratability for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err) + } else if compatible || migratable { if p.IsPrimaryKey { c = &pm.Response_Validated_Constraint{ Type: pm.Response_Validated_Constraint_FIELD_REQUIRED, diff --git a/materialize-boilerplate/validate_test.go b/materialize-boilerplate/validate_test.go index 2d4ecd7001..761f2c06ad 100644 --- a/materialize-boilerplate/validate_test.go +++ b/materialize-boilerplate/validate_test.go @@ -203,7 +203,7 @@ func TestValidate(t *testing.T) { require.NoError(t, err) snap.WriteString(tt.name + ":\n") - snap.WriteString(snapshotConstraints(t, cs) + "\n") + snap.WriteString(SnapshotConstraints(t, cs) + "\n") }) } cupaloy.SnapshotT(t, snap.String()) diff --git a/materialize-databricks/sqlgen.go b/materialize-databricks/sqlgen.go index a983330dee..22bae3ba4e 100644 --- a/materialize-databricks/sqlgen.go +++ b/materialize-databricks/sqlgen.go @@ -59,8 +59,10 @@ var databricksDialect = func() sql.Dialect { ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"long", "decimal", "double"}, + MigratableTypes: map[string][]string{ + "decimal": {"string"}, + "long": {"string"}, + "double": {"string"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { return sql.InfoTableLocation{ @@ -110,6 +112,8 @@ var databricksDialect = func() sql.Dialect { var ( tplAll = sql.MustParseTemplate(databricksDialect, "root", ` -- Templated creation of a materialized table definition and comments: +-- delta.columnMapping.mode enables column renaming in Databricks. Column renaming was introduced in Databricks Runtime 10.4 LTS which was released in March 2022. +-- See https://docs.databricks.com/en/release-notes/runtime/10.4lts.html {{ define "createTargetTable" }} CREATE TABLE IF NOT EXISTS {{$.Identifier}} ( {{- range $ind, $col := $.Columns }} diff --git a/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations b/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations index cb8d024d2e..ce70497102 100644 --- a/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations +++ b/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations @@ -11,19 +11,6 @@ Base Initial Constraints: {"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"} -Base Re-validated Constraints: -{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} -{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} - Migratable Changes Before Apply Schema: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"} {"Name":"flow_document","Nullable":"NO","Type":"JSON"} diff --git a/materialize-motherduck/sqlgen.go b/materialize-motherduck/sqlgen.go index 47f35ae8a8..dc0aa46cfe 100644 --- a/materialize-motherduck/sqlgen.go +++ b/materialize-motherduck/sqlgen.go @@ -44,8 +44,10 @@ var duckDialect = func() sql.Dialect { ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"double", "bigint", "hugeint"}, + MigratableTypes: map[string][]string{ + "double": {"varchar"}, + "bigint": {"varchar"}, + "hugeint": {"varchar"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { return sql.InfoTableLocation{TableSchema: path[1], TableName: path[2]} diff --git a/materialize-mysql/.snapshots/TestValidateAndApplyMigrations b/materialize-mysql/.snapshots/TestValidateAndApplyMigrations index fde4deafcf..012247e547 100644 --- a/materialize-mysql/.snapshots/TestValidateAndApplyMigrations +++ b/materialize-mysql/.snapshots/TestValidateAndApplyMigrations @@ -11,19 +11,6 @@ Base Initial Constraints: {"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"} -Base Re-validated Constraints: -{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} -{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} -{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"} - Migratable Changes Before Apply Schema: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"tinyint"} {"Name":"flow_document","Nullable":"NO","Type":"json"} diff --git a/materialize-mysql/sqlgen.go b/materialize-mysql/sqlgen.go index 8988fb8a3d..1302bbed9f 100644 --- a/materialize-mysql/sqlgen.go +++ b/materialize-mysql/sqlgen.go @@ -86,8 +86,10 @@ var mysqlDialect = func(tzLocation *time.Location, database string) sql.Dialect ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"decimal", "bigint", "double"}, + MigratableTypes: map[string][]string{ + "decimal": {"varchar", "longtext"}, + "bigint": {"varchar", "longtext"}, + "double": {"varchar", "longtext"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { // For MySQL, the table_catalog is always "def", and table_schema is the name of the diff --git a/materialize-postgres/.snapshots/TestValidateAndApplyMigrations b/materialize-postgres/.snapshots/TestValidateAndApplyMigrations index 89acf6a30e..5ebb54aafb 100644 --- a/materialize-postgres/.snapshots/TestValidateAndApplyMigrations +++ b/materialize-postgres/.snapshots/TestValidateAndApplyMigrations @@ -11,19 +11,6 @@ Base Initial Constraints: {"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"} -Base Re-validated Constraints: -{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} -{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} -{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"} - Migratable Changes Before Apply Schema: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"boolean"} {"Name":"flow_document","Nullable":"NO","Type":"json"} diff --git a/materialize-postgres/sqlgen.go b/materialize-postgres/sqlgen.go index 17b96c8b83..ce96e07e12 100644 --- a/materialize-postgres/sqlgen.go +++ b/materialize-postgres/sqlgen.go @@ -50,8 +50,10 @@ var pgDialect = func() sql.Dialect { ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"numeric", "integer", "double precision"}, + MigratableTypes: map[string][]string{ + "numeric": {"character varying", "text"}, + "integer": {"character varying", "text"}, + "double precision": {"character varying", "text"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-redshift/sqlgen.go b/materialize-redshift/sqlgen.go index f65070fcbf..b30b2157c0 100644 --- a/materialize-redshift/sqlgen.go +++ b/materialize-redshift/sqlgen.go @@ -97,8 +97,10 @@ var rsDialect = func(caseSensitiveIdentifierEnabled bool) sql.Dialect { } return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"numeric", "bigint", "double precision"}, + MigratableTypes: map[string][]string{ + "numeric": {"character varying", "text"}, + "bigint": {"character varying", "text"}, + "double precision": {"character varying", "text"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-snowflake/snowflake_test.go b/materialize-snowflake/snowflake_test.go index adbbb563e0..72c57bb0f4 100644 --- a/materialize-snowflake/snowflake_test.go +++ b/materialize-snowflake/snowflake_test.go @@ -147,7 +147,7 @@ func TestValidateAndApplyMigrations(t *testing.T) { func(t *testing.T) string { t.Helper() - rows, err := sql.DumpTestTable(t, db, testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table), testDialect.Identifier("key")) + rows, err := sql.DumpTestTable(t, db, testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table)) require.NoError(t, err) diff --git a/materialize-snowflake/sqlgen.go b/materialize-snowflake/sqlgen.go index 3d662845ec..47d30142b5 100644 --- a/materialize-snowflake/sqlgen.go +++ b/materialize-snowflake/sqlgen.go @@ -87,8 +87,9 @@ var snowflakeDialect = func(configSchema string, timestampMapping timestampTypeM } return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"number", "float"}, + MigratableTypes: map[string][]string{ + "number": {"text"}, + "float": {"text"}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 { diff --git a/materialize-sql/apply.go b/materialize-sql/apply.go index 26a2635255..3360b38e00 100644 --- a/materialize-sql/apply.go +++ b/materialize-sql/apply.go @@ -56,16 +56,18 @@ type MetaSpecsUpdate struct { var _ boilerplate.Applier = (*sqlApplier)(nil) type sqlApplier struct { - client Client - is *boilerplate.InfoSchema - endpoint *Endpoint + client Client + is *boilerplate.InfoSchema + endpoint *Endpoint + constrainter constrainter } -func newSqlApplier(client Client, is *boilerplate.InfoSchema, endpoint *Endpoint) *sqlApplier { +func newSqlApplier(client Client, is *boilerplate.InfoSchema, endpoint *Endpoint, constrainter constrainter) *sqlApplier { return &sqlApplier{ - client: client, - is: is, - endpoint: endpoint, + client: client, + is: is, + endpoint: endpoint, + constrainter: constrainter, } } @@ -269,12 +271,39 @@ func (a *sqlApplier) UpdateResource(ctx context.Context, spec *pf.Materializatio alter.AddColumns = append(alter.AddColumns, col) } - for _, changedType := range bindingUpdate.ChangedFieldTypes { - col, err := getColumn(changedType.Field) + var changedFieldTypes []Column + + var binding = spec.Bindings[bindingIndex] + var collection = binding.Collection + for _, proposed := range collection.Projections { + existing, err := a.is.GetField(table.Path, proposed.Field) if err != nil { - return "", nil, err + continue + } + + var rawFieldConfig = binding.FieldSelection.FieldConfigJsonMap[proposed.Field] + compatible, err := a.constrainter.compatible(existing, &proposed, rawFieldConfig) + if err != nil { + return "", nil, fmt.Errorf("checking compatibility of %q: %w", proposed.Field, err) + } + + migratable, err := a.constrainter.migratable(existing, &proposed, rawFieldConfig) + if err != nil { + return "", nil, fmt.Errorf("checking migratability of %q: %w", proposed.Field, err) } + // If the types are not compatible, but are migratable, attempt to migrate + if !compatible && migratable { + col, err := getColumn(proposed.Field) + if err != nil { + return "", nil, err + } + changedFieldTypes = append(changedFieldTypes, col) + } + } + + for _, col := range changedFieldTypes { + if migrationSteps := a.isFieldPendingMigration(table.Path, col.Field); len(migrationSteps) > 0 { alter.ColumnTypeChangeMigrations = append(alter.ColumnTypeChangeMigrations, ColumnTypeMigration{ OriginalField: col.Field, diff --git a/materialize-sql/dialect.go b/materialize-sql/dialect.go index 13de2d1720..3ad6d9218d 100644 --- a/materialize-sql/dialect.go +++ b/materialize-sql/dialect.go @@ -19,9 +19,10 @@ type Dialect struct { // CaseInsensitiveColumns is provided as the "caseInsensitiveFields" parameter to boilerplate.NewValidator. CaseInsensitiveColumns bool - // MigratableTypes is a mapp of target mapped type (the type to be migrated to) as key, and a slice of endpoint - // types (as represented by endpoint) which can be migrated to the key type - MigratableTypes map[FlatType][]string + // MigratableTypes is a mapp of current column DDL as key, and a slice of DDLs + // which the key type can be migrated to. + // For example, "decimal": {"string"} means decimal columns can be migrated to string type + MigratableTypes map[string][]string } // TableLocatorer produces an InfoTableLocation for a given path. diff --git a/materialize-sql/driver.go b/materialize-sql/driver.go index 275a6b77ff..707338cf89 100644 --- a/materialize-sql/driver.go +++ b/materialize-sql/driver.go @@ -222,7 +222,7 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response } } - return boilerplate.ApplyChanges(ctx, req, newSqlApplier(client, is, endpoint), is, endpoint.ConcurrentApply) + return boilerplate.ApplyChanges(ctx, req, newSqlApplier(client, is, endpoint, constrainter{dialect: endpoint.Dialect}), is, endpoint.ConcurrentApply) } func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Transactor, *pm.Response_Opened, error) { diff --git a/materialize-sql/type_mapping.go b/materialize-sql/type_mapping.go index 60e026a1e4..1bfec8b428 100644 --- a/materialize-sql/type_mapping.go +++ b/materialize-sql/type_mapping.go @@ -452,7 +452,7 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp return &constraint } -func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) { +func (c constrainter) compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) { proj := buildProjection(proposed, rawFieldConfig) mapped, err := c.dialect.MapType(&proj) if err != nil { @@ -470,15 +470,34 @@ func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *p return strings.EqualFold(existing.Type, compatibleType) }) - isMigratableType := false - proposedFlatType, _ := proj.AsFlatType() - if migratableList, ok := c.dialect.MigratableTypes[proposedFlatType]; ok { - isMigratableType = slices.ContainsFunc(migratableList, func(migratableType string) bool { - return strings.EqualFold(existing.Type, migratableType) - }) + return isCompatibleType, nil +} + +func (c constrainter) migratable(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) { + proj := buildProjection(proposed, rawFieldConfig) + mapped, err := c.dialect.MapType(&proj) + if err != nil { + return false, fmt.Errorf("mapping type: %w", err) + } + + var migratableTypes = c.dialect.MigratableTypes + // If the types are not compatible, but are migratable, attempt to migrate + if slices.Contains(migratableTypes[strings.ToLower(existing.Type)], strings.ToLower(mapped.DDL)) { + return true, nil } - return isCompatibleType || isMigratableType, nil + return false, nil +} + +func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) { + if compatible, err := c.compatible(existing, proposed, rawFieldConfig); err != nil { + return false, err + } else if !compatible { + return false, nil + } else { + migratable, err := c.migratable(existing, proposed, rawFieldConfig) + return migratable, err + } } func (c constrainter) DescriptionForType(p *pf.Projection, rawFieldConfig json.RawMessage) (string, error) { diff --git a/materialize-sql/validate_test.go b/materialize-sql/validate_test.go index d7dcd83c69..0a869ad16a 100644 --- a/materialize-sql/validate_test.go +++ b/materialize-sql/validate_test.go @@ -72,7 +72,7 @@ func TestValidateMigrations(t *testing.T) { require.NoError(t, err) snap.WriteString(tt.name + ":\n") - snap.WriteString(snapshotConstraints(t, cs) + "\n") + snap.WriteString(boilerplate.SnapshotConstraints(t, cs) + "\n") }) } cupaloy.SnapshotT(t, snap.String()) @@ -81,8 +81,8 @@ func TestValidateMigrations(t *testing.T) { type testConstrainter struct{} func (testConstrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, _ json.RawMessage) (bool, error) { - var formattedToString = existing.Type == "integer,string" && strings.Join(proposed.Inference.Types, ",") == "string" - return existing.Type == strings.Join(proposed.Inference.Types, ",") || formattedToString, nil + var migratable = existing.Type == "integer,string" && strings.Join(proposed.Inference.Types, ",") == "string" + return existing.Type == strings.Join(proposed.Inference.Types, ",") || migratable, nil } func (testConstrainter) DescriptionForType(p *pf.Projection, _ json.RawMessage) (string, error) { diff --git a/materialize-sqlserver/.snapshots/TestValidateAndApplyMigrations b/materialize-sqlserver/.snapshots/TestValidateAndApplyMigrations index 99dbccde04..2344fd4779 100644 --- a/materialize-sqlserver/.snapshots/TestValidateAndApplyMigrations +++ b/materialize-sqlserver/.snapshots/TestValidateAndApplyMigrations @@ -11,19 +11,6 @@ Base Initial Constraints: {"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} {"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"} -Base Re-validated Constraints: -{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} -{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} -{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} -{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} -{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"} - Migratable Changes Before Apply Schema: {"Name":"_meta/flow_truncated","Nullable":"NO","Type":"bit"} {"Name":"flow_document","Nullable":"NO","Type":"varchar"} diff --git a/materialize-sqlserver/sqlgen.go b/materialize-sqlserver/sqlgen.go index be35606b37..66a38c8039 100644 --- a/materialize-sqlserver/sqlgen.go +++ b/materialize-sqlserver/sqlgen.go @@ -85,8 +85,9 @@ var sqlServerDialect = func(collation string, defaultSchema string) sql.Dialect ) return sql.Dialect{ - MigratableTypes: map[sql.FlatType][]string{ - sql.STRING: {"float", "bigint"}, + MigratableTypes: map[string][]string{ + "float": {strings.ToLower(textType), stringType}, + "bigint": {strings.ToLower(textType), stringType}, }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { if len(path) == 1 {