Skip to content

Commit

Permalink
do not generate workflow requests for reapplied events
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Apr 17, 2024
1 parent 3fc417c commit 4ff8737
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 32 deletions.
36 changes: 26 additions & 10 deletions service/history/engine/engineimpl/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,13 @@ func (e *historyEngineImpl) startWorkflowHelper(
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
// handle already started error
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
Expand Down Expand Up @@ -862,9 +866,13 @@ func (e *historyEngineImpl) startWorkflowHelper(
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
}
if err != nil {
Expand Down Expand Up @@ -2633,7 +2641,11 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
// the history and try the operation again.
if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil
if t.RequestType == persistence.WorkflowRequestTypeSignal {
return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
if execution.IsConflictError(err) {
continue Just_Signal_Loop
Expand Down Expand Up @@ -3030,9 +3042,13 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
request.GetSkipSignalReapply(),
); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.ResetWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
if t.RequestType == persistence.WorkflowRequestTypeReset {
return &types.ResetWorkflowExecutionResponse{
RunID: t.RunID,
}, nil

Check warning on line 3048 in service/history/engine/engineimpl/historyEngine.go

View check run for this annotation

Codecov / codecov/patch

service/history/engine/engineimpl/historyEngine.go#L3044-L3048

Added lines #L3044 - L3048 were not covered by tests
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t

Check warning on line 3051 in service/history/engine/engineimpl/historyEngine.go

View check run for this annotation

Codecov / codecov/patch

service/history/engine/engineimpl/historyEngine.go#L3050-L3051

Added lines #L3050 - L3051 were not covered by tests
}
return nil, err
}
Expand Down
250 changes: 249 additions & 1 deletion service/history/engine/engineimpl/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,40 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_DuplicateRequestError
s.Equal("test-run-id", resp.RunID)
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_DuplicateRequestError_TypeMismatch() {
domainID := constants.TestDomainID
workflowID := "workflowID"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
partitionConfig := map[string]string{
"zone": "phx",
}

s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
})).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeSignal, RunID: "test-run-id"}).Once()

requestID := uuid.New()
_, err := s.historyEngine.StartWorkflowExecution(context.Background(), &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: domainID,
StartRequest: &types.StartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
RequestID: requestID,
},
PartitionConfig: partitionConfig,
})
s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
}

func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
domainID := constants.TestDomainID
workflowID := "workflowID"
Expand Down Expand Up @@ -1197,6 +1231,130 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
s.Nil(resp)
}

func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess_DuplicateRequestError() {
domainID := constants.TestDomainID
workflowID := "workflowID"
runID := "runID"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
lastWriteVersion := common.EmptyVersion
partitionConfig := map[string]string{
"zone": "phx",
}

s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.Anything,
mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return request.Mode == p.CreateWorkflowModeBrandNew &&
!request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() &&
reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
}),
).Return(nil, &p.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: "oldRequestID",
RunID: runID,
State: p.WorkflowStateCompleted,
CloseStatus: p.WorkflowCloseStatusCompleted,
LastWriteVersion: lastWriteVersion,
}).Once()

s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.Anything,
mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return request.Mode == p.CreateWorkflowModeWorkflowIDReuse &&
request.PreviousRunID == runID &&
request.PreviousLastWriteVersion == lastWriteVersion &&
!request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() &&
reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
}),
).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeStart, RunID: "test-run-id"}).Once()

resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: domainID,
StartRequest: &types.StartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
RequestID: "newRequestID",
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
},
PartitionConfig: partitionConfig,
})

s.Nil(err)
s.Equal("test-run-id", resp.RunID)
}

func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess_DuplicateRequestError_TypeMismatch() {
domainID := constants.TestDomainID
workflowID := "workflowID"
runID := "runID"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
lastWriteVersion := common.EmptyVersion
partitionConfig := map[string]string{
"zone": "phx",
}

s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.Anything,
mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return request.Mode == p.CreateWorkflowModeBrandNew &&
!request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() &&
reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
}),
).Return(nil, &p.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: "oldRequestID",
RunID: runID,
State: p.WorkflowStateCompleted,
CloseStatus: p.WorkflowCloseStatusCompleted,
LastWriteVersion: lastWriteVersion,
}).Once()

s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.Anything,
mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return request.Mode == p.CreateWorkflowModeWorkflowIDReuse &&
request.PreviousRunID == runID &&
request.PreviousLastWriteVersion == lastWriteVersion &&
!request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() &&
reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
}),
).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeSignal, RunID: "test-run-id"}).Once()

_, err := s.historyEngine.StartWorkflowExecution(context.Background(), &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: domainID,
StartRequest: &types.StartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
RequestID: "newRequestID",
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
},
PartitionConfig: partitionConfig,
})

s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
}

func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
domainID := constants.TestDomainID
workflowID := "workflowID"
Expand Down Expand Up @@ -1448,13 +1606,55 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal_Duplicate
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeSignal, RunID: "test-run-id"}).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(err)
s.Equal("test-run-id", resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal_DuplicateRequestError_TypeMismatch() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)

domainID := constants.TestDomainID
workflowID := "wId"
runID := constants.TestRunID
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
Identity: identity,
SignalName: signalName,
Input: input,
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
testlogger.New(s.Suite.T()),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeStart, RunID: "test-run-id"}).Once()

_, err = s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
Expand Down Expand Up @@ -1548,6 +1748,54 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist_Dup
s.Nil(err)
s.Equal("test-run-id", resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist_DuplicateRequestError_TypeMismatch() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)

domainID := constants.TestDomainID
workflowID := "wId"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
requestID := uuid.New()
partitionConfig := map[string]string{
"zone": "phx",
}

sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
SignalName: signalName,
Input: input,
RequestID: requestID,
},
PartitionConfig: partitionConfig,
}

notExistErr := &types.EntityNotExistsError{Message: "Workflow not exist"}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(nil, notExistErr).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
})).Return(nil, &p.DuplicateRequestError{RequestType: p.WorkflowRequestTypeCancel, RunID: "test-run-id"}).Once()

_, err = s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_CreateTimeout() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
Expand Down
Loading

0 comments on commit 4ff8737

Please sign in to comment.