Skip to content

Commit

Permalink
materialize-motherduck: formatted string to string migration
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 19, 2024
1 parent a8b2fca commit 61327f6
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
75 changes: 75 additions & 0 deletions materialize-motherduck/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Base Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"}
{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The root document should usually be materialized"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}

Migratable Changes Before Apply Schema:
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"}
{"Name":"flow_document","Nullable":"NO","Type":"JSON"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP WITH TIME ZONE"}
{"Name":"key","Nullable":"NO","Type":"VARCHAR"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"JSON"}
{"Name":"numericString","Nullable":"YES","Type":"HUGEINT"}
{"Name":"optional","Nullable":"YES","Type":"JSON"}
{"Name":"scalarValue","Nullable":"NO","Type":"VARCHAR"}
{"Name":"second_root","Nullable":"NO","Type":"JSON"}


Migratable Changes Before Apply Data:
key (VARCHAR), _meta/flow_truncated (BOOLEAN), flow_document (VARCHAR), flow_published_at (TIMESTAMPTZ), multiple (VARCHAR), nonScalarValue (VARCHAR), numericString (HUGEINT), optional (VARCHAR), scalarValue (VARCHAR), second_root (VARCHAR)

1, false, {}, 2024-09-13 01:01:01 +0000 UTC, <nil>, <nil>, 123, <nil>, test, {}

Migratable Changes Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}

Migratable Changes Applied Schema:
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"}
{"Name":"flow_document","Nullable":"NO","Type":"JSON"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP WITH TIME ZONE"}
{"Name":"key","Nullable":"NO","Type":"VARCHAR"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"JSON"}
{"Name":"numericString","Nullable":"YES","Type":"VARCHAR"}
{"Name":"optional","Nullable":"YES","Type":"JSON"}
{"Name":"scalarValue","Nullable":"NO","Type":"VARCHAR"}
{"Name":"second_root","Nullable":"NO","Type":"JSON"}


Migratable Changes Applied Data:
key (VARCHAR), _meta/flow_truncated (BOOLEAN), flow_document (VARCHAR), flow_published_at (TIMESTAMPTZ), multiple (VARCHAR), nonScalarValue (VARCHAR), numericString (VARCHAR), optional (VARCHAR), scalarValue (VARCHAR), second_root (VARCHAR)

1, false, {}, 2024-09-13 01:01:01 +0000 UTC, <nil>, <nil>, 123, <nil>, test, {}

9 changes: 9 additions & 0 deletions materialize-motherduck/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boi
))
}

for _, f := range ta.ColumnTypeChanges {
stmts = append(stmts, fmt.Sprintf(
"ALTER TABLE %s ALTER COLUMN %s TYPE %s;",
ta.Identifier,
f.Identifier,
f.DDL,
))
}

return strings.Join(stmts, "\n"), func(ctx context.Context) error {
for _, stmt := range stmts {
if _, err := c.db.ExecContext(ctx, stmt); err != nil {
Expand Down
81 changes: 81 additions & 0 deletions materialize-motherduck/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,87 @@ func TestValidateAndApply(t *testing.T) {
)
}

func TestValidateAndApplyMigrations(t *testing.T) {
ctx := context.Background()

cfg := mustGetCfg(t)

resourceConfig := tableConfig{
Table: "target",
Schema: cfg.Schema,
Delta: true,
database: cfg.Database,
}

sql.RunValidateAndApplyMigrationsTests(
t,
newDuckDriver(),
cfg,
resourceConfig,
func(t *testing.T) string {
t.Helper()

db, err := cfg.db(ctx)
require.NoError(t, err)
defer db.Close()

sch, err := sql.StdGetSchema(ctx, db, cfg.Database, resourceConfig.Schema, resourceConfig.Table)
require.NoError(t, err)

return sch
},
func(t *testing.T, cols []string, values []string) {
t.Helper()
db, err := cfg.db(ctx)
require.NoError(t, err)

var keys = make([]string, len(cols))
for i, col := range cols {
keys[i] = duckDialect.Identifier(col)
}
keys = append(keys, duckDialect.Identifier("_meta/flow_truncated"))
values = append(values, "FALSE")
keys = append(keys, duckDialect.Identifier("flow_published_at"))
values = append(values, "'2024-09-13 01:01:01'")
keys = append(keys, duckDialect.Identifier("flow_document"))
values = append(values, "'{}'")
keys = append(keys, duckDialect.Identifier("second_root"))
values = append(values, "'{}'")
q := fmt.Sprintf("insert into %s (%s) VALUES (%s);", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ","))
_, err = db.ExecContext(ctx, q)

require.NoError(t, err)
},
func(t *testing.T) string {
t.Helper()

db, err := cfg.db(ctx)
require.NoError(t, err)

rows, err := sql.DumpTestTable(t, db, duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table), duckDialect.Identifier("key"))

require.NoError(t, err)

return rows
},
func(t *testing.T, materialization pf.Materialization) {
t.Helper()

db, err := cfg.db(ctx)
require.NoError(t, err)
defer db.Close()

_, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", duckDialect.Identifier(cfg.Database, resourceConfig.Schema, resourceConfig.Table)))

_, _ = db.ExecContext(ctx, fmt.Sprintf(
"delete from %s where materialization = %s",
duckDialect.Identifier(cfg.Database, cfg.Schema, sql.DefaultFlowMaterializations),
duckDialect.Literal(materialization.String()),
))
},
)
}

func TestFencingCases(t *testing.T) {
var ctx = context.Background()

Expand Down
3 changes: 3 additions & 0 deletions materialize-motherduck/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var duckDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"double", "bigint", "hugeint"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{TableSchema: path[1], TableName: path[2]}
}),
Expand Down

0 comments on commit 61327f6

Please sign in to comment.