Skip to content

Commit

Permalink
materialize-snowflake: formatted string to string migration
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Sep 19, 2024
1 parent 704b4db commit 2d6619e
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 3 deletions.
73 changes: 73 additions & 0 deletions materialize-snowflake/.snapshots/TestValidateAndApplyMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
Base Initial Constraints:
{"Field":"_meta/flow_truncated","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Metadata fields fields are able to be materialized"}
{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"key","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"All Locations that are part of the collections key are required"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This field is able to be materialized"}
{"Field":"nonScalarValue","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"optional","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"The projection has a single scalar type"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Only a single root document projection can be materialized for standard updates"}

Base Re-validated Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Before Apply Schema:
{"Name":"FLOW_DOCUMENT","Nullable":"NO","Type":"VARIANT"}
{"Name":"FLOW_PUBLISHED_AT","Nullable":"NO","Type":"TIMESTAMP_LTZ"}
{"Name":"KEY","Nullable":"NO","Type":"TEXT"}
{"Name":"MULTIPLE","Nullable":"YES","Type":"VARIANT"}
{"Name":"NONSCALARVALUE","Nullable":"YES","Type":"VARIANT"}
{"Name":"NUMERICSTRING","Nullable":"YES","Type":"NUMBER"}
{"Name":"OPTIONAL","Nullable":"YES","Type":"VARIANT"}
{"Name":"SCALARVALUE","Nullable":"NO","Type":"TEXT"}
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"}


Migratable Changes Before Apply Data:
KEY (TEXT), _meta/flow_truncated (BOOLEAN), FLOW_PUBLISHED_AT (TIMESTAMP_LTZ), MULTIPLE (VARIANT), NONSCALARVALUE (VARIANT), NUMERICSTRING (FIXED), OPTIONAL (VARIANT), SCALARVALUE (TEXT), FLOW_DOCUMENT (VARIANT)

1, 0, 2024-09-13 01:01:01 -0700 PDT, <nil>, <nil>, 123, <nil>, test, {}

Migratable Changes Constraints:
{"Field":"_meta/flow_truncated","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multiple","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nonScalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullValue","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize a field where the only possible type is 'null'"}
{"Field":"numericString","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"optional","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"scalarValue","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"second_root","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"Cannot materialize root document projection 'second_root' because field 'flow_document' is already being materialized as the document"}

Migratable Changes Applied Schema:
{"Name":"FLOW_DOCUMENT","Nullable":"NO","Type":"VARIANT"}
{"Name":"FLOW_PUBLISHED_AT","Nullable":"NO","Type":"TIMESTAMP_LTZ"}
{"Name":"KEY","Nullable":"NO","Type":"TEXT"}
{"Name":"MULTIPLE","Nullable":"YES","Type":"VARIANT"}
{"Name":"NONSCALARVALUE","Nullable":"YES","Type":"VARIANT"}
{"Name":"NUMERICSTRING","Nullable":"YES","Type":"TEXT"}
{"Name":"OPTIONAL","Nullable":"YES","Type":"VARIANT"}
{"Name":"SCALARVALUE","Nullable":"NO","Type":"TEXT"}
{"Name":"_meta/flow_truncated","Nullable":"NO","Type":"BOOLEAN"}


Migratable Changes Applied Data:
KEY (TEXT), _meta/flow_truncated (BOOLEAN), FLOW_PUBLISHED_AT (TIMESTAMP_LTZ), MULTIPLE (VARIANT), NONSCALARVALUE (VARIANT), OPTIONAL (VARIANT), SCALARVALUE (TEXT), FLOW_DOCUMENT (VARIANT), NUMERICSTRING (TEXT)

1, 0, 2024-09-13 01:01:01 -0700 PDT, <nil>, <nil>, <nil>, test, {}, 123

134 changes: 131 additions & 3 deletions materialize-snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,55 @@ func newClient(ctx context.Context, ep *sql.Endpoint) (sql.Client, error) {
}

func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (is *boilerplate.InfoSchema, err error) {
// First check if there are any interrupted column migrations which must be resumed, before we
// construct the InfoSchema
migrationsTable := c.ep.Dialect.Identifier(c.cfg.Schema, "flow_migrations")
rows, err := c.db.QueryContext(ctx, fmt.Sprintf(`SELECT "table", step, col_identifier, col_field, col_ddl FROM %s`, migrationsTable))
if err != nil {
if !strings.Contains(err.Error(), "FLOW_MIGRATIONS' does not exist") {
return nil, fmt.Errorf("finding flow_migrations: %w", err)
}
}
if rows != nil {
defer rows.Close()
for rows.Next() {
var stmts [][]string
var tableIdentifier, colIdentifier, colField, DDL string
var step int
if err := rows.Scan(&tableIdentifier, &step, &colIdentifier, &colField, &DDL); err != nil {
return nil, fmt.Errorf("reading flow_migrations row: %w", err)
}
var steps, acceptableErrors = c.columnChangeSteps(tableIdentifier, colField, colIdentifier, DDL)

for s := step; s < len(steps); s++ {
stmts = append(stmts, []string{steps[s], acceptableErrors[s]})
stmts = append(stmts, []string{fmt.Sprintf(`UPDATE %s SET step = %d WHERE "table"='%s';`, migrationsTable, s+1, tableIdentifier)})
}

stmts = append(stmts, []string{fmt.Sprintf(`DELETE FROM %s WHERE "table"='%s';`, migrationsTable, tableIdentifier)})

for _, stmt := range stmts {
query := stmt[0]
var acceptableError string
if len(stmt) > 1 {
acceptableError = stmt[1]
}

if _, err := c.db.ExecContext(ctx, query); err != nil {
if acceptableError != "" && strings.Contains(err.Error(), acceptableError) {
// This is an acceptable error for this step, so we do not throw an error
} else {
return nil, fmt.Errorf("resume migration: %w", err)
}
}
}
}

if _, err := c.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s;", migrationsTable)); err != nil {
return nil, fmt.Errorf("dropping flow_migrations: %w", err)
}
}

// Currently the "catalog" is always the database value from the endpoint configuration in all
// capital letters. It is possible to connect to Snowflake databases that aren't in all caps by
// quoting the database name. We don't do that currently and it's hard to say if we ever will
Expand Down Expand Up @@ -76,16 +125,95 @@ func (c *client) DeleteTable(ctx context.Context, path []string) (string, boiler
}, nil
}

