From e263242187dcc14b939f3422e55d0960d9006b4e Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 29 Oct 2024 14:19:30 -0700 Subject: [PATCH 1/7] adds some coverage for the mutable state builder's reset --- .../execution/mutable_state_builder_test.go | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 879081ada29..3059a7c9e0d 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -1959,3 +1959,226 @@ func TestMutableStateBuilder_CopyToPersistence_roundtrip(t *testing.T) { } } + +func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { + + t1 := time.Unix(123, 0) + now := time.Unix(500, 0) + + badBinaryID := "bad-binary-id" + + mockDomainEntryWithBadBinary := cache.NewLocalDomainCacheEntryForTest(&persistence.DomainInfo{Name: "domain"}, &persistence.DomainConfig{ + BadBinaries: types.BadBinaries{ + Binaries: map[string]*types.BadBinaryInfo{ + badBinaryID: &types.BadBinaryInfo{ + Reason: "some-reason", + Operator: "", + CreatedTimeNano: common.Ptr(t1.UnixNano()), + }, + }, + }, + }, "cluster0") + + mockDomainEntryWithoutBadBinary := cache.NewLocalDomainCacheEntryForTest(nil, &persistence.DomainConfig{ + BadBinaries: types.BadBinaries{ + Binaries: map[string]*types.BadBinaryInfo{}, + }, + }, "cluster0") + + tests := map[string]struct { + policyIn TransactionPolicy + shardContextExpectations func(mockCache *events.MockCache, shard *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + mutableStateBuilderStartingState func(m *mutableStateBuilder) + + expectedEndState func(t *testing.T, m *mutableStateBuilder) + expectedErr error + }{ + "a workflow with reset point which is running - the expectation is that this should be able to successfully find the domain to reset and add a transfer task": { + + policyIn: TransactionPolicyActive, + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + // the workflow's running + m.executionInfo = &persistence.WorkflowExecutionInfo{ + CloseStatus: persistence.WorkflowCloseStatusNone, + DomainID: "some-domain-id", + WorkflowID: "wf-id", + AutoResetPoints: &types.ResetPoints{ + Points: []*types.ResetPointInfo{ + { + BinaryChecksum: badBinaryID, + RunID: "", + FirstDecisionCompletedID: 0, + CreatedTimeNano: common.Ptr(t1.UnixNano()), + ExpiringTimeNano: nil, + Resettable: true, + }, + }, + }, + } + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) + mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithBadBinary, nil) + }, + expectedEndState: func(t *testing.T, m *mutableStateBuilder) { + assert.Equal(t, []persistence.Task{ + &persistence.ResetWorkflowTask{ + TaskData: persistence.TaskData{ + Version: common.EmptyVersion, + }, + }, + }, m.insertTransferTasks) + }, + }, + "a workflow with reset point which is running for a domain without a bad binary - the expectation is this will not add any transfer tasks": { + + policyIn: TransactionPolicyActive, + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + // the workflow's running + m.executionInfo = &persistence.WorkflowExecutionInfo{ + CloseStatus: persistence.WorkflowCloseStatusNone, + DomainID: "some-domain-id", + WorkflowID: "wf-id", + AutoResetPoints: &types.ResetPoints{ + Points: []*types.ResetPointInfo{ + { + BinaryChecksum: badBinaryID, + RunID: "", + FirstDecisionCompletedID: 0, + CreatedTimeNano: common.Ptr(t1.UnixNano()), + ExpiringTimeNano: nil, + Resettable: true, + }, + }, + }, + } + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) + + mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) + }, + expectedEndState: func(t *testing.T, m *mutableStateBuilder) { + assert.Equal(t, []persistence.Task(nil), m.insertTransferTasks) + }, + }, + "a workflow withithout auto-reset point which is running for a domain": { + + policyIn: TransactionPolicyActive, + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + // the workflow's running + m.executionInfo = &persistence.WorkflowExecutionInfo{ + CloseStatus: persistence.WorkflowCloseStatusNone, + DomainID: "some-domain-id", + WorkflowID: "wf-id", + } + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) + + mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) + }, + expectedEndState: func(t *testing.T, m *mutableStateBuilder) { + assert.Equal(t, []persistence.Task(nil), m.insertTransferTasks) + }, + }, + "a workflow with reset point which is running but which has child workflows": { + policyIn: TransactionPolicyActive, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) + }, + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + // the workflow's running + m.executionInfo = &persistence.WorkflowExecutionInfo{ + CloseStatus: persistence.WorkflowCloseStatusNone, + } + + // there's some child workflow that's due to be updated + m.pendingChildExecutionInfoIDs = map[int64]*persistence.ChildExecutionInfo{ + 1: &persistence.ChildExecutionInfo{}, + } + }, + expectedEndState: func(t *testing.T, m *mutableStateBuilder) { + assert.Equal(t, []persistence.Task(nil), m.insertTransferTasks) + }, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + nowClock := clock.NewMockedTimeSourceAt(now) + + msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + td.mutableStateBuilderStartingState(msb) + + msb.timeSource = nowClock + err := msb.closeTransactionHandleWorkflowReset(td.policyIn) + assert.Equal(t, td.expectedErr, err) + td.expectedEndState(t, msb) + }) + } +} From 1293d33a7c81733919d767af206d2bdae33fc5cb Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 29 Oct 2024 15:13:59 -0700 Subject: [PATCH 2/7] additional case --- .../execution/mutable_state_builder_test.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 3059a7c9e0d..08c37ac8f0c 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2157,6 +2157,38 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { assert.Equal(t, []persistence.Task(nil), m.insertTransferTasks) }, }, + "Transaction policy passive - no expected resets": { + policyIn: TransactionPolicyPassive, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) + }, + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + // the workflow's running + m.executionInfo = &persistence.WorkflowExecutionInfo{ + CloseStatus: persistence.WorkflowCloseStatusNone, + } + + // there's some child workflow that's due to be updated + m.pendingChildExecutionInfoIDs = map[int64]*persistence.ChildExecutionInfo{ + 1: &persistence.ChildExecutionInfo{}, + } + }, + expectedEndState: func(t *testing.T, m *mutableStateBuilder) { + assert.Equal(t, []persistence.Task(nil), m.insertTransferTasks) + }, + }, } for name, td := range tests { From 65d12eb5fb9fdaf459112eef865dadead173b65a Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 29 Oct 2024 16:36:16 -0700 Subject: [PATCH 3/7] Small amount of coverage added --- .../execution/mutable_state_builder_test.go | 212 +++++++++++------- 1 file changed, 137 insertions(+), 75 deletions(-) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 08c37ac8f0c..6c8a5c3866a 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2017,21 +2017,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2068,22 +2054,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) - + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2102,22 +2073,7 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { } }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(2) - + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomainEntryWithoutBadBinary, nil) }, expectedEndState: func(t *testing.T, m *mutableStateBuilder) { @@ -2127,20 +2083,6 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { "a workflow with reset point which is running but which has child workflows": { policyIn: TransactionPolicyActive, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) }, mutableStateBuilderStartingState: func(m *mutableStateBuilder) { // the workflow's running @@ -2160,19 +2102,6 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { "Transaction policy passive - no expected resets": { policyIn: TransactionPolicyPassive, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { - shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) - shardContext.EXPECT().GetEventsCache().Return(mockCache) - shardContext.EXPECT().GetConfig().Return(&config.Config{ - NumberOfShards: 2, - IsAdvancedVisConfigExist: false, - MaxResponseSize: 0, - MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), - MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), - HostName: "test-host", - }).Times(1) - shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) }, mutableStateBuilderStartingState: func(m *mutableStateBuilder) { // the workflow's running @@ -2204,7 +2133,8 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { nowClock := clock.NewMockedTimeSourceAt(now) - msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + td.mutableStateBuilderStartingState(msb) msb.timeSource = nowClock @@ -2214,3 +2144,135 @@ func TestMutableStateBuilder_closeTransactionHandleWorkflowReset(t *testing.T) { }) } } + +func TestMutableStateBuilder_GetVersionHistoriesStart(t *testing.T) { + + tests := map[string]struct { + mutableStateBuilderStartingState func(m *mutableStateBuilder) + + expectedVersion int64 + expectedErr error + }{ + "A mutable state with version history": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 100, + Version: 23, + }, + { + EventID: 401, + Version: 424, + }, + }, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 200, + Version: 123, + }, + { + EventID: 201, + Version: 124, + }, + }, + }, + }, + } + }, + expectedVersion: 23, + }, + "invalid / partial version history ": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + { + BranchToken: []byte("branch-token2"), + Items: []*persistence.VersionHistoryItem{}, + }, + }, + } + }, + expectedErr: &types.BadRequestError{Message: "version history is empty."}, + expectedVersion: 0, + }, + "invalid / partial version history - branch not available": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + m.versionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 10, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{}, + }, + }, + } + }, + expectedErr: &types.BadRequestError{Message: "getting branch index: 10, available branch count: 2"}, + expectedVersion: 0, + }, + "nil version history": { + mutableStateBuilderStartingState: func(m *mutableStateBuilder) { + }, + expectedVersion: common.EmptyVersion, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + td.mutableStateBuilderStartingState(msb) + + res, err := msb.GetStartVersion() + assert.Equal(t, td.expectedErr, err) + assert.Equal(t, td.expectedVersion, res) + }) + } +} + +func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { + // the MSB constructor calls a bunch of endpoints on the mocks, so + // put them in here as a set of fixed expectations so the actual mocking + // code can just make expectations on the calls on the returned MSB object + // and not get cluttered with constructor calls + shardContext.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).Times(2) + shardContext.EXPECT().GetEventsCache().Return(mockCache) + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 2, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + HostName: "test-host", + }).Times(1) + shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1) + + msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + return msb +} From 0e12a6e32c8eef049b46bc229e6c7b973fc83c32 Mon Sep 17 00:00:00 2001 From: "david.porter" Date: Wed, 30 Oct 2024 01:35:17 +0000 Subject: [PATCH 4/7] more tests --- .../execution/mutable_state_builder_test.go | 227 ++++++++++++++++++ 1 file changed, 227 insertions(+) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 6c8a5c3866a..0b9e753625a 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -35,6 +35,7 @@ import ( "github.com/uber-go/tally" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/clock" @@ -2254,6 +2255,232 @@ func TestMutableStateBuilder_GetVersionHistoriesStart(t *testing.T) { } } + +func TestIsCurrentWorkflowGuaranteed(t *testing.T) { + tests := []struct { + name string + stateInDB int + expectedResult bool + }{ + { + name: "Workflow is created", + stateInDB: persistence.WorkflowStateCreated, + expectedResult: true, + }, + { + name: "Workflow is running", + stateInDB: persistence.WorkflowStateRunning, + expectedResult: true, + }, + { + name: "Workflow is completed", + stateInDB: persistence.WorkflowStateCompleted, + expectedResult: false, + }, + { + name: "Workflow is zombie", + stateInDB: persistence.WorkflowStateZombie, + expectedResult: false, + }, + { + name: "Workflow is void", + stateInDB: persistence.WorkflowStateVoid, + expectedResult: false, + }, + { + name: "Workflow is corrupted", + stateInDB: persistence.WorkflowStateCorrupted, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msb := mutableStateBuilder{} + msb.stateInDB = tt.stateInDB + result := msb.IsCurrentWorkflowGuaranteed() + assert.Equal(t, tt.expectedResult, result) + }) + } + + assert.Panics(t, func(){ + msb := mutableStateBuilder{} + msb.stateInDB = 123 + msb.IsCurrentWorkflowGuaranteed() + }) +} + +// this is a pretty poor test, the actual logic is better tested in the +// unit tests for getBackoffInterval() +func TestGetRetryBackoffDuration(t *testing.T) { + + tests := []struct { + name string + retryPolicy *persistence.WorkflowExecutionInfo + errorReason string + expectedBackoff time.Duration + }{ + { + name: "NoRetryPolicy", + retryPolicy: &persistence.WorkflowExecutionInfo{ + HasRetryPolicy: false, + }, + errorReason: "some error reason", + expectedBackoff: backoff.NoBackoff, + }, + { + name: "WithRetryPolicy", + retryPolicy: &persistence.WorkflowExecutionInfo{ + HasRetryPolicy: true, + ExpirationTime: time.Now().Add(time.Hour), + Attempt: 1, + MaximumAttempts: 5, + BackoffCoefficient: 2.0, + InitialInterval: 12, + NonRetriableErrors: []string{"non-retriable-error"}, + }, + errorReason: "some error reason", + expectedBackoff: 24 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + t1 := time.Unix(1730247795, 0) + + msb := mutableStateBuilder{} + + msb.executionInfo = tt.retryPolicy + msb.timeSource = clock.NewMockedTimeSourceAt(t1) + + duration := msb.GetRetryBackoffDuration(tt.errorReason) + assert.Equal(t, tt.expectedBackoff, duration) + }) + } +} + +func TestGetCronRetryBackoffDuration(t *testing.T) { + + t1 := time.Unix(1730247795, 0) + + sampleVersionHistory := &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 100, + Version: 23, + }, + { + EventID: 401, + Version: 424, + }, + }, + }, + { + BranchToken: []byte("branch-token1"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 200, + Version: 123, + }, + { + EventID: 201, + Version: 124, + }, + }, + }, + }, + } + + startEvent := &types.HistoryEvent{ + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + + }, + } + + tests := map[string]struct { + startingExecutionInfo *persistence.WorkflowExecutionInfo + expectedErr bool + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedBackoff time.Duration + }{ + "with simple, valid cron schedule" : { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + CronSchedule: "* * * * *", + RunID: "run-id", + WorkflowID: "wid", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetShardID().Return(12) + mockCache.EXPECT().GetEvent(gomock.Any(), 12, "domain-id", "wid", "run-id", int64(1), int64(1), []byte("branch-token1")).Return(startEvent, nil) + }, + expectedBackoff: 45 * time.Second, + }, + "with no cron schedule" : { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + }, + expectedBackoff: backoff.NoBackoff, + }, + "with invalid start event" : { + startingExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", + CronSchedule: "* * * * *", + StartTimestamp: t1, + }, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GetShardID().Return(12) + mockCache.EXPECT().GetEvent(gomock.Any(), 12, "domain-id", "wid", "run-id", int64(1), int64(1), []byte("branch-token1")).Return(nil, assert.AnError) + }, + expectedBackoff: backoff.NoBackoff, + expectedErr: true, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.executionInfo = td.startingExecutionInfo + msb.versionHistories = sampleVersionHistory + msb.timeSource = clock.NewMockedTimeSourceAt(t1) + + duration, err := msb.GetCronBackoffDuration(context.Background()) + assert.Equal(t, td.expectedBackoff, duration) + if td.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + + + func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking From 8bab069584cfd3166213ab19796b83aea4f32e15 Mon Sep 17 00:00:00 2001 From: "david.porter" Date: Wed, 30 Oct 2024 01:39:46 +0000 Subject: [PATCH 5/7] linty --- .../execution/mutable_state_builder_test.go | 73 +++++++++---------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 0b9e753625a..037d71de12d 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2255,7 +2255,6 @@ func TestMutableStateBuilder_GetVersionHistoriesStart(t *testing.T) { } } - func TestIsCurrentWorkflowGuaranteed(t *testing.T) { tests := []struct { name string @@ -2303,10 +2302,10 @@ func TestIsCurrentWorkflowGuaranteed(t *testing.T) { }) } - assert.Panics(t, func(){ - msb := mutableStateBuilder{} - msb.stateInDB = 123 - msb.IsCurrentWorkflowGuaranteed() + assert.Panics(t, func() { + msb := mutableStateBuilder{} + msb.stateInDB = 123 + msb.IsCurrentWorkflowGuaranteed() }) } @@ -2315,10 +2314,10 @@ func TestIsCurrentWorkflowGuaranteed(t *testing.T) { func TestGetRetryBackoffDuration(t *testing.T) { tests := []struct { - name string - retryPolicy *persistence.WorkflowExecutionInfo - errorReason string - expectedBackoff time.Duration + name string + retryPolicy *persistence.WorkflowExecutionInfo + errorReason string + expectedBackoff time.Duration }{ { name: "NoRetryPolicy", @@ -2331,12 +2330,12 @@ func TestGetRetryBackoffDuration(t *testing.T) { { name: "WithRetryPolicy", retryPolicy: &persistence.WorkflowExecutionInfo{ - HasRetryPolicy: true, - ExpirationTime: time.Now().Add(time.Hour), - Attempt: 1, - MaximumAttempts: 5, + HasRetryPolicy: true, + ExpirationTime: time.Now().Add(time.Hour), + Attempt: 1, + MaximumAttempts: 5, BackoffCoefficient: 2.0, - InitialInterval: 12, + InitialInterval: 12, NonRetriableErrors: []string{"non-retriable-error"}, }, errorReason: "some error reason", @@ -2397,24 +2396,22 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { } startEvent := &types.HistoryEvent{ - ID: 1, - WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ - - }, + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{}, } tests := map[string]struct { - startingExecutionInfo *persistence.WorkflowExecutionInfo - expectedErr bool - shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) - expectedBackoff time.Duration + startingExecutionInfo *persistence.WorkflowExecutionInfo + expectedErr bool + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedBackoff time.Duration }{ - "with simple, valid cron schedule" : { + "with simple, valid cron schedule": { startingExecutionInfo: &persistence.WorkflowExecutionInfo{ - DomainID: "domain-id", - CronSchedule: "* * * * *", - RunID: "run-id", - WorkflowID: "wid", + DomainID: "domain-id", + CronSchedule: "* * * * *", + RunID: "run-id", + WorkflowID: "wid", StartTimestamp: t1, }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { @@ -2423,23 +2420,23 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { }, expectedBackoff: 45 * time.Second, }, - "with no cron schedule" : { + "with no cron schedule": { startingExecutionInfo: &persistence.WorkflowExecutionInfo{ - DomainID: "domain-id", - RunID: "run-id", - WorkflowID: "wid", + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", StartTimestamp: t1, }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { }, expectedBackoff: backoff.NoBackoff, }, - "with invalid start event" : { + "with invalid start event": { startingExecutionInfo: &persistence.WorkflowExecutionInfo{ - DomainID: "domain-id", - RunID: "run-id", - WorkflowID: "wid", - CronSchedule: "* * * * *", + DomainID: "domain-id", + RunID: "run-id", + WorkflowID: "wid", + CronSchedule: "* * * * *", StartTimestamp: t1, }, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { @@ -2447,7 +2444,7 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { mockCache.EXPECT().GetEvent(gomock.Any(), 12, "domain-id", "wid", "run-id", int64(1), int64(1), []byte("branch-token1")).Return(nil, assert.AnError) }, expectedBackoff: backoff.NoBackoff, - expectedErr: true, + expectedErr: true, }, } @@ -2479,8 +2476,6 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { } } - - func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking From 9383cb657820d36061a42d37f952ab0c292fcf78 Mon Sep 17 00:00:00 2001 From: "david.porter" Date: Wed, 30 Oct 2024 20:57:53 +0000 Subject: [PATCH 6/7] WIP --- .../execution/mutable_state_builder.go | 18 - .../execution/mutable_state_builder_test.go | 310 ++++++++++++++++++ 2 files changed, 310 insertions(+), 18 deletions(-) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 5bac87ef07c..66a2182b7f1 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -144,7 +144,6 @@ type ( appliedEvents map[string]struct{} insertTransferTasks []persistence.Task - insertCrossClusterTasks []persistence.Task insertReplicationTasks []persistence.Task insertTimerTasks []persistence.Task @@ -1347,12 +1346,6 @@ func (e *mutableStateBuilder) AddTransferTasks( e.insertTransferTasks = append(e.insertTransferTasks, transferTasks...) } -func (e *mutableStateBuilder) AddCrossClusterTasks( - crossClusterTasks ...persistence.Task, -) { - e.insertCrossClusterTasks = append(e.insertCrossClusterTasks, crossClusterTasks...) -} - // TODO convert AddTimerTasks to prepareTimerTasks func (e *mutableStateBuilder) AddTimerTasks( timerTasks ...persistence.Task, @@ -1365,10 +1358,6 @@ func (e *mutableStateBuilder) GetTransferTasks() []persistence.Task { return e.insertTransferTasks } -func (e *mutableStateBuilder) GetCrossClusterTasks() []persistence.Task { - return e.insertCrossClusterTasks -} - func (e *mutableStateBuilder) GetTimerTasks() []persistence.Task { return e.insertTimerTasks } @@ -1377,10 +1366,6 @@ func (e *mutableStateBuilder) DeleteTransferTasks() { e.insertTransferTasks = nil } -func (e *mutableStateBuilder) DeleteCrossClusterTasks() { - e.insertCrossClusterTasks = nil -} - func (e *mutableStateBuilder) DeleteTimerTasks() { e.insertTimerTasks = nil } @@ -1487,7 +1472,6 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( ClearBufferedEvents: e.clearBufferedEvents, TransferTasks: e.insertTransferTasks, - CrossClusterTasks: e.insertCrossClusterTasks, ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, @@ -1567,7 +1551,6 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( SignalRequestedIDs: convertStringSetToSlice(e.pendingSignalRequestedIDs), TransferTasks: e.insertTransferTasks, - CrossClusterTasks: e.insertCrossClusterTasks, ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, @@ -1678,7 +1661,6 @@ func (e *mutableStateBuilder) cleanupTransaction() error { e.nextEventIDInDB = e.GetNextEventID() e.insertTransferTasks = nil - e.insertCrossClusterTasks = nil e.insertReplicationTasks = nil e.insertTimerTasks = nil diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 037d71de12d..a3197973b14 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2476,6 +2476,316 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { } } +func TestMutableStateBuilder_GetTransferTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTransferTasks: []persistence.Task{ + &persistence.ActivityTask{}, + &persistence.DecisionTask{}, + }, + } + tasks := msb.GetTransferTasks() + assert.Equal(t, 2, len(tasks)) + assert.IsType(t, &persistence.ActivityTask{}, tasks[0]) + assert.IsType(t, &persistence.DecisionTask{}, tasks[1]) +} + + +func TestMutableStateBuilder_GetTimerTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTimerTasks: []persistence.Task{ + &persistence.UserTimerTask{}, + }, + } + tasks := msb.GetTimerTasks() + assert.Equal(t, 1, len(tasks)) + assert.IsType(t, &persistence.UserTimerTask{}, tasks[0]) +} + +func TestMutableStateBuilder_DeleteTransferTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTransferTasks: []persistence.Task{ + &persistence.ActivityTask{}, + }, + } + msb.DeleteTransferTasks() + assert.Nil(t, msb.insertTransferTasks) +} + + +func TestMutableStateBuilder_DeleteTimerTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTimerTasks: []persistence.Task{ + &persistence.UserTimerTask{}, + }, + } + msb.DeleteTimerTasks() + assert.Nil(t, msb.insertTimerTasks) +} + +func TestMutableStateBuilder_SetUpdateCondition(t *testing.T) { + msb := &mutableStateBuilder{} + msb.SetUpdateCondition(123) + assert.Equal(t, int64(123), msb.nextEventIDInDB) +} + +func TestMutableStateBuilder_GetUpdateCondition(t *testing.T) { + msb := &mutableStateBuilder{ + nextEventIDInDB: 123, + } + assert.Equal(t, int64(123), msb.GetUpdateCondition()) +} + +func TestCheckAndClearTimerFiredEvent(t *testing.T) { + tests := []struct { + name string + timerID string + bufferedEvents []*types.HistoryEvent + updateBufferedEvents []*types.HistoryEvent + history []*types.HistoryEvent + expectedTimerEvent *types.HistoryEvent + expectedBufferedEvents []*types.HistoryEvent + expectedUpdateBufferedEvents []*types.HistoryEvent + expectedHistory []*types.HistoryEvent + }{ + { + name: "TimerFiredEventInBufferedEvents", + timerID: "timer1", + bufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + }, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + }, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, + }, + { + name: "TimerFiredEventInUpdateBufferedEvents", + timerID: "timer2", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + history: []*types.HistoryEvent{}, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{ + }, + expectedUpdateBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + expectedHistory: []*types.HistoryEvent{ + }, + }, + { + name: "TimerFiredEventInHistory", + timerID: "timer3", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{}, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + }, + }, + { + name: "NoTimerFiredEvent", + timerID: "timer4", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, + expectedTimerEvent: nil, + expectedBufferedEvents: []*types.HistoryEvent{}, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msb := &mutableStateBuilder{ + bufferedEvents: tt.bufferedEvents, + updateBufferedEvents: tt.updateBufferedEvents, + hBuilder: &HistoryBuilder{history: tt.history}, + } + + timerEvent := msb.checkAndClearTimerFiredEvent(tt.timerID) + + assert.Equal(t, tt.expectedTimerEvent, timerEvent) + assert.Equal(t, tt.expectedBufferedEvents, msb.bufferedEvents) + assert.Equal(t, tt.expectedUpdateBufferedEvents, msb.updateBufferedEvents) + assert.Equal(t, tt.expectedHistory, msb.hBuilder.history) + }) + } +} + + +func TestAssignTaskIDToEvents(t *testing.T) { + + tests := map[string]struct { + transientHistory []*types.HistoryEvent + taskID int64 + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedEvents []*types.HistoryEvent + expectedErr error + }{ + "AssignTaskIDToSingleEvent": { + transientHistory: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + }, + }, + taskID: 123, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: 123, + }, + }, + }, + // "AssignTaskIDToMultipleEvents": { + // transientHistory: []*types.HistoryEvent{ + // { + // ID: 1, + // EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + // }, + // { + // ID: 2, + // EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + // }, + // }, + // taskID: 456, + // expectedEvents: []*types.HistoryEvent{ + // { + // ID: 1, + // EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + // TaskID: 456, + // }, + // { + // ID: 2, + // EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + // TaskID: 456, + // }, + // }, + // }, + // "NoEvents": { + // transientHistory: []*types.HistoryEvent{}, + // taskID: 789, + // expectedEvents: []*types.HistoryEvent{}, + // }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.hBuilder.history = td.transientHistory + + err := msb.assignTaskIDToEvents() + + assert.Equal(t, td.expectedEvents, msb.hBuilder.history) + assert.Equal(t, td.expectedErr, err) + }) + } +} + + + func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking From cf2b4db15a229d875dfe832519990e3b5b3ad602 Mon Sep 17 00:00:00 2001 From: "david.porter" Date: Thu, 31 Oct 2024 06:07:16 +0000 Subject: [PATCH 7/7] fmt --- .../execution/mutable_state_builder.go | 18 +-- .../execution/mutable_state_builder_test.go | 135 +++++++----------- 2 files changed, 60 insertions(+), 93 deletions(-) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 2ceed7e2489..64f9422bfd5 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -143,9 +143,9 @@ type ( // TODO: persist this to db appliedEvents map[string]struct{} - insertTransferTasks []persistence.Task - insertReplicationTasks []persistence.Task - insertTimerTasks []persistence.Task + insertTransferTasks []persistence.Task + insertReplicationTasks []persistence.Task + insertTimerTasks []persistence.Task workflowRequests map[persistence.WorkflowRequest]struct{} @@ -1472,9 +1472,9 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( NewBufferedEvents: e.updateBufferedEvents, ClearBufferedEvents: e.clearBufferedEvents, - TransferTasks: e.insertTransferTasks, - ReplicationTasks: e.insertReplicationTasks, - TimerTasks: e.insertTimerTasks, + TransferTasks: e.insertTransferTasks, + ReplicationTasks: e.insertReplicationTasks, + TimerTasks: e.insertTimerTasks, WorkflowRequests: convertWorkflowRequests(e.workflowRequests), @@ -1551,9 +1551,9 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( SignalInfos: convertPendingSignalInfos(e.pendingSignalInfoIDs), SignalRequestedIDs: convertStringSetToSlice(e.pendingSignalRequestedIDs), - TransferTasks: e.insertTransferTasks, - ReplicationTasks: e.insertReplicationTasks, - TimerTasks: e.insertTimerTasks, + TransferTasks: e.insertTransferTasks, + ReplicationTasks: e.insertReplicationTasks, + TimerTasks: e.insertTimerTasks, WorkflowRequests: convertWorkflowRequests(e.workflowRequests), diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index b6d354b8c51..767e157e697 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2889,7 +2889,6 @@ func TestMutableStateBuilder_GetTransferTasks(t *testing.T) { assert.IsType(t, &persistence.DecisionTask{}, tasks[1]) } - func TestMutableStateBuilder_GetTimerTasks(t *testing.T) { msb := &mutableStateBuilder{ insertTimerTasks: []persistence.Task{ @@ -2911,7 +2910,6 @@ func TestMutableStateBuilder_DeleteTransferTasks(t *testing.T) { assert.Nil(t, msb.insertTransferTasks) } - func TestMutableStateBuilder_DeleteTimerTasks(t *testing.T) { msb := &mutableStateBuilder{ insertTimerTasks: []persistence.Task{ @@ -2937,18 +2935,18 @@ func TestMutableStateBuilder_GetUpdateCondition(t *testing.T) { func TestCheckAndClearTimerFiredEvent(t *testing.T) { tests := []struct { - name string - timerID string - bufferedEvents []*types.HistoryEvent - updateBufferedEvents []*types.HistoryEvent - history []*types.HistoryEvent - expectedTimerEvent *types.HistoryEvent - expectedBufferedEvents []*types.HistoryEvent + name string + timerID string + bufferedEvents []*types.HistoryEvent + updateBufferedEvents []*types.HistoryEvent + history []*types.HistoryEvent + expectedTimerEvent *types.HistoryEvent + expectedBufferedEvents []*types.HistoryEvent expectedUpdateBufferedEvents []*types.HistoryEvent - expectedHistory []*types.HistoryEvent + expectedHistory []*types.HistoryEvent }{ { - name: "TimerFiredEventInBufferedEvents", + name: "TimerFiredEventInBufferedEvents", timerID: "timer1", bufferedEvents: []*types.HistoryEvent{ { @@ -2965,7 +2963,7 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { }, }, updateBufferedEvents: []*types.HistoryEvent{}, - history: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, expectedTimerEvent: &types.HistoryEvent{ EventType: types.EventTypeTimerFired.Ptr(), TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ @@ -2981,11 +2979,11 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { }, }, expectedUpdateBufferedEvents: []*types.HistoryEvent{}, - expectedHistory: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, }, { - name: "TimerFiredEventInUpdateBufferedEvents", - timerID: "timer2", + name: "TimerFiredEventInUpdateBufferedEvents", + timerID: "timer2", bufferedEvents: []*types.HistoryEvent{}, updateBufferedEvents: []*types.HistoryEvent{ { @@ -3014,8 +3012,7 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { TimerID: "timer2", }, }, - expectedBufferedEvents: []*types.HistoryEvent{ - }, + expectedBufferedEvents: []*types.HistoryEvent{}, expectedUpdateBufferedEvents: []*types.HistoryEvent{ { EventType: types.EventTypeTimerFired.Ptr(), @@ -3030,13 +3027,12 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { }, }, }, - expectedHistory: []*types.HistoryEvent{ - }, + expectedHistory: []*types.HistoryEvent{}, }, { - name: "TimerFiredEventInHistory", - timerID: "timer3", - bufferedEvents: []*types.HistoryEvent{}, + name: "TimerFiredEventInHistory", + timerID: "timer3", + bufferedEvents: []*types.HistoryEvent{}, updateBufferedEvents: []*types.HistoryEvent{}, history: []*types.HistoryEvent{ { @@ -3058,7 +3054,7 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { TimerID: "timer3", }, }, - expectedBufferedEvents: []*types.HistoryEvent{}, + expectedBufferedEvents: []*types.HistoryEvent{}, expectedUpdateBufferedEvents: []*types.HistoryEvent{}, expectedHistory: []*types.HistoryEvent{ { @@ -3070,15 +3066,15 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { }, }, { - name: "NoTimerFiredEvent", - timerID: "timer4", - bufferedEvents: []*types.HistoryEvent{}, - updateBufferedEvents: []*types.HistoryEvent{}, - history: []*types.HistoryEvent{}, - expectedTimerEvent: nil, - expectedBufferedEvents: []*types.HistoryEvent{}, + name: "NoTimerFiredEvent", + timerID: "timer4", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, + expectedTimerEvent: nil, + expectedBufferedEvents: []*types.HistoryEvent{}, expectedUpdateBufferedEvents: []*types.HistoryEvent{}, - expectedHistory: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, }, } @@ -3100,22 +3096,21 @@ func TestCheckAndClearTimerFiredEvent(t *testing.T) { } } - func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { tests := map[string]struct { - transientHistory []*types.HistoryEvent - taskID int64 + transientHistory []*types.HistoryEvent + taskID int64 shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) - expectedEvents []*types.HistoryEvent - expectedErr error + expectedEvents []*types.HistoryEvent + expectedErr error }{ "AssignTaskIDToSingleEvent - transient": { transientHistory: []*types.HistoryEvent{ { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 123, @@ -3135,12 +3130,12 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, { ID: 2, EventType: types.EventTypeDecisionTaskScheduled.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 456, @@ -3162,7 +3157,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { }, "NoEvents - transient events": { transientHistory: []*types.HistoryEvent{}, - taskID: 789, + taskID: 789, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { }, expectedEvents: []*types.HistoryEvent{}, @@ -3172,7 +3167,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 456, @@ -3183,7 +3178,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, expectedErr: assert.AnError, @@ -3216,18 +3211,18 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { func TestAssignTaskIDToHistoryEvents(t *testing.T) { tests := map[string]struct { - history []*types.HistoryEvent - taskID int64 + history []*types.HistoryEvent + taskID int64 shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) - expectedEvents []*types.HistoryEvent - expectedErr error + expectedEvents []*types.HistoryEvent + expectedErr error }{ "AssignTaskIDToSingleEvent": { history: []*types.HistoryEvent{ { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 123, @@ -3247,12 +3242,12 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, { ID: 2, EventType: types.EventTypeDecisionTaskScheduled.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 456, @@ -3274,7 +3269,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) { }, "NoEvents - transient events": { history: []*types.HistoryEvent{}, - taskID: 789, + taskID: 789, shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { }, expectedEvents: []*types.HistoryEvent{}, @@ -3284,7 +3279,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, taskID: 456, @@ -3295,7 +3290,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) { { ID: 1, EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), - TaskID: common.EmptyEventTaskID, + TaskID: common.EmptyEventTaskID, }, }, expectedErr: assert.AnError, @@ -3348,7 +3343,7 @@ func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { mutableStateBuilderSetup: func(m *mutableStateBuilder) { }, expectedEvent: &types.HistoryEvent{ - ID: 1, + ID: 1, EventType: types.EventTypeUpsertWorkflowSearchAttributes.Ptr(), UpsertWorkflowSearchAttributesEventAttributes: &types.UpsertWorkflowSearchAttributesEventAttributes{ DecisionTaskCompletedEventID: 123, @@ -3358,8 +3353,8 @@ func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { }, }, }, - TaskID: common.EmptyEventTaskID, - Version: common.EmptyVersion, + TaskID: common.EmptyEventTaskID, + Version: common.EmptyVersion, Timestamp: common.Ptr(now.UnixNano()), }, expectedErr: nil, @@ -3379,32 +3374,6 @@ func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { expectedEvent: nil, expectedErr: &types.InternalServiceError{Message: "invalid mutable state action: mutation after finish"}, }, - // "replication fails": { - // decisionCompletedEventID: 123, - // request: &types.UpsertWorkflowSearchAttributesDecisionAttributes{ - // SearchAttributes: &types.SearchAttributes{ - // IndexedFields: map[string][]byte{ - // "CustomKeywordField": []byte("keyword"), - // }, - // }, - // }, - // mutableStateBuilderSetup: func(m *mutableStateBuilder) { - // m.hBuilder = &HistoryBuilder{} - // m.hBuilder.AddUpsertWorkflowSearchAttributesEvent = func(decisionCompletedEventID int64, request *types.UpsertWorkflowSearchAttributesDecisionAttributes) *types.HistoryEvent { - // return &types.HistoryEvent{ - // EventType: types.EventTypeUpsertWorkflowSearchAttributes.Ptr(), - // UpsertWorkflowSearchAttributesEventAttributes: &types.UpsertWorkflowSearchAttributesEventAttributes{ - // SearchAttributes: request.SearchAttributes, - // }, - // } - // } - // m.ReplicateUpsertWorkflowSearchAttributesEvent = func(event *types.HistoryEvent) error { - // return errors.New("replication failed") - // } - // }, - // expectedEvent: nil, - // expectedErr: errors.New("replication failed"), - // }, } for name, td := range tests { @@ -3416,13 +3385,12 @@ func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { mockCache := events.NewMockCache(ctrl) mockDomainCache := cache.NewMockDomainCache(ctrl) - nowClock := clock.NewMockedTimeSourceAt(now) msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) msb.hBuilder = &HistoryBuilder{ - history: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, msBuilder: msb, } @@ -3438,7 +3406,6 @@ func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { } } - func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking