Skip to content

Commit

Permalink
Update mutable state to generate and replicate requestIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Apr 15, 2024
1 parent ba39678 commit 1497967
Show file tree
Hide file tree
Showing 33 changed files with 546 additions and 91 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,13 @@ const (
// Allowed filters: DomainName
EnableRetryForChecksumFailure

// EnableStrongIdempotency enables strong idempotency for APIs
// KeyName: history.enableStrongIdempotency
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableStrongIdempotency

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4310,6 +4317,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails",
DefaultValue: false,
},
EnableStrongIdempotency: DynamicBool{
KeyName: "history.enableStrongIdempotency",
Filters: []Filter{DomainName},
Description: "EnableStrongIdempotency enables strong idempotency for APIs",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func WorkflowSignalName(signalName string) Tag {
return newStringTag("wf-signal-name", signalName)
}

// WorkflowRequestID returns tag for WorkflowRequestID
func WorkflowRequestID(requestID string) Tag {
return newStringTag("wf-request-id", requestID)
}

// WorkflowState returns tag for WorkflowState
func WorkflowState(s int) Tag {
return newInt("wf-state", s)
Expand Down
3 changes: 3 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ history.EnableConsistentQueryByDomain:
- value: true
constraints: {}
history.useNewInitialFailoverVersion:
- value: true
constraints: {}
history.enableStrongIdempotency:
- value: true
constraints: {}
frontend.validSearchAttributes:
Expand Down
5 changes: 5 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ type Config struct {
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn

EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter

// HostName for machine running the service
HostName string
}
Expand Down Expand Up @@ -596,6 +598,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),

EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),

HostName: hostname,
}

Expand Down Expand Up @@ -637,6 +641,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
}))
panicIfErr(inMem.UpdateValue(dynamicconfig.QueueProcessorRandomSplitProbability, 0.5))
panicIfErr(inMem.UpdateValue(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnableStrongIdempotency, true))

dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false, "")
Expand Down
20 changes: 20 additions & 0 deletions service/history/engine/engineimpl/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,13 @@ func (e *historyEngineImpl) startWorkflowHelper(
createMode,
prevRunID,
prevLastWriteVersion,
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
// handle already started error
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {

Expand Down Expand Up @@ -853,7 +859,13 @@ func (e *historyEngineImpl) startWorkflowHelper(
createMode,
prevRunID,
t.LastWriteVersion,
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2620,6 +2632,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil
}
if execution.IsConflictError(err) {
continue Just_Signal_Loop
}
Expand Down Expand Up @@ -3014,6 +3029,11 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
nil,
request.GetSkipSignalReapply(),
); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
return &types.ResetWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
return nil, err
}
return &types.ResetWorkflowExecutionResponse{
Expand Down
202 changes: 202 additions & 0 deletions service/history/engine/engineimpl/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,37 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() {
s.Equal(int64(4), executionBuilder.GetNextEventID())
}

func (s *engine2Suite) TestRequestCancelWorkflowExecutionDuplicateRequestError() {
domainID := constants.TestDomainID
workflowExecution := types.WorkflowExecution{
WorkflowID: "wId",
RunID: constants.TestRunID,
}

identity := "testIdentity"
tl := "testTaskList"

msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, false)
ms1 := execution.CreatePersistenceMutableState(msBuilder)
gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()

err := s.historyEngine.RequestCancelWorkflowExecution(context.Background(), &types.HistoryRequestCancelWorkflowExecutionRequest{
DomainUUID: domainID,
CancelRequest: &types.RequestCancelWorkflowExecutionRequest{
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowExecution.WorkflowID,
RunID: workflowExecution.RunID,
},
Identity: "identity",
},
})
s.Nil(err)
}

func (s *engine2Suite) TestRequestCancelWorkflowExecutionAlreadyCancelled_Success() {
domainID := constants.TestDomainID
workflowExecution := types.WorkflowExecution{
Expand Down Expand Up @@ -1057,6 +1088,40 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {
s.NotNil(resp.RunID)
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew_DuplicateRequestError() {
domainID := constants.TestDomainID
workflowID := "workflowID"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
partitionConfig := map[string]string{
"zone": "phx",
}

s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
})).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()

requestID := uuid.New()
resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: domainID,
StartRequest: &types.StartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
RequestID: requestID,
},
PartitionConfig: partitionConfig,
})
s.NoError(err)
s.Equal("test-run-id", resp.RunID)
}

func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
domainID := constants.TestDomainID
workflowID := "workflowID"
Expand Down Expand Up @@ -1348,6 +1413,48 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
s.Equal(runID, resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal_DuplicateRequestError() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)

domainID := constants.TestDomainID
workflowID := "wId"
runID := constants.TestRunID
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
Identity: identity,
SignalName: signalName,
Input: input,
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
testlogger.New(s.Suite.T()),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(err)
s.Equal("test-run-id", resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
Expand Down Expand Up @@ -1395,6 +1502,52 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() {
s.NotNil(resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist_DuplicateRequestError() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Error(err)

domainID := constants.TestDomainID
workflowID := "wId"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
requestID := uuid.New()
partitionConfig := map[string]string{
"zone": "phx",
}

sRequest = &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
SignalName: signalName,
Input: input,
RequestID: requestID,
},
PartitionConfig: partitionConfig,
}

notExistErr := &types.EntityNotExistsError{Message: "Workflow not exist"}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(nil, notExistErr).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.CreateWorkflowExecutionRequest) bool {
return !request.NewWorkflowSnapshot.ExecutionInfo.StartTimestamp.IsZero() && reflect.DeepEqual(partitionConfig, request.NewWorkflowSnapshot.ExecutionInfo.PartitionConfig)
})).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(err)
s.Equal("test-run-id", resp.GetRunID())
}
func (s *engine2Suite) TestSignalWithStartWorkflowExecution_CreateTimeout() {
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{}
_, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
Expand Down Expand Up @@ -1555,6 +1708,55 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque
s.Equal(runID, resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateRequestError() {
domainID := constants.TestDomainID
workflowID := "wId"
runID := constants.TestRunID
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
requestID := "testRequestID"
policy := types.WorkflowIDReusePolicyAllowDuplicate
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
Domain: domainID,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{Name: workflowType},
TaskList: &types.TaskList{Name: taskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: identity,
SignalName: signalName,
Input: input,
RequestID: requestID,
WorkflowIDReusePolicy: &policy,
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
testlogger.New(s.Suite.T()),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.DuplicateRequestError{RunID: "test-run-id"}).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(err)
s.Equal("test-run-id", resp.GetRunID())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlreadyStarted() {
domainID := constants.TestDomainID
workflowID := "wId"
Expand Down
Loading

0 comments on commit 1497967

Please sign in to comment.