func (c *client) columnChangeSteps(tableIdentifier, colField, colIdentifier, DDL string) ([]string, []string) {
var tempColumnName = fmt.Sprintf("%s_flowtmp1", colField)
var tempColumnIdentifier = c.ep.Dialect.Identifier(tempColumnName)
var tempOriginalRename = fmt.Sprintf("%s_flowtmp2", colField)
var tempOriginalRenameIdentifier = c.ep.Dialect.Identifier(tempOriginalRename)
return []string{
fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN %s %s;",
tableIdentifier,
tempColumnIdentifier,
DDL,
),
fmt.Sprintf(
"UPDATE %s SET %s = %s;",
tableIdentifier,
tempColumnIdentifier,
colIdentifier,
),
fmt.Sprintf(
"ALTER TABLE %s RENAME COLUMN %s TO %s;",
tableIdentifier,
colIdentifier,
tempOriginalRenameIdentifier,
),
fmt.Sprintf(
"ALTER TABLE %s RENAME COLUMN %s TO %s;",
tableIdentifier,
tempColumnIdentifier,
colIdentifier,
),
fmt.Sprintf(
"ALTER TABLE %s DROP COLUMN %s;",
tableIdentifier,
tempOriginalRenameIdentifier,
),
}, []string{
"already exists",
"",
"does not exist",
"does not exist",
"does not exist",
}
}

