diff --git a/service/worker/batcher/batcher_test.go b/service/worker/batcher/batcher_test.go index 529fe711bf7..8c7ce5e71dc 100644 --- a/service/worker/batcher/batcher_test.go +++ b/service/worker/batcher/batcher_test.go @@ -33,9 +33,7 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" - mmocks "github.com/uber/cadence/common/metrics/mocks" "github.com/uber/cadence/common/resource" - "github.com/uber/cadence/common/types" ) func Test__Start(t *testing.T) { @@ -50,30 +48,17 @@ func setuptest(t *testing.T) (*Batcher, *resource.Test) { mockResource := resource.NewTest(t, ctrl, metrics.Worker) mockClientBean := client.NewMockBean(ctrl) - mockResource.FrontendClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()).Return(&types.DescribeDomainResponse{}, nil).AnyTimes() - mockResource.FrontendClient.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&types.ListWorkflowExecutionsResponse{ - Executions: []*types.WorkflowExecutionInfo{{Execution: &types.WorkflowExecution{WorkflowID: "wid", RunID: "rid"}}}, - NextPageToken: nil, - }, nil).AnyTimes() - mockResource.FrontendClient.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&types.CountWorkflowExecutionsResponse{ - Count: 1, - }, nil).AnyTimes() - mockClientBean.EXPECT().GetFrontendClient().Return(mockResource.FrontendClient).AnyTimes() - - mockClientBean.EXPECT().GetRemoteAdminClient(gomock.Any()).Return(mockResource.RemoteAdminClient).AnyTimes() - mockResource.SDKClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.DescribeDomainResponse{}, nil).AnyTimes() mockResource.SDKClient.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForDecisionTaskResponse{}, nil).AnyTimes() mockResource.SDKClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForActivityTaskResponse{}, nil).AnyTimes() sdkClient := mockResource.GetSDKClient() + mockClientBean.EXPECT().GetFrontendClient().Return(mockResource.FrontendClient).AnyTimes() + mockClientBean.EXPECT().GetRemoteAdminClient(gomock.Any()).Return(mockResource.RemoteAdminClient).AnyTimes() - metricsMock := &mmocks.Client{} - metricsMock.On("IncCounter", metrics.BatcherScope, metrics.BatcherProcessorFailures).Once() return New(&BootstrapParams{ Logger: testlogger.New(t), ServiceClient: sdkClient, ClientBean: mockClientBean, - MetricsClient: metricsMock, TallyScope: tally.TestScope(nil), }), mockResource } diff --git a/service/worker/batcher/workflow_test.go b/service/worker/batcher/workflow_test.go index ad8ff645ad8..69a00a7c4a8 100644 --- a/service/worker/batcher/workflow_test.go +++ b/service/worker/batcher/workflow_test.go @@ -24,14 +24,18 @@ package batcher import ( "context" - "testing" - + "github.com/golang/mock/gomock" "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" + mmocks "github.com/uber/cadence/common/metrics/mocks" + "github.com/uber/cadence/common/types" "go.uber.org/cadence/testsuite" "go.uber.org/cadence/worker" + "testing" ) type workflowSuite struct { @@ -51,6 +55,34 @@ func (s *workflowSuite) SetupTest() { s.activityEnv = s.NewTestActivityEnvironment() s.activityEnv.RegisterActivity(BatchActivity) + + batcher, mockResource := setuptest(s.T()) + + metricsMock := &mmocks.Client{} + metricsMock.On("IncCounter", metrics.BatcherScope, metrics.BatcherProcessorSuccess).Once() + batcher.metricsClient = metricsMock + + mockResource.FrontendClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()).Return(&types.DescribeDomainResponse{}, nil).AnyTimes() + mockResource.FrontendClient.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&types.ListWorkflowExecutionsResponse{ + Executions: []*types.WorkflowExecutionInfo{{Execution: &types.WorkflowExecution{WorkflowID: "wid", RunID: "rid"}}}, + NextPageToken: nil, + }, nil).AnyTimes() + mockResource.FrontendClient.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&types.CountWorkflowExecutionsResponse{Count: 1}, nil).AnyTimes() + mockResource.FrontendClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockResource.FrontendClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).Return(&types.DescribeWorkflowExecutionResponse{}, nil).AnyTimes() + mockResource.FrontendClient.EXPECT().SignalWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockResource.FrontendClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + mockResource.RemoteAdminClient.EXPECT().ResendReplicationTasks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + ctx := context.WithValue(context.Background(), batcherContextKey, batcher) + workerOpts := worker.Options{ + MetricsScope: tally.TestScope(nil), + BackgroundActivityContext: ctx, + Tracer: opentracing.GlobalTracer(), + } + s.activityEnv.SetWorkerOptions(workerOpts) + } func (s *workflowSuite) TestWorkflow() { @@ -79,26 +111,18 @@ func (s *workflowSuite) TestWorkflow() { } func (s *workflowSuite) TestActivity_BatchCancel() { - batcher, _ := setuptest(s.T()) - ctx := context.WithValue(context.Background(), batcherContextKey, batcher) - workerOpts := worker.Options{ - MetricsScope: tally.TestScope(nil), - BackgroundActivityContext: ctx, - Tracer: opentracing.GlobalTracer(), - } - s.activityEnv.SetWorkerOptions(workerOpts) - s.activityEnv.SetHeartbeatDetails(HeartBeatDetails{}) - params := BatchParams{ - DomainName: "test-domain", - Query: "Closetime=missing", - Reason: "unit-test", - BatchType: BatchTypeCancel, - TerminateParams: TerminateParams{}, - CancelParams: CancelParams{}, + DomainName: "test-domain", + Query: "Closetime=missing", + Reason: "unit-test", + BatchType: BatchTypeCancel, + TerminateParams: TerminateParams{}, + CancelParams: CancelParams{ + CancelChildren: common.BoolPtr(true), + }, SignalParams: SignalParams{}, ReplicateParams: ReplicateParams{}, - RPS: 0, + RPS: 5, Concurrency: 5, PageSize: 10, AttemptsOnRetryableError: 0, @@ -112,15 +136,6 @@ func (s *workflowSuite) TestActivity_BatchCancel() { } func (s *workflowSuite) TestActivity_BatchTerminate() { - batcher, _ := setuptest(s.T()) - ctx := context.WithValue(context.Background(), batcherContextKey, batcher) - workerOpts := worker.Options{ - MetricsScope: tally.TestScope(nil), - BackgroundActivityContext: ctx, - Tracer: opentracing.GlobalTracer(), - } - s.activityEnv.SetWorkerOptions(workerOpts) - params := BatchParams{ DomainName: "test-domain", Query: "Closetime=missing", @@ -130,7 +145,7 @@ func (s *workflowSuite) TestActivity_BatchTerminate() { CancelParams: CancelParams{}, SignalParams: SignalParams{}, ReplicateParams: ReplicateParams{}, - RPS: 0, + RPS: 5, Concurrency: 5, PageSize: 10, AttemptsOnRetryableError: 0, @@ -144,15 +159,6 @@ func (s *workflowSuite) TestActivity_BatchTerminate() { } func (s *workflowSuite) TestActivity_BatchSignal() { - batcher, _ := setuptest(s.T()) - ctx := context.WithValue(context.Background(), batcherContextKey, batcher) - workerOpts := worker.Options{ - MetricsScope: tally.TestScope(nil), - BackgroundActivityContext: ctx, - Tracer: opentracing.GlobalTracer(), - } - s.activityEnv.SetWorkerOptions(workerOpts) - params := BatchParams{ DomainName: "test-domain", Query: "Closetime=missing", @@ -162,7 +168,7 @@ func (s *workflowSuite) TestActivity_BatchSignal() { CancelParams: CancelParams{}, SignalParams: SignalParams{}, ReplicateParams: ReplicateParams{}, - RPS: 0, + RPS: 5, Concurrency: 5, PageSize: 10, AttemptsOnRetryableError: 0, @@ -176,15 +182,6 @@ func (s *workflowSuite) TestActivity_BatchSignal() { } func (s *workflowSuite) TestActivity_BatchReplicate() { - batcher, _ := setuptest(s.T()) - ctx := context.WithValue(context.Background(), batcherContextKey, batcher) - workerOpts := worker.Options{ - MetricsScope: tally.TestScope(nil), - BackgroundActivityContext: ctx, - Tracer: opentracing.GlobalTracer(), - } - s.activityEnv.SetWorkerOptions(workerOpts) - params := BatchParams{ DomainName: "test-domain", Query: "Closetime=missing", @@ -194,7 +191,7 @@ func (s *workflowSuite) TestActivity_BatchReplicate() { CancelParams: CancelParams{}, SignalParams: SignalParams{}, ReplicateParams: ReplicateParams{}, - RPS: 0, + RPS: 5, Concurrency: 5, PageSize: 10, AttemptsOnRetryableError: 0,