Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-bigquery-batch: Fix behavior of DATETIME cursors with fractional-second precision #2086

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions source-bigquery-batch/.snapshots/TestDatetimeCursor-Capture
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ================================
# Collection "acmeCo/test/datetime_cursor_132448": 3 Documents
# ================================
{"_meta":{"polled":"<TIMESTAMP>","index":999},"data":"Value for row \"2023-08-10T07:54:54.123\"","id":"2023-08-10T07:54:54.123000"}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"data":"Value for row \"2024-10-23T03:22:31.456\"","id":"2024-10-23T03:22:31.456000"}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"data":"Value for row \"2024-10-23T03:23:00.789\"","id":"2024-10-23T03:23:00.789000"}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"datetime_cursor_132448":{"CursorNames":["id"],"CursorValues":["2024-10-23T03:23:00.789000"],"LastPolled":"<TIMESTAMP>"}}}

51 changes: 51 additions & 0 deletions source-bigquery-batch/.snapshots/TestDatetimeCursor-Discovery
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Binding 0:
{
"resource_config_json": {
"name": "datetime_cursor_132448",
"template": "{{/* Default query template which adapts to cursor field selection */}}\n{{- if not .CursorFields -}}\n SELECT * FROM `testdata`.`datetime_cursor_132448`;\n{{- else -}}\n SELECT * FROM `testdata`.`datetime_cursor_132448`\n {{- if not .IsFirstQuery -}}\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = @p{{$j}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e @p{{$i}}\n\t{{- end -}})\n {{- end}}\n ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- end}}"
},
"resource_path": [
"datetime_cursor_132448"
],
"collection": {
"name": "acmeCo/test/datetime_cursor_132448",
"read_schema_json": {
"type": "object",
"required": [
"_meta"
],
"properties": {
"_meta": {
"$schema": "http://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
}
},
"type": "object",
"required": [
"polled",
"index"
]
}
},
"x-infer-schema": true
},
"key": [
"/_meta/polled",
"/_meta/index"
],
"projections": null
},
"state_key": "datetime_cursor_132448"
}

12 changes: 11 additions & 1 deletion source-bigquery-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
documentsPerCheckpoint = 1000
)

var (
// TestShutdownAfterQuery is a test behavior flag which causes the capture to
// shut down after issuing one query to each binding. It is always false in
// normal operation.
TestShutdownAfterQuery = false
)

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
// by a config schema, connect function, and value translation logic.
type BatchSQLDriver struct {
Expand Down Expand Up @@ -537,6 +544,9 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error {
if err := c.poll(ctx, binding, queryTemplate); err != nil {
return fmt.Errorf("error polling binding %q: %w", res.Name, err)
}
if TestShutdownAfterQuery {
return nil // In tests, we want each worker to shut down after one poll
}
}
return ctx.Err()
}
Expand Down Expand Up @@ -601,7 +611,7 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template

log.WithFields(log.Fields{
"query": query,
"args": cursorValues,
"args": fmt.Sprintf("%#v", cursorValues),
}).Info("executing query")
var pollTime = time.Now().UTC()

Expand Down
9 changes: 9 additions & 0 deletions source-bigquery-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"math"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
Expand Down Expand Up @@ -62,8 +64,15 @@ func (c *Config) SetDefaults() {
}
}

const (
// Google Cloud DATETIME columns support microsecond precision at most
datetimeFormatMicros = "2006-01-02T15:04:05.000000"
)

func translateBigQueryValue(val any, fieldType bigquery.FieldType) (any, error) {
switch val := val.(type) {
case civil.DateTime:
return val.In(time.UTC).Format(datetimeFormatMicros), nil
case string:
if fieldType == "JSON" && json.Valid([]byte(val)) {
return json.RawMessage([]byte(val)), nil
Expand Down
108 changes: 81 additions & 27 deletions source-bigquery-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,12 @@ func TestBasicCapture(t *testing.T) {
var uniqueID = "826935"
var tableName = fmt.Sprintf("testdata.basic_capture_%s", uniqueID)

executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
t.Cleanup(func() { executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) })
executeSetupQuery(ctx, t, client, fmt.Sprintf("CREATE TABLE %s(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING)", tableName))

// Discover the table and verify discovery snapshot
createTestTable(ctx, t, client, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING)")
cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID))
t.Run("Discovery", func(t *testing.T) {
var summary = new(strings.Builder)
for idx, binding := range cs.Bindings {
fmt.Fprintf(summary, "Binding %d:\n", idx)
bs, err := json.MarshalIndent(binding, " ", " ")
require.NoError(t, err)
io.Copy(summary, bytes.NewReader(bs))
fmt.Fprintf(summary, "\n")
}
if len(cs.Bindings) == 0 {
fmt.Fprintf(summary, "(no output)")
}
cupaloy.SnapshotT(t, summary.String())
})

// Edit the discovered binding to use the ID column as a cursor
var res Resource
require.NoError(t, json.Unmarshal(cs.Bindings[0].ResourceConfigJson, &res))
res.Cursor = []string{"id"}
resourceConfigBytes, err := json.Marshal(res)
require.NoError(t, err)
cs.Bindings[0].ResourceConfigJson = resourceConfigBytes
t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) })

