Skip to content

Commit

Permalink
Revert "Update task executor to handle WorkflowAlreadyCompletedError …
Browse files Browse the repository at this point in the history
…for signal and cancel workflow (#5956)" (#6026)

This reverts commit d877674.
  • Loading branch information
Shaddoll authored May 17, 2024
1 parent f1a4bad commit 02c7efb
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 73 deletions.
10 changes: 0 additions & 10 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,6 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -269,7 +265,6 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(
taskInfo.TargetWorkflowID,
taskInfo.TargetRunID,
now,
cause,
)
}
return requestCancelExternalExecutionCompleted(
Expand Down Expand Up @@ -484,10 +479,6 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -497,7 +488,6 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(
taskInfo.TargetRunID,
signalInfo.Control,
now,
cause,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *crossClusterSourceTaskExecutorSuite) TestExecuteCancelExecution_Failure
&types.CrossClusterTaskResponse{
TaskType: types.CrossClusterTaskTypeCancelExecution.Ptr(),
TaskState: int16(processingStateInitialized),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted.Ptr(),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowNotExists.Ptr(),
},
func(
mutableState execution.MutableState,
Expand Down
22 changes: 3 additions & 19 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
return err
}
Expand All @@ -657,11 +656,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
// for retryable error just return
return err
}
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
if errors.As(err, &alreadyCompletedErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
task,
Expand All @@ -670,7 +664,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
cause,
)
}

Expand Down Expand Up @@ -757,7 +750,6 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
}

Expand All @@ -777,17 +769,12 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add RequestCancelFailed
// Check to see if the error is non-transient, in which case add SignalFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
return err
}
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if errors.As(err, &alreadyCompletedErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
task,
Expand All @@ -797,7 +784,6 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
cause,
)
}

Expand Down Expand Up @@ -1433,7 +1419,6 @@ func requestCancelExternalExecutionFailed(
targetWorkflowID string,
targetRunID string,
now time.Time,
cause types.CancelExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1454,7 +1439,7 @@ func requestCancelExternalExecutionFailed(
targetDomain,
targetWorkflowID,
targetRunID,
cause,
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
return err
},
Expand All @@ -1479,7 +1464,6 @@ func signalExternalExecutionFailed(
targetRunID string,
control []byte,
now time.Time,
cause types.SignalExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1501,7 +1485,7 @@ func signalExternalExecutionFailed(
targetWorkflowID,
targetRunID,
control,
cause,
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
)
return err
},
Expand Down
45 changes: 2 additions & 43 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotExistsError() {
func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
Expand All @@ -1067,27 +1067,6 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotEx
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_WorkflowAlreadyCompleted() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
requestCancelInfo *persistence.RequestCancelInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
cancelRequest := createTestRequestCancelWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), requestCancelInfo.CancelRequestID)
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), cancelRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Duplication() {
s.testProcessCancelExecution(
s.targetDomainID,
Expand Down Expand Up @@ -1223,7 +1202,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotExistsError() {
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
Expand All @@ -1244,26 +1223,6 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotEx
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_WorkflowAlreadyCompletedError() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
signalInfo *persistence.SignalInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
signalRequest := createTestSignalWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), signalInfo)
s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), signalRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() {
s.testProcessSignalExecution(
s.targetDomainID,
Expand Down

0 comments on commit 02c7efb

Please sign in to comment.