Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-sql: migratable columns #1928

Merged
merged 23 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d75ca92
materialize-boilerplate: migratable columns in applier
mdibaiee Sep 12, 2024
07aa4ae
materialize-mysql: implement formatted string migration
mdibaiee Sep 12, 2024
aa9de93
materialize-sqlserver: implement formatted string migration
mdibaiee Sep 12, 2024
19c6cc5
materialize-redshift: formatted string to string migration
mdibaiee Sep 13, 2024
2df6146
materialize-sql: update test snapshots
mdibaiee Sep 16, 2024
6b03953
materialize-databricks: formatted string to string migration
mdibaiee Sep 16, 2024
836c832
materialize-bigquery: formatted string to string migration
mdibaiee Sep 18, 2024
4a6e73b
materialize-snowflake: formatted string to string migration
mdibaiee Sep 19, 2024
9d5611e
materialize-motherduck: formatted string to string migration
mdibaiee Sep 19, 2024
baf1f61
materialize-sql: refactor ColumnTypeChangeMigrations
mdibaiee Sep 24, 2024
7dfcf88
materialize-sql: refactor ValidateMigrations tests
mdibaiee Sep 24, 2024
ea1d32f
materialize-sql: detect changes in materialize-sql.UpdateResource
mdibaiee Sep 24, 2024
ec3f06b
materialize-sql: fix bug in materialize-sql.Compatible
mdibaiee Sep 27, 2024
c2d77e9
materialize-sql: refactor & simplify ColumnChangeMigration
mdibaiee Sep 27, 2024
31cf45f
materialize-sql: more migratable types
mdibaiee Sep 27, 2024
69bd828
materialize-mysql: support string to date-time conversion
mdibaiee Sep 30, 2024
906ac84
materialize-sql: update test snapshots
mdibaiee Sep 30, 2024
82393d7
materialize-{sql}: use non-transactional Alter Table statements
mdibaiee Oct 1, 2024
fba469f
materialize-sql: simplify column migration steps
mdibaiee Oct 1, 2024
ffc904c
materialize-*: only support migrations when a cast is infallible
mdibaiee Oct 2, 2024
47a098b
materialize-sql: custom CastSQL function and migration mapping spec
mdibaiee Oct 3, 2024
18f70c5
materialize-sql: small change requests for migrations
mdibaiee Oct 4, 2024
0e6f78a
materialize-snowflake: verify deletion of files succeeds
mdibaiee Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions materialize-bigquery/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
Base Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"}
{"Field":"dateValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"datetimeValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"int64","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"}
{"Field":"timeValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}

Migratable Changes Before Apply Schema:
{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"}
{"Name":"dateValue","Nullable":"YES","Type":"DATE"}
{"Name":"datetimeValue","Nullable":"YES","Type":"TIMESTAMP"}
{"Name":"flow_document","Nullable":"NO","Type":"STRING"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"}
{"Name":"int64","Nullable":"YES","Type":"INT64"}
{"Name":"key","Nullable":"NO","Type":"STRING"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"}
{"Name":"numericString","Nullable":"YES","Type":"BIGNUMERIC(38)"}
{"Name":"optional","Nullable":"YES","Type":"STRING"}
{"Name":"scalarValue","Nullable":"NO","Type":"STRING"}
{"Name":"timeValue","Nullable":"YES","Type":"STRING"}


Migratable Changes Before Apply Data:
key (STRING), _meta_flow_truncated (BOOLEAN), dateValue (DATE), datetimeValue (TIMESTAMP), flow_published_at (TIMESTAMP), int64 (INTEGER), multiple (JSON), nonScalarValue (STRING), numericString (BIGNUMERIC), optional (STRING), scalarValue (STRING), timeValue (STRING), flow_document (STRING)
1, false, 2024-01-01, 2024-01-01 01:01:01.111111 +0000 UTC, 2024-09-13 01:01:01 +0000 UTC, 1, <nil>, <nil>, 123/1, <nil>, test, 01:01:01, {}

Migratable Changes Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"dateValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"datetimeValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"int64","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}
{"Field":"timeValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}

Migratable Changes Applied Schema:
{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"}
{"Name":"dateValue","Nullable":"YES","Type":"STRING"}
{"Name":"datetimeValue","Nullable":"YES","Type":"STRING"}
{"Name":"flow_document","Nullable":"NO","Type":"STRING"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"}
{"Name":"int64","Nullable":"YES","Type":"INT64"}
{"Name":"key","Nullable":"NO","Type":"STRING"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"}
{"Name":"numericString","Nullable":"YES","Type":"STRING"}
{"Name":"optional","Nullable":"YES","Type":"STRING"}
{"Name":"scalarValue","Nullable":"NO","Type":"STRING"}
{"Name":"timeValue","Nullable":"YES","Type":"STRING"}


Migratable Changes Applied Data:
key (STRING), _meta_flow_truncated (BOOLEAN), flow_published_at (TIMESTAMP), int64 (INTEGER), multiple (JSON), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), timeValue (STRING), flow_document (STRING), dateValue (STRING), datetimeValue (STRING), numericString (STRING)
1, false, 2024-09-13 01:01:01 +0000 UTC, 1, <nil>, <nil>, <nil>, test, 01:01:01, {}, 2024-01-01, 2024-01-01T01:01:01.111111Z, 123

164 changes: 141 additions & 23 deletions materialize-bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,45 +71,120 @@ func TestValidateAndApply(t *testing.T) {
resourceConfig,
func(t *testing.T) string {
t.Helper()
return dumpSchema(t, ctx, client, cfg, resourceConfig)
},
func(t *testing.T, materialization pf.Materialization) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}

func TestValidateAndApplyMigrations(t *testing.T) {
ctx := context.Background()

cfg := mustGetCfg(t)

resourceConfig := tableConfig{
Table: "target",
Dataset: cfg.Dataset,
projectID: cfg.ProjectID,
}

client, err := cfg.client(ctx)
require.NoError(t, err)
defer client.Close()

sql.RunValidateAndApplyMigrationsTests(
t,
newBigQueryDriver(),
cfg,
resourceConfig,
func(t *testing.T) string {
t.Helper()
return dumpSchema(t, ctx, client, cfg, resourceConfig)
},
func(t *testing.T, cols []string, values []string) {
t.Helper()

var keys = make([]string, len(cols))
for i, col := range cols {
keys[i] = bqDialect.Identifier(col)
}
keys = append(keys, bqDialect.Identifier("_meta/flow_truncated"))
values = append(values, "false")
keys = append(keys, bqDialect.Identifier("flow_published_at"))
values = append(values, "'2024-09-13 01:01:01'")
keys = append(keys, bqDialect.Identifier("flow_document"))
values = append(values, "'{}'")

// bigquery does not support more than 6 fractional second precision, and will fail if we try
// to insert a value with 9
for i, _ := range values {
if keys[i] == "datetimeValue" {
values[i] = "'2024-01-01 01:01:01.111111'"
}
}

job, err := client.query(ctx, fmt.Sprintf(
"select column_name, is_nullable, data_type from %s where table_name = %s;",
bqDialect.Identifier(cfg.Dataset, "INFORMATION_SCHEMA", "COLUMNS"),
bqDialect.Literal(resourceConfig.Table),
_, err = client.query(ctx, fmt.Sprintf(
"insert into %s (%s) VALUES (%s);",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ","),
))

require.NoError(t, err)
},
func(t *testing.T) string {
t.Helper()
var b strings.Builder

it, err := job.Read(ctx)
var sql = fmt.Sprintf("select * from %s order by %s asc;", bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), bqDialect.Identifier("key"))

job, err := client.query(ctx, sql)
require.NoError(t, err)

type foundColumn struct {
Name string `bigquery:"column_name"`
Nullable string `bigquery:"is_nullable"`
Type string `bigquery:"data_Type"`
}
it, err := job.Read(ctx)
require.NoError(t, err)

cols := []foundColumn{}
var values []bigquery.Value
var firstResult = true
for {
var c foundColumn
if err = it.Next(&c); err == iterator.Done {
if err = it.Next(&values); err == iterator.Done {
break
} else if err != nil {
require.NoError(t, err)
}
cols = append(cols, c)
}

slices.SortFunc(cols, func(a, b foundColumn) int {
return strings.Compare(a.Name, b.Name)
})
if firstResult {
for i, col := range it.Schema {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(col.Name)
b.WriteString(" (" + string(col.Type) + ")")
}
b.WriteString("\n")

var out strings.Builder
enc := json.NewEncoder(&out)
for _, c := range cols {
require.NoError(t, enc.Encode(c))
firstResult = false
}

for i, v := range values {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(fmt.Sprintf("%v", v))
}
}

return out.String()
return b.String()
},
func(t *testing.T, materialization pf.Materialization) {
t.Helper()
Expand Down Expand Up @@ -261,3 +336,46 @@ func TestSpecification(t *testing.T) {

cupaloy.SnapshotT(t, formatted)
}

func dumpSchema(t *testing.T, ctx context.Context, client *client, cfg config, resourceConfig tableConfig) string {
t.Helper()

job, err := client.query(ctx, fmt.Sprintf(
"select column_name, is_nullable, data_type from %s where table_name = %s;",
bqDialect.Identifier(cfg.Dataset, "INFORMATION_SCHEMA", "COLUMNS"),
bqDialect.Literal(resourceConfig.Table),
))
require.NoError(t, err)

it, err := job.Read(ctx)
require.NoError(t, err)

type foundColumn struct {
Name string `bigquery:"column_name"`
Nullable string `bigquery:"is_nullable"`
Type string `bigquery:"data_Type"`
}

cols := []foundColumn{}
for {
var c foundColumn
if err = it.Next(&c); err == iterator.Done {
break
} else if err != nil {
require.NoError(t, err)
}
cols = append(cols, c)
}

slices.SortFunc(cols, func(a, b foundColumn) int {
return strings.Compare(a.Name, b.Name)
})

var out strings.Builder
enc := json.NewEncoder(&out)
for _, c := range cols {
require.NoError(t, enc.Encode(c))
}

return out.String()
}
32 changes: 25 additions & 7 deletions materialize-bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,33 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler
// columns exist, we'll at least get a coherent error message when this happens and can know that
// the workaround is to re-backfill the table so it gets created fresh with the needed columns.
func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boilerplate.ActionApplyFn, error) {
var alterColumnStmtBuilder strings.Builder
if err := tplAlterTableColumns.Execute(&alterColumnStmtBuilder, ta); err != nil {
return "", nil, fmt.Errorf("rendering alter table columns statement: %w", err)
var stmts []string
if len(ta.DropNotNulls) > 0 || len(ta.AddColumns) > 0 {
var alterColumnStmtBuilder strings.Builder
if err := tplAlterTableColumns.Execute(&alterColumnStmtBuilder, ta); err != nil {
return "", nil, fmt.Errorf("rendering alter table columns statement: %w", err)
}
alterColumnStmt := alterColumnStmtBuilder.String()
stmts = append(stmts, alterColumnStmt)
}
alterColumnStmt := alterColumnStmtBuilder.String()

return alterColumnStmt, func(ctx context.Context) error {
_, err := c.query(ctx, alterColumnStmt)
return err
if len(ta.ColumnTypeChanges) > 0 {
for _, m := range ta.ColumnTypeChanges {
if steps, err := sql.StdColumnTypeMigration(ctx, bqDialect, ta.Table, m); err != nil {
return "", nil, fmt.Errorf("rendering column migration steps: %w", err)
} else {
stmts = append(stmts, steps...)
}
}
}

return strings.Join(stmts, "\n"), func(ctx context.Context) error {
for _, stmt := range stmts {
if _, err := c.query(ctx, stmt); err != nil {
return err
}
}
return nil
}, nil
}

Expand Down
11 changes: 11 additions & 0 deletions materialize-bigquery/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ var bqDialect = func() sql.Dialect {
)

return sql.Dialect{
MigratableTypes: sql.MigrationSpecs{
"integer": {sql.NewMigrationSpec([]string{"string"})},
"bignumeric": {sql.NewMigrationSpec([]string{"string"})},
"float": {sql.NewMigrationSpec([]string{"string"})},
"date": {sql.NewMigrationSpec([]string{"string"})},
"timestamp": {sql.NewMigrationSpec([]string{"string"}, sql.WithCastSQL(datetimeToStringCast))},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
return sql.InfoTableLocation{
TableSchema: path[1],
Expand All @@ -124,6 +131,10 @@ var bqDialect = func() sql.Dialect {
}
}()

func datetimeToStringCast(migration sql.ColumnTypeMigration) string {
return fmt.Sprintf(`FORMAT_TIMESTAMP('%%Y-%%m-%%dT%%H:%%M:%%E*SZ', %s, 'UTC') `, migration.Identifier)
}

var (
tplAll = sql.MustParseTemplate(bqDialect, "root", `
{{ define "tempTableName" -}}
Expand Down
Loading
Loading