diff --git a/service/worker/indexer/indexer.go b/service/worker/indexer/indexer.go index 5e0650bdfae..d645911c8e9 100644 --- a/service/worker/indexer/indexer.go +++ b/service/worker/indexer/indexer.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination indexer_mock.go github.com/uber/cadence/service/worker/indexer ESProcessor + package indexer import ( diff --git a/service/worker/indexer/indexer_mock.go b/service/worker/indexer/indexer_mock.go new file mode 100644 index 00000000000..35a8d00f9f9 --- /dev/null +++ b/service/worker/indexer/indexer_mock.go @@ -0,0 +1,95 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: indexer.go + +// Package indexer is a generated GoMock package. +package indexer + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + bulk "github.com/uber/cadence/common/elasticsearch/bulk" + messaging "github.com/uber/cadence/common/messaging" +) + +// MockESProcessor is a mock of ESProcessor interface. +type MockESProcessor struct { + ctrl *gomock.Controller + recorder *MockESProcessorMockRecorder +} + +// MockESProcessorMockRecorder is the mock recorder for MockESProcessor. +type MockESProcessorMockRecorder struct { + mock *MockESProcessor +} + +// NewMockESProcessor creates a new mock instance. +func NewMockESProcessor(ctrl *gomock.Controller) *MockESProcessor { + mock := &MockESProcessor{ctrl: ctrl} + mock.recorder = &MockESProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockESProcessor) EXPECT() *MockESProcessorMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockESProcessor) Add(request *bulk.GenericBulkableAddRequest, key string, kafkaMsg messaging.Message) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Add", request, key, kafkaMsg) +} + +// Add indicates an expected call of Add. +func (mr *MockESProcessorMockRecorder) Add(request, key, kafkaMsg interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockESProcessor)(nil).Add), request, key, kafkaMsg) +} + +// Start mocks base method. +func (m *MockESProcessor) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockESProcessorMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockESProcessor)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockESProcessor) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockESProcessorMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockESProcessor)(nil).Stop)) +} diff --git a/service/worker/indexer/indexer_test.go b/service/worker/indexer/indexer_test.go index c1aea92a575..c049a8ce694 100644 --- a/service/worker/indexer/indexer_test.go +++ b/service/worker/indexer/indexer_test.go @@ -21,17 +21,22 @@ package indexer import ( + "fmt" + "sync/atomic" "testing" "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "go.uber.org/goleak" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/elasticsearch/bulk" mocks2 "github.com/uber/cadence/common/elasticsearch/bulk/mocks" esMocks "github.com/uber/cadence/common/elasticsearch/mocks" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" @@ -39,7 +44,6 @@ import ( func TestNewDualIndexer(t *testing.T) { ctrl := gomock.NewController(t) - defer ctrl.Finish() config := &Config{ ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), @@ -82,3 +86,136 @@ func TestNewIndexer(t *testing.T) { indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient()) assert.NotNil(t, indexer) } + +// TestIndexerStart tests the Start method of Indexer +func TestIndexerStart(t *testing.T) { + ctrl := gomock.NewController(t) + + config := &Config{ + ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), + ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10), + ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), + ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), + IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1), + } + mockConsumer := messaging.NewMockConsumer(ctrl) + mockConsumer.EXPECT().Start().Return(nil).Times(1) + messageChan := make(chan messaging.Message) + mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).Times(1) + mockConsumer.EXPECT().Stop().Return().Times(1) + mockvisibiltyProcessor := NewMockESProcessor(ctrl) + mockvisibiltyProcessor.EXPECT().Start().Return().Times(1) + mockvisibiltyProcessor.EXPECT().Stop().Return().Times(1) + + indexer := &Indexer{ + config: config, + esIndexName: "test-index", + consumer: mockConsumer, + logger: log.NewNoop(), + scope: metrics.NoopScope(metrics.IndexProcessorScope), + shutdownCh: make(chan struct{}), + visibilityProcessor: mockvisibiltyProcessor, + msgEncoder: defaultEncoder, + } + err := indexer.Start() + assert.NoError(t, err) + close(messageChan) + + indexer.Stop() + defer goleak.VerifyNone(t) +} + +// TestIndexerStart_ConsumerError tests the Start method when consumer.Start returns an error +func TestIndexerStart_ConsumerError(t *testing.T) { + ctrl := gomock.NewController(t) + + config := &Config{ + ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), + ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10), + ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), + ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), + IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1), + } + mockConsumer := messaging.NewMockConsumer(ctrl) + mockConsumer.EXPECT().Start().Return(fmt.Errorf("some error")).Times(1) + mockvisibiltyProcessor := NewMockESProcessor(ctrl) + + indexer := &Indexer{ + config: config, + esIndexName: "test-index", + consumer: mockConsumer, + logger: log.NewNoop(), + scope: metrics.NoopScope(metrics.IndexProcessorScope), + shutdownCh: make(chan struct{}), + visibilityProcessor: mockvisibiltyProcessor, + msgEncoder: defaultEncoder, + } + err := indexer.Start() + assert.ErrorContains(t, err, "some error") + +} + +func TestIndexerStop(t *testing.T) { + ctrl := gomock.NewController(t) + + config := &Config{ + ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1), + ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10), + ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), + ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), + IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1), + } + + // Mock the messaging consumer + mockConsumer := messaging.NewMockConsumer(ctrl) + messageChan := make(chan messaging.Message) + mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).AnyTimes() + // No specific expectations for Start or Stop since they're not called in Stop() + + // Mock the visibility processor + mockVisibilityProcessor := NewMockESProcessor(ctrl) + // Create the Indexer instance with mocks + indexer := &Indexer{ + config: config, + esIndexName: "test-index", + consumer: mockConsumer, + logger: log.NewNoop(), + scope: metrics.NoopScope(metrics.IndexProcessorScope), + shutdownCh: make(chan struct{}), + visibilityProcessor: mockVisibilityProcessor, + msgEncoder: defaultEncoder, + } + + // Simulate that the indexer was started + atomic.StoreInt32(&indexer.isStarted, 1) + + // Simulate the processorPump goroutine that waits on shutdownCh + indexer.shutdownWG.Add(1) + go func() { + defer indexer.shutdownWG.Done() + <-indexer.shutdownCh + }() + + // Call Stop and verify behavior + indexer.Stop() + + // Verify that shutdownCh is closed + select { + case <-indexer.shutdownCh: + // Expected: shutdownCh should be closed + default: + t.Error("shutdownCh is not closed") + } + + // Verify that the WaitGroup has completed + success := common.AwaitWaitGroup(&indexer.shutdownWG, time.Second) + assert.True(t, success) + + // Verify that isStopped flag is set + assert.Equal(t, int32(1), atomic.LoadInt32(&indexer.isStopped)) + + // Call Stop again to ensure idempotency + indexer.Stop() + defer goleak.VerifyNone(t) + +}