diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 08c37ac8f0c..037d71de12d 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -35,6 +35,7 @@ import ( "github.com/uber-go/tally" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/clock" @@ -2017,21 +2018,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2068,22 +2055,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) - + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2102,22 +2074,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) - + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2127,20 +2084,6 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { "a workflow with reset point which is running but which has child workflows": { policyIn: TransactionPolicyActive, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) }, mutableStateBuilderStartingState: func(m *mutableStateBuilder) { // the workflow's running @@ -2160,19 +2103,6 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { "Transaction policy passive - no expected resets": { policyIn: TransactionPolicyPassive, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) }, mutableStateBuilderStartingState: func(m *mutableStateBuilder) { // the workflow's running @@ -2204,7 +2134,8 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { nowClock := clock.NewMockedTimeSourceAt(now) - msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + td.mutableStateBuilderStartingState(msb) msb.timeSource = nowClock @@ -2214,3 +2145,356 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { }) } } + +func TestMutableStateBuilder_GetVersionHistoriesStart(t *testing.T) { + + tests := map[string]struct { + mutableStateBuilderStartingState func(m *mutableStateBuilder) + + expectedVersion int64 + expectedErr error + }{ + "A mutable state with version history": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 100, + Version: 23, + }, + { + EventID: 401, + Version: 424, + }, + }, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 200, + Version: 123, + }, + { + EventID: 201, + Version: 124, + }, + }, + }, + }, + } + }, + expectedVersion: 23, + }, + "invalid / partial version history ": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + { + BranchToken: []byte("branch-token2"), + Items: []*persistence.VersionHistoryItem{}, + }, + }, + } + }, + expectedErr: &types.BadRequestError{Message: "version history is empty."}, + expectedVersion: 0, + }, + "invalid / partial version history - branch not available": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 10, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + }, + } + }, + expectedErr: &types.BadRequestError{Message: "getting branch index: 10, available branch count: 2"}, + expectedVersion: 0, + }, + "nil version history": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + }, + expectedVersion: common.EmptyVersion, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + td.mutableStateBuilderStartingState(msb) + + res, err := msb.GetStartVersion() + assert.Equal(t, td.expectedErr, err) + assert.Equal(t, td.expectedVersion, res) + }) + } +} + +func TestIsCurrentWorkflowGuaranteed(t *testing.T) { + tests := []struct { + name string + stateInDB int + expectedResult bool + }{ + { + name: "Workflow is created", + stateInDB: persistence.WorkflowStateCreated, + expectedResult: true, + }, + { + name: "Workflow is running", + stateInDB: persistence.WorkflowStateRunning, + expectedResult: true, + }, + { + name: "Workflow is completed", + stateInDB: persistence.WorkflowStateCompleted, + expectedResult: false, + }, + { + name: "Workflow is zombie", + stateInDB: persistence.WorkflowStateZombie, + expectedResult: false, + }, + { + name: "Workflow is void", + stateInDB: persistence.WorkflowStateVoid, + expectedResult: false, + }, + { + name: "Workflow is corrupted", + stateInDB: persistence.WorkflowStateCorrupted, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msb := mutableStateBuilder{} + msb.stateInDB = tt.stateInDB + result := msb.IsCurrentWorkflowGuaranteed() + assert.Equal(t, tt.expectedResult, result) + }) + } + + assert.Panics(t, func() { + msb := mutableStateBuilder{} + msb.stateInDB = 123 + msb.IsCurrentWorkflowGuaranteed() + }) +} + +// this is a pretty poor test, the actual logic is better tested in the +// unit tests for getBackoffInterval() +func TestGetRetryBackoffDuration(t *testing.T) { + + tests := []struct { + name string + retryPolicy *persistence.WorkflowExecutionInfo + errorReason string + expectedBackoff time.Duration + }{ + { + name: "NoRetryPolicy", + retryPolicy: &persistence.WorkflowExecutionInfo{ + HasRetryPolicy: false, + }, + errorReason: "some error reason", + expectedBackoff: backoff.NoBackoff, + }, + { + name: "WithRetryPolicy", + retryPolicy: &persistence.WorkflowExecutionInfo{ + HasRetryPolicy: true, + ExpirationTime: time.Now().Add(time.Hour), + Attempt: 1, + MaximumAttempts: 5, + BackoffCoefficient: 2.0, + InitialInterval: 12, + NonRetriableErrors: []string{"non-retriable-error"}, + }, + errorReason: "some error reason", + expectedBackoff: 24 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + t1 := time.Unix(1730247795, 0) + + msb := mutableStateBuilder{} + + msb.executionInfo = tt.retryPolicy + msb.timeSource = clock.NewMockedTimeSourceAt(t1) + + duration := msb.GetRetryBackoffDuration(tt.errorReason) + assert.Equal(t, tt.expectedBackoff, duration) + }) + } +} + +func TestGetCronRetryBackoffDuration(t *testing.T) { + + t1 := time.Unix(1730247795, 0) + + sampleVersionHistory := &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 100, + Version: 23, + }, + { + EventID: 401, + Version: 424, + }, + }, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 200, + Version: 123, + }, + { + EventID: 201, + Version: 124, + }, + }, + }, + }, + } + + startEvent := &types.HistoryEvent{ + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{}, + } + + tests := map[string]struct { + startingExecutionInfo *persistence.WorkflowExecutionInfo + expectedErr bool + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedBackoff time.Duration + }{ + "with simple, valid cron schedule": { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + CronSchedule: "* * * * *", + RunID: "run-id", + WorkflowID: "wid", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetShardID().Return(12) + mockCache.EXPECT().GetEvent(gomock.Any(), 12, "domain-id", "wid", "run-id", int64(1), int64(1), []byte("branch-token1")).Return(startEvent, nil) + }, + expectedBackoff: 45 * time.Second, + }, + "with no cron schedule": { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + }, + expectedBackoff: backoff.NoBackoff, + }, + "with invalid start event": { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", + CronSchedule: "* * * * *", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetShardID().Return(12) + mockCache.EXPECT().GetEvent(gomock.Any(), 12, "domain-id", "wid", "run-id", int64(1), int64(1), []byte("branch-token1")).Return(nil, assert.AnError) + }, + expectedBackoff: backoff.NoBackoff, + expectedErr: true, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.executionInfo = td.startingExecutionInfo + msb.versionHistories = sampleVersionHistory + msb.timeSource = clock.NewMockedTimeSourceAt(t1) + + duration, err := msb.GetCronBackoffDuration(context.Background()) + assert.Equal(t, td.expectedBackoff, duration) + if td.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { + // the MSB constructor calls a bunch of endpoints on the mocks, so + // put them in here as a set of fixed expectations so the actual mocking + // code can just make expectations on the calls on the returned MSB object + // and not get cluttered with constructor calls + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) + + msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + return msb +}