diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index dfe9d27617d..64f9422bfd5 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -143,10 +143,9 @@ type ( // TODO: persist this to db appliedEvents map[string]struct{} - insertTransferTasks []persistence.Task - insertCrossClusterTasks []persistence.Task - insertReplicationTasks []persistence.Task - insertTimerTasks []persistence.Task + insertTransferTasks []persistence.Task + insertReplicationTasks []persistence.Task + insertTimerTasks []persistence.Task workflowRequests map[persistence.WorkflowRequest]struct{} @@ -1348,12 +1347,6 @@ func (e *mutableStateBuilder) AddTransferTasks( e.insertTransferTasks = append(e.insertTransferTasks, transferTasks...) } -func (e *mutableStateBuilder) AddCrossClusterTasks( - crossClusterTasks ...persistence.Task, -) { - e.insertCrossClusterTasks = append(e.insertCrossClusterTasks, crossClusterTasks...) -} - // TODO convert AddTimerTasks to prepareTimerTasks func (e *mutableStateBuilder) AddTimerTasks( timerTasks ...persistence.Task, @@ -1374,10 +1367,6 @@ func (e *mutableStateBuilder) DeleteTransferTasks() { e.insertTransferTasks = nil } -func (e *mutableStateBuilder) DeleteCrossClusterTasks() { - e.insertCrossClusterTasks = nil -} - func (e *mutableStateBuilder) DeleteTimerTasks() { e.insertTimerTasks = nil } @@ -1483,10 +1472,9 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( NewBufferedEvents: e.updateBufferedEvents, ClearBufferedEvents: e.clearBufferedEvents, - TransferTasks: e.insertTransferTasks, - CrossClusterTasks: e.insertCrossClusterTasks, - ReplicationTasks: e.insertReplicationTasks, - TimerTasks: e.insertTimerTasks, + TransferTasks: e.insertTransferTasks, + ReplicationTasks: e.insertReplicationTasks, + TimerTasks: e.insertTimerTasks, WorkflowRequests: convertWorkflowRequests(e.workflowRequests), @@ -1563,10 +1551,9 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( SignalInfos: convertPendingSignalInfos(e.pendingSignalInfoIDs), SignalRequestedIDs: convertStringSetToSlice(e.pendingSignalRequestedIDs), - TransferTasks: e.insertTransferTasks, - CrossClusterTasks: e.insertCrossClusterTasks, - ReplicationTasks: e.insertReplicationTasks, - TimerTasks: e.insertTimerTasks, + TransferTasks: e.insertTransferTasks, + ReplicationTasks: e.insertReplicationTasks, + TimerTasks: e.insertTimerTasks, WorkflowRequests: convertWorkflowRequests(e.workflowRequests), @@ -1675,7 +1662,6 @@ func (e *mutableStateBuilder) cleanupTransaction() error { e.nextEventIDInDB = e.GetNextEventID() e.insertTransferTasks = nil - e.insertCrossClusterTasks = nil e.insertReplicationTasks = nil e.insertTimerTasks = nil diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 5006f8e3535..767e157e697 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -2876,6 +2876,536 @@ func createMSB() mutableStateBuilder { } } +func TestMutableStateBuilder_GetTransferTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTransferTasks: []persistence.Task{ + &persistence.ActivityTask{}, + &persistence.DecisionTask{}, + }, + } + tasks := msb.GetTransferTasks() + assert.Equal(t, 2, len(tasks)) + assert.IsType(t, &persistence.ActivityTask{}, tasks[0]) + assert.IsType(t, &persistence.DecisionTask{}, tasks[1]) +} + +func TestMutableStateBuilder_GetTimerTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTimerTasks: []persistence.Task{ + &persistence.UserTimerTask{}, + }, + } + tasks := msb.GetTimerTasks() + assert.Equal(t, 1, len(tasks)) + assert.IsType(t, &persistence.UserTimerTask{}, tasks[0]) +} + +func TestMutableStateBuilder_DeleteTransferTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTransferTasks: []persistence.Task{ + &persistence.ActivityTask{}, + }, + } + msb.DeleteTransferTasks() + assert.Nil(t, msb.insertTransferTasks) +} + +func TestMutableStateBuilder_DeleteTimerTasks(t *testing.T) { + msb := &mutableStateBuilder{ + insertTimerTasks: []persistence.Task{ + &persistence.UserTimerTask{}, + }, + } + msb.DeleteTimerTasks() + assert.Nil(t, msb.insertTimerTasks) +} + +func TestMutableStateBuilder_SetUpdateCondition(t *testing.T) { + msb := &mutableStateBuilder{} + msb.SetUpdateCondition(123) + assert.Equal(t, int64(123), msb.nextEventIDInDB) +} + +func TestMutableStateBuilder_GetUpdateCondition(t *testing.T) { + msb := &mutableStateBuilder{ + nextEventIDInDB: 123, + } + assert.Equal(t, int64(123), msb.GetUpdateCondition()) +} + +func TestCheckAndClearTimerFiredEvent(t *testing.T) { + tests := []struct { + name string + timerID string + bufferedEvents []*types.HistoryEvent + updateBufferedEvents []*types.HistoryEvent + history []*types.HistoryEvent + expectedTimerEvent *types.HistoryEvent + expectedBufferedEvents []*types.HistoryEvent + expectedUpdateBufferedEvents []*types.HistoryEvent + expectedHistory []*types.HistoryEvent + }{ + { + name: "TimerFiredEventInBufferedEvents", + timerID: "timer1", + bufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + }, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + }, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, + }, + { + name: "TimerFiredEventInUpdateBufferedEvents", + timerID: "timer2", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + history: []*types.HistoryEvent{}, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer2", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{}, + expectedUpdateBufferedEvents: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + expectedHistory: []*types.HistoryEvent{}, + }, + { + name: "TimerFiredEventInHistory", + timerID: "timer3", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + }, + expectedTimerEvent: &types.HistoryEvent{ + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer3", + }, + }, + expectedBufferedEvents: []*types.HistoryEvent{}, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{ + { + EventType: types.EventTypeTimerFired.Ptr(), + TimerFiredEventAttributes: &types.TimerFiredEventAttributes{ + TimerID: "timer1", + }, + }, + }, + }, + { + name: "NoTimerFiredEvent", + timerID: "timer4", + bufferedEvents: []*types.HistoryEvent{}, + updateBufferedEvents: []*types.HistoryEvent{}, + history: []*types.HistoryEvent{}, + expectedTimerEvent: nil, + expectedBufferedEvents: []*types.HistoryEvent{}, + expectedUpdateBufferedEvents: []*types.HistoryEvent{}, + expectedHistory: []*types.HistoryEvent{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msb := &mutableStateBuilder{ + bufferedEvents: tt.bufferedEvents, + updateBufferedEvents: tt.updateBufferedEvents, + hBuilder: &HistoryBuilder{history: tt.history}, + } + + timerEvent := msb.checkAndClearTimerFiredEvent(tt.timerID) + + assert.Equal(t, tt.expectedTimerEvent, timerEvent) + assert.Equal(t, tt.expectedBufferedEvents, msb.bufferedEvents) + assert.Equal(t, tt.expectedUpdateBufferedEvents, msb.updateBufferedEvents) + assert.Equal(t, tt.expectedHistory, msb.hBuilder.history) + }) + } +} + +func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) { + + tests := map[string]struct { + transientHistory []*types.HistoryEvent + taskID int64 + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedEvents []*types.HistoryEvent + expectedErr error + }{ + "AssignTaskIDToSingleEvent - transient": { + transientHistory: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 123, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: 123, + }, + }, + }, + "AssignTaskIDToMultipleEvents - transient": { + transientHistory: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + { + ID: 2, + EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 456, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(2).Return([]int64{123, 124}, nil).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: 123, + }, + { + ID: 2, + EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + TaskID: 124, + }, + }, + }, + "NoEvents - transient events": { + transientHistory: []*types.HistoryEvent{}, + taskID: 789, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + }, + expectedEvents: []*types.HistoryEvent{}, + }, + "error returned": { + transientHistory: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 456, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(1).Return(nil, assert.AnError).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + expectedErr: assert.AnError, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.hBuilder.transientHistory = td.transientHistory + + err := msb.assignTaskIDToEvents() + + assert.Equal(t, td.expectedEvents, msb.hBuilder.transientHistory) + assert.Equal(t, td.expectedErr, err) + }) + } +} + +func TestAssignTaskIDToHistoryEvents(t *testing.T) { + + tests := map[string]struct { + history []*types.HistoryEvent + taskID int64 + shardContextExpectations func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) + expectedEvents []*types.HistoryEvent + expectedErr error + }{ + "AssignTaskIDToSingleEvent": { + history: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 123, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: 123, + }, + }, + }, + "AssignTaskIDToMultipleEvents": { + history: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + { + ID: 2, + EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 456, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(2).Return([]int64{123, 124}, nil).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: 123, + }, + { + ID: 2, + EventType: types.EventTypeDecisionTaskScheduled.Ptr(), + TaskID: 124, + }, + }, + }, + "NoEvents - transient events": { + history: []*types.HistoryEvent{}, + taskID: 789, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + }, + expectedEvents: []*types.HistoryEvent{}, + }, + "error returned": { + history: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + taskID: 456, + shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) { + shardContext.EXPECT().GenerateTransferTaskIDs(1).Return(nil, assert.AnError).Times(1) + }, + expectedEvents: []*types.HistoryEvent{ + { + ID: 1, + EventType: types.EventTypeWorkflowExecutionStarted.Ptr(), + TaskID: common.EmptyEventTaskID, + }, + }, + expectedErr: assert.AnError, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + td.shardContextExpectations(mockCache, shardContext, mockDomainCache) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.hBuilder.history = td.history + + err := msb.assignTaskIDToEvents() + + assert.Equal(t, td.expectedEvents, msb.hBuilder.history) + assert.Equal(t, td.expectedErr, err) + }) + } +} + +func TestAddUpsertWorkflowSearchAttributesEvent(t *testing.T) { + + now := time.Unix(1730353941, 0) + + tests := map[string]struct { + decisionCompletedEventID int64 + request *types.UpsertWorkflowSearchAttributesDecisionAttributes + mutableStateBuilderSetup func(m *mutableStateBuilder) + expectedEvent *types.HistoryEvent + expectedErr error + }{ + "successful upsert": { + decisionCompletedEventID: 123, + request: &types.UpsertWorkflowSearchAttributesDecisionAttributes{ + SearchAttributes: &types.SearchAttributes{ + IndexedFields: map[string][]byte{ + "CustomKeywordField": []byte("keyword"), + }, + }, + }, + mutableStateBuilderSetup: func(m *mutableStateBuilder) { + }, + expectedEvent: &types.HistoryEvent{ + ID: 1, + EventType: types.EventTypeUpsertWorkflowSearchAttributes.Ptr(), + UpsertWorkflowSearchAttributesEventAttributes: &types.UpsertWorkflowSearchAttributesEventAttributes{ + DecisionTaskCompletedEventID: 123, + SearchAttributes: &types.SearchAttributes{ + IndexedFields: map[string][]byte{ + "CustomKeywordField": []byte("keyword"), + }, + }, + }, + TaskID: common.EmptyEventTaskID, + Version: common.EmptyVersion, + Timestamp: common.Ptr(now.UnixNano()), + }, + expectedErr: nil, + }, + "mutability check fails": { + decisionCompletedEventID: 123, + request: &types.UpsertWorkflowSearchAttributesDecisionAttributes{ + SearchAttributes: &types.SearchAttributes{ + IndexedFields: map[string][]byte{ + "CustomKeywordField": []byte("keyword"), + }, + }, + }, + mutableStateBuilderSetup: func(m *mutableStateBuilder) { + m.executionInfo.State = persistence.WorkflowStateCompleted + }, + expectedEvent: nil, + expectedErr: &types.InternalServiceError{Message: "invalid mutable state action: mutation after finish"}, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shard.NewMockContext(ctrl) + mockCache := events.NewMockCache(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + + nowClock := clock.NewMockedTimeSourceAt(now) + + msb := createMSBWithMocks(mockCache, shardContext, mockDomainCache) + + msb.hBuilder = &HistoryBuilder{ + history: []*types.HistoryEvent{}, + msBuilder: msb, + } + + td.mutableStateBuilderSetup(msb) + + msb.timeSource = nowClock + + event, err := msb.AddUpsertWorkflowSearchAttributesEvent(td.decisionCompletedEventID, td.request) + + assert.Equal(t, td.expectedEvent, event) + assert.Equal(t, td.expectedErr, err) + }) + } +} + func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) *mutableStateBuilder { // the MSB constructor calls a bunch of endpoints on the mocks, so // put them in here as a set of fixed expectations so the actual mocking