Skip to content

Commit

Permalink
materialize-bigquery: formatted string to string migration
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 19, 2024
1 parent c4afa59 commit 704b4db
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 3 deletions.
71 changes: 71 additions & 0 deletions materialize-bigquery/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
Base Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Before Apply Schema:
{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"}
{"Name":"flow_document","Nullable":"NO","Type":"STRING"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"}
{"Name":"key","Nullable":"NO","Type":"STRING"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"}
{"Name":"numericString","Nullable":"YES","Type":"BIGNUMERIC(38)"}
{"Name":"optional","Nullable":"YES","Type":"STRING"}
{"Name":"scalarValue","Nullable":"NO","Type":"STRING"}


Migratable Changes Before Apply Data:
key (STRING), _meta_flow_truncated (BOOLEAN), flow_published_at (TIMESTAMP), multiple (JSON), nonScalarValue (STRING), numericString (BIGNUMERIC), optional (STRING), scalarValue (STRING), flow_document (STRING)
1, false, 2024-09-13 01:01:01 +0000 UTC, <nil>, <nil>, 123/1, <nil>, test, {}

Migratable Changes Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Applied Schema:
{"Name":"_meta_flow_truncated","Nullable":"NO","Type":"BOOL"}
{"Name":"flow_document","Nullable":"NO","Type":"STRING"}
{"Name":"flow_published_at","Nullable":"NO","Type":"TIMESTAMP"}
{"Name":"key","Nullable":"NO","Type":"STRING"}
{"Name":"multiple","Nullable":"YES","Type":"JSON"}
{"Name":"nonScalarValue","Nullable":"YES","Type":"STRING"}
{"Name":"numericString","Nullable":"YES","Type":"STRING"}
{"Name":"optional","Nullable":"YES","Type":"STRING"}
{"Name":"scalarValue","Nullable":"NO","Type":"STRING"}


Migratable Changes Applied Data:
key (STRING), _meta_flow_truncated (BOOLEAN), flow_published_at (TIMESTAMP), multiple (JSON), nonScalarValue (STRING), optional (STRING), scalarValue (STRING), flow_document (STRING), numericString (STRING)
1, false, 2024-09-13 01:01:01 +0000 UTC, <nil>, <nil>, <nil>, test, {}, 123

140 changes: 140 additions & 0 deletions materialize-bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,146 @@ func TestValidateAndApply(t *testing.T) {
)
}

func TestValidateAndApplyMigrations(t *testing.T) {
ctx := context.Background()

cfg := mustGetCfg(t)

resourceConfig := tableConfig{
Table: "target",
Dataset: cfg.Dataset,
projectID: cfg.ProjectID,
}

client, err := cfg.client(ctx)
require.NoError(t, err)
defer client.Close()

sql.RunValidateAndApplyMigrationsTests(
t,
newBigQueryDriver(),
cfg,
resourceConfig,
func(t *testing.T) string {
job, err := client.query(ctx, fmt.Sprintf(
"select column_name, is_nullable, data_type from %s where table_name = %s;",
bqDialect.Identifier(cfg.Dataset, "INFORMATION_SCHEMA", "COLUMNS"),
bqDialect.Literal(resourceConfig.Table),
))
require.NoError(t, err)

it, err := job.Read(ctx)
require.NoError(t, err)

type foundColumn struct {
Name string `bigquery:"column_name"`
Nullable string `bigquery:"is_nullable"`
Type string `bigquery:"data_Type"`
}

cols := []foundColumn{}
for {
var c foundColumn
if err = it.Next(&c); err == iterator.Done {
break
} else if err != nil {
require.NoError(t, err)
}
cols = append(cols, c)
}

slices.SortFunc(cols, func(a, b foundColumn) int {
return strings.Compare(a.Name, b.Name)
})

var out strings.Builder
enc := json.NewEncoder(&out)
for _, c := range cols {
require.NoError(t, enc.Encode(c))
}

return out.String()
},
func(t *testing.T, cols []string, values []string) {
t.Helper()

var keys = make([]string, len(cols))
for i, col := range cols {
keys[i] = bqDialect.Identifier(col)
}
keys = append(keys, bqDialect.Identifier("_meta/flow_truncated"))
values = append(values, "false")
keys = append(keys, bqDialect.Identifier("flow_published_at"))
values = append(values, "'2024-09-13 01:01:01'")
keys = append(keys, bqDialect.Identifier("flow_document"))
values = append(values, "'{}'")
_, err = client.query(ctx, fmt.Sprintf(
"insert into %s (%s) VALUES (%s);",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ","),
))

require.NoError(t, err)
},
func(t *testing.T) string {
t.Helper()
var b strings.Builder

var sql = fmt.Sprintf("select * from %s order by %s asc;", bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table), bqDialect.Identifier("key"))

job, err := client.query(ctx, sql)
require.NoError(t, err)

it, err := job.Read(ctx)
require.NoError(t, err)

var values []bigquery.Value
var firstResult = true
for {
if err = it.Next(&values); err == iterator.Done {
break
} else if err != nil {
require.NoError(t, err)
}

if firstResult {
for i, col := range it.Schema {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(col.Name)
b.WriteString(" (" + string(col.Type) + ")")
}
b.WriteString("\n")

firstResult = false
}

for i, v := range values {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(fmt.Sprintf("%v", v))
}
}

return b.String()
},
func(t *testing.T, materialization pf.Materialization) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}

func TestFencingCases(t *testing.T) {
// Because of the number of round-trips to bigquery required for this test to run it is not run
// normally. Enable it via the RUN_FENCE_TESTS environment variable. It will take several
Expand Down
149 changes: 146 additions & 3 deletions materialize-bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,69 @@ func newClient(ctx context.Context, ep *sql.Endpoint) (sql.Client, error) {
}

func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boilerplate.InfoSchema, error) {
// First check if there are any interrupted column migrations which must be resumed, before we
// construct the InfoSchema
var migrationsTable = bqDialect.Identifier(c.cfg.ProjectID, c.cfg.Dataset, "flow_migrations")
// We check for existence of the table by requesting metadata, this is the recommended approach by BigQuery docs
// see https://pkg.go.dev/cloud.google.com/go/bigquery#Dataset.Table
_, err := c.bigqueryClient.DatasetInProject(c.cfg.ProjectID, c.cfg.Dataset).Table("flow_migrations").Metadata(ctx)
if err == nil {
job, err := c.query(ctx, fmt.Sprintf("SELECT table, step, col_identifier, col_field, col_ddl FROM %s", migrationsTable))
if err != nil {
return nil, fmt.Errorf("finding flow_migrations job: %w", err)
}
it, err := job.Read(ctx)
if err != nil {
return nil, fmt.Errorf("finding flow_migrations: %w", err)
}
type migration struct {
TableIdentifier string `bigquery:"table"`
Step int `bigquery:"step"`
ColIdentifier string `bigquery:"col_identifier"`
ColField string `bigquery:"col_field"`
DDL string `bigquery:"col_ddl"`
}
for {
var stmts [][]string
var m migration
if err := it.Next(&m); err == iterator.Done {
break
} else if err != nil {
return nil, fmt.Errorf("reading flow_migrations row: %w", err)
}

var steps, acceptableErrors = c.columnChangeSteps(m.TableIdentifier, m.ColField, m.ColIdentifier, m.DDL)

for s := m.Step; s < len(steps); s++ {
stmts = append(stmts, []string{steps[s], acceptableErrors[s]})
stmts = append(stmts, []string{fmt.Sprintf("UPDATE %s SET step = %d WHERE table='%s';", migrationsTable, s+1, m.TableIdentifier)})
}

stmts = append(stmts, []string{fmt.Sprintf("DELETE FROM %s WHERE table='%s';", migrationsTable, m.TableIdentifier)})

for _, stmt := range stmts {
query := stmt[0]
var acceptableError string
if len(stmt) > 1 {
acceptableError = stmt[1]
}

log.WithField("q", query).WithField("acceptableError", acceptableError).Info("migration")
if _, err := c.query(ctx, query); err != nil {
if acceptableError != "" && strings.Contains(err.Error(), acceptableError) {
// This is an acceptable error for this step, so we do not throw an error
} else {
return nil, err
}
}
}

if _, err := c.query(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s;", migrationsTable)); err != nil {
return nil, fmt.Errorf("dropping flow_migrations: %w", err)
}
}
}

is := boilerplate.NewInfoSchema(
sql.ToLocatePathFn(bqDialect.TableLocator),
bqDialect.ColumnLocator,
Expand Down Expand Up @@ -132,6 +195,51 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler
}, nil
}

func (c *client) columnChangeSteps(tableIdentifier, colField, colIdentifier, DDL string) ([]string, []string) {
var tempColumnName = fmt.Sprintf("%s_flowtmp1", colField)
var tempColumnIdentifier = bqDialect.Identifier(tempColumnName)
var tempOriginalRename = fmt.Sprintf("%s_flowtmp2", colField)
var tempOriginalRenameIdentifier = bqDialect.Identifier(tempOriginalRename)
return []string{
fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN %s %s;",
tableIdentifier,
tempColumnIdentifier,
DDL,
),
fmt.Sprintf(
"UPDATE %s SET %s = CAST(%s AS %s) WHERE true;",
tableIdentifier,
tempColumnIdentifier,
colIdentifier,
DDL,
),
fmt.Sprintf(
"ALTER TABLE %s RENAME COLUMN %s TO %s;",
tableIdentifier,
colIdentifier,
tempOriginalRenameIdentifier,
),
fmt.Sprintf(
"ALTER TABLE %s RENAME COLUMN %s TO %s;",
tableIdentifier,
tempColumnIdentifier,
colIdentifier,
),
fmt.Sprintf(
"ALTER TABLE %s DROP COLUMN %s;",
tableIdentifier,
tempOriginalRenameIdentifier,
),
}, []string{
fmt.Sprintf("Column already exists: %s", tempColumnName),
"",
fmt.Sprintf("Column already exists: %s", tempOriginalRename),
fmt.Sprintf("Column already exists: %s", colField),
fmt.Sprintf("Column not found: %s", tempOriginalRename),
}
}