func (c *client) AlterTable(ctx context.Context, ta sql.TableAlter) (string, boilerplate.ActionApplyFn, error) {
var stmts []string
var alterColumnStmtBuilder strings.Builder
if err := renderTemplates(c.ep.Dialect).alterTableColumns.Execute(&alterColumnStmtBuilder, ta); err != nil {
return "", nil, fmt.Errorf("rendering alter table columns statement: %w", err)
}
alterColumnStmt := alterColumnStmtBuilder.String()
if len(strings.Trim(alterColumnStmt, "\n")) > 0 {
stmts = append(stmts, alterColumnStmt)
}

return alterColumnStmt, func(ctx context.Context) error {
_, err := c.db.ExecContext(ctx, alterColumnStmt)
return err
if len(ta.ColumnTypeChanges) > 0 {
var migrationsTable = c.ep.Dialect.Identifier(c.cfg.Schema, "flow_migrations")
stmts = append(stmts, fmt.Sprintf(`CREATE OR REPLACE TABLE %s("table" STRING, step INTEGER, col_identifier STRING, col_field STRING, col_ddl STRING);`, migrationsTable))

for _, ch := range ta.ColumnTypeChanges {
stmts = append(stmts, fmt.Sprintf(
`INSERT INTO %s("table", step, col_identifier, col_field, col_ddl) VALUES ('%s', 0, '%s', '%s', '%s');`,
migrationsTable,
ta.Identifier,
ch.Identifier,
ch.Field,
ch.DDL,
))
}

for _, ch := range ta.ColumnTypeChanges {
var steps, _ = c.columnChangeSteps(ta.Identifier, ch.Field, ch.Identifier, ch.DDL)

for s := 0; s < len(steps); s++ {
stmts = append(stmts, steps[s])
stmts = append(stmts, fmt.Sprintf(`UPDATE %s SET STEP=%d WHERE "table"='%s';`, migrationsTable, s+1, ta.Identifier))
}
}

stmts = append(stmts, fmt.Sprintf("DROP TABLE %s;", migrationsTable))
}

return strings.Join(stmts, "\n"), func(ctx context.Context) error {
for _, stmt := range stmts {
if _, err := c.db.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
}, nil
}

Expand Down
72 changes: 72 additions & 0 deletions materialize-snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"os"
"strings"
"testing"

"github.com/bradleyjkemp/cupaloy"
Expand Down Expand Up @@ -95,6 +96,77 @@ func TestValidateAndApply(t *testing.T) {
)
}

func TestValidateAndApplyMigrations(t *testing.T) {
ctx := context.Background()

cfg := mustGetCfg(t)

resourceConfig := tableConfig{
Table: "TARGET",
Schema: "PUBLIC",
}

dsn, err := cfg.toURI("testing")
require.NoError(t, err)

db, err := stdsql.Open("snowflake", dsn)
require.NoError(t, err)
defer db.Close()

sql.RunValidateAndApplyMigrationsTests(
t,
newSnowflakeDriver(),
cfg,
resourceConfig,
func(t *testing.T) string {
t.Helper()

sch, err := sql.StdGetSchema(ctx, db, cfg.Database, resourceConfig.Schema, resourceConfig.Table)
require.NoError(t, err)

return sch
},
func(t *testing.T, cols []string, values []string) {
t.Helper()

var keys = make([]string, len(cols))
for i, col := range cols {
keys[i] = testDialect.Identifier(col)
}
keys = append(keys, testDialect.Identifier("_meta/flow_truncated"))
values = append(values, "0")
keys = append(keys, testDialect.Identifier("flow_published_at"))
values = append(values, "'2024-09-13 01:01:01'")
keys = append(keys, testDialect.Identifier("flow_document"))
values = append(values, "PARSE_JSON('{}')")
q := fmt.Sprintf("insert into %s (%s) SELECT %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table), strings.Join(keys, ","), strings.Join(values, ","))
_, err = db.ExecContext(ctx, q)

require.NoError(t, err)
},
func(t *testing.T) string {
t.Helper()

rows, err := sql.DumpTestTable(t, db, testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table), testDialect.Identifier("key"))

require.NoError(t, err)

return rows
},
func(t *testing.T, materialization pf.Materialization) {
t.Helper()

_, _ = db.ExecContext(ctx, fmt.Sprintf("drop table %s;", testDialect.Identifier(resourceConfig.Schema, resourceConfig.Table)))

_, _ = db.ExecContext(ctx, fmt.Sprintf(
"delete from %s where materialization = %s",
testDialect.Identifier(cfg.Schema, sql.DefaultFlowMaterializations),
testDialect.Literal(materialization.String()),
))
},
)
}

func TestSpecification(t *testing.T) {
var resp, err = newSnowflakeDriver().
Spec(context.Background(), &pm.Request_Spec{})
Expand Down
3 changes: 3 additions & 0 deletions materialize-snowflake/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ var snowflakeDialect = func(configSchema string, timestampMapping timestampTypeM
}

return sql.Dialect{
MigratableTypes: map[sql.FlatType][]string{
sql.STRING: {"number", "float"},
},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
// A schema isn't required to be set on any resource, but the endpoint configuration
Expand Down

0 comments on commit 2d6619e

Please sign in to comment.