Skip to content

Commit

Permalink
materialize-sql: detect changes in materialize-sql.UpdateResource
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 25, 2024
1 parent ac2ef5e commit 7f747bd
Show file tree
Hide file tree
Showing 21 changed files with 111 additions and 126 deletions.
6 changes: 4 additions & 2 deletions materialize-bigquery/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ var bqDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"integer", "bignumeric", "float"},
MigratableTypes: map[string][]string{
"integer": {"string"},
"bignumeric": {"string"},
"float": {"string"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{
Expand Down
30 changes: 0 additions & 30 deletions materialize-boilerplate/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ type BindingUpdate struct {
// applied materialization spec, and is now delta updates. Some systems may need to do things
// like drop primary key restraints in response to this change.
NewlyDeltaUpdates bool

// ChangedFieldTypes is a list of projections that have a changed type. The connector may or may
// not support migrating these cases
ChangedFieldTypes []pf.Projection
}

// Applier represents the capabilities needed for an endpoint to apply changes to materialized
Expand Down Expand Up @@ -190,21 +186,6 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
// modified to be made nullable since it may need to hold null values now.
params.NewlyNullableFields = append(params.NewlyNullableFields, existingField)
}

existingProjection := existingBinding.Collection.GetProjection(field)
existingNonNullTypes := nonNullTypes(existingProjection.Inference.Types)
newNonNullTypes := nonNullTypes(projection.Inference.Types)

if !slices.Equal(existingNonNullTypes, newNonNullTypes) {
params.ChangedFieldTypes = append(params.ChangedFieldTypes, projection)
} else if existingProjection.Inference.String_ != nil && projection.Inference.String_ != nil {
var existingFormat = existingProjection.Inference.String_.Format
var newFormat = projection.Inference.String_.Format

if existingFormat != newFormat {
params.ChangedFieldTypes = append(params.ChangedFieldTypes, projection)
}
}
} else {
// Field does not exist in the materialized resource, so this is a new
// projection to add to it.
Expand Down Expand Up @@ -276,14 +257,3 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i

return &pm.Response_Applied{ActionDescription: strings.Join(actionDescriptions, "\n")}, nil
}

func nonNullTypes(types []string) []string {
var filtered []string
for _, t := range types {
if t != pf.JsonTypeNull {
filtered = append(filtered, t)
}
}

return filtered
}
4 changes: 3 additions & 1 deletion materialize-boilerplate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ func (v Validator) validateMatchesExistingBinding(
rawConfig := fieldConfigJsonMap[p.Field]
if compatible, err := v.c.Compatible(existingField, &p, rawConfig); err != nil {
return nil, fmt.Errorf("determining compatibility for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err)
} else if compatible {
} else if migratable, err := v.c.Migratable(existingField, &p, rawConfig); err != nil {
return nil, fmt.Errorf("determining migratability for endpoint field %q vs. selected field %q: %w", existingField.Name, p.Field, err)
} else if compatible || migratable {
if p.IsPrimaryKey {
c = &pm.Response_Validated_Constraint{
Type: pm.Response_Validated_Constraint_FIELD_REQUIRED,
Expand Down
2 changes: 1 addition & 1 deletion materialize-boilerplate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestValidate(t *testing.T) {
require.NoError(t, err)

snap.WriteString(tt.name + ":\n")
snap.WriteString(snapshotConstraints(t, cs) + "\n")
snap.WriteString(SnapshotConstraints(t, cs) + "\n")
})
}
cupaloy.SnapshotT(t, snap.String())
Expand Down
8 changes: 6 additions & 2 deletions materialize-databricks/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ var databricksDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"long", "decimal", "double"},
MigratableTypes: map[string][]string{
"decimal": {"string"},
"long": {"string"},
"double": {"string"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{
Expand Down Expand Up @@ -110,6 +112,8 @@ var databricksDialect = func() sql.Dialect {
var (
tplAll = sql.MustParseTemplate(databricksDialect, "root", `
-- Templated creation of a materialized table definition and comments:
-- delta.columnMapping.mode enables column renaming in Databricks. Column renaming was introduced in Databricks Runtime 10.4 LTS which was released in March 2022.
-- See https://docs.databricks.com/en/release-notes/runtime/10.4lts.html
{{ define "createTargetTable" }}
CREATE TABLE IF NOT EXISTS {{$.Identifier}} (
{{- range $ind, $col := $.Columns }}
Expand Down
13 changes: 0 additions & 13 deletions materialize-motherduck/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ Base Initial Constraints:
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}

Migratable Changes Before Apply Schema:
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"}
{"Name":"flow_document","Nullable":"NO","Type":"JSON"}
Expand Down
6 changes: 4 additions & 2 deletions materialize-motherduck/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ var duckDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"double", "bigint", "hugeint"},
MigratableTypes: map[string][]string{
"double": {"varchar"},
"bigint": {"varchar"},
"hugeint": {"varchar"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{TableSchema: path[1], TableName: path[2]}
Expand Down
13 changes: 0 additions & 13 deletions materialize-mysql/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ Base Initial Constraints:
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Before Apply Schema:
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"tinyint"}
{"Name":"flow_document","Nullable":"NO","Type":"json"}
Expand Down
6 changes: 4 additions & 2 deletions materialize-mysql/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ var mysqlDialect = func(tzLocation *time.Location, database string) sql.Dialect
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"decimal", "bigint", "double"},
MigratableTypes: map[string][]string{
"decimal": {"varchar", "longtext"},
"bigint": {"varchar", "longtext"},
"double": {"varchar", "longtext"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
// For MySQL, the table_catalog is always "def", and table_schema is the name of the
Expand Down
13 changes: 0 additions & 13 deletions materialize-postgres/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ Base Initial Constraints:
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Before Apply Schema:
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"boolean"}
{"Name":"flow_document","Nullable":"NO","Type":"json"}
Expand Down
6 changes: 4 additions & 2 deletions materialize-postgres/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ var pgDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"numeric", "integer", "double precision"},
MigratableTypes: map[string][]string{
"numeric": {"character varying", "text"},
"integer": {"character varying", "text"},
"double precision": {"character varying", "text"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
Expand Down
6 changes: 4 additions & 2 deletions materialize-redshift/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ var rsDialect = func(caseSensitiveIdentifierEnabled bool) sql.Dialect {
}

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"numeric", "bigint", "double precision"},
MigratableTypes: map[string][]string{
"numeric": {"character varying", "text"},
"bigint": {"character varying", "text"},
"double precision": {"character varying", "text"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
Expand Down
2 changes: 1 addition & 1 deletion materialize-snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestValidateAndApplyMigrations(t *testing.T) {
func(t *testing.T) string {
t.Helper()

rows, err := sql.DumpTestTable(t, db, testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table), testDialect.Identifier("key"))
rows, err := sql.DumpTestTable(t, db, testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table))

require.NoError(t, err)

Expand Down
5 changes: 3 additions & 2 deletions materialize-snowflake/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ var snowflakeDialect = func(configSchema string, timestampMapping timestampTypeM
}

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"number", "float"},
MigratableTypes: map[string][]string{
"number": {"text"},
"float": {"text"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
Expand Down
49 changes: 39 additions & 10 deletions materialize-sql/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ type MetaSpecsUpdate struct {
var _ boilerplate.Applier = (*sqlApplier)(nil)

type sqlApplier struct {
client Client
is *boilerplate.InfoSchema
endpoint *Endpoint
client Client
is *boilerplate.InfoSchema
endpoint *Endpoint
constrainter constrainter
}

func newSqlApplier(client Client, is *boilerplate.InfoSchema, endpoint *Endpoint) *sqlApplier {
func newSqlApplier(client Client, is *boilerplate.InfoSchema, endpoint *Endpoint, constrainter constrainter) *sqlApplier {
return &sqlApplier{
client: client,
is: is,
endpoint: endpoint,
client: client,
is: is,
endpoint: endpoint,
constrainter: constrainter,
}
}

Expand Down Expand Up @@ -269,12 +271,39 @@ func (a *sqlApplier) UpdateResource(ctx context.Context, spec *pf.Materializatio
alter.AddColumns = append(alter.AddColumns, col)
}

for _, changedType := range bindingUpdate.ChangedFieldTypes {
col, err := getColumn(changedType.Field)
var changedFieldTypes []Column

var binding = spec.Bindings[bindingIndex]
var collection = binding.Collection
for _, proposed := range collection.Projections {
existing, err := a.is.GetField(table.Path, proposed.Field)
if err != nil {
return "", nil, err
continue
}

var rawFieldConfig = binding.FieldSelection.FieldConfigJsonMap[proposed.Field]
compatible, err := a.constrainter.compatible(existing, &proposed, rawFieldConfig)
if err != nil {
return "", nil, fmt.Errorf("checking compatibility of %q: %w", proposed.Field, err)
}

migratable, err := a.constrainter.migratable(existing, &proposed, rawFieldConfig)
if err != nil {
return "", nil, fmt.Errorf("checking migratability of %q: %w", proposed.Field, err)
}

// If the types are not compatible, but are migratable, attempt to migrate
if !compatible && migratable {
col, err := getColumn(proposed.Field)
if err != nil {
return "", nil, err
}
changedFieldTypes = append(changedFieldTypes, col)
}
}

for _, col := range changedFieldTypes {

if migrationSteps := a.isFieldPendingMigration(table.Path, col.Field); len(migrationSteps) > 0 {
alter.ColumnTypeChangeMigrations = append(alter.ColumnTypeChangeMigrations, ColumnTypeMigration{
OriginalField: col.Field,
Expand Down
7 changes: 4 additions & 3 deletions materialize-sql/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ type Dialect struct {
// CaseInsensitiveColumns is provided as the "caseInsensitiveFields" parameter to boilerplate.NewValidator.
CaseInsensitiveColumns bool

// MigratableTypes is a mapp of target mapped type (the type to be migrated to) as key, and a slice of endpoint
// types (as represented by endpoint) which can be migrated to the key type
MigratableTypes map[FlatType][]string
// MigratableTypes is a mapp of current column DDL as key, and a slice of DDLs
// which the key type can be migrated to.
// For example, "decimal": {"string"} means decimal columns can be migrated to string type
MigratableTypes map[string][]string
}

// TableLocatorer produces an InfoTableLocation for a given path.
Expand Down
2 changes: 1 addition & 1 deletion materialize-sql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response
}
}

return boilerplate.ApplyChanges(ctx, req, newSqlApplier(client, is, endpoint), is, endpoint.ConcurrentApply)
return boilerplate.ApplyChanges(ctx, req, newSqlApplier(client, is, endpoint, constrainter{dialect: endpoint.Dialect}), is, endpoint.ConcurrentApply)
}

func (d *Driver) NewTransactor(ctx context.Context, open pm.Request_Open) (m.Transactor, *pm.Response_Opened, error) {
Expand Down
35 changes: 27 additions & 8 deletions materialize-sql/type_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (constrainter) NewConstraints(p *pf.Projection, deltaUpdates bool) *pm.Resp
return &constraint
}

func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) {
func (c constrainter) compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) {
proj := buildProjection(proposed, rawFieldConfig)
mapped, err := c.dialect.MapType(&proj)
if err != nil {
Expand All @@ -470,15 +470,34 @@ func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *p
return strings.EqualFold(existing.Type, compatibleType)
})

isMigratableType := false
proposedFlatType, _ := proj.AsFlatType()
if migratableList, ok := c.dialect.MigratableTypes[proposedFlatType]; ok {
isMigratableType = slices.ContainsFunc(migratableList, func(migratableType string) bool {
return strings.EqualFold(existing.Type, migratableType)
})
return isCompatibleType, nil
}

func (c constrainter) migratable(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) {
proj := buildProjection(proposed, rawFieldConfig)
mapped, err := c.dialect.MapType(&proj)
if err != nil {
return false, fmt.Errorf("mapping type: %w", err)
}

var migratableTypes = c.dialect.MigratableTypes
// If the types are not compatible, but are migratable, attempt to migrate
if slices.Contains(migratableTypes[strings.ToLower(existing.Type)], strings.ToLower(mapped.DDL)) {
return true, nil
}

return isCompatibleType || isMigratableType, nil
return false, nil
}

func (c constrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, rawFieldConfig json.RawMessage) (bool, error) {
if compatible, err := c.compatible(existing, proposed, rawFieldConfig); err != nil {
return false, err
} else if !compatible {
return false, nil
} else {
migratable, err := c.migratable(existing, proposed, rawFieldConfig)
return migratable, err
}
}

func (c constrainter) DescriptionForType(p *pf.Projection, rawFieldConfig json.RawMessage) (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions materialize-sql/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestValidateMigrations(t *testing.T) {
require.NoError(t, err)

snap.WriteString(tt.name + ":\n")
snap.WriteString(snapshotConstraints(t, cs) + "\n")
snap.WriteString(boilerplate.SnapshotConstraints(t, cs) + "\n")
})
}
cupaloy.SnapshotT(t, snap.String())
Expand All @@ -81,8 +81,8 @@ func TestValidateMigrations(t *testing.T) {
type testConstrainter struct{}

func (testConstrainter) Compatible(existing boilerplate.EndpointField, proposed *pf.Projection, _ json.RawMessage) (bool, error) {
var formattedToString = existing.Type == "integer,string" && strings.Join(proposed.Inference.Types, ",") == "string"
return existing.Type == strings.Join(proposed.Inference.Types, ",") || formattedToString, nil
var migratable = existing.Type == "integer,string" && strings.Join(proposed.Inference.Types, ",") == "string"
return existing.Type == strings.Join(proposed.Inference.Types, ",") || migratable, nil
}

func (testConstrainter) DescriptionForType(p *pf.Projection, _ json.RawMessage) (string, error) {
Expand Down
Loading

0 comments on commit 7f747bd

Please sign in to comment.