Skip to content

Commit

Permalink
Add FirstRunAt field to start workflow option logic (#6178)
Browse files Browse the repository at this point in the history
* Add logic for FirstRunAt field and add compatibility with IDL changes
  • Loading branch information
timl3136 authored Jul 26, 2024
1 parent 8522abd commit f3350d0
Show file tree
Hide file tree
Showing 23 changed files with 1,454 additions and 1,062 deletions.
208 changes: 200 additions & 8 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

767 changes: 385 additions & 382 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

767 changes: 385 additions & 382 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

469 changes: 235 additions & 234 deletions .gen/proto/shared/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0 h1:r4ZCsIfOVK06jnr8nBh9mR8Npxunh7aoldONrz6Kb9o=
github.com/uber/cadence-idl v0.0.0-20240627204638-12f43fe756a0/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d h1:1dX3Pr0wEW0TQFhj0lwCJPuYUtd7pOhScbiiwNiL1Tw=
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
6 changes: 6 additions & 0 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,7 @@ func FromSignalWithStartWorkflowExecutionRequest(t *types.SignalWithStartWorkflo
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
},
SignalName: t.SignalName,
SignalInput: FromPayload(t.SignalInput),
Expand Down Expand Up @@ -3418,6 +3419,7 @@ func ToSignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowE
Header: ToHeader(t.StartRequest.Header),
DelayStartSeconds: durationToSeconds(t.StartRequest.DelayStart),
JitterStartSeconds: durationToSeconds(t.StartRequest.JitterStart),
FirstRunAtTimestamp: timeToUnixNano(t.StartRequest.FirstRunAt),
}
}

Expand Down Expand Up @@ -3568,6 +3570,7 @@ func FromStartChildWorkflowExecutionInitiatedEventAttributes(t *types.StartChild
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimestamp),
}
}

Expand All @@ -3594,6 +3597,7 @@ func ToStartChildWorkflowExecutionInitiatedEventAttributes(t *apiv1.StartChildWo
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
FirstRunAtTimestamp: timeToUnixNano(t.FirstRunAt),
}
}

Expand Down Expand Up @@ -3743,6 +3747,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
Header: FromHeader(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
FirstRunAt: unixNanoToTime(t.FirstRunAtTimeStamp),
}
}

Expand All @@ -3768,6 +3773,7 @@ func ToStartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *ty
Header: ToHeader(t.Header),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
FirstRunAtTimeStamp: timeToUnixNano(t.FirstRunAt),
}
}

Expand Down
6 changes: 6 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5046,6 +5046,7 @@ func FromSignalWithStartWorkflowExecutionRequest(t *types.SignalWithStartWorkflo
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
Header: FromHeader(t.Header),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5073,6 +5074,7 @@ func ToSignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflow
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
Header: ToHeader(t.Header),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5210,6 +5212,7 @@ func FromStartChildWorkflowExecutionInitiatedEventAttributes(t *types.StartChild
Header: FromHeader(t.Header),
Memo: FromMemo(t.Memo),
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand All @@ -5235,6 +5238,7 @@ func ToStartChildWorkflowExecutionInitiatedEventAttributes(t *shared.StartChildW
Header: ToHeader(t.Header),
Memo: ToMemo(t.Memo),
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
FirstRunAtTimestamp: t.FirstRunAtTimestamp,
}
}

Expand Down Expand Up @@ -5369,6 +5373,7 @@ func FromStartWorkflowExecutionRequest(t *types.StartWorkflowExecutionRequest) *
Header: FromHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
FirstRunAtTimestamp: t.FirstRunAtTimeStamp,
}
}

Expand All @@ -5395,6 +5400,7 @@ func ToStartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *t
Header: ToHeader(t.Header),
DelayStartSeconds: t.DelayStartSeconds,
JitterStartSeconds: t.JitterStartSeconds,
FirstRunAtTimeStamp: t.FirstRunAtTimestamp,
}
}

