diff --git a/engine/storage/mysql/query.sql b/engine/storage/mysql/query.sql index 250231c..88fb22c 100644 --- a/engine/storage/mysql/query.sql +++ b/engine/storage/mysql/query.sql @@ -155,7 +155,7 @@ WHERE -- name: GetWorkflowLastStarted :one SELECT - last_created_at + last_created_unix FROM wf_status WHERE diff --git a/engine/storage/mysql/schema.00002.sql b/engine/storage/mysql/schema.00002.sql new file mode 100644 index 0000000..ba1f8f8 --- /dev/null +++ b/engine/storage/mysql/schema.00002.sql @@ -0,0 +1,3 @@ +ALTER TABLE wf_status + DROP COLUMN last_created_at, + ADD COLUMN last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP()); diff --git a/engine/storage/mysql/schema.sql b/engine/storage/mysql/schema.sql index e11171c..3a055f9 100644 --- a/engine/storage/mysql/schema.sql +++ b/engine/storage/mysql/schema.sql @@ -84,7 +84,11 @@ CREATE TABLE wf_status ( enrollment_id VARCHAR(255) NOT NULL, workflow_name VARCHAR(255) NOT NULL, - last_created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- this was intended to be "DEFAULT (UNIX_TIMESTAMP() * 1000)" + -- which would complement the Golang `time.Time{}.UnixMilli()`. + -- however sqlc seems to not support that syntax, so we'll settle + -- for less precision. + last_created_unix BIGINT NOT NULL DEFAULT (UNIX_TIMESTAMP()), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, diff --git a/engine/storage/mysql/sqlc/models.go b/engine/storage/mysql/sqlc/models.go index 28d3646..964c0b3 100644 --- a/engine/storage/mysql/sqlc/models.go +++ b/engine/storage/mysql/sqlc/models.go @@ -53,9 +53,9 @@ type WfEvent struct { } type WfStatus struct { - EnrollmentID string - WorkflowName string - LastCreatedAt string - CreatedAt sql.NullTime - UpdatedAt sql.NullTime + EnrollmentID string + WorkflowName string + LastCreatedUnix int64 + CreatedAt sql.NullTime + UpdatedAt sql.NullTime } diff --git a/engine/storage/mysql/sqlc/query.sql.go b/engine/storage/mysql/sqlc/query.sql.go index 128ac63..8b608dd 100644 --- a/engine/storage/mysql/sqlc/query.sql.go +++ b/engine/storage/mysql/sqlc/query.sql.go @@ -376,7 +376,7 @@ func (q *Queries) GetStepByID(ctx context.Context, id int64) (GetStepByIDRow, er const getWorkflowLastStarted = `-- name: GetWorkflowLastStarted :one SELECT - last_created_at + last_created_unix FROM wf_status WHERE @@ -389,11 +389,11 @@ type GetWorkflowLastStartedParams struct { WorkflowName string } -func (q *Queries) GetWorkflowLastStarted(ctx context.Context, arg GetWorkflowLastStartedParams) (string, error) { +func (q *Queries) GetWorkflowLastStarted(ctx context.Context, arg GetWorkflowLastStartedParams) (int64, error) { row := q.db.QueryRowContext(ctx, getWorkflowLastStarted, arg.EnrollmentID, arg.WorkflowName) - var last_created_at string - err := row.Scan(&last_created_at) - return last_created_at, err + var last_created_unix int64 + err := row.Scan(&last_created_unix) + return last_created_unix, err } const removeIDCommandsByStepID = `-- name: RemoveIDCommandsByStepID :exec diff --git a/engine/storage/mysql/storage.go b/engine/storage/mysql/storage.go index 22d9df1..7cd9e99 100644 --- a/engine/storage/mysql/storage.go +++ b/engine/storage/mysql/storage.go @@ -245,17 +245,11 @@ func (s *MySQLStorage) CancelSteps(ctx context.Context, id, workflowName string) // RetrieveWorkflowStarted returns the last time a workflow was started for id. func (s *MySQLStorage) RetrieveWorkflowStarted(ctx context.Context, id, workflowName string) (time.Time, error) { - ret, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName}) + epoch, err := s.q.GetWorkflowLastStarted(ctx, sqlc.GetWorkflowLastStartedParams{EnrollmentID: id, WorkflowName: workflowName}) if errors.Is(err, sql.ErrNoRows) { return time.Time{}, nil - } else if err != nil { - return time.Time{}, err } - parsedTime, err := time.Parse(mySQLTimestampFormat, ret) - if err != nil { - return time.Time{}, fmt.Errorf("parsing time: %w", err) - } - return parsedTime, err + return time.Unix(epoch, 0), err } // RecordWorkflowStarted stores the started time for workflowName for ids. @@ -265,26 +259,25 @@ func (s *MySQLStorage) RecordWorkflowStarted(ctx context.Context, ids []string, } const numFields = 3 const subst = ", (?, ?, ?)" - fmt.Println(len(ids), len(ids)-1) parms := make([]interface{}, len(ids)*numFields) - startedFormat := started.Format(mySQLTimestampFormat) + startedUnix := started.Unix() for i, id := range ids { // these must match the SQL query, below parms[i*numFields] = id parms[i*numFields+1] = workflowName - parms[i*numFields+2] = startedFormat + parms[i*numFields+2] = startedUnix } - val := subst[2:] + strings.Repeat(subst, len(ids)-1) + values := strings.Repeat(subst, len(ids))[2:] _, err := s.db.ExecContext( ctx, ` INSERT INTO wf_status - (enrollment_id, workflow_name, last_created_at) + (enrollment_id, workflow_name, last_created_unix) VALUES - `+val+` AS new + `+values+` AS new ON DUPLICATE KEY UPDATE - last_created_at = new.last_created_at;`, + last_created_unix = new.last_created_unix;`, parms..., ) return err diff --git a/engine/storage/test/test.go b/engine/storage/test/test.go index 63b8eba..d7d17f9 100644 --- a/engine/storage/test/test.go +++ b/engine/storage/test/test.go @@ -273,11 +273,7 @@ func mainTest(t *testing.T, s storage.AllStorage) { // t.Fatalf("invalid test data: step enqueueing with config: %v", err) // } - // some backends may truncate the time and drop TZ - // so let's truncate ourselves and eliminate the TZ. - // since this value is used to compare the retrived value - // we'll stick with that. - storedAt := time.Now().UTC().Truncate(time.Second) + storedAt := time.Now() err := s.StoreStep(ctx, tStep.step, storedAt) if tStep.shouldError && err == nil { @@ -301,8 +297,9 @@ func mainTest(t *testing.T, s storage.AllStorage) { } if ts.IsZero() { t.Errorf("RetrieveWorkflowStarted: nil timestamp for id=%s, step=%s err=%v", id, tStep.step.WorkflowName, err) - } else if ts != storedAt { - t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v", id, tStep.step.WorkflowName, storedAt, ts) + } else if t1, t2 := ts.Truncate(time.Second), storedAt.Truncate(time.Second); t1.Compare(t2) != 0 { + // truncate comparison in case backends don't persist precision less than 1s (e.g. SQL textual dates) + t.Errorf("RetrieveWorkflowStarted: timestamp mismatch for id=%s, step=%s expected=%v got=%v compare=%v", id, tStep.step.WorkflowName, t2, t1, t1.Compare(t2)) } } }