From 1497967cd75186e852366e10d39dae032bcbc251 Mon Sep 17 00:00:00 2001 From: Zijian Date: Tue, 26 Mar 2024 18:39:48 -0700 Subject: [PATCH] Update mutable state to generate and replicate requestIDs --- common/dynamicconfig/constants.go | 13 ++ common/log/tag/tags.go | 5 + config/dynamicconfig/development.yaml | 3 + service/history/config/config.go | 5 + .../engine/engineimpl/historyEngine.go | 20 ++ .../engine/engineimpl/historyEngine2_test.go | 202 ++++++++++++++++++ service/history/execution/context.go | 67 +++++- service/history/execution/context_mock.go | 16 +- service/history/execution/context_test.go | 59 ++--- .../history/execution/history_builder_test.go | 12 ++ service/history/execution/mutable_state.go | 4 +- .../execution/mutable_state_builder.go | 49 ++++- .../execution/mutable_state_builder_test.go | 32 ++- .../mutable_state_decision_task_manager.go | 49 ++++- ...utable_state_decision_task_manager_mock.go | 16 +- .../history/execution/mutable_state_mock.go | 10 +- .../history/execution/mutable_state_util.go | 9 + .../execution/mutable_state_util_test.go | 21 ++ service/history/execution/state_builder.go | 4 +- .../history/execution/state_builder_test.go | 4 +- service/history/ndc/activity_replicator.go | 1 + .../history/ndc/activity_replicator_test.go | 2 + .../existing_workflow_transaction_manager.go | 1 + ...sting_workflow_transaction_manager_test.go | 2 + .../ndc/new_workflow_transaction_manager.go | 4 + .../new_workflow_transaction_manager_test.go | 5 + service/history/ndc/transaction_manager.go | 1 + .../history/ndc/transaction_manager_test.go | 12 +- service/history/reset/resetter.go | 1 + service/history/reset/resetter_test.go | 1 + service/history/shard/context.go | 2 + .../task/timer_standby_task_executor_test.go | 2 + service/history/workflow/util.go | 3 + 33 files changed, 546 insertions(+), 91 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 9faca3600ee..89d12e83d44 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2003,6 +2003,13 @@ const ( // Allowed filters: DomainName EnableRetryForChecksumFailure + // EnableStrongIdempotency enables strong idempotency for APIs + // KeyName: history.enableStrongIdempotency + // Value type: Bool + // Default value: false + // Allowed filters: DomainName + EnableStrongIdempotency + // LastBoolKey must be the last one in this const group LastBoolKey ) @@ -4310,6 +4317,12 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails", DefaultValue: false, }, + EnableStrongIdempotency: DynamicBool{ + KeyName: "history.enableStrongIdempotency", + Filters: []Filter{DomainName}, + Description: "EnableStrongIdempotency enables strong idempotency for APIs", + DefaultValue: false, + }, } var FloatKeys = map[FloatKey]DynamicFloat{ diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 2b00c54ba18..7e61cbd83c2 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -123,6 +123,11 @@ func WorkflowSignalName(signalName string) Tag { return newStringTag("wf-signal-name", signalName) } +// WorkflowRequestID returns tag for WorkflowRequestID +func WorkflowRequestID(requestID string) Tag { + return newStringTag("wf-request-id", requestID) +} + // WorkflowState returns tag for WorkflowState func WorkflowState(s int) Tag { return newInt("wf-state", s) diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index 6805841d27f..964686ff600 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -8,6 +8,9 @@ history.EnableConsistentQueryByDomain: - value: true constraints: {} history.useNewInitialFailoverVersion: +- value: true + constraints: {} +history.enableStrongIdempotency: - value: true constraints: {} frontend.validSearchAttributes: diff --git a/service/history/config/config.go b/service/history/config/config.go index c146c90972e..ee72acdc0fe 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -338,6 +338,8 @@ type Config struct { LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn + EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter + // HostName for machine running the service HostName string } @@ -596,6 +598,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold), LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold), + EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency), + HostName: hostname, } @@ -637,6 +641,7 @@ func NewForTestByShardNumber(shardNumber int) *Config { })) panicIfErr(inMem.UpdateValue(dynamicconfig.QueueProcessorRandomSplitProbability, 0.5)) panicIfErr(inMem.UpdateValue(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true)) + panicIfErr(inMem.UpdateValue(dynamicconfig.EnableStrongIdempotency, true)) dc := dynamicconfig.NewCollection(inMem, log.NewNoop()) config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false, "") diff --git a/service/history/engine/engineimpl/historyEngine.go b/service/history/engine/engineimpl/historyEngine.go index 7fcb2bfdfb9..17c00861bed 100644 --- a/service/history/engine/engineimpl/historyEngine.go +++ b/service/history/engine/engineimpl/historyEngine.go @@ -787,7 +787,13 @@ func (e *historyEngineImpl) startWorkflowHelper( createMode, prevRunID, prevLastWriteVersion, + persistence.CreateWorkflowRequestModeNew, ) + if t, ok := err.(*persistence.DuplicateRequestError); ok { + return &types.StartWorkflowExecutionResponse{ + RunID: t.RunID, + }, nil + } // handle already started error if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { @@ -853,7 +859,13 @@ func (e *historyEngineImpl) startWorkflowHelper( createMode, prevRunID, t.LastWriteVersion, + persistence.CreateWorkflowRequestModeNew, ) + if t, ok := err.(*persistence.DuplicateRequestError); ok { + return &types.StartWorkflowExecutionResponse{ + RunID: t.RunID, + }, nil + } } if err != nil { return nil, err @@ -2620,6 +2632,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( // We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload // the history and try the operation again. if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil { + if t, ok := err.(*persistence.DuplicateRequestError); ok { + return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil + } if execution.IsConflictError(err) { continue Just_Signal_Loop } @@ -3014,6 +3029,11 @@ func (e *historyEngineImpl) ResetWorkflowExecution( nil, request.GetSkipSignalReapply(), ); err != nil { + if t, ok := err.(*persistence.DuplicateRequestError); ok { + return &types.ResetWorkflowExecutionResponse{ + RunID: t.RunID, + }, nil + } return nil, err } return &types.ResetWorkflowExecutionResponse{ diff --git a/service/history/engine/engineimpl/historyEngine2_test.go b/service/history/engine/engineimpl/historyEngine2_test.go index bd3c2e931f7..0fba428e2de 100644 --- a/service/history/engine/engineimpl/historyEngine2_test.go +++ b/service/history/engine/engineimpl/historyEngine2_test.go @@ -838,6 +838,37 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() { s.Equal(int64(4), executionBuilder.GetNextEventID()) } +func (s *engine2Suite) TestRequestCancelWorkflowExecutionDuplicateRequestError() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, false) + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once() + + err := s.historyEngine.RequestCancelWorkflowExecution(context.Background(), &types.HistoryRequestCancelWorkflowExecutionRequest{ + DomainUUID: domainID, + CancelRequest: &types.RequestCancelWorkflowExecutionRequest{ + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: workflowExecution.WorkflowID, + RunID: workflowExecution.RunID, + }, + Identity: "identity", + }, + }) + s.Nil(err) +} + func (s *engine2Suite) TestRequestCancelWorkflowExecutionAlreadyCancelled_Success() { domainID := constants.TestDomainID workflowExecution := types.WorkflowExecution{ @@ -1057,6 +1088,40 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() { s.NotNil(resp.RunID) } +func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_DuplicateRequestError() { + domainID := constants.TestDomainID + workflowID := "workflowID" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + partitionConfig := map[string]string{ + "zone": "phx", + } + + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool { + return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig) + })).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once() + + requestID := uuid.New() + resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &types.HistoryStartWorkflowExecutionRequest{ + DomainUUID: domainID, + StartRequest: &types.StartWorkflowExecutionRequest{ + Domain: domainID, + WorkflowID: workflowID, + WorkflowType: &types.WorkflowType{Name: workflowType}, + TaskList: &types.TaskList{Name: taskList}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: identity, + RequestID: requestID, + }, + PartitionConfig: partitionConfig, + }) + s.NoError(err) + s.Equal("test-run-id", resp.RunID) +} + func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { domainID := constants.TestDomainID workflowID := "workflowID" @@ -1348,6 +1413,48 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() { s.Equal(runID, resp.GetRunID()) } +func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal_DuplicateRequestError() { + sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} + _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) + s.Error(err) + + domainID := constants.TestDomainID + workflowID := "wId" + runID := constants.TestRunID + identity := "testIdentity" + signalName := "my signal name" + input := []byte("test input") + sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{ + DomainUUID: domainID, + SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{ + Domain: domainID, + WorkflowID: workflowID, + Identity: identity, + SignalName: signalName, + Input: input, + }, + } + + msBuilder := execution.NewMutableStateBuilderWithEventV2( + s.historyEngine.shard, + testlogger.New(s.Suite.T()), + runID, + constants.TestLocalDomainEntry, + ) + ms := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms} + gceResponse := &p.GetCurrentExecutionResponse{RunID: runID} + + s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once() + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once() + + resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) + s.Nil(err) + s.Equal("test-run-id", resp.GetRunID()) +} + func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() { sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) @@ -1395,6 +1502,52 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() { s.NotNil(resp.GetRunID()) } +func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist_DuplicateRequestError() { + sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} + _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) + s.Error(err) + + domainID := constants.TestDomainID + workflowID := "wId" + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + signalName := "my signal name" + input := []byte("test input") + requestID := uuid.New() + partitionConfig := map[string]string{ + "zone": "phx", + } + + sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{ + DomainUUID: domainID, + SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{ + Domain: domainID, + WorkflowID: workflowID, + WorkflowType: &types.WorkflowType{Name: workflowType}, + TaskList: &types.TaskList{Name: taskList}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: identity, + SignalName: signalName, + Input: input, + RequestID: requestID, + }, + PartitionConfig: partitionConfig, + } + + notExistErr := &types.EntityNotExistsError{Message: "Workflow not exist"} + + s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(nil, notExistErr).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool { + return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig) + })).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once() + + resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) + s.Nil(err) + s.Equal("test-run-id", resp.GetRunID()) +} func (s *engine2Suite) TestSignalWithStartWorkflowExecution_CreateTimeout() { sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{} _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) @@ -1555,6 +1708,55 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque s.Equal(runID, resp.GetRunID()) } +func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateRequestError() { + domainID := constants.TestDomainID + workflowID := "wId" + runID := constants.TestRunID + workflowType := "workflowType" + taskList := "testTaskList" + identity := "testIdentity" + signalName := "my signal name" + input := []byte("test input") + requestID := "testRequestID" + policy := types.WorkflowIDReusePolicyAllowDuplicate + sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{ + DomainUUID: domainID, + SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{ + Domain: domainID, + WorkflowID: workflowID, + WorkflowType: &types.WorkflowType{Name: workflowType}, + TaskList: &types.TaskList{Name: taskList}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: identity, + SignalName: signalName, + Input: input, + RequestID: requestID, + WorkflowIDReusePolicy: &policy, + }, + } + + msBuilder := execution.NewMutableStateBuilderWithEventV2( + s.historyEngine.shard, + testlogger.New(s.Suite.T()), + runID, + constants.TestLocalDomainEntry, + ) + ms := execution.CreatePersistenceMutableState(msBuilder) + ms.ExecutionInfo.State = p.WorkflowStateCompleted + gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms} + gceResponse := &p.GetCurrentExecutionResponse{RunID: runID} + + s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once() + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once() + + resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) + s.Nil(err) + s.Equal("test-run-id", resp.GetRunID()) +} + func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlreadyStarted() { domainID := constants.TestDomainID workflowID := "wId" diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 0e5fe1ee150..59f471ea2e2 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -115,6 +115,7 @@ type ( createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, + workflowRequestMode persistence.CreateWorkflowRequestMode, ) error ConflictResolveWorkflowExecution( ctx context.Context, @@ -155,6 +156,7 @@ type ( newMutableState MutableState, currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy, + workflowRequestMode persistence.CreateWorkflowRequestMode, ) error UpdateWorkflowExecutionTasks( ctx context.Context, @@ -182,7 +184,7 @@ type ( getWorkflowExecutionFn func(context.Context, *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) createWorkflowExecutionFn func(context.Context, *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) updateWorkflowExecutionFn func(context.Context, *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) - updateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + updateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error notifyTasksFromWorkflowSnapshotFn func(*persistence.WorkflowSnapshot, events.PersistedBlobs, bool) notifyTasksFromWorkflowMutationFn func(*persistence.WorkflowMutation, events.PersistedBlobs, bool) emitSessionUpdateStatsFn func(string, *persistence.MutableStateUpdateSessionStats) @@ -410,6 +412,7 @@ func (c *contextImpl) CreateWorkflowExecution( createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, + workflowRequestMode persistence.CreateWorkflowRequestMode, ) (retError error) { defer func() { @@ -417,6 +420,10 @@ func (c *contextImpl) CreateWorkflowExecution( c.Clear() } }() + err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode) + if err != nil { + c.logger.Error("workflow requests and mode validation error", tag.Error(err)) + } domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID) if errorDomainName != nil { return errorDomainName @@ -426,9 +433,9 @@ func (c *contextImpl) CreateWorkflowExecution( Mode: createMode, PreviousRunID: prevRunID, PreviousLastWriteVersion: prevLastWriteVersion, - - NewWorkflowSnapshot: *newWorkflow, - DomainName: domain, + NewWorkflowSnapshot: *newWorkflow, + WorkflowRequestMode: workflowRequestMode, + DomainName: domain, } historySize := int64(len(persistedHistory.Data)) @@ -499,7 +506,9 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( newContext.Clear() } }() - + if len(resetWorkflow.WorkflowRequests) != 0 && len(newWorkflow.WorkflowRequests) != 0 { + c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for ConflictResolveWorkflowExecution") + } newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(now, TransactionPolicyPassive) if err != nil { return err @@ -526,11 +535,13 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( currentContext.Clear() } }() - currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(now, *currentTransactionPolicy) if err != nil { return err } + if len(currentWorkflow.WorkflowRequests) != 0 { + c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution") + } currentWorkflowSize := currentContext.GetHistorySize() for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.persistNonStartWorkflowBatchEventsFn(ctx, workflowEvents) @@ -564,6 +575,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( ResetWorkflowSnapshot: *resetWorkflow, NewWorkflowSnapshot: newWorkflow, CurrentWorkflowMutation: currentWorkflow, + WorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, // Encoding, this is set by shard context DomainName: domain, }) @@ -616,7 +628,7 @@ func (c *contextImpl) UpdateWorkflowExecutionAsActive( ctx context.Context, now time.Time, ) error { - return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, TransactionPolicyActive, nil) + return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, TransactionPolicyActive, nil, persistence.CreateWorkflowRequestModeNew) } func (c *contextImpl) UpdateWorkflowExecutionWithNewAsActive( @@ -625,14 +637,14 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNewAsActive( newContext Context, newMutableState MutableState, ) error { - return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, newContext, newMutableState, TransactionPolicyActive, TransactionPolicyActive.Ptr()) + return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, newContext, newMutableState, TransactionPolicyActive, TransactionPolicyActive.Ptr(), persistence.CreateWorkflowRequestModeNew) } func (c *contextImpl) UpdateWorkflowExecutionAsPassive( ctx context.Context, now time.Time, ) error { - return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, TransactionPolicyPassive, nil) + return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, TransactionPolicyPassive, nil, persistence.CreateWorkflowRequestModeReplicated) } func (c *contextImpl) UpdateWorkflowExecutionWithNewAsPassive( @@ -641,7 +653,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNewAsPassive( newContext Context, newMutableState MutableState, ) error { - return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, newContext, newMutableState, TransactionPolicyPassive, TransactionPolicyPassive.Ptr()) + return c.updateWorkflowExecutionWithNewFn(ctx, now, persistence.UpdateWorkflowModeUpdateCurrent, newContext, newMutableState, TransactionPolicyPassive, TransactionPolicyPassive.Ptr(), persistence.CreateWorkflowRequestModeReplicated) } func (c *contextImpl) UpdateWorkflowExecutionTasks( @@ -664,6 +676,10 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks( Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events", } } + if len(currentWorkflow.WorkflowRequests) != 0 { + // TODO: convert this log to an error once we enable this feature for a long time in production + c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests") + } currentWorkflow.ExecutionStats = &persistence.ExecutionStats{ HistorySize: c.GetHistorySize(), } @@ -698,6 +714,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( newMutableState MutableState, currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy, + workflowRequestMode persistence.CreateWorkflowRequestMode, ) (retError error) { defer func() { if retError != nil { @@ -709,6 +726,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( if err != nil { return err } + err = validateWorkflowRequestsAndMode(currentWorkflow.WorkflowRequests, workflowRequestMode) + if err != nil { + c.logger.Error("workflow requests and mode validation error", tag.Error(err)) + } var persistedBlobs events.PersistedBlobs currentWorkflowSize := c.GetHistorySize() oldWorkflowSize := currentWorkflowSize @@ -736,7 +757,16 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( newContext.Clear() } }() + if len(newWorkflow.WorkflowRequests) != 0 && len(currentWorkflow.WorkflowRequests) != 0 { + // TODO: convert it to an error if we've verified in production + c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for UpdateWorkflowExecution") + } + err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode) + if err != nil { + // TODO: convert it to an error if we've verified in production + c.logger.Error("workflow requests and mode validation error", tag.Error(err)) + } newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot( now, *newWorkflowTransactionPolicy, @@ -785,6 +815,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( Mode: updateMode, UpdateWorkflowMutation: *currentWorkflow, NewWorkflowSnapshot: newWorkflow, + WorkflowRequestMode: workflowRequestMode, // Encoding, this is set by shard context DomainName: domain, }) @@ -1371,3 +1402,19 @@ func isOperationPossiblySuccessfulError(err error) bool { return !IsConflictError(err) } } + +func validateWorkflowRequestsAndMode(requests []*persistence.WorkflowRequest, mode persistence.CreateWorkflowRequestMode) error { + if mode == persistence.CreateWorkflowRequestModeNew { + if len(requests) > 2 { + return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."} + } else if len(requests) == 2 { + // SignalWithStartWorkflow API can generate 2 workflow requests + if (requests[0].RequestType == persistence.WorkflowRequestTypeStart && requests[1].RequestType == persistence.WorkflowRequestTypeSignal) || + (requests[1].RequestType == persistence.WorkflowRequestTypeStart && requests[0].RequestType == persistence.WorkflowRequestTypeSignal) { + return nil + } + return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."} + } + } + return nil +} diff --git a/service/history/execution/context_mock.go b/service/history/execution/context_mock.go index db3172303c7..34976fdb24a 100644 --- a/service/history/execution/context_mock.go +++ b/service/history/execution/context_mock.go @@ -88,17 +88,17 @@ func (mr *MockContextMockRecorder) ConflictResolveWorkflowExecution(ctx, now, co } // CreateWorkflowExecution mocks base method. -func (m *MockContext) CreateWorkflowExecution(ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, persistedHistory events.PersistedBlob, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64) error { +func (m *MockContext) CreateWorkflowExecution(ctx context.Context, newWorkflow *persistence.WorkflowSnapshot, persistedHistory events.PersistedBlob, createMode persistence.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, workflowRequestMode persistence.CreateWorkflowRequestMode) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion) + ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion, workflowRequestMode) ret0, _ := ret[0].(error) return ret0 } // CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. -func (mr *MockContextMockRecorder) CreateWorkflowExecution(ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) CreateWorkflowExecution(ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion, workflowRequestMode interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).CreateWorkflowExecution), ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).CreateWorkflowExecution), ctx, newWorkflow, persistedHistory, createMode, prevRunID, prevLastWriteVersion, workflowRequestMode) } // GetDomainID mocks base method. @@ -353,17 +353,17 @@ func (mr *MockContextMockRecorder) UpdateWorkflowExecutionTasks(ctx, now interfa } // UpdateWorkflowExecutionWithNew mocks base method. -func (m *MockContext) UpdateWorkflowExecutionWithNew(ctx context.Context, now time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy) error { +func (m *MockContext) UpdateWorkflowExecutionWithNew(ctx context.Context, now time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy, workflowRequestMode persistence.CreateWorkflowRequestMode) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateWorkflowExecutionWithNew", ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy) + ret := m.ctrl.Call(m, "UpdateWorkflowExecutionWithNew", ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy, workflowRequestMode) ret0, _ := ret[0].(error) return ret0 } // UpdateWorkflowExecutionWithNew indicates an expected call of UpdateWorkflowExecutionWithNew. -func (mr *MockContextMockRecorder) UpdateWorkflowExecutionWithNew(ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) UpdateWorkflowExecutionWithNew(ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy, workflowRequestMode interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecutionWithNew", reflect.TypeOf((*MockContext)(nil).UpdateWorkflowExecutionWithNew), ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecutionWithNew", reflect.TypeOf((*MockContext)(nil).UpdateWorkflowExecutionWithNew), ctx, now, updateMode, newContext, newMutableState, currentWorkflowTransactionPolicy, newWorkflowTransactionPolicy, workflowRequestMode) } // UpdateWorkflowExecutionWithNewAsActive mocks base method. diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index ae6e33ba727..a6bc7808137 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -1089,6 +1089,7 @@ func TestCreateWorkflowExecution(t *testing.T) { createMode persistence.CreateWorkflowMode prevRunID string prevLastWriteVersion int64 + createWorkflowRequestMode persistence.CreateWorkflowRequestMode mockCreateWorkflowExecutionFn func(context.Context, *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) mockNotifyTasksFromWorkflowSnapshotFn func(*persistence.WorkflowSnapshot, events.PersistedBlobs, bool) mockEmitSessionUpdateStatsFn func(string, *persistence.MutableStateUpdateSessionStats) @@ -1110,9 +1111,10 @@ func TestCreateWorkflowExecution(t *testing.T) { BranchToken: []byte{1, 2, 3}, FirstEventID: 1, }, - createMode: persistence.CreateWorkflowModeContinueAsNew, - prevRunID: "test-prev-run-id", - prevLastWriteVersion: 123, + createMode: persistence.CreateWorkflowModeContinueAsNew, + prevRunID: "test-prev-run-id", + prevLastWriteVersion: 123, + createWorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, mockCreateWorkflowExecutionFn: func(context.Context, *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) { return nil, &types.InternalServiceError{} }, @@ -1137,9 +1139,10 @@ func TestCreateWorkflowExecution(t *testing.T) { BranchToken: []byte{1, 2, 3}, FirstEventID: 1, }, - createMode: persistence.CreateWorkflowModeContinueAsNew, - prevRunID: "test-prev-run-id", - prevLastWriteVersion: 123, + createMode: persistence.CreateWorkflowModeContinueAsNew, + prevRunID: "test-prev-run-id", + prevLastWriteVersion: 123, + createWorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, mockCreateWorkflowExecutionFn: func(ctx context.Context, req *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) { assert.Equal(t, &persistence.CreateWorkflowExecutionRequest{ Mode: persistence.CreateWorkflowModeContinueAsNew, @@ -1155,7 +1158,8 @@ func TestCreateWorkflowExecution(t *testing.T) { HistorySize: 3, }, }, - DomainName: "test-domain", + WorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, + DomainName: "test-domain", }, req) return &persistence.CreateWorkflowExecutionResponse{ MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{ @@ -1213,7 +1217,7 @@ func TestCreateWorkflowExecution(t *testing.T) { if tc.mockEmitSessionUpdateStatsFn != nil { ctx.emitSessionUpdateStatsFn = tc.mockEmitSessionUpdateStatsFn } - err := ctx.CreateWorkflowExecution(context.Background(), tc.newWorkflow, tc.history, tc.createMode, tc.prevRunID, tc.prevLastWriteVersion) + err := ctx.CreateWorkflowExecution(context.Background(), tc.newWorkflow, tc.history, tc.createMode, tc.prevRunID, tc.prevLastWriteVersion, tc.createWorkflowRequestMode) if tc.wantErr { assert.Error(t, err) } else { @@ -1381,6 +1385,7 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { newContext Context currentWorkflowTransactionPolicy TransactionPolicy newWorkflowTransactionPolicy *TransactionPolicy + workflowRequestMode persistence.CreateWorkflowRequestMode mockSetup func(*shard.MockContext, *cache.MockDomainCache, *MockMutableState, *MockMutableState, *engine.MockEngine) mockPersistNonStartWorkflowBatchEventsFn func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) mockPersistStartWorkflowBatchEventsFn func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) @@ -1626,6 +1631,7 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { updateMode: persistence.UpdateWorkflowModeUpdateCurrent, currentWorkflowTransactionPolicy: TransactionPolicyActive, newWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + workflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockEngine *engine.MockEngine) { mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), TransactionPolicyActive).Return(&persistence.WorkflowMutation{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ @@ -1779,7 +1785,8 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { HistorySize: 2, }, }, - DomainName: "test-domain", + WorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, + DomainName: "test-domain", }, req, "case: success") return &persistence.UpdateWorkflowExecutionResponse{ MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{ @@ -1889,7 +1896,7 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { emitLargeWorkflowShardIDStatsFn: tc.mockEmitLargeWorkflowShardIDStatsFn, emitWorkflowCompletionStatsFn: tc.mockEmitWorkflowCompletionStatsFn, } - err := ctx.UpdateWorkflowExecutionWithNew(context.Background(), time.Unix(0, 0), tc.updateMode, tc.newContext, mockNewMutableState, tc.currentWorkflowTransactionPolicy, tc.newWorkflowTransactionPolicy) + err := ctx.UpdateWorkflowExecutionWithNew(context.Background(), time.Unix(0, 0), tc.updateMode, tc.newContext, mockNewMutableState, tc.currentWorkflowTransactionPolicy, tc.newWorkflowTransactionPolicy, tc.workflowRequestMode) if tc.wantErr { assert.Error(t, err) if tc.assertErr != nil { @@ -2782,7 +2789,7 @@ func TestLoadWorkflowExecutionWithTaskVersion(t *testing.T) { mockSetup func(*shard.MockContext, *MockMutableState, *cache.MockDomainCache) mockGetWorkflowExecutionFn func(context.Context, *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) mockEmitWorkflowExecutionStatsFn func(string, *persistence.MutableStateStats, int64) - mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error wantErr bool }{ { @@ -2896,7 +2903,7 @@ func TestLoadWorkflowExecutionWithTaskVersion(t *testing.T) { assert.Equal(t, "test-domain", domainName) assert.Equal(t, int64(123), size) }, - mockUpdateWorkflowExecutionWithNewFn: func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error { return nil }, wantErr: false, @@ -2937,24 +2944,25 @@ func TestLoadWorkflowExecutionWithTaskVersion(t *testing.T) { func TestUpdateWorkflowExecutionAsActive(t *testing.T) { testCases := []struct { name string - mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error wantErr bool }{ { name: "success", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, workflowRequestMode persistence.CreateWorkflowRequestMode) error { assert.Equal(t, persistence.UpdateWorkflowModeUpdateCurrent, updateMode) assert.Nil(t, newContext) assert.Nil(t, newMutableState) assert.Equal(t, TransactionPolicyActive, currentPolicy) assert.Nil(t, newPolicy) + assert.Equal(t, persistence.CreateWorkflowRequestModeNew, workflowRequestMode) return nil }, wantErr: false, }, { name: "error case", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, _ persistence.CreateWorkflowRequestMode) error { return errors.New("some error") }, wantErr: true, @@ -2979,24 +2987,25 @@ func TestUpdateWorkflowExecutionAsActive(t *testing.T) { func TestUpdateWorkflowExecutionWithNewAsActive(t *testing.T) { testCases := []struct { name string - mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error wantErr bool }{ { name: "success", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, workflowRequestMode persistence.CreateWorkflowRequestMode) error { assert.Equal(t, persistence.UpdateWorkflowModeUpdateCurrent, updateMode) assert.NotNil(t, newContext) assert.NotNil(t, newMutableState) assert.Equal(t, TransactionPolicyActive, currentPolicy) assert.Equal(t, TransactionPolicyActive.Ptr(), newPolicy) + assert.Equal(t, persistence.CreateWorkflowRequestModeNew, workflowRequestMode) return nil }, wantErr: false, }, { name: "error case", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, _ persistence.CreateWorkflowRequestMode) error { return errors.New("some error") }, wantErr: true, @@ -3021,24 +3030,25 @@ func TestUpdateWorkflowExecutionWithNewAsActive(t *testing.T) { func TestUpdateWorkflowExecutionAsPassive(t *testing.T) { testCases := []struct { name string - mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error wantErr bool }{ { name: "success", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, workflowRequestMode persistence.CreateWorkflowRequestMode) error { assert.Equal(t, persistence.UpdateWorkflowModeUpdateCurrent, updateMode) assert.Nil(t, newContext) assert.Nil(t, newMutableState) assert.Equal(t, TransactionPolicyPassive, currentPolicy) assert.Nil(t, newPolicy) + assert.Equal(t, persistence.CreateWorkflowRequestModeReplicated, workflowRequestMode) return nil }, wantErr: false, }, { name: "error case", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, _ persistence.CreateWorkflowRequestMode) error { return errors.New("some error") }, wantErr: true, @@ -3063,24 +3073,25 @@ func TestUpdateWorkflowExecutionAsPassive(t *testing.T) { func TestUpdateWorkflowExecutionWithNewAsPassive(t *testing.T) { testCases := []struct { name string - mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy) error + mockUpdateWorkflowExecutionWithNewFn func(context.Context, time.Time, persistence.UpdateWorkflowMode, Context, MutableState, TransactionPolicy, *TransactionPolicy, persistence.CreateWorkflowRequestMode) error wantErr bool }{ { name: "success", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, workflowRequestMode persistence.CreateWorkflowRequestMode) error { assert.Equal(t, persistence.UpdateWorkflowModeUpdateCurrent, updateMode) assert.NotNil(t, newContext) assert.NotNil(t, newMutableState) assert.Equal(t, TransactionPolicyPassive, currentPolicy) assert.Equal(t, TransactionPolicyPassive.Ptr(), newPolicy) + assert.Equal(t, persistence.CreateWorkflowRequestModeReplicated, workflowRequestMode) return nil }, wantErr: false, }, { name: "error case", - mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy) error { + mockUpdateWorkflowExecutionWithNewFn: func(_ context.Context, _ time.Time, updateMode persistence.UpdateWorkflowMode, newContext Context, newMutableState MutableState, currentPolicy TransactionPolicy, newPolicy *TransactionPolicy, _ persistence.CreateWorkflowRequestMode) error { return errors.New("some error") }, wantErr: true, diff --git a/service/history/execution/history_builder_test.go b/service/history/execution/history_builder_test.go index dc81a952172..7ab6a84c5f0 100644 --- a/service/history/execution/history_builder_test.go +++ b/service/history/execution/history_builder_test.go @@ -1066,6 +1066,12 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowExternalCancellationRequ s.Nil(s.msBuilder.FlushBufferedEvents()) s.validateWorkflowExecutionCancelRequestedEvent(cancellationEvent, 5, "cancel workflow", "identity", nil, types.WorkflowExecution{}, "b071cbe8-3a95-4223-a8ac-f308a42db383") s.Equal(int64(6), s.getNextEventID()) + _, exists := s.msBuilder.(*mutableStateBuilder).workflowRequests[persistence.WorkflowRequest{ + RequestID: "b071cbe8-3a95-4223-a8ac-f308a42db383", + RequestType: persistence.WorkflowRequestTypeCancel, + Version: s.msBuilder.GetCurrentVersion(), + }] + s.True(exists) } func (s *historyBuilderSuite) TestHistoryBuilderWorkflowExternalSignaled() { @@ -1126,6 +1132,12 @@ func (s *historyBuilderSuite) TestHistoryBuilderWorkflowExternalSignaled() { s.Nil(s.msBuilder.FlushBufferedEvents()) s.validateWorkflowExecutionSignaledEvent(signalEvent, 5, "test-signal", []byte("input"), "id", "3b8d0ec2-e1ff-4f61-915b-1ffca831361e") s.Equal(int64(6), s.getNextEventID()) + _, exists := s.msBuilder.(*mutableStateBuilder).workflowRequests[persistence.WorkflowRequest{ + RequestID: "3b8d0ec2-e1ff-4f61-915b-1ffca831361e", + RequestType: persistence.WorkflowRequestTypeSignal, + Version: s.msBuilder.GetCurrentVersion(), + }] + s.True(exists) } func (s *historyBuilderSuite) getNextEventID() int64 { return s.msBuilder.GetExecutionInfo().NextEventID diff --git a/service/history/execution/mutable_state.go b/service/history/execution/mutable_state.go index a804cebc195..d7557907b88 100644 --- a/service/history/execution/mutable_state.go +++ b/service/history/execution/mutable_state.go @@ -183,10 +183,10 @@ type ( ReplicateChildWorkflowExecutionTerminatedEvent(*types.HistoryEvent) error ReplicateChildWorkflowExecutionTimedOutEvent(*types.HistoryEvent) error ReplicateDecisionTaskCompletedEvent(*types.HistoryEvent) error - ReplicateDecisionTaskFailedEvent() error + ReplicateDecisionTaskFailedEvent(*types.HistoryEvent) error ReplicateDecisionTaskScheduledEvent(int64, int64, string, int32, int64, int64, int64, bool) (*DecisionInfo, error) ReplicateDecisionTaskStartedEvent(*DecisionInfo, int64, int64, int64, string, int64) (*DecisionInfo, error) - ReplicateDecisionTaskTimedOutEvent(types.TimeoutType) error + ReplicateDecisionTaskTimedOutEvent(*types.HistoryEvent) error ReplicateExternalWorkflowExecutionCancelRequested(*types.HistoryEvent) error ReplicateExternalWorkflowExecutionSignaled(*types.HistoryEvent) error ReplicateRequestCancelExternalWorkflowExecutionFailedEvent(*types.HistoryEvent) error diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 938584086df..176685324bd 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -149,6 +149,8 @@ type ( insertReplicationTasks []persistence.Task insertTimerTasks []persistence.Task + workflowRequests map[persistence.WorkflowRequest]struct{} + // do not rely on this, this is only updated on // Load() and closeTransactionXXX methods. So when // a transaction is in progress, this value will be @@ -232,6 +234,8 @@ func newMutableStateBuilder( timeSource: shard.GetTimeSource(), logger: logger, metricsClient: shard.GetMetricsClient(), + + workflowRequests: make(map[persistence.WorkflowRequest]struct{}), } s.executionInfo = &persistence.WorkflowExecutionInfo{ DecisionVersion: common.EmptyVersion, @@ -1800,7 +1804,15 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent( ) error { event := startEvent.WorkflowExecutionStartedEventAttributes + if event.GetRequestID() != "" { + requestID = event.GetRequestID() + } e.executionInfo.CreateRequestID = requestID + e.insertWorkflowRequest(persistence.WorkflowRequest{ + RequestID: requestID, + Version: startEvent.Version, + RequestType: persistence.WorkflowRequestTypeStart, + }) e.executionInfo.DomainID = e.domainEntry.GetInfo().ID e.executionInfo.WorkflowID = execution.GetWorkflowID() e.executionInfo.RunID = execution.GetRunID() @@ -2083,9 +2095,9 @@ func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent( } func (e *mutableStateBuilder) ReplicateDecisionTaskTimedOutEvent( - timeoutType types.TimeoutType, + event *types.HistoryEvent, ) error { - return e.decisionTaskManager.ReplicateDecisionTaskTimedOutEvent(timeoutType) + return e.decisionTaskManager.ReplicateDecisionTaskTimedOutEvent(event) } func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent( @@ -2152,8 +2164,8 @@ func (e *mutableStateBuilder) AddDecisionTaskFailedEvent( ) } -func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent() error { - return e.decisionTaskManager.ReplicateDecisionTaskFailedEvent() +func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent(event *types.HistoryEvent) error { + return e.decisionTaskManager.ReplicateDecisionTaskFailedEvent(event) } func (e *mutableStateBuilder) AddActivityTaskScheduledEvent( @@ -2800,9 +2812,6 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCancelRequestedEvent( if err := e.ReplicateWorkflowExecutionCancelRequestedEvent(event); err != nil { return nil, err } - - // Set the CancelRequestID on the active cluster. This information is not part of the history event. - e.executionInfo.CancelRequestID = request.CancelRequest.GetRequestID() return event, nil } @@ -2811,6 +2820,13 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionCancelRequestedEvent( ) error { e.executionInfo.CancelRequested = true + requestID := event.WorkflowExecutionCancelRequestedEventAttributes.RequestID + e.executionInfo.CancelRequestID = requestID + e.insertWorkflowRequest(persistence.WorkflowRequest{ + RequestID: requestID, + Version: event.Version, + RequestType: persistence.WorkflowRequestTypeCancel, + }) return nil } @@ -3364,6 +3380,11 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionSignaled( // Increment signal count in mutable state for this workflow execution e.executionInfo.SignalCount++ + e.insertWorkflowRequest(persistence.WorkflowRequest{ + RequestID: event.WorkflowExecutionSignaledEventAttributes.RequestID, + Version: event.Version, + RequestType: persistence.WorkflowRequestTypeSignal, + }) return nil } @@ -4117,6 +4138,8 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, + WorkflowRequests: convertWorkflowRequests(e.workflowRequests), + Condition: e.nextEventIDInDB, Checksum: checksum, } @@ -4195,6 +4218,8 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, + WorkflowRequests: convertWorkflowRequests(e.workflowRequests), + Condition: e.nextEventIDInDB, Checksum: checksum, } @@ -4304,6 +4329,7 @@ func (e *mutableStateBuilder) cleanupTransaction() error { e.insertReplicationTasks = nil e.insertTimerTasks = nil + e.workflowRequests = make(map[persistence.WorkflowRequest]struct{}) return nil } @@ -4753,6 +4779,15 @@ func (e *mutableStateBuilder) checkMutability( return nil } +func (e *mutableStateBuilder) insertWorkflowRequest(request persistence.WorkflowRequest) { + if e.domainEntry != nil && e.config.EnableStrongIdempotency(e.domainEntry.GetInfo().Name) && request.RequestID != "" { + if _, ok := e.workflowRequests[request]; ok { + e.logWarn("error encountering duplicate request", tag.WorkflowRequestID(request.RequestID)) + } + e.workflowRequests[request] = struct{}{} + } +} + func (e *mutableStateBuilder) generateChecksum() checksum.Checksum { if !e.shouldGenerateChecksum() { return checksum.Checksum{} diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 64ce4cb5bc4..47be9f2eeb3 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -565,8 +565,18 @@ func (s *mutableStateSuite) TestTransientDecisionTaskSchedule_CurrentVersionChan runID, constants.TestGlobalDomainEntry, ).(*mutableStateBuilder) - _, _ = s.prepareTransientDecisionCompletionFirstBatchReplicated(version, runID) - err := s.msBuilder.ReplicateDecisionTaskFailedEvent() + decisionScheduleEvent, decisionStartedEvent := s.prepareTransientDecisionCompletionFirstBatchReplicated(version, runID) + decisionFailedEvent := &types.HistoryEvent{ + Version: version, + ID: 3, + Timestamp: common.Int64Ptr(time.Now().UnixNano()), + EventType: types.EventTypeDecisionTaskFailed.Ptr(), + DecisionTaskFailedEventAttributes: &types.DecisionTaskFailedEventAttributes{ + ScheduledEventID: decisionScheduleEvent.ID, + StartedEventID: decisionStartedEvent.ID, + }, + } + err := s.msBuilder.ReplicateDecisionTaskFailedEvent(decisionFailedEvent) s.NoError(err) err = s.msBuilder.UpdateCurrentVersion(version+1, true) @@ -599,8 +609,18 @@ func (s *mutableStateSuite) TestTransientDecisionTaskStart_CurrentVersionChanged runID, constants.TestGlobalDomainEntry, ).(*mutableStateBuilder) - _, _ = s.prepareTransientDecisionCompletionFirstBatchReplicated(version, runID) - err := s.msBuilder.ReplicateDecisionTaskFailedEvent() + decisionScheduleEvent, decisionStartedEvent := s.prepareTransientDecisionCompletionFirstBatchReplicated(version, runID) + decisionFailedEvent := &types.HistoryEvent{ + Version: version, + ID: 3, + Timestamp: common.Int64Ptr(time.Now().UnixNano()), + EventType: types.EventTypeDecisionTaskFailed.Ptr(), + DecisionTaskFailedEventAttributes: &types.DecisionTaskFailedEventAttributes{ + ScheduledEventID: decisionScheduleEvent.ID, + StartedEventID: decisionStartedEvent.ID, + }, + } + err := s.msBuilder.ReplicateDecisionTaskFailedEvent(decisionFailedEvent) s.NoError(err) decisionScheduleID := int64(4) @@ -714,7 +734,7 @@ func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicat } eventID++ - _ = &types.HistoryEvent{ + decisionFailedEvent := &types.HistoryEvent{ Version: version, ID: eventID, Timestamp: common.Int64Ptr(now.UnixNano()), @@ -763,7 +783,7 @@ func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicat s.Nil(err) s.NotNil(di) - err = s.msBuilder.ReplicateDecisionTaskFailedEvent() + err = s.msBuilder.ReplicateDecisionTaskFailedEvent(decisionFailedEvent) s.Nil(err) decisionAttempt = int64(123) diff --git a/service/history/execution/mutable_state_decision_task_manager.go b/service/history/execution/mutable_state_decision_task_manager.go index 341870717ef..643cf0a615c 100644 --- a/service/history/execution/mutable_state_decision_task_manager.go +++ b/service/history/execution/mutable_state_decision_task_manager.go @@ -58,8 +58,8 @@ type ( timestamp int64, ) (*DecisionInfo, error) ReplicateDecisionTaskCompletedEvent(event *types.HistoryEvent) error - ReplicateDecisionTaskFailedEvent() error - ReplicateDecisionTaskTimedOutEvent(timeoutType types.TimeoutType) error + ReplicateDecisionTaskFailedEvent(*types.HistoryEvent) error + ReplicateDecisionTaskTimedOutEvent(*types.HistoryEvent) error AddDecisionTaskScheduleToStartTimeoutEvent(scheduleEventID int64) (*types.HistoryEvent, error) AddDecisionTaskScheduledEventAsHeartbeat( @@ -263,14 +263,22 @@ func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskCompletedEven return m.afterAddDecisionTaskCompletedEvent(event, maxResetPoints) } -func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskFailedEvent() error { +func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskFailedEvent(event *types.HistoryEvent) error { + if event != nil && event.DecisionTaskFailedEventAttributes.GetCause() == types.DecisionTaskFailedCauseResetWorkflow { + m.msb.insertWorkflowRequest(persistence.WorkflowRequest{ + RequestID: event.DecisionTaskFailedEventAttributes.RequestID, + Version: event.Version, + RequestType: persistence.WorkflowRequestTypeReset, + }) + } m.FailDecision(true) return nil } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskTimedOutEvent( - timeoutType types.TimeoutType, + event *types.HistoryEvent, ) error { + timeoutType := event.DecisionTaskTimedOutEventAttributes.GetTimeoutType() incrementAttempt := true // Do not increment decision attempt in the case of sticky scheduleToStart timeout to // prevent creating next decision as transient @@ -281,6 +289,13 @@ func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskTimedOutEvent m.msb.executionInfo.StickyTaskList != "" { incrementAttempt = false } + if event.DecisionTaskTimedOutEventAttributes.GetCause() == types.DecisionTaskTimedOutCauseReset { + m.msb.insertWorkflowRequest(persistence.WorkflowRequest{ + RequestID: event.DecisionTaskTimedOutEventAttributes.GetRequestID(), + Version: event.Version, + RequestType: persistence.WorkflowRequestTypeReset, + }) + } m.FailDecision(incrementAttempt) return nil } @@ -313,11 +328,19 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskScheduleToStartTime types.DecisionTaskTimedOutCauseTimeout, "", ) + if err := m.ReplicateDecisionTaskTimedOutEvent(event); err != nil { + return nil, err + } + } else { + if err := m.ReplicateDecisionTaskTimedOutEvent(&types.HistoryEvent{ + DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{ + TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(), + }, + }); err != nil { + return nil, err + } } - if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeScheduleToStart); err != nil { - return nil, err - } return event, nil } @@ -351,7 +374,7 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskResetTimeoutEvent( resetRequestID, ) - if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeScheduleToStart); err != nil { + if err := m.ReplicateDecisionTaskTimedOutEvent(event); err != nil { return nil, err } @@ -593,7 +616,7 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskFailedEvent( event = m.msb.hBuilder.AddDecisionTaskFailedEvent(attr) } - if err := m.ReplicateDecisionTaskFailedEvent(); err != nil { + if err := m.ReplicateDecisionTaskFailedEvent(event); err != nil { return nil, err } @@ -620,7 +643,11 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskTimedOutEvent( return nil, m.msb.createInternalServerError(opTag) } - var event *types.HistoryEvent + event := &types.HistoryEvent{ + DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{ + TimeoutType: types.TimeoutTypeStartToClose.Ptr(), + }, + } // Avoid creating new history events when decisions are continuously timing out if dt.Attempt == 0 { event = m.msb.hBuilder.AddDecisionTaskTimedOutEvent( @@ -636,7 +663,7 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskTimedOutEvent( ) } - if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeStartToClose); err != nil { + if err := m.ReplicateDecisionTaskTimedOutEvent(event); err != nil { return nil, err } return event, nil diff --git a/service/history/execution/mutable_state_decision_task_manager_mock.go b/service/history/execution/mutable_state_decision_task_manager_mock.go index 9e2fb8af3ac..44053da84f2 100644 --- a/service/history/execution/mutable_state_decision_task_manager_mock.go +++ b/service/history/execution/mutable_state_decision_task_manager_mock.go @@ -348,17 +348,17 @@ func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTask } // ReplicateDecisionTaskFailedEvent mocks base method. -func (m *MockmutableStateDecisionTaskManager) ReplicateDecisionTaskFailedEvent() error { +func (m *MockmutableStateDecisionTaskManager) ReplicateDecisionTaskFailedEvent(arg0 *types.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReplicateDecisionTaskFailedEvent") + ret := m.ctrl.Call(m, "ReplicateDecisionTaskFailedEvent", arg0) ret0, _ := ret[0].(error) return ret0 } // ReplicateDecisionTaskFailedEvent indicates an expected call of ReplicateDecisionTaskFailedEvent. -func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTaskFailedEvent() *gomock.Call { +func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTaskFailedEvent(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskFailedEvent", reflect.TypeOf((*MockmutableStateDecisionTaskManager)(nil).ReplicateDecisionTaskFailedEvent)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskFailedEvent", reflect.TypeOf((*MockmutableStateDecisionTaskManager)(nil).ReplicateDecisionTaskFailedEvent), arg0) } // ReplicateDecisionTaskScheduledEvent mocks base method. @@ -392,17 +392,17 @@ func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTask } // ReplicateDecisionTaskTimedOutEvent mocks base method. -func (m *MockmutableStateDecisionTaskManager) ReplicateDecisionTaskTimedOutEvent(timeoutType types.TimeoutType) error { +func (m *MockmutableStateDecisionTaskManager) ReplicateDecisionTaskTimedOutEvent(arg0 *types.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReplicateDecisionTaskTimedOutEvent", timeoutType) + ret := m.ctrl.Call(m, "ReplicateDecisionTaskTimedOutEvent", arg0) ret0, _ := ret[0].(error) return ret0 } // ReplicateDecisionTaskTimedOutEvent indicates an expected call of ReplicateDecisionTaskTimedOutEvent. -func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTaskTimedOutEvent(timeoutType interface{}) *gomock.Call { +func (mr *MockmutableStateDecisionTaskManagerMockRecorder) ReplicateDecisionTaskTimedOutEvent(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskTimedOutEvent", reflect.TypeOf((*MockmutableStateDecisionTaskManager)(nil).ReplicateDecisionTaskTimedOutEvent), timeoutType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskTimedOutEvent", reflect.TypeOf((*MockmutableStateDecisionTaskManager)(nil).ReplicateDecisionTaskTimedOutEvent), arg0) } // ReplicateTransientDecisionTaskScheduled mocks base method. diff --git a/service/history/execution/mutable_state_mock.go b/service/history/execution/mutable_state_mock.go index c588e65c7e4..fa1887b3a51 100644 --- a/service/history/execution/mutable_state_mock.go +++ b/service/history/execution/mutable_state_mock.go @@ -2040,17 +2040,17 @@ func (mr *MockMutableStateMockRecorder) ReplicateDecisionTaskCompletedEvent(arg0 } // ReplicateDecisionTaskFailedEvent mocks base method. -func (m *MockMutableState) ReplicateDecisionTaskFailedEvent() error { +func (m *MockMutableState) ReplicateDecisionTaskFailedEvent(arg0 *types.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReplicateDecisionTaskFailedEvent") + ret := m.ctrl.Call(m, "ReplicateDecisionTaskFailedEvent", arg0) ret0, _ := ret[0].(error) return ret0 } // ReplicateDecisionTaskFailedEvent indicates an expected call of ReplicateDecisionTaskFailedEvent. -func (mr *MockMutableStateMockRecorder) ReplicateDecisionTaskFailedEvent() *gomock.Call { +func (mr *MockMutableStateMockRecorder) ReplicateDecisionTaskFailedEvent(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskFailedEvent", reflect.TypeOf((*MockMutableState)(nil).ReplicateDecisionTaskFailedEvent)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateDecisionTaskFailedEvent", reflect.TypeOf((*MockMutableState)(nil).ReplicateDecisionTaskFailedEvent), arg0) } // ReplicateDecisionTaskScheduledEvent mocks base method. @@ -2084,7 +2084,7 @@ func (mr *MockMutableStateMockRecorder) ReplicateDecisionTaskStartedEvent(arg0, } // ReplicateDecisionTaskTimedOutEvent mocks base method. -func (m *MockMutableState) ReplicateDecisionTaskTimedOutEvent(arg0 types.TimeoutType) error { +func (m *MockMutableState) ReplicateDecisionTaskTimedOutEvent(arg0 *types.HistoryEvent) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReplicateDecisionTaskTimedOutEvent", arg0) ret0, _ := ret[0].(error) diff --git a/service/history/execution/mutable_state_util.go b/service/history/execution/mutable_state_util.go index a9757d78d2e..18a18d253b2 100644 --- a/service/history/execution/mutable_state_util.go +++ b/service/history/execution/mutable_state_util.go @@ -205,6 +205,15 @@ func convertUpdateSignalInfos( return outputs } +func convertWorkflowRequests(inputs map[persistence.WorkflowRequest]struct{}) []*persistence.WorkflowRequest { + outputs := make([]*persistence.WorkflowRequest, 0, len(inputs)) + for key := range inputs { + key := key // TODO: remove this trick once we upgrade go to 1.22 + outputs = append(outputs, &key) + } + return outputs +} + // FailDecision fails the current decision task func FailDecision( mutableState MutableState, diff --git a/service/history/execution/mutable_state_util_test.go b/service/history/execution/mutable_state_util_test.go index 030062fb97f..feeb9c19681 100644 --- a/service/history/execution/mutable_state_util_test.go +++ b/service/history/execution/mutable_state_util_test.go @@ -295,3 +295,24 @@ func TestTrimBinaryChecksums(t *testing.T) { assert.Equal(t, recentBinaryChecksums, trimedBinaryChecksums) assert.Equal(t, currResetPoints, trimedResetPoints) } + +func TestConvertWorkflowRequests(t *testing.T) { + inputs := map[persistence.WorkflowRequest]struct{}{} + inputs[persistence.WorkflowRequest{RequestID: "aaa", Version: 1, RequestType: persistence.WorkflowRequestTypeStart}] = struct{}{} + inputs[persistence.WorkflowRequest{RequestID: "aaa", Version: 1, RequestType: persistence.WorkflowRequestTypeSignal}] = struct{}{} + + expectedOutputs := []*persistence.WorkflowRequest{ + { + RequestID: "aaa", + Version: 1, + RequestType: persistence.WorkflowRequestTypeStart, + }, + { + RequestID: "aaa", + Version: 1, + RequestType: persistence.WorkflowRequestTypeSignal, + }, + } + + assert.ElementsMatch(t, expectedOutputs, convertWorkflowRequests(inputs)) +} diff --git a/service/history/execution/state_builder.go b/service/history/execution/state_builder.go index 032749d1b49..910508adc58 100644 --- a/service/history/execution/state_builder.go +++ b/service/history/execution/state_builder.go @@ -195,7 +195,7 @@ func (b *stateBuilderImpl) ApplyEvents( case types.EventTypeDecisionTaskTimedOut: if err := b.mutableState.ReplicateDecisionTaskTimedOutEvent( - event.DecisionTaskTimedOutEventAttributes.GetTimeoutType(), + event, ); err != nil { return nil, err } @@ -207,7 +207,7 @@ func (b *stateBuilderImpl) ApplyEvents( } case types.EventTypeDecisionTaskFailed: - if err := b.mutableState.ReplicateDecisionTaskFailedEvent(); err != nil { + if err := b.mutableState.ReplicateDecisionTaskFailedEvent(event); err != nil { return nil, err } diff --git a/service/history/execution/state_builder_test.go b/service/history/execution/state_builder_test.go index 5c68a6070cc..0ed33fb06de 100644 --- a/service/history/execution/state_builder_test.go +++ b/service/history/execution/state_builder_test.go @@ -706,7 +706,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeDecisionTaskTimedOut() { TimeoutType: types.TimeoutTypeStartToClose.Ptr(), }, } - s.mockMutableState.EXPECT().ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeStartToClose).Return(nil).Times(1) + s.mockMutableState.EXPECT().ReplicateDecisionTaskTimedOutEvent(event).Return(nil).Times(1) tasklist := "some random tasklist" executionInfo := &persistence.WorkflowExecutionInfo{ TaskList: tasklist, @@ -743,7 +743,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeDecisionTaskFailed() { StartedEventID: startedID, }, } - s.mockMutableState.EXPECT().ReplicateDecisionTaskFailedEvent().Return(nil).Times(1) + s.mockMutableState.EXPECT().ReplicateDecisionTaskFailedEvent(event).Return(nil).Times(1) tasklist := "some random tasklist" executionInfo := &persistence.WorkflowExecutionInfo{ TaskList: tasklist, diff --git a/service/history/ndc/activity_replicator.go b/service/history/ndc/activity_replicator.go index 0fc93d569c6..973d6e38e7d 100644 --- a/service/history/ndc/activity_replicator.go +++ b/service/history/ndc/activity_replicator.go @@ -204,6 +204,7 @@ func (r *activityReplicatorImpl) SyncActivity( nil, // no new workflow execution.TransactionPolicyPassive, nil, + persistence.CreateWorkflowRequestModeReplicated, ) } diff --git a/service/history/ndc/activity_replicator_test.go b/service/history/ndc/activity_replicator_test.go index 528bcb6ac88..3900071fa79 100644 --- a/service/history/ndc/activity_replicator_test.go +++ b/service/history/ndc/activity_replicator_test.go @@ -1143,6 +1143,7 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityRunning() { nil, execution.TransactionPolicyPassive, nil, + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err = s.activityReplicator.SyncActivity(ctx.Background(), request) s.NoError(err) @@ -1230,6 +1231,7 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityRunning_ZombieWorkflo nil, execution.TransactionPolicyPassive, nil, + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err = s.activityReplicator.SyncActivity(ctx.Background(), request) s.NoError(err) diff --git a/service/history/ndc/existing_workflow_transaction_manager.go b/service/history/ndc/existing_workflow_transaction_manager.go index 466fa157d05..84fa8a5c00b 100644 --- a/service/history/ndc/existing_workflow_transaction_manager.go +++ b/service/history/ndc/existing_workflow_transaction_manager.go @@ -316,6 +316,7 @@ func (r *transactionManagerForExistingWorkflowImpl) updateAsZombie( newMutableState, execution.TransactionPolicyPassive, newTransactionPolicy, + persistence.CreateWorkflowRequestModeReplicated, ) } diff --git a/service/history/ndc/existing_workflow_transaction_manager_test.go b/service/history/ndc/existing_workflow_transaction_manager_test.go index 9c9e410ba26..a1b3a90b3e7 100644 --- a/service/history/ndc/existing_workflow_transaction_manager_test.go +++ b/service/history/ndc/existing_workflow_transaction_manager_test.go @@ -344,6 +344,7 @@ func (s *transactionManagerForExistingWorkflowSuite) TestDispatchForExistingWork newMutableState, execution.TransactionPolicyPassive, execution.TransactionPolicyPassive.Ptr(), + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -416,6 +417,7 @@ func (s *transactionManagerForExistingWorkflowSuite) TestDispatchForExistingWork (execution.MutableState)(nil), execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) diff --git a/service/history/ndc/new_workflow_transaction_manager.go b/service/history/ndc/new_workflow_transaction_manager.go index a3c55baff0b..8e4fcf88e5f 100644 --- a/service/history/ndc/new_workflow_transaction_manager.go +++ b/service/history/ndc/new_workflow_transaction_manager.go @@ -192,6 +192,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsCurrent( createMode, prevRunID, prevLastWriteVersion, + persistence.CreateWorkflowRequestModeReplicated, ) } @@ -206,6 +207,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsCurrent( createMode, prevRunID, prevLastWriteVersion, + persistence.CreateWorkflowRequestModeReplicated, ) } @@ -276,6 +278,7 @@ func (r *transactionManagerForNewWorkflowImpl) createAsZombie( createMode, prevRunID, prevLastWriteVersion, + persistence.CreateWorkflowRequestModeReplicated, ) switch err.(type) { case nil: @@ -313,6 +316,7 @@ func (r *transactionManagerForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent targetWorkflow.GetMutableState(), currentWorkflowPolicy, execution.TransactionPolicyPassive.Ptr(), + persistence.CreateWorkflowRequestModeReplicated, ) } diff --git a/service/history/ndc/new_workflow_transaction_manager_test.go b/service/history/ndc/new_workflow_transaction_manager_test.go index 339f06b93ff..3351ecb7bc1 100644 --- a/service/history/ndc/new_workflow_transaction_manager_test.go +++ b/service/history/ndc/new_workflow_transaction_manager_test.go @@ -145,6 +145,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Brand persistence.CreateWorkflowModeBrandNew, "", int64(0), + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.createManager.dispatchForNewWorkflow(ctx, now, workflow) @@ -223,6 +224,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat persistence.CreateWorkflowModeWorkflowIDReuse, currentRunID, currentLastWriteVersion, + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.createManager.dispatchForNewWorkflow(ctx, now, targetWorkflow) @@ -298,6 +300,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat persistence.CreateWorkflowModeZombie, "", int64(0), + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) targetContext.EXPECT().ReapplyEvents(targetWorkflowEventsSeq).Return(nil).Times(1) @@ -374,6 +377,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Creat persistence.CreateWorkflowModeZombie, "", int64(0), + persistence.CreateWorkflowRequestModeReplicated, ).Return(&persistence.WorkflowExecutionAlreadyStartedError{}).Times(1) targetContext.EXPECT().ReapplyEvents(targetWorkflowEventsSeq).Return(nil).Times(1) @@ -434,6 +438,7 @@ func (s *transactionManagerForNewWorkflowSuite) TestDispatchForNewWorkflow_Suppr targetMutableState, currentWorkflowPolicy, execution.TransactionPolicyPassive.Ptr(), + persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.createManager.dispatchForNewWorkflow(ctx, now, targetWorkflow) diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index e256baf4e21..3e35a9e6a4a 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -261,6 +261,7 @@ func (r *transactionManagerImpl) backfillWorkflow( nil, transactionPolicy, nil, + persistence.CreateWorkflowRequestModeReplicated, ) } diff --git a/service/history/ndc/transaction_manager_test.go b/service/history/ndc/transaction_manager_test.go index a51c147f546..d951c3b8c89 100644 --- a/service/history/ndc/transaction_manager_test.go +++ b/service/history/ndc/transaction_manager_test.go @@ -162,7 +162,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Op mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{RunID: runID}).Times(1) context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyActive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyActive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) s.NoError(err) @@ -236,7 +236,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Cl context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) @@ -269,7 +269,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_O context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) s.NoError(err) @@ -316,7 +316,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_C context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) @@ -370,7 +370,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_NotCurrentWorkflow_Active context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) s.NoError(err) @@ -422,7 +422,7 @@ func (s *transactionManagerSuite) TestBackfillWorkflow_NotCurrentWorkflow_Passiv context.EXPECT().ReapplyEvents([]*persistence.WorkflowEvents{workflowEvents}).Times(1) context.EXPECT().PersistNonStartWorkflowBatchEvents(gomock.Any(), workflowEvents).Return(events.PersistedBlob{}, nil).Times(1) context.EXPECT().UpdateWorkflowExecutionWithNew( - gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), + gomock.Any(), now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, execution.TransactionPolicyPassive, (*execution.TransactionPolicy)(nil), persistence.CreateWorkflowRequestModeReplicated, ).Return(nil).Times(1) err := s.transactionManager.backfillWorkflow(ctx, now, workflow, workflowEvents) s.NoError(err) diff --git a/service/history/reset/resetter.go b/service/history/reset/resetter.go index 3761b8b944c..169e6400611 100644 --- a/service/history/reset/resetter.go +++ b/service/history/reset/resetter.go @@ -303,6 +303,7 @@ func (r *workflowResetterImpl) persistToDB( persistence.CreateWorkflowModeContinueAsNew, currentRunID, currentLastWriteVersion, + persistence.CreateWorkflowRequestModeNew, ) } diff --git a/service/history/reset/resetter_test.go b/service/history/reset/resetter_test.go index 2d2a371e8e9..018c96bd1db 100644 --- a/service/history/reset/resetter_test.go +++ b/service/history/reset/resetter_test.go @@ -203,6 +203,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() { persistence.CreateWorkflowModeContinueAsNew, s.currentRunID, currentLastWriteVersion, + persistence.CreateWorkflowRequestModeNew, ).Return(nil).Times(1) err := s.workflowResetter.persistToDB(context.Background(), currentWorkflowTerminated, currentWorkflow, resetWorkflow) diff --git a/service/history/shard/context.go b/service/history/shard/context.go index f3f223e360d..55b9d1108ac 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -648,6 +648,7 @@ func (s *contextImpl) CreateWorkflowExecution( case *types.WorkflowExecutionAlreadyStartedError, *persistence.WorkflowExecutionAlreadyStartedError, *persistence.CurrentWorkflowConditionFailedError, + *persistence.DuplicateRequestError, *types.ServiceBusyError: // No special handling required for these errors // We know write to DB fails if these errors are returned @@ -755,6 +756,7 @@ func (s *contextImpl) UpdateWorkflowExecution( s.updateMaxReadLevelLocked(transferMaxReadLevel) return resp, nil case *persistence.ConditionFailedError, + *persistence.DuplicateRequestError, *types.ServiceBusyError: // No special handling required for these errors // We know write to DB fails if these errors are returned diff --git a/service/history/task/timer_standby_task_executor_test.go b/service/history/task/timer_standby_task_executor_test.go index 4f79ffeb0a7..de988c22e6d 100644 --- a/service/history/task/timer_standby_task_executor_test.go +++ b/service/history/task/timer_standby_task_executor_test.go @@ -530,8 +530,10 @@ func (s *timerStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple_CanU NewBufferedEvents: nil, ClearBufferedEvents: false, VersionHistories: mutableState.GetVersionHistories(), + WorkflowRequests: []*persistence.WorkflowRequest{}, }, NewWorkflowSnapshot: nil, + WorkflowRequestMode: persistence.CreateWorkflowRequestModeReplicated, Encoding: common.EncodingType(s.mockShard.GetConfig().EventEncodingType(s.domainID)), DomainName: constants.TestDomainName, }, input) diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 2aeb04ae12a..7d9c3107eb9 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -261,6 +261,9 @@ UpdateHistoryLoop: } err = workflowContext.GetContext().UpdateWorkflowExecutionAsActive(ctx, now) + if _, ok := err.(*persistence.DuplicateRequestError); ok { + return nil + } if execution.IsConflictError(err) { if attempt != ConditionalRetryCount-1 { _, err = workflowContext.ReloadMutableState(ctx)