diff --git a/materialize-bigquery/.snapshots/TestValidateAndApplyMigrations b/materialize-bigquery/.snapshots/TestValidateAndApplyMigrations new file mode 100644 index 000000000..5173d44dc --- /dev/null +++ b/materialize-bigquery/.snapshots/TestValidateAndApplyMigrations @@ -0,0 +1,71 @@ +Base Initial Constraints: +{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"} +{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"} +{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"} +{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"} + +Base Re-validated Constraints: +{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"} + +Migratable Changes Before Apply Schema: +{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"} +{"Name":"flow_document","Nullable":"NO","Type":"STRING"} +{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"} +{"Name":"key","Nullable":"NO","Type":"STRING"} +{"Name":"multiple","Nullable":"YES","Type":"JSON"} +{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"} +{"Name":"numericString","Nullable":"YES","Type":"BIGNUMERIC(38)"} +{"Name":"optional","Nullable":"YES","Type":"STRING"} +{"Name":"scalarValue","Nullable":"NO","Type":"STRING"} + + +Migratable Changes Before Apply Data: +key (STRING), _meta_flow_truncated (BOOLEAN), flow_published_at (TIMESTAMP), multiple (JSON), nonScalarValue (STRING), numericString (BIGNUMERIC), optional (STRING), scalarValue (STRING), flow_document (STRING) +1, false, 2024-09-13 01:01:01 +0000 UTC, , , 123/1, , test, {} + +Migratable Changes Constraints: +{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"} +{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"} +{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"} +{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"} +{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"} + +Migratable Changes Applied Schema: +{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"} +{"Name":"flow_document","Nullable":"NO","Type":"STRING"} +{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"} +{"Name":"key","Nullable":"NO","Type":"STRING"} +{"Name":"multiple","Nullable":"YES","Type":"JSON"} +{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"} +{"Name":"numericString","Nullable":"YES","Type":"STRING"} +{"Name":"optional","Nullable":"YES","Type":"STRING"} +{"Name":"scalarValue","Nullable":"NO","Type":"STRING"} + + +Migratable Changes Applied Data: +key (STRING), _meta_flow_truncated (BOOLEAN), flow_published_at (TIMESTAMP), multiple (JSON), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), flow_document (STRING), numericString (STRING) +1, false, 2024-09-13 01:01:01 +0000 UTC, , , , test, {}, 123 + diff --git a/materialize-bigquery/bigquery_test.go b/materialize-bigquery/bigquery_test.go index ac72e5362..7f4bf475b 100644 --- a/materialize-bigquery/bigquery_test.go +++ b/materialize-bigquery/bigquery_test.go @@ -127,6 +127,146 @@ func TestValidateAndApply(t *testing.T) { ) } +func TestValidateAndApplyMigrations(t *testing.T) { + ctx := context.Background() + + cfg := mustGetCfg(t) + + resourceConfig := tableConfig{ + Table: "target", + Dataset: cfg.Dataset, + projectID: cfg.ProjectID, + } + + client, err := cfg.client(ctx) + require.NoError(t, err) + defer client.Close() + + sql.RunValidateAndApplyMigrationsTests( + t, + newBigQueryDriver(), + cfg, + resourceConfig, + func(t *testing.T) string { + job, err := client.query(ctx, fmt.Sprintf( + "select column_name, is_nullable, data_type from %s where table_name = %s;", + bqDialect.Identifier(cfg.Dataset, "INFORMATION_SCHEMA", "COLUMNS"), + bqDialect.Literal(resourceConfig.Table), + )) + require.NoError(t, err) + + it, err := job.Read(ctx) + require.NoError(t, err) + + type foundColumn struct { + Name string `bigquery:"column_name"` + Nullable string `bigquery:"is_nullable"` + Type string `bigquery:"data_Type"` + } + + cols := []foundColumn{} + for { + var c foundColumn + if err = it.Next(&c); err == iterator.Done { + break + } else if err != nil { + require.NoError(t, err) + } + cols = append(cols, c) + } + + slices.SortFunc(cols, func(a, b foundColumn) int { + return strings.Compare(a.Name, b.Name) + }) + + var out strings.Builder + enc := json.NewEncoder(&out) + for _, c := range cols { + require.NoError(t, enc.Encode(c)) + } + + return out.String() + }, + func(t *testing.T, cols []string, values []string) { + t.Helper() + + var keys = make([]string, len(cols)) + for i, col := range cols { + keys[i] = bqDialect.Identifier(col) + } + keys = append(keys, bqDialect.Identifier("_meta/flow_truncated")) + values = append(values, "false") + keys = append(keys, bqDialect.Identifier("flow_published_at")) + values = append(values, "'2024-09-13 01:01:01'") + keys = append(keys, bqDialect.Identifier("flow_document")) + values = append(values, "'{}'") + _, err = client.query(ctx, fmt.Sprintf( + "insert into %s (%s) VALUES (%s);", + bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ","), + )) + + require.NoError(t, err) + }, + func(t *testing.T) string { + t.Helper() + var b strings.Builder + + var sql = fmt.Sprintf("select * from %s order by %s asc;", bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), bqDialect.Identifier("key")) + + job, err := client.query(ctx, sql) + require.NoError(t, err) + + it, err := job.Read(ctx) + require.NoError(t, err) + + var values []bigquery.Value + var firstResult = true + for { + if err = it.Next(&values); err == iterator.Done { + break + } else if err != nil { + require.NoError(t, err) + } + + if firstResult { + for i, col := range it.Schema { + if i > 0 { + b.WriteString(", ") + } + b.WriteString(col.Name) + b.WriteString(" (" + string(col.Type) + ")") + } + b.WriteString("\n") + + firstResult = false + } + + for i, v := range values { + if i > 0 { + b.WriteString(", ") + } + b.WriteString(fmt.Sprintf("%v", v)) + } + } + + return b.String() + }, + func(t *testing.T, materialization pf.Materialization) { + t.Helper() + + _, _ = client.query(ctx, fmt.Sprintf( + "drop table %s;", + bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), + )) + + _, _ = client.query(ctx, fmt.Sprintf( + "delete from %s where materialization = 'test/sqlite'", + bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations), + )) + }, + ) +} + func TestFencingCases(t *testing.T) { // Because of the number of round-trips to bigquery required for this test to run it is not run // normally. Enable it via the RUN_FENCE_TESTS environment variable. It will take several diff --git a/materialize-bigquery/client.go b/materialize-bigquery/client.go index e7504041f..6a4543d23 100644 --- a/materialize-bigquery/client.go +++ b/materialize-bigquery/client.go @@ -38,6 +38,69 @@ func newClient(ctx context.Context, ep *sql.Endpoint) (sql.Client, error) { } func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boilerplate.InfoSchema, error) { + // First check if there are any interrupted column migrations which must be resumed, before we + // construct the InfoSchema + var migrationsTable = bqDialect.Identifier(c.cfg.ProjectID, c.cfg.Dataset, "flow_migrations") + // We check for existence of the table by requesting metadata, this is the recommended approach by BigQuery docs + // see https://pkg.go.dev/cloud.google.com/go/bigquery#Dataset.Table + _, err := c.bigqueryClient.DatasetInProject(c.cfg.ProjectID, c.cfg.Dataset).Table("flow_migrations").Metadata(ctx) + if err == nil { + job, err := c.query(ctx, fmt.Sprintf("SELECT table, step, col_identifier, col_field, col_ddl FROM %s", migrationsTable)) + if err != nil { + return nil, fmt.Errorf("finding flow_migrations job: %w", err) + } + it, err := job.Read(ctx) + if err != nil { + return nil, fmt.Errorf("finding flow_migrations: %w", err) + } + type migration struct { + TableIdentifier string `bigquery:"table"` + Step int `bigquery:"step"` + ColIdentifier string `bigquery:"col_identifier"` + ColField string `bigquery:"col_field"` + DDL string `bigquery:"col_ddl"` + } + for { + var stmts [][]string + var m migration + if err := it.Next(&m); err == iterator.Done { + break + } else if err != nil { + return nil, fmt.Errorf("reading flow_migrations row: %w", err) + } + + var steps, acceptableErrors = c.columnChangeSteps(m.TableIdentifier, m.ColField, m.ColIdentifier, m.DDL) + + for s := m.Step; s < len(steps); s++ { + stmts = append(stmts, []string{steps[s], acceptableErrors[s]}) + stmts = append(stmts, []string{fmt.Sprintf("UPDATE %s SET step = %d WHERE table='%s';", migrationsTable, s+1, m.TableIdentifier)}) + } + + stmts = append(stmts, []string{fmt.Sprintf("DELETE FROM %s WHERE table='%s';", migrationsTable, m.TableIdentifier)}) + + for _, stmt := range stmts { + query := stmt[0] + var acceptableError string + if len(stmt) > 1 { + acceptableError = stmt[1] + } + + log.WithField("q", query).WithField("acceptableError", acceptableError).Info("migration") + if _, err := c.query(ctx, query); err != nil { + if acceptableError != "" && strings.Contains(err.Error(), acceptableError) { + // This is an acceptable error for this step, so we do not throw an error + } else { + return nil, err + } + } + } + + if _, err := c.query(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s;", migrationsTable)); err != nil { + return nil, fmt.Errorf("dropping flow_migrations: %w", err) + } + } + } + is := boilerplate.NewInfoSchema( sql.ToLocatePathFn(bqDialect.TableLocator), bqDialect.ColumnLocator, @@ -132,6 +195,51 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler }, nil } +func (c *client) columnChangeSteps(tableIdentifier, colField, colIdentifier, DDL string) ([]string, []string) { + var tempColumnName = fmt.Sprintf("%s_flowtmp1", colField) + var tempColumnIdentifier = bqDialect.Identifier(tempColumnName) + var tempOriginalRename = fmt.Sprintf("%s_flowtmp2", colField) + var tempOriginalRenameIdentifier = bqDialect.Identifier(tempOriginalRename) + return []string{ + fmt.Sprintf( + "ALTER TABLE %s ADD COLUMN %s %s;", + tableIdentifier, + tempColumnIdentifier, + DDL, + ), + fmt.Sprintf( + "UPDATE %s SET %s = CAST(%s AS %s) WHERE true;", + tableIdentifier, + tempColumnIdentifier, + colIdentifier, + DDL, + ), + fmt.Sprintf( + "ALTER TABLE %s RENAME COLUMN %s TO %s;", + tableIdentifier, + colIdentifier, + tempOriginalRenameIdentifier, + ), + fmt.Sprintf( + "ALTER TABLE %s RENAME COLUMN %s TO %s;", + tableIdentifier, + tempColumnIdentifier, + colIdentifier, + ), + fmt.Sprintf( + "ALTER TABLE %s DROP COLUMN %s;", + tableIdentifier, + tempOriginalRenameIdentifier, + ), + }, []string{ + fmt.Sprintf("Column already exists: %s", tempColumnName), + "", + fmt.Sprintf("Column already exists: %s", tempOriginalRename), + fmt.Sprintf("Column already exists: %s", colField), + fmt.Sprintf("Column not found: %s", tempOriginalRename), + } +} + // TODO(whb): In display of needless cruelty, BigQuery will throw an error if you try to use an // ALTER TABLE sql statement to add columns to a table with no pre-existing columns, claiming that // it does not have a schema. I believe the client API would allow us to set the schema, but this @@ -140,15 +248,50 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler // columns exist, we'll at least get a coherent error message when this happens and can know that // the workaround is to re-backfill the table so it gets created fresh with the needed columns. func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boilerplate.ActionApplyFn, error) { + var stmts []string var alterColumnStmtBuilder strings.Builder if err := tplAlterTableColumns.Execute(&alterColumnStmtBuilder, ta); err != nil { return "", nil, fmt.Errorf("rendering alter table columns statement: %w", err) } alterColumnStmt := alterColumnStmtBuilder.String() + if len(strings.Trim(alterColumnStmt, "\n")) > 0 { + stmts = append(stmts, alterColumnStmt) + } - return alterColumnStmt, func(ctx context.Context) error { - _, err := c.query(ctx, alterColumnStmt) - return err + if len(ta.ColumnTypeChanges) > 0 { + var migrationsTable = bqDialect.Identifier(c.cfg.ProjectID, c.cfg.Dataset, "flow_migrations") + stmts = append(stmts, fmt.Sprintf("CREATE OR REPLACE TABLE %s(table STRING, step INTEGER, col_identifier STRING, col_field STRING, col_ddl STRING);", migrationsTable)) + + for _, ch := range ta.ColumnTypeChanges { + stmts = append(stmts, fmt.Sprintf( + "INSERT INTO %s(table, step, col_identifier, col_field, col_ddl) VALUES ('%s', 0, '%s', '%s', '%s');", + migrationsTable, + ta.Identifier, + ch.Identifier, + ch.Field, + ch.DDL, + )) + } + + for _, ch := range ta.ColumnTypeChanges { + var steps, _ = c.columnChangeSteps(ta.Identifier, ch.Field, ch.Identifier, ch.DDL) + + for s := 0; s < len(steps); s++ { + stmts = append(stmts, steps[s]) + stmts = append(stmts, fmt.Sprintf("UPDATE %s SET STEP=%d WHERE table='%s';", migrationsTable, s+1, ta.Identifier)) + } + } + + stmts = append(stmts, fmt.Sprintf("DROP TABLE %s;", migrationsTable)) + } + + return strings.Join(stmts, "\n"), func(ctx context.Context) error { + for _, stmt := range stmts { + if _, err := c.query(ctx, stmt); err != nil { + return err + } + } + return nil }, nil } diff --git a/materialize-bigquery/sqlgen.go b/materialize-bigquery/sqlgen.go index 720b32253..5f8507f0a 100644 --- a/materialize-bigquery/sqlgen.go +++ b/materialize-bigquery/sqlgen.go @@ -98,6 +98,9 @@ var bqDialect = func() sql.Dialect { ) return sql.Dialect{ + MigratableTypes: map[sql.FlatType][]string{ + sql.STRING: {"integer", "bignumeric", "float"}, + }, TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation { return sql.InfoTableLocation{ TableSchema: path[1],