From 28f48e464ac028c6e0c3bea763b7eda4c69155e4 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 30 Oct 2024 20:05:30 -0700 Subject: [PATCH] Adds coverage to mutable state builder (#6388) * Adds coverage to mutable state builder --- .../execution/mutable_state_builder.go | 5 +- .../execution/mutable_state_builder_test.go | 400 ++++++++++++++++++ 2 files changed, 401 insertions(+), 4 deletions(-) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 5bac87ef07c..dfe9d27617d 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -380,6 +380,7 @@ func (e *mutableStateBuilder) Load( } } } + return nil } @@ -1365,10 +1366,6 @@ func (e *mutableStateBuilder) GetTransferTasks() []persistence.Task { return e.insertTransferTasks } -func (e *mutableStateBuilder) GetCrossClusterTasks() []persistence.Task { - return e.insertCrossClusterTasks -} - func (e *mutableStateBuilder) GetTimerTasks() []persistence.Task { return e.insertTimerTasks } diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 037d71de12d..5006f8e3535 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -40,9 +40,11 @@ import ( "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" + commonConfig "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -51,6 +53,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/events" + "github.com/uber/cadence/service/history/query" "github.com/uber/cadence/service/history/shard" shardCtx "github.com/uber/cadence/service/history/shard" ) @@ -2476,6 +2479,403 @@ func TestGetCronRetryBackoffDuration(t *testing.T) { } } +func TestStartTransactionHandleFailover(t *testing.T) { + + tests := map[string]struct { + incomingTaskVersion int64 + currentVersion int64 + decisionManagerAffordance func(m *MockmutableStateDecisionTaskManager) + expectFlushBeforeReady bool + expectedErr bool + }{ + "Failing over from cluster2 to cluster1 - passive -> passive: There's an inflight decision, but it's from an earlier version": { + incomingTaskVersion: 10, + currentVersion: 2, + decisionManagerAffordance: func(m *MockmutableStateDecisionTaskManager) { + m.EXPECT().GetInFlightDecision().Return(&DecisionInfo{ + Version: 2, + }, true) + }, + expectFlushBeforeReady: false, + }, + // todo: David.porter - look a bit more into why this could occur and write a better description + // about what the intent is, because this is a unit test without a clear intent or outcome. + // At the time of writing this test I believe this is a migration case, but I'm not 100% sure and + // need to do some runtime debugging. + "empty version": { + incomingTaskVersion: common.EmptyVersion, + currentVersion: 2, + decisionManagerAffordance: func(m *MockmutableStateDecisionTaskManager) { + m.EXPECT().GetInFlightDecision().Return(&DecisionInfo{ + Version: 2, + }, true) + }, + expectFlushBeforeReady: false, + }, + "active -> passive - when there's an inflight decision from an earlier version": { + incomingTaskVersion: 12, + currentVersion: 11, + decisionManagerAffordance: func(m *MockmutableStateDecisionTaskManager) { + m.EXPECT().GetInFlightDecision().Return(&DecisionInfo{ + Version: 2, + StartedID: 123, + ScheduleID: 124, + RequestID: "requestID", + }, true) + m.EXPECT().AddDecisionTaskFailedEvent(int64(124), int64(123), types.DecisionTaskFailedCauseFailoverCloseDecision, gomock.Any(), "history-service", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()) + + m.EXPECT().HasInFlightDecision().Return(true) + m.EXPECT().HasInFlightDecision().Return(true) + m.EXPECT().HasPendingDecision().Return(true) + }, + expectFlushBeforeReady: true, + }, + "There's a decision for for the same level as the failover version": { + incomingTaskVersion: 10, + currentVersion: 10, + decisionManagerAffordance: func(m *MockmutableStateDecisionTaskManager) { + m.EXPECT().GetInFlightDecision().Return(&DecisionInfo{ + Version: 1, + }, true) + }, + expectFlushBeforeReady: false, + expectedErr: true, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shardCtx.NewMockContext(ctrl) + + decisionManager := NewMockmutableStateDecisionTaskManager(ctrl) + td.decisionManagerAffordance(decisionManager) + + shardContext.EXPECT().GetConfig().Return(&config.Config{ + NumberOfShards: 3, + IsAdvancedVisConfigExist: false, + MaxResponseSize: 0, + MutableStateChecksumInvalidateBefore: dynamicconfig.GetFloatPropertyFn(10), + MutableStateChecksumVerifyProbability: dynamicconfig.GetIntPropertyFilteredByDomain(0.0), + EnableReplicationTaskGeneration: func(_ string, _ string) bool { return true }, + HostName: "test-host", + }).Times(1) + + clusterMetadata := cluster.NewMetadata( + 10, + "cluster0", + "cluster0", + map[string]commonConfig.ClusterInformation{ + "cluster0": commonConfig.ClusterInformation{ + Enabled: true, + InitialFailoverVersion: 1, + }, + "cluster1": commonConfig.ClusterInformation{ + Enabled: true, + InitialFailoverVersion: 0, + }, + "cluster2": commonConfig.ClusterInformation{ + Enabled: true, + InitialFailoverVersion: 2, + }, + }, + func(string) bool { return false }, + metrics.NewNoopMetricsClient(), + loggerimpl.NewNopLogger(), + ) + + domainEntry := cache.NewDomainCacheEntryForTest(&persistence.DomainInfo{ + ID: "domain-id", + Name: "domain", + }, + &persistence.DomainConfig{}, + true, + &persistence.DomainReplicationConfig{ + ActiveClusterName: "cluster0", + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: "cluster0"}, + {ClusterName: "cluster1"}, + {ClusterName: "cluster2"}, + }, + }, 0, nil, 0, 0, 0) + + msb := mutableStateBuilder{ + decisionTaskManager: decisionManager, + shard: shardContext, + domainEntry: domainEntry, + clusterMetadata: clusterMetadata, + currentVersion: td.currentVersion, + versionHistories: &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("token"), + Items: []*persistence.VersionHistoryItem{ + { + EventID: 3, + Version: 10, + }, + { + EventID: 2, + Version: 2, + }, + }, + }, + }, + }, + executionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "domainID", + WorkflowID: "workflowID", + RunID: "some-example-run", + FirstExecutionRunID: "", + ParentDomainID: "", + ParentWorkflowID: "", + ParentRunID: "", + InitiatedID: 0, + CompletionEventBatchID: 0, + CompletionEvent: nil, + State: 0, + CloseStatus: 0, + LastFirstEventID: 0, + LastEventTaskID: 0, + NextEventID: 0, + LastProcessedEvent: 0, + }, + } + + msb.hBuilder = NewHistoryBuilder(&msb) + + flushBeforeReady, err := msb.startTransactionHandleDecisionFailover(td.incomingTaskVersion) + if td.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, td.expectFlushBeforeReady, flushBeforeReady) + }) + } +} + +func TestSimpleGetters(t *testing.T) { + + msb := createMSB() + assert.Equal(t, msb.versionHistories, msb.GetVersionHistories()) + + branchToken, err := msb.GetCurrentBranchToken() + assert.Equal(t, msb.versionHistories.Histories[0].BranchToken, branchToken) + assert.NoError(t, err) + assert.Equal(t, msb.currentVersion, msb.GetCurrentVersion()) + assert.Equal(t, msb.domainEntry, msb.GetDomainEntry()) + assert.Equal(t, msb.executionInfo, msb.GetExecutionInfo()) + assert.Equal(t, msb.hBuilder, msb.GetHistoryBuilder()) + assert.Equal(t, msb.executionStats.HistorySize, msb.GetHistorySize()) + assert.Equal(t, msb.executionInfo.LastFirstEventID, msb.GetLastFirstEventID()) + lastWriteVersion, err := msb.GetLastWriteVersion() + + item, err := msb.versionHistories.Histories[0].GetLastItem() + assert.NoError(t, err) + assert.Equal(t, item.Version, lastWriteVersion) + + assert.Equal(t, msb.executionInfo.NextEventID, msb.GetNextEventID()) + assert.Equal(t, msb.pendingRequestCancelInfoIDs, msb.GetPendingRequestCancelExternalInfos()) + assert.Equal(t, msb.executionInfo.LastProcessedEvent, msb.GetPreviousStartedEventID()) + assert.Equal(t, msb.queryRegistry, msb.GetQueryRegistry()) + + startVersion, err := msb.GetStartVersion() + assert.NoError(t, err) + assert.Equal(t, msb.versionHistories.Histories[0].Items[0].Version, startVersion) + assert.Equal(t, msb.insertTimerTasks, msb.GetTimerTasks()) + assert.Equal(t, msb.insertTransferTasks, msb.GetTransferTasks()) + assert.Equal(t, msb.nextEventIDInDB, msb.GetUpdateCondition()) + assert.Equal(t, msb.versionHistories, msb.GetVersionHistories()) + + state, closeStatus := msb.GetWorkflowStateCloseStatus() + assert.Equal(t, msb.executionInfo.CloseStatus, closeStatus) + assert.Equal(t, msb.executionInfo.State, state) + assert.Equal(t, &types.WorkflowType{Name: msb.executionInfo.WorkflowTypeName}, msb.GetWorkflowType()) + + pendingActivityInfo, activityInfoIsPresent := msb.GetActivityInfo(1232) + assert.Equal(t, msb.pendingActivityInfoIDs[1232], pendingActivityInfo) + assert.True(t, activityInfoIsPresent) + + assert.Equal(t, msb.pendingActivityInfoIDs, msb.GetPendingActivityInfos()) + + pendingRequestCancelledInfo, ok := msb.GetRequestCancelInfo(13) + assert.Equal(t, msb.pendingRequestCancelInfoIDs[13], pendingRequestCancelledInfo) + assert.True(t, ok) + + pendingChildExecutions, ok := msb.GetChildExecutionInfo(1) + assert.Equal(t, msb.pendingChildExecutionInfoIDs[1], pendingChildExecutions) + assert.True(t, ok) + +} + +func TestMutableState_IsCurrentWorkflowGuaranteed(t *testing.T) { + tests := map[string]struct { + state int + expected bool + }{ + "created": { + state: persistence.WorkflowStateCreated, + expected: true, + }, + "running": { + state: persistence.WorkflowStateCreated, + expected: true, + }, + "completed": { + state: persistence.WorkflowStateCompleted, + expected: false, + }, + "void": { + state: persistence.WorkflowStateVoid, + expected: false, + }, + "zombie state": { + state: persistence.WorkflowStateZombie, + expected: false, + }, + "corrupted state": { + state: persistence.WorkflowStateCorrupted, + expected: false, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + msb := mutableStateBuilder{ + stateInDB: td.state, + } + assert.Equal(t, td.expected, msb.IsCurrentWorkflowGuaranteed()) + }) + } +} + +func createMSB() mutableStateBuilder { + + sampleDomain := cache.NewDomainCacheEntryForTest(&persistence.DomainInfo{ID: "domain-id", Name: "domain"}, &persistence.DomainConfig{}, true, nil, 0, nil, 0, 0, 0) + + return mutableStateBuilder{ + pendingActivityInfoIDs: map[int64]*persistence.ActivityInfo{ + 1232: &persistence.ActivityInfo{ActivityID: "activityID"}, + }, + pendingActivityIDToEventID: map[string]int64{ + "activityID": 6, + }, + updateActivityInfos: map[int64]*persistence.ActivityInfo{ + 7: &persistence.ActivityInfo{DomainID: "domainID"}, + }, + deleteActivityInfos: map[int64]struct{}{ + 8: struct{}{}, + }, + syncActivityTasks: map[int64]struct{}{}, + pendingTimerInfoIDs: map[string]*persistence.TimerInfo{ + "testdata-pendingTimerInfoIDs": &persistence.TimerInfo{ + Version: 1, + TimerID: "1232", + }, + }, + pendingTimerEventIDToID: map[int64]string{}, + updateTimerInfos: map[string]*persistence.TimerInfo{ + "testdata-updatedtimerinfos": &persistence.TimerInfo{ + Version: 1, + TimerID: "1232", + }, + }, + deleteTimerInfos: map[string]struct{}{}, + pendingChildExecutionInfoIDs: map[int64]*persistence.ChildExecutionInfo{ + 1: &persistence.ChildExecutionInfo{ + WorkflowTypeName: "sample-workflow", + }, + }, + updateChildExecutionInfos: map[int64]*persistence.ChildExecutionInfo{ + 8: &persistence.ChildExecutionInfo{DomainID: "updateChildInfosDomainID"}, + }, + deleteChildExecutionInfos: map[int64]struct{}{ + 12: struct{}{}, + }, + pendingRequestCancelInfoIDs: map[int64]*persistence.RequestCancelInfo{ + 13: &persistence.RequestCancelInfo{InitiatedID: 16}, + }, + updateRequestCancelInfos: map[int64]*persistence.RequestCancelInfo{}, + deleteRequestCancelInfos: map[int64]struct{}{ + 15: struct{}{}, + }, + pendingSignalInfoIDs: map[int64]*persistence.SignalInfo{}, + updateSignalInfos: map[int64]*persistence.SignalInfo{}, + deleteSignalInfos: map[int64]struct{}{}, + pendingSignalRequestedIDs: map[string]struct{}{}, + updateSignalRequestedIDs: map[string]struct{}{}, + deleteSignalRequestedIDs: map[string]struct{}{}, + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + clearBufferedEvents: false, + executionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "d9cbf563-3056-4387-b2ac-5fddd868fe4d", + WorkflowID: "53fc235c-093e-4b15-9d9d-045e61354b91", + RunID: "a2901718-ac12-443e-873d-b100f45d55d8", + FirstExecutionRunID: "a2901718-ac12-443e-873d-b100f45d55d8", + InitiatedID: -7, + TaskList: "tl", + WorkflowTypeName: "test", + WorkflowTimeout: 600000000, + DecisionStartToCloseTimeout: 10, + State: 1, + LastFirstEventID: 1, + LastEventTaskID: 2097153, + NextEventID: 3, + LastProcessedEvent: -23, + StartTimestamp: time.Date(2024, 10, 21, 20, 58, 1, 275000000, time.UTC), + LastUpdatedTimestamp: time.Date(2024, 10, 21, 20, 58, 1, 275000000, time.UTC), + CreateRequestID: "f33ee669-9ff6-4221-a2b0-feb2959667b8", + DecisionVersion: 1, + DecisionScheduleID: 2, + DecisionStartedID: -23, + DecisionRequestID: "emptyUuid", + DecisionTimeout: 10, + DecisionScheduledTimestamp: 1729544281275414000, + DecisionOriginalScheduledTimestamp: 1729544281275414000, + AutoResetPoints: &types.ResetPoints{}, + }, + versionHistories: &persistence.VersionHistories{ + Histories: []*persistence.VersionHistory{ + { + BranchToken: []byte("a branch token"), + Items: []*persistence.VersionHistoryItem{{ + EventID: 2, + Version: 1, + }}, + }, + }, + }, + currentVersion: int64(-24), + hasBufferedEventsInDB: false, + stateInDB: int(1), + nextEventIDInDB: int64(3), + domainEntry: sampleDomain, + appliedEvents: map[string]struct{}{}, + insertTransferTasks: []persistence.Task{ + &persistence.DecisionTask{ + DomainID: "decsion task", + }, + }, + insertReplicationTasks: []persistence.Task{}, + insertTimerTasks: []persistence.Task{ + &persistence.ActivityRetryTimerTask{ + TaskData: persistence.TaskData{}, + EventID: 123, + Attempt: 4, + }, + }, + workflowRequests: map[persistence.WorkflowRequest]struct{}{}, + checksum: checksum.Checksum{}, + executionStats: &persistence.ExecutionStats{HistorySize: 403}, + queryRegistry: query.NewRegistry(), + } +} + func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking