From 61327f6c8c5f5e61e6cf4116cecc770a1749ff50 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Thu, 19 Sep 2024 15:16:03 +0100 Subject: [PATCH] materialize-motherduck: formatted string to string migration --- .../.snapshots/TestValidateAndApplyMigrations | 75 +++++++++++++++++ materialize-motherduck/client.go | 9 +++ materialize-motherduck/driver_test.go | 81 +++++++++++++++++++ materialize-motherduck/sqlgen.go | 3 + 4 files changed, 168 insertions(+) create mode 100644 materialize-motherduck/.snapshots/TestValidateAndApplyMigrations diff --git a/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations b/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations new file mode 100644 index 0000000000..cb8d024d2e --- /dev/null +++ b/materialize-motherduck/.snapshots/TestValidateAndApplyMigrations @@ -0,0 +1,75 @@ +Base Initial Constraints: +{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"} +{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"} +{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"} + +Base Re-validated Constraints: +{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} + +Migratable Changes Before Apply Schema: +{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"} +{"Name":"flow_document","Nullable":"NO","Type":"JSON"} +{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP WITH TIME ZONE"} +{"Name":"key","Nullable":"NO","Type":"VARCHAR"} +{"Name":"multiple","Nullable":"YES","Type":"JSON"} +{"Name":"nonScalarValue","Nullable":"YES","Type":"JSON"} +{"Name":"numericString","Nullable":"YES","Type":"HUGEINT"} +{"Name":"optional","Nullable":"YES","Type":"JSON"} +{"Name":"scalarValue","Nullable":"NO","Type":"VARCHAR"} +{"Name":"second_root","Nullable":"NO","Type":"JSON"} + + +Migratable Changes Before Apply Data: +key (VARCHAR), _meta/flow_truncated (BOOLEAN), flow_document (VARCHAR), flow_published_at (TIMESTAMPTZ), multiple (VARCHAR), nonScalarValue (VARCHAR), numericString (HUGEINT), optional (VARCHAR), scalarValue (VARCHAR), second_root (VARCHAR) + +1, false, {}, 2024-09-13 01:01:01 +0000 UTC, , , 123, , test, {} + +Migratable Changes Constraints: +{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} + +Migratable Changes Applied Schema: +{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"} +{"Name":"flow_document","Nullable":"NO","Type":"JSON"} +{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP WITH TIME ZONE"} +{"Name":"key","Nullable":"NO","Type":"VARCHAR"} +{"Name":"multiple","Nullable":"YES","Type":"JSON"} +{"Name":"nonScalarValue","Nullable":"YES","Type":"JSON"} +{"Name":"numericString","Nullable":"YES","Type":"VARCHAR"} +{"Name":"optional","Nullable":"YES","Type":"JSON"} +{"Name":"scalarValue","Nullable":"NO","Type":"VARCHAR"} +{"Name":"second_root","Nullable":"NO","Type":"JSON"} + + +Migratable Changes Applied Data: +key (VARCHAR), _meta/flow_truncated (BOOLEAN), flow_document (VARCHAR), flow_published_at (TIMESTAMPTZ), multiple (VARCHAR), nonScalarValue (VARCHAR), numericString (VARCHAR), optional (VARCHAR), scalarValue (VARCHAR), second_root (VARCHAR) + +1, false, {}, 2024-09-13 01:01:01 +0000 UTC, , , 123, , test, {} + diff --git a/materialize-motherduck/client.go b/materialize-motherduck/client.go index 75215bcf8f..bb55812aa8 100644 --- a/materialize-motherduck/client.go +++ b/materialize-motherduck/client.go @@ -88,6 +88,15 @@ func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boi )) } + for _, f := range ta.ColumnTypeChanges { + stmts = append(stmts, fmt.Sprintf( + "ALTER TABLE %s ALTER COLUMN %s TYPE %s;", + ta.Identifier, + f.Identifier, + f.DDL, + )) + } + return strings.Join(stmts, "\n"), func(ctx context.Context) error { for _, stmt := range stmts { if _, err := c.db.ExecContext(ctx, stmt); err != nil { diff --git a/materialize-motherduck/driver_test.go b/materialize-motherduck/driver_test.go index 9ff2be5033..02a047b5a5 100644 --- a/materialize-motherduck/driver_test.go +++ b/materialize-motherduck/driver_test.go @@ -94,6 +94,87 @@ func TestValidateAndApply(t *testing.T) { ) } +func TestValidateAndApplyMigrations(t *testing.T) { + ctx := context.Background() + + cfg := mustGetCfg(t) + + resourceConfig := tableConfig{ + Table: "target", + Schema: cfg.Schema, + Delta: true, + database: cfg.Database, + } + + sql.RunValidateAndApplyMigrationsTests( + t, + newDuckDriver(), + cfg, + resourceConfig, + func(t *testing.T) string { + t.Helper() + + db, err := cfg.db(ctx) + require.NoError(t, err) + defer db.Close() + + sch, err := sql.StdGetSchema(ctx, db, cfg.Database, resourceConfig.Schema, resourceConfig.Table) + require.NoError(t, err) + + return sch + }, + func(t *testing.T, cols []string, values []string) { + t.Helper() + db, err := cfg.db(ctx) + require.NoError(t, err) + + var keys = make([]string, len(cols)) + for i, col := range cols { + keys[i] = duckDialect.Identifier(col) + } + keys = append(keys, duckDialect.Identifier("_meta/flow_truncated")) + values = append(values, "FALSE") + keys = append(keys, duckDialect.Identifier("flow_published_at")) + values = append(values, "'2024-09-13 01:01:01'") + keys = append(keys, duckDialect.Identifier("flow_document")) + values = append(values, "'{}'") + keys = append(keys, duckDialect.Identifier("second_root")) + values = append(values, "'{}'") + q := fmt.Sprintf("insert into %s (%s) VALUES (%s);", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ",")) + _, err = db.ExecContext(ctx, q) + + require.NoError(t, err) + }, + func(t *testing.T) string { + t.Helper() + + db, err := cfg.db(ctx) + require.NoError(t, err) + + rows, err := sql.DumpTestTable(t, db, duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table), duckDialect.Identifier("key")) + + require.NoError(t, err) + + return rows + }, + func(t *testing.T, materialization pf.Materialization) { + t.Helper() + + db, err := cfg.db(ctx) + require.NoError(t, err) + defer db.Close() + + _, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table))) + + _, _ = db.ExecContext(ctx, fmt.Sprintf( + "delete from %s where materialization = %s", + duckDialect.Identifier(cfg.Database, cfg.Schema, sql.DefaultFlowMaterializations), + duckDialect.Literal(materialization.String()), + )) + }, + ) +} + func TestFencingCases(t *testing.T) { var ctx = context.Background() diff --git a/materialize-motherduck/sqlgen.go b/materialize-motherduck/sqlgen.go index 159377a482..47f35ae8a8 100644 --- a/materialize-motherduck/sqlgen.go +++ b/materialize-motherduck/sqlgen.go @@ -44,6 +44,9 @@ var duckDialect = func() sql.Dialect { ) return sql.Dialect{ + MigratableTypes: map[sql.FlatType][]string{ + sql.STRING: {"double", "bigint", "hugeint"}, + }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { return sql.InfoTableLocation{TableSchema: path[1], TableName: path[2]} }),