// TODO(whb): In display of needless cruelty, BigQuery will throw an error if you try to use an
// ALTER TABLE sql statement to add columns to a table with no pre-existing columns, claiming that
// it does not have a schema. I believe the client API would allow us to set the schema, but this
Expand All @@ -140,15 +248,50 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler
// columns exist, we'll at least get a coherent error message when this happens and can know that
// the workaround is to re-backfill the table so it gets created fresh with the needed columns.
func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boilerplate.ActionApplyFn, error) {
var stmts []string
var alterColumnStmtBuilder strings.Builder
if err := tplAlterTableColumns.Execute(&alterColumnStmtBuilder, ta); err != nil {
return "", nil, fmt.Errorf("rendering alter table columns statement: %w", err)
}
alterColumnStmt := alterColumnStmtBuilder.String()
if len(strings.Trim(alterColumnStmt, "\n")) > 0 {
stmts = append(stmts, alterColumnStmt)
}

return alterColumnStmt, func(ctx context.Context) error {
_, err := c.query(ctx, alterColumnStmt)
return err
if len(ta.ColumnTypeChanges) > 0 {
var migrationsTable = bqDialect.Identifier(c.cfg.ProjectID, c.cfg.Dataset, "flow_migrations")
stmts = append(stmts, fmt.Sprintf("CREATE OR REPLACE TABLE %s(table STRING, step INTEGER, col_identifier STRING, col_field STRING, col_ddl STRING);", migrationsTable))

for _, ch := range ta.ColumnTypeChanges {
stmts = append(stmts, fmt.Sprintf(
"INSERT INTO %s(table, step, col_identifier, col_field, col_ddl) VALUES ('%s', 0, '%s', '%s', '%s');",
migrationsTable,
ta.Identifier,
ch.Identifier,
ch.Field,
ch.DDL,
))
}

for _, ch := range ta.ColumnTypeChanges {
var steps, _ = c.columnChangeSteps(ta.Identifier, ch.Field, ch.Identifier, ch.DDL)

for s := 0; s < len(steps); s++ {
stmts = append(stmts, steps[s])
stmts = append(stmts, fmt.Sprintf("UPDATE %s SET STEP=%d WHERE table='%s';", migrationsTable, s+1, ta.Identifier))
}
}

stmts = append(stmts, fmt.Sprintf("DROP TABLE %s;", migrationsTable))
}

return strings.Join(stmts, "\n"), func(ctx context.Context) error {
for _, stmt := range stmts {
if _, err := c.query(ctx, stmt); err != nil {
return err
}
}
return nil
}, nil
}

Expand Down
Loading

0 comments on commit 704b4db

Please sign in to comment.