Skip to content

Commit

Permalink
Merge branch 'master' into taylan/async_wf_q_config_replication_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx authored Apr 11, 2024
2 parents b5d6c8e + 7b4b90e commit 5cf1920
Showing 1 changed file with 212 additions and 4 deletions.
216 changes: 212 additions & 4 deletions service/history/decision/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,42 @@
package decision

import (
"context"
"errors"
"reflect"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
commonConfig "github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/constants"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/query"
"github.com/uber/cadence/service/history/shard"
)

const (
_testDomainUUID = "00000000000000000000000000000001"
_testInvalidDomainUUID = "some-invalid-UUID"
_testDomainName = "test-domain"
_testWorkflowID = "test-wfID"
_testRunID = "00000000000000000000000000000002"
_testCluster = "test-cluster"
_testShardID = 0
)

type (
Expand All @@ -47,8 +67,10 @@ type (
controller *gomock.Controller
mockMutableState *execution.MockMutableState

decisionHandler *handlerImpl
queryRegistry query.Registry
decisionHandler *handlerImpl
queryRegistry query.Registry
localDomainCacheEntry *cache.DomainCacheEntry
clusterMetadata cluster.Metadata
}
)

Expand All @@ -59,16 +81,23 @@ func TestDecisionHandlerSuite(t *testing.T) {
func (s *DecisionHandlerSuite) SetupTest() {
s.Assertions = require.New(s.T())
s.controller = gomock.NewController(s.T())

domainInfo := &persistence.DomainInfo{
ID: _testDomainUUID,
Name: _testDomainName,
}
s.localDomainCacheEntry = cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, _testCluster)
s.clusterMetadata = cluster.NewMetadata(0, _testCluster, _testCluster, map[string]commonConfig.ClusterInformation{}, func(domain string) bool {
return false
}, metrics.NewClient(tally.NoopScope, metrics.History), testlogger.New(s.T()))
s.decisionHandler = &handlerImpl{
versionChecker: client.NewVersionChecker(),
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
config: config.NewForTest(),
logger: testlogger.New(s.T()),
timeSource: clock.NewRealTimeSource(),
}
s.queryRegistry = s.constructQueryRegistry(10)
s.mockMutableState = execution.NewMockMutableState(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
workflowInfo := &persistence.WorkflowExecutionInfo{
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
Expand All @@ -80,34 +109,186 @@ func (s *DecisionHandlerSuite) TearDownTest() {
s.controller.Finish()
}

func (s *DecisionHandlerSuite) TestNewHandler() {
shardContext := shard.NewMockContext(s.controller)
tokenSerializer := common.NewMockTaskTokenSerializer(s.controller)
shardContext.EXPECT().GetConfig().Times(1).Return(&config.Config{})
shardContext.EXPECT().GetLogger().Times(2).Return(testlogger.New(s.T()))
shardContext.EXPECT().GetTimeSource().Times(1)
shardContext.EXPECT().GetDomainCache().Times(2)
shardContext.EXPECT().GetMetricsClient().Times(2)
shardContext.EXPECT().GetThrottledLogger().Times(1).Return(testlogger.New(s.T()))
h := NewHandler(shardContext, &execution.Cache{}, tokenSerializer)
s.NotNil(h)
s.Equal("handlerImpl", reflect.ValueOf(h).Elem().Type().Name())
}

func (s *DecisionHandlerSuite) TestHandleDecisionTaskScheduled() {
tests := []struct {
name string
domainID string
mutablestate *persistence.WorkflowMutableState
isfirstDecision bool
expectCalls func(shardContext *shard.MockContext)
expectErr bool
}{
{
name: "test HandleDecisionTaskScheduled - fail to retrieve domain From ID",
domainID: _testInvalidDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{},
},
expectCalls: func(shardContext *shard.MockContext) {},
expectErr: true,
},
{
name: "test HandleDecisionTaskScheduled - success",
domainID: _testDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{},
},
expectCalls: func(shardContext *shard.MockContext) {
shardContext.EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(s.controller))
},
expectErr: false,
},
{
name: "test HandleDecisionTaskScheduled - with completed workflow",
domainID: _testDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
// WorkflowStateCompleted = 2 from persistence WorkflowExecutionInfo.IsRunning()
State: 2,
},
},
expectCalls: func(shardContext *shard.MockContext) {
shardContext.EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(s.controller))
},
expectErr: true,
},
{
name: "test HandleDecisionTaskScheduled - with failure to get start event",
domainID: _testDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
// execution has no event yet
DecisionScheduleID: -23,
LastProcessedEvent: -23,
},
},
expectCalls: func(shardContext *shard.MockContext) {
eventsCache := events.NewMockCache(s.controller)
shardContext.EXPECT().GetEventsCache().Times(1).Return(eventsCache)
eventsCache.EXPECT().
GetEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Times(1).
Return(nil, &persistence.TimeoutError{Msg: "failed to get start event: request timeout"})
shardContext.EXPECT().GetShardID().Return(_testShardID).Times(1)
},
expectErr: true,
},
{
name: "test HandleDecisionTaskScheduled - first decision task scheduled failure",
domainID: _testDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DecisionScheduleID: -23,
LastProcessedEvent: -23,
},
BufferedEvents: append([]*types.HistoryEvent{}, &types.HistoryEvent{}),
},
expectCalls: func(shardContext *shard.MockContext) {
eventsCache := events.NewMockCache(s.controller)
shardContext.EXPECT().GetEventsCache().Times(1).Return(eventsCache)
eventsCache.EXPECT().
GetEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Times(1).
Return(&types.HistoryEvent{}, nil)
shardContext.EXPECT().GetShardID().Return(_testShardID).Times(1)
shardContext.EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
},
expectErr: true,
isfirstDecision: true,
},
{
name: "test HandleDecisionTaskScheduled - first decision task scheduled success",
domainID: _testDomainUUID,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DecisionScheduleID: -23,
LastProcessedEvent: -23,
},
},
expectCalls: func(shardContext *shard.MockContext) {
eventsCache := events.NewMockCache(s.controller)
shardContext.EXPECT().GetEventsCache().Times(1).Return(eventsCache)
eventsCache.EXPECT().
GetEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Times(1).
Return(&types.HistoryEvent{}, nil)
shardContext.EXPECT().GetShardID().Return(_testShardID).Times(1)
shardContext.EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
},
expectErr: true,
isfirstDecision: true,
},
}
for _, test := range tests {
s.Run(test.name, func() {
request := &types.ScheduleDecisionTaskRequest{
DomainUUID: test.domainID,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: _testWorkflowID,
RunID: _testRunID,
},
IsFirstDecision: test.isfirstDecision,
}
shardContext := shard.NewMockContext(s.controller)
test.expectCalls(shardContext)
s.expectHandleDecisionTaskScheduledCalls(test.domainID, test.mutablestate, shardContext)

handler := *s.decisionHandler
handler.executionCache = execution.NewCache(shardContext)
handler.shard = shardContext
err := handler.HandleDecisionTaskScheduled(context.Background(), request)
s.Equal(test.expectErr, err != nil)
})
}
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_ClientNotSupports() {
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, "0.0.0", nil, false, constants.TestGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 0, 0, 0, 10)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_HeartbeatDecision() {
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.GetBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, constants.TestGlobalDomainEntry, true)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_NewDecisionTask() {
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.GetBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, true, constants.TestGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 5, 5, 0, 0)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_NoNewDecisionTask() {
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.GetBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, constants.TestGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 0, 5, 5, 0)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_QueryTooLarge() {
s.mockMutableState.EXPECT().GetQueryRegistry().Return(s.queryRegistry)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
bufferedIDs := s.queryRegistry.GetBufferedIDs()
queryResults := s.constructQueryResults(bufferedIDs[0:5], 10)
Expand Down Expand Up @@ -144,3 +325,30 @@ func (s *DecisionHandlerSuite) assertQueryCounts(queryRegistry query.Registry, b
s.Len(queryRegistry.GetUnblockedIDs(), unblocked)
s.Len(queryRegistry.GetFailedIDs(), failed)
}

func (s *DecisionHandlerSuite) expectHandleDecisionTaskScheduledCalls(domainID string, state *persistence.WorkflowMutableState, shardContex *shard.MockContext) {
workflowExecutionResponse := &persistence.GetWorkflowExecutionResponse{
State: state,
MutableStateStats: &persistence.MutableStateStats{},
}
workflowExecutionResponse.State.ExecutionStats = &persistence.ExecutionStats{}
workflowExecutionResponse.State.ExecutionInfo.DomainID = domainID
workflowExecutionResponse.State.ExecutionInfo.WorkflowID = _testWorkflowID
workflowExecutionResponse.State.ExecutionInfo.RunID = _testRunID
shardContextConfig := config.NewForTest()
shardContextLogger := testlogger.New(s.T())
shardContextTimeSource := clock.NewMockedTimeSource()
shardContextMetricClient := metrics.NewClient(tally.NoopScope, metrics.History)
domainCacheMock := cache.NewMockDomainCache(s.controller)

shardContex.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).AnyTimes().Return(workflowExecutionResponse, nil)
shardContex.EXPECT().GetConfig().AnyTimes().Return(shardContextConfig)
shardContex.EXPECT().GetLogger().AnyTimes().Return(shardContextLogger)
shardContex.EXPECT().GetTimeSource().AnyTimes().Return(shardContextTimeSource)
shardContex.EXPECT().GetDomainCache().AnyTimes().Return(domainCacheMock)
shardContex.EXPECT().GetClusterMetadata().AnyTimes().Return(s.clusterMetadata)
shardContex.EXPECT().GetMetricsClient().AnyTimes().Return(shardContextMetricClient)
domainCacheMock.EXPECT().GetDomainByID(domainID).AnyTimes().Return(s.localDomainCacheEntry, nil)
domainCacheMock.EXPECT().GetDomainName(domainID).AnyTimes().Return(_testDomainName, nil)
shardContex.EXPECT().GetExecutionManager().Times(1)
}

0 comments on commit 5cf1920

Please sign in to comment.