From e169be29b0ca0c2d529ef5114295df77194fb93f Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Wed, 30 Oct 2024 18:08:57 +0100 Subject: [PATCH] [internal] Improve code coverage of internal_task_pollers.go --- Makefile | 1 - codecov.yml | 1 + internal/internal_public.go | 6 +- internal/internal_task_pollers.go | 205 +++++++++++++------------ internal/internal_task_pollers_test.go | 107 ++++++++++++- internal/mock_local_dispatcher.go | 90 +++++++++++ internal/mock_workflow_task_handler.go | 100 ++++++++++++ 7 files changed, 410 insertions(+), 100 deletions(-) create mode 100644 internal/mock_local_dispatcher.go create mode 100644 internal/mock_workflow_task_handler.go diff --git a/Makefile b/Makefile index 57a9cd362..61d69c264 100644 --- a/Makefile +++ b/Makefile @@ -219,7 +219,6 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc # this needs to be both the files defining the generate command, AND the files that define the interfaces. $(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery $Q $(BIN_PATH) go generate ./... - $Q touch $@ # ==================================== # other intermediates diff --git a/codecov.yml b/codecov.yml index df8694f03..041fbf497 100644 --- a/codecov.yml +++ b/codecov.yml @@ -31,6 +31,7 @@ codecov: ignore: - "**/*_generated.go" - "**/*_mock.go" + - "**/mock_*.go" - "**/testdata/**" - "**/*_test.go" - "**/*_testsuite.go" diff --git a/internal/internal_public.go b/internal/internal_public.go index 6e2d85fd4..e874b89c4 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -33,6 +33,8 @@ import ( s "go.uber.org/cadence/.gen/go/shared" ) +//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE + type ( decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error) @@ -71,7 +73,7 @@ type ( // WorkflowTaskHandler represents decision task handlers. WorkflowTaskHandler interface { - // Processes the workflow task + // ProcessWorkflowTask processes the workflow task // The response could be: // - RespondDecisionTaskCompletedRequest // - RespondDecisionTaskFailedRequest @@ -84,7 +86,7 @@ type ( // ActivityTaskHandler represents activity task handlers. ActivityTaskHandler interface { - // Executes the activity task + // Execute executes the activity task // The response is one of the types: // - RespondActivityTaskCompletedRequest // - RespondActivityTaskFailedRequest diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c565a5833..382ed0623 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -45,6 +45,8 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) +//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE + const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta @@ -76,7 +78,7 @@ type ( identity string service workflowserviceclient.Interface taskHandler WorkflowTaskHandler - ldaTunnel *locallyDispatchedActivityTunnel + ldaTunnel LocalDispatcher metricsScope *metrics.TaggedScope logger *zap.Logger @@ -159,6 +161,11 @@ type ( stopCh <-chan struct{} metricsScope *metrics.TaggedScope } + + // LocalDispatcher is an interface to dispatch locally dispatched activities. + LocalDispatcher interface { + SendTask(task *locallyDispatchedActivityTask) bool + } ) func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel { @@ -214,7 +221,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit } } -func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool { +func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool { select { case ldat.taskCh <- task: return true @@ -349,7 +356,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { func(response interface{}, startTime time.Time) (*workflowTask, error) { wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId())) wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1) - heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime) + heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime) if err != nil { return nil, err } @@ -365,10 +372,10 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { if completedRequest == nil && err == nil { return nil } - if _, ok := err.(decisionHeartbeatError); ok { + if errors.As(err, new(*decisionHeartbeatError)) { return err } - response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime) + response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime) if err != nil { return err } @@ -397,8 +404,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa return nil } -func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { - +func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()) if taskErr != nil { metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1) @@ -416,7 +422,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime)) responseStartTime := time.Now() - if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil { + if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil { metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1) return } @@ -425,103 +431,114 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest return } -func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { ctx := context.Background() // Respond task completion. err = backoff.Retry(ctx, func() error { - tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags) - defer cancel() - var err1 error - switch request := completedRequest.(type) { - case *s.RespondDecisionTaskFailedRequest: - // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. - // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. - if task.Attempt != nil && task.GetAttempt() == 0 { - err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1)) - }) - } + response, err = wtp.respondTaskCompletedAttempt(completedRequest, task) + return err + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) + + return response, err +} + +func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) { + ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags) + defer cancel() + var ( + err error + response *s.RespondDecisionTaskCompletedResponse + operation string + ) + switch request := completedRequest.(type) { + case *s.RespondDecisionTaskFailedRequest: + err = wtp.handleDecisionFailedRequest(ctx, task, request) + operation = "RespondDecisionTaskFailed" + case *s.RespondDecisionTaskCompletedRequest: + response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request) + operation = "RespondDecisionTaskCompleted" + case *s.RespondQueryTaskCompletedRequest: + err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + operation = "RespondQueryTaskCompleted" + default: + // should not happen + panic("unknown request type from ProcessWorkflowTask()") + } + + traceLog(func() { + wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation)) + }) + + return response, err +} + +func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error { + // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. + // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. + if task.Attempt != nil && task.GetAttempt() == 0 { + return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + } + return nil +} + +func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) { + if request.StickyAttributes == nil && !wtp.disableStickyExecution { + request.StickyAttributes = &s.StickyExecutionAttributes{ + WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, + ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), + } + } else { + request.ReturnNewDecisionTask = common.BoolPtr(false) + } + + if wtp.ldaTunnel != nil { + var activityTasks []*locallyDispatchedActivityTask + for _, decision := range request.Decisions { + attr := decision.ScheduleActivityTaskDecisionAttributes + if attr != nil && wtp.taskListName == attr.TaskList.GetName() { + // assume the activity type is in registry otherwise the activity would be failed and retried from server + activityTask := &locallyDispatchedActivityTask{ + readyCh: make(chan bool, 1), + ActivityId: attr.ActivityId, + ActivityType: attr.ActivityType, + Input: attr.Input, + Header: attr.Header, + WorkflowDomain: common.StringPtr(wtp.domain), + ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, + StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, + WorkflowExecution: task.WorkflowExecution, + WorkflowType: task.WorkflowType, } - case *s.RespondDecisionTaskCompletedRequest: - if request.StickyAttributes == nil && !wtp.disableStickyExecution { - request.StickyAttributes = &s.StickyExecutionAttributes{ - WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, - ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), - } + if wtp.ldaTunnel.SendTask(activityTask) { + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) + decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) + activityTasks = append(activityTasks, activityTask) } else { - request.ReturnNewDecisionTask = common.BoolPtr(false) + // all pollers are busy - no room to optimize + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) } - var activityTasks []*locallyDispatchedActivityTask - if wtp.ldaTunnel != nil { - for _, decision := range request.Decisions { - attr := decision.ScheduleActivityTaskDecisionAttributes - if attr != nil && wtp.taskListName == attr.TaskList.GetName() { - // assume the activity type is in registry otherwise the activity would be failed and retried from server - activityTask := &locallyDispatchedActivityTask{ - readyCh: make(chan bool, 1), - ActivityId: attr.ActivityId, - ActivityType: attr.ActivityType, - Input: attr.Input, - Header: attr.Header, - WorkflowDomain: common.StringPtr(wtp.domain), - ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, - StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, - WorkflowExecution: task.WorkflowExecution, - WorkflowType: task.WorkflowType, - } - if wtp.ldaTunnel.sendTask(activityTask) { - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) - decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) - activityTasks = append(activityTasks, activityTask) - } else { - // all pollers are busy - no room to optimize - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) - } - } - } - } - defer func() { - for _, at := range activityTasks { - started := false - if response != nil && err1 == nil { - if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { - at.ScheduledTimestamp = adl.ScheduledTimestamp - at.StartedTimestamp = adl.StartedTimestamp - at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt - at.TaskToken = adl.TaskToken - started = true - } - } - at.readyCh <- started + } + } + defer func() { + for _, at := range activityTasks { + started := false + if response != nil && err == nil { + if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { + at.ScheduledTimestamp = adl.ScheduledTimestamp + at.StartedTimestamp = adl.StartedTimestamp + at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt + at.TaskToken = adl.TaskToken + started = true } - }() - response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1)) - }) } - - case *s.RespondQueryTaskCompletedRequest: - err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1)) - }) - } - default: - // should not happen - panic("unknown request type from ProcessWorkflowTask()") + at.readyCh <- started } + }() + } - return err1 - }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - - return + return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) } func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller { diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index b39084ca7..5318ddfd2 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -23,14 +23,31 @@ package internal import ( "context" "errors" - "testing" - "time" - + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" + s "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" + "go.uber.org/cadence/internal/common/metrics" "go.uber.org/zap/zaptest" + "testing" + "time" +) + +const ( + _testDomainName = "test-domain" + _testTaskList = "test-tasklist" + _testIdentity = "test-worker" ) +// Enable verbose logging for tests. +func TestMain(m *testing.M) { + EnableVerboseLogging(true) + m.Run() +} + func TestLocalActivityPanic(t *testing.T) { // regression: panics in local activities should not terminate the process s := WorkflowTestSuite{logger: zaptest.NewLogger(t)} @@ -54,3 +71,87 @@ func TestLocalActivityPanic(t *testing.T) { assert.Contains(t, perr.StackTrace(), "panic") assert.Contains(t, perr.StackTrace(), t.Name(), "should mention the source location of the local activity that panicked") } + +func TestRespondTaskCompleted_failed(t *testing.T) { + t.Run("fail sends RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) + t.Run("fail fails to send RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(assert.AnError) + + // We cannot test RespondTaskCompleted since it uses backoff and has a hardcoded retry mechanism for 60 seconds. + _, err := poller.respondTaskCompletedAttempt(errorToFailDecisionTask(testTaskToken, assert.AnError, _testIdentity), &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }) + assert.ErrorIs(t, err, assert.AnError) + }) + t.Run("fail skips sending for not the first attempt", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + Attempt: common.Int64Ptr(1), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) + +} + +func TestRespondTaskCompleted_Unsupported(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + assert.Panics(t, func() { + _, _ = poller.RespondTaskCompleted(assert.AnError, nil, &s.PollForDecisionTaskResponse{}, time.Now()) + }) +} + +func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *MockLocalDispatcher) { + ctrl := gomock.NewController(t) + mockService := workflowservicetest.NewMockClient(ctrl) + taskHandler := &MockWorkflowTaskHandler{} + lda := &MockLocalDispatcher{} + + return &workflowTaskPoller{ + basePoller: basePoller{ + shutdownC: make(<-chan struct{}), + }, + domain: _testDomainName, + taskListName: _testTaskList, + identity: _testIdentity, + service: mockService, + taskHandler: taskHandler, + ldaTunnel: lda, + metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, + logger: zaptest.NewLogger(t), + stickyUUID: "", + disableStickyExecution: false, + StickyScheduleToStartTimeout: time.Millisecond, + featureFlags: FeatureFlags{}, + }, mockService, taskHandler, lda +} diff --git a/internal/mock_local_dispatcher.go b/internal/mock_local_dispatcher.go new file mode 100644 index 000000000..fd72c5484 --- /dev/null +++ b/internal/mock_local_dispatcher.go @@ -0,0 +1,90 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// MockLocalDispatcher is an autogenerated mock type for the LocalDispatcher type +type MockLocalDispatcher struct { + mock.Mock +} + +type MockLocalDispatcher_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLocalDispatcher) EXPECT() *MockLocalDispatcher_Expecter { + return &MockLocalDispatcher_Expecter{mock: &_m.Mock} +} + +// SendTask provides a mock function with given fields: task +func (_m *MockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) bool { + ret := _m.Called(task) + + var r0 bool + if rf, ok := ret.Get(0).(func(*locallyDispatchedActivityTask) bool); ok { + r0 = rf(task) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockLocalDispatcher_SendTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTask' +type MockLocalDispatcher_SendTask_Call struct { + *mock.Call +} + +// SendTask is a helper method to define mock.On call +// - task *locallyDispatchedActivityTask +func (_e *MockLocalDispatcher_Expecter) SendTask(task interface{}) *MockLocalDispatcher_SendTask_Call { + return &MockLocalDispatcher_SendTask_Call{Call: _e.mock.On("SendTask", task)} +} + +func (_c *MockLocalDispatcher_SendTask_Call) Run(run func(task *locallyDispatchedActivityTask)) *MockLocalDispatcher_SendTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*locallyDispatchedActivityTask)) + }) + return _c +} + +func (_c *MockLocalDispatcher_SendTask_Call) Return(_a0 bool) *MockLocalDispatcher_SendTask_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockLocalDispatcher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockLocalDispatcher creates a new instance of MockLocalDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLocalDispatcher(t mockConstructorTestingTNewMockLocalDispatcher) *MockLocalDispatcher { + mock := &MockLocalDispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mock_workflow_task_handler.go b/internal/mock_workflow_task_handler.go new file mode 100644 index 000000000..db64a8dc7 --- /dev/null +++ b/internal/mock_workflow_task_handler.go @@ -0,0 +1,100 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// MockWorkflowTaskHandler is an autogenerated mock type for the WorkflowTaskHandler type +type MockWorkflowTaskHandler struct { + mock.Mock +} + +type MockWorkflowTaskHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWorkflowTaskHandler) EXPECT() *MockWorkflowTaskHandler_Expecter { + return &MockWorkflowTaskHandler_Expecter{mock: &_m.Mock} +} + +// ProcessWorkflowTask provides a mock function with given fields: task, f +func (_m *MockWorkflowTaskHandler) ProcessWorkflowTask(task *workflowTask, f decisionHeartbeatFunc) (interface{}, error) { + ret := _m.Called(task, f) + + var r0 interface{} + if rf, ok := ret.Get(0).(func(*workflowTask, decisionHeartbeatFunc) interface{}); ok { + r0 = rf(task, f) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*workflowTask, decisionHeartbeatFunc) error); ok { + r1 = rf(task, f) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWorkflowTaskHandler_ProcessWorkflowTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessWorkflowTask' +type MockWorkflowTaskHandler_ProcessWorkflowTask_Call struct { + *mock.Call +} + +// ProcessWorkflowTask is a helper method to define mock.On call +// - task *workflowTask +// - f decisionHeartbeatFunc +func (_e *MockWorkflowTaskHandler_Expecter) ProcessWorkflowTask(task interface{}, f interface{}) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + return &MockWorkflowTaskHandler_ProcessWorkflowTask_Call{Call: _e.mock.On("ProcessWorkflowTask", task, f)} +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Run(run func(task *workflowTask, f decisionHeartbeatFunc)) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*workflowTask), args[1].(decisionHeartbeatFunc)) + }) + return _c +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Return(response interface{}, err error) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Return(response, err) + return _c +} + +type mockConstructorTestingTNewMockWorkflowTaskHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockWorkflowTaskHandler creates a new instance of MockWorkflowTaskHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockWorkflowTaskHandler(t mockConstructorTestingTNewMockWorkflowTaskHandler) *MockWorkflowTaskHandler { + mock := &MockWorkflowTaskHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}