setCursorColumns(t, cs.Bindings[0], "id")

t.Run("Capture", func(t *testing.T) {
// Spawn a worker thread which will insert 10 rows of data as distinct inserts.
Expand Down Expand Up @@ -155,6 +133,34 @@ func TestBasicCapture(t *testing.T) {
})
}

func TestDatetimeCursor(t *testing.T) {
var ctx, cs = context.Background(), testCaptureSpec(t)
var client = testBigQueryClient(ctx, t)
var uniqueID = "132448"
var tableName = fmt.Sprintf("testdata.datetime_cursor_%s", uniqueID)

createTestTable(ctx, t, client, tableName, "(id DATETIME, data STRING)")
for _, x := range []string{"2023-08-10T07:54:54.123", "2024-10-23T03:22:31.456", "2024-10-23T03:23:00.789"} {
executeSetupQuery(ctx, t, client, fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), x, fmt.Sprintf("Value for row %q", x))
}

cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID))

t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) })

setShutdownAfterQuery(t, true)
setCursorColumns(t, cs.Bindings[0], "id")
setQueryLimit(t, cs.Bindings[0], 1)

t.Run("Capture", func(t *testing.T) {
var deadline = time.Now().Add(10 * time.Second)
for time.Now().Before(deadline) {
cs.Capture(ctx, t, nil)
}
cupaloy.SnapshotT(t, cs.Summary())
})
}

func testBigQueryClient(ctx context.Context, t testing.TB) *bigquery.Client {
t.Helper()
if os.Getenv("TEST_DATABASE") != "yes" {
Expand Down Expand Up @@ -195,7 +201,8 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec {
}

var sanitizers = make(map[string]*regexp.Regexp)
sanitizers[`"<TIMESTAMP>"`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"polled":"<TIMESTAMP>"`] = regexp.MustCompile(`"polled":"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"LastPolled":"<TIMESTAMP>"`] = regexp.MustCompile(`"LastPolled":"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"index":999`] = regexp.MustCompile(`"index":[0-9]+`)

return &st.CaptureSpec{
Expand Down Expand Up @@ -246,3 +253,50 @@ func executeSetupQuery(ctx context.Context, t testing.TB, client *bigquery.Clien
require.NoError(t, err)
require.NoError(t, status.Err())
}

func createTestTable(ctx context.Context, t testing.TB, client *bigquery.Client, tableName, tableDef string) {
t.Helper()
executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
t.Cleanup(func() { executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) })
executeSetupQuery(ctx, t, client, fmt.Sprintf("CREATE TABLE %s%s", tableName, tableDef))
}

func summarizeBindings(t testing.TB, bindings []*pf.CaptureSpec_Binding) string {
t.Helper()
var summary = new(strings.Builder)
for idx, binding := range bindings {
fmt.Fprintf(summary, "Binding %d:\n", idx)
bs, err := json.MarshalIndent(binding, " ", " ")
require.NoError(t, err)
io.Copy(summary, bytes.NewReader(bs))
fmt.Fprintf(summary, "\n")
}
if len(bindings) == 0 {
fmt.Fprintf(summary, "(no output)")
}
return summary.String()
}

func setCursorColumns(t testing.TB, binding *pf.CaptureSpec_Binding, cursor ...string) {
var res Resource
require.NoError(t, json.Unmarshal(binding.ResourceConfigJson, &res))
res.Cursor = cursor
resourceConfigBytes, err := json.Marshal(res)
require.NoError(t, err)
binding.ResourceConfigJson = resourceConfigBytes
}

func setQueryLimit(t testing.TB, binding *pf.CaptureSpec_Binding, limit int) {
var res Resource
require.NoError(t, json.Unmarshal(binding.ResourceConfigJson, &res))
res.Template = strings.ReplaceAll(res.Template, ";", fmt.Sprintf(" LIMIT %d;", limit))
resourceConfigBytes, err := json.Marshal(res)
require.NoError(t, err)
binding.ResourceConfigJson = resourceConfigBytes
}

func setShutdownAfterQuery(t testing.TB, setting bool) {
var oldSetting = TestShutdownAfterQuery
TestShutdownAfterQuery = setting
t.Cleanup(func() { TestShutdownAfterQuery = oldSetting })
}
Loading