Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use epoch for wf started column in MySQL backend #72

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/storage/mysql/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ WHERE

-- name: GetWorkflowLastStarted :one
SELECT
last_created_at
last_created_unix
FROM
wf_status
WHERE
Expand Down
3 changes: 3 additions & 0 deletions engine/storage/mysql/schema.00002.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE wf_status
DROP COLUMN last_created_at,
ADD COLUMN last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP());
6 changes: 5 additions & 1 deletion engine/storage/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ CREATE TABLE wf_status (
enrollment_id VARCHAR(255) NOT NULL,
workflow_name VARCHAR(255) NOT NULL,

last_created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- this was intended to be "DEFAULT (UNIX_TIMESTAMP() * 1000)"
-- which would complement the Golang `time.Time{}.UnixMilli()`.
-- however sqlc seems to not support that syntax, so we'll settle
-- for less precision.
last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP()),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Expand Down
10 changes: 5 additions & 5 deletions engine/storage/mysql/sqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions engine/storage/mysql/sqlc/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 8 additions & 15 deletions engine/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,11 @@ func (s *MySQLStorage) CancelSteps(ctx context.Context, id, workflowName string)

// RetrieveWorkflowStarted returns the last time a workflow was started for id.
func (s *MySQLStorage) RetrieveWorkflowStarted(ctx context.Context, id, workflowName string) (time.Time, error) {
ret, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName})
epoch, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName})
if errors.Is(err, sql.ErrNoRows) {
return time.Time{}, nil
} else if err != nil {
return time.Time{}, err
}
parsedTime, err := time.Parse(mySQLTimestampFormat, ret)
if err != nil {
return time.Time{}, fmt.Errorf("parsing time: %w", err)
}
return parsedTime, err
return time.Unix(epoch, 0), err
}

// RecordWorkflowStarted stores the started time for workflowName for ids.
Expand All @@ -265,26 +259,25 @@ func (s *MySQLStorage) RecordWorkflowStarted(ctx context.Context, ids []string,
}
const numFields = 3
const subst = ", (?, ?, ?)"
fmt.Println(len(ids), len(ids)-1)
parms := make([]interface{}, len(ids)*numFields)
startedFormat := started.Format(mySQLTimestampFormat)
startedUnix := started.Unix()
for i, id := range ids {
// these must match the SQL query, below
parms[i*numFields] = id
parms[i*numFields+1] = workflowName
parms[i*numFields+2] = startedFormat
parms[i*numFields+2] = startedUnix
}
val := subst[2:] + strings.Repeat(subst, len(ids)-1)
values := strings.Repeat(subst, len(ids))[2:]
_, err := s.db.ExecContext(
ctx,
`
INSERT INTO wf_status
(enrollment_id, workflow_name, last_created_at)
(enrollment_id, workflow_name, last_created_unix)
VALUES
`+val+` AS new
`+values+` AS new
ON DUPLICATE KEY
UPDATE
last_created_at = new.last_created_at;`,
last_created_unix = new.last_created_unix;`,
parms...,
)
return err
Expand Down
11 changes: 4 additions & 7 deletions engine/storage/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,7 @@ func mainTest(t *testing.T, s storage.AllStorage) {
// t.Fatalf("invalid test data: step enqueueing with config: %v", err)
// }

// some backends may truncate the time and drop TZ
// so let's truncate ourselves and eliminate the TZ.
// since this value is used to compare the retrived value
// we'll stick with that.
storedAt := time.Now().UTC().Truncate(time.Second)
storedAt := time.Now()

err := s.StoreStep(ctx, tStep.step, storedAt)
if tStep.shouldError && err == nil {
Expand All @@ -301,8 +297,9 @@ func mainTest(t *testing.T, s storage.AllStorage) {
}
if ts.IsZero() {
t.Errorf("RetrieveWorkflowStarted: nil timestamp for id=%s, step=%s err=%v", id, tStep.step.WorkflowName, err)
} else if ts != storedAt {
t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v", id, tStep.step.WorkflowName, storedAt, ts)
} else if t1, t2 := ts.Truncate(time.Second), storedAt.Truncate(time.Second); t1.Compare(t2) != 0 {
// truncate comparison in case backends don't persist precision less than 1s (e.g. SQL textual dates)
t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v compare=%v", id, tStep.step.WorkflowName, t2, t1, t1.Compare(t2))
}
}
}
Expand Down