Expand Down
11 changes: 11 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5932,6 +5932,7 @@ type SignalWithStartWorkflowExecutionRequest struct {
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimestamp *int64 `json:"firstRunAtTimestamp,omitempty"`
}

func (v *SignalWithStartWorkflowExecutionRequest) SerializeForLogging() (string, error) {
Expand Down Expand Up @@ -6226,6 +6227,7 @@ type StartChildWorkflowExecutionInitiatedEventAttributes struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimestamp *int64 `json:"firstRunAtTimestamp,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -6352,6 +6354,7 @@ type StartWorkflowExecutionRequest struct {
Header *Header `json:"header,omitempty"`
DelayStartSeconds *int32 `json:"delayStartSeconds,omitempty"`
JitterStartSeconds *int32 `json:"jitterStartSeconds,omitempty"`
FirstRunAtTimeStamp *int64 `json:"firstRunAtTimeStamp,omitempty"`
}

func (v *StartWorkflowExecutionRequest) SerializeForLogging() (string, error) {
Expand Down Expand Up @@ -6409,6 +6412,14 @@ func (v *StartWorkflowExecutionRequest) GetJitterStartSeconds() (o int32) {
return
}

// GetFirstRunAtTimeStamp is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetFirstRunAtTimeStamp() (o int64) {
if v != nil && v.FirstRunAtTimeStamp != nil {
return *v.FirstRunAtTimeStamp
}
return
}

// GetRequestID is an internal getter (TBD...)
func (v *StartWorkflowExecutionRequest) GetRequestID() (o string) {
if v != nil {
Expand Down
24 changes: 16 additions & 8 deletions common/types/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSignalWithStartWorkflowExecutionRequestSerializeForLogging(t *testing.T
}{
"complete request without error": {
input: createNewSignalWithStartWorkflowExecutionRequest(),
expectedOutput: "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1}",
expectedOutput: "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1,\"firstRunAtTimestamp\":1}",
expectedErrorOutput: nil,
},

Expand Down Expand Up @@ -187,7 +187,13 @@ func TestSerializeRequest(t *testing.T) {
testReq := createNewSignalWithStartWorkflowExecutionRequest()
serializeRes, err := SerializeRequest(testReq)

expectRes := "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"},\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1,\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\",\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\",\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1,\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1},\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1}"
expectRes := "{\"domain\":\"testDomain\",\"workflowId\":\"testWorkflowID\",\"workflowType\":{\"name\":\"testWorkflowType\"}," +
"\"taskList\":{\"name\":\"testTaskList\",\"kind\":\"STICKY\"},\"executionStartToCloseTimeoutSeconds\":1," +
"\"taskStartToCloseTimeoutSeconds\":1,\"identity\":\"testIdentity\",\"requestId\":\"DF66E35D-A5B0-425D-8731-6AAC4A4B6368\"," +
"\"workflowIdReusePolicy\":\"AllowDuplicate\",\"signalName\":\"testRequest\",\"control\":\"dGVzdENvbnRyb2w=\"," +
"\"retryPolicy\":{\"initialIntervalInSeconds\":1,\"backoffCoefficient\":1,\"maximumIntervalInSeconds\":1," +
"\"maximumAttempts\":1,\"nonRetriableErrorReasons\":[\"testArray\"],\"expirationIntervalInSeconds\":1}," +
"\"cronSchedule\":\"testSchedule\",\"header\":{},\"delayStartSeconds\":1,\"jitterStartSeconds\":1,\"firstRunAtTimestamp\":1}"
expectErr := error(nil)

assert.Equal(t, expectRes, serializeRes)
Expand All @@ -206,6 +212,7 @@ func createNewSignalWithStartWorkflowExecutionRequest() *SignalWithStartWorkflow
testWorkflowIDReusePolicy := WorkflowIDReusePolicy(1)
testDelayStartSeconds := int32(1)
testJitterStartSeconds := int32(1)
testFirstRunAtTimestamp := int64(1)
piiTestArray := []byte("testInputPII")
piiTestMap := make(map[string][]byte)
piiTestMap["PII"] = piiTestArray
Expand Down Expand Up @@ -235,12 +242,13 @@ func createNewSignalWithStartWorkflowExecutionRequest() *SignalWithStartWorkflow
NonRetriableErrorReasons: []string{"testArray"},
ExpirationIntervalInSeconds: 1,
},
CronSchedule: "testSchedule",
Memo: &Memo{Fields: piiTestMap},
SearchAttributes: &SearchAttributes{IndexedFields: piiTestMap},
Header: &Header{Fields: map[string][]byte{}},
DelayStartSeconds: &testDelayStartSeconds,
JitterStartSeconds: &testJitterStartSeconds,
CronSchedule: "testSchedule",
Memo: &Memo{Fields: piiTestMap},
SearchAttributes: &SearchAttributes{IndexedFields: piiTestMap},
Header: &Header{Fields: map[string][]byte{}},
DelayStartSeconds: &testDelayStartSeconds,
JitterStartSeconds: &testJitterStartSeconds,
FirstRunAtTimestamp: &testFirstRunAtTimestamp,
}
return testReq
}
Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ var (
Header: &Header,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
FirstRunAtTimestamp: &Timestamp1,
}
StartChildWorkflowExecutionFailedEventAttributes = types.StartChildWorkflowExecutionFailedEventAttributes{
Domain: DomainName,
Expand Down
2 changes: 2 additions & 0 deletions common/types/testdata/service_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ var (
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
FirstRunAtTimeStamp: &Timestamp1,
}
StartWorkflowExecutionResponse = types.StartWorkflowExecutionResponse{
RunID: RunID,
Expand Down Expand Up @@ -356,6 +357,7 @@ var (
Memo: &Memo,
SearchAttributes: &SearchAttributes,
Header: &Header,
FirstRunAtTimestamp: &Timestamp1,
}
SignalWithStartWorkflowExecutionAsyncRequest = types.SignalWithStartWorkflowExecutionAsyncRequest{
SignalWithStartWorkflowExecutionRequest: &SignalWithStartWorkflowExecutionRequest,
Expand Down
40 changes: 26 additions & 14 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,22 +547,34 @@ func CreateHistoryStartWorkflowRequest(

delayStartSeconds := startRequest.GetDelayStartSeconds()
jitterStartSeconds := startRequest.GetJitterStartSeconds()
firstRunAtTimestamp := startRequest.GetFirstRunAtTimeStamp()

firstDecisionTaskBackoffSeconds := delayStartSeconds
if len(startRequest.GetCronSchedule()) > 0 {
delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
var err error
firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
if err != nil {
return nil, err
}

// backoff seconds was calculated based on delayed start time, so we need to
// add the delayStartSeconds to that backoff.
firstDecisionTaskBackoffSeconds += delayStartSeconds
} else if jitterStartSeconds > 0 {
// Add a random jitter to start time, if requested.
firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
// if the user specified a timestamp for the first run, we will use that as the start time,
// ignoring the delayStartSeconds, jitterStartSeconds, and cronSchedule
// The following condition guarantees two things:
// - The logic is only triggered when the user specifies a first run timestamp
// - AND that timestamp is only triggered ONCE hence not interfering with other scheduling logic
if firstRunAtTimestamp > now.UnixNano() {
firstDecisionTaskBackoffSeconds = int32((firstRunAtTimestamp - now.UnixNano()) / int64(time.Second))
} else {
if len(startRequest.GetCronSchedule()) > 0 {
delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
var err error
firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
if err != nil {
return nil, err
}

// backoff seconds was calculated based on delayed start time, so we need to
// add the delayStartSeconds to that backoff.
firstDecisionTaskBackoffSeconds += delayStartSeconds
} else if jitterStartSeconds > 0 {
// Add a random jitter to start time, if requested.
firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
}
}

histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)
Expand Down
Loading

0 comments on commit f3350d0

Please sign in to comment.