Skip to content

Commit

Permalink
Add mock ESProcessor and unit test to start and stop indexer (#6423)
Browse files Browse the repository at this point in the history
  • Loading branch information
timl3136 authored Oct 28, 2024
1 parent b84679f commit 040e28e
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 1 deletion.
2 changes: 2 additions & 0 deletions service/worker/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination indexer_mock.go github.com/uber/cadence/service/worker/indexer ESProcessor

package indexer

import (
Expand Down
95 changes: 95 additions & 0 deletions service/worker/indexer/indexer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 138 additions & 1 deletion service/worker/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,29 @@
package indexer

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/goleak"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/elasticsearch/bulk"
mocks2 "github.com/uber/cadence/common/elasticsearch/bulk/mocks"
esMocks "github.com/uber/cadence/common/elasticsearch/mocks"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
)

func TestNewDualIndexer(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
Expand Down Expand Up @@ -82,3 +86,136 @@ func TestNewIndexer(t *testing.T) {
indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
assert.NotNil(t, indexer)
}

// TestIndexerStart tests the Start method of Indexer
func TestIndexerStart(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}
mockConsumer := messaging.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(nil).Times(1)
messageChan := make(chan messaging.Message)
mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).Times(1)
mockConsumer.EXPECT().Stop().Return().Times(1)
mockvisibiltyProcessor := NewMockESProcessor(ctrl)
mockvisibiltyProcessor.EXPECT().Start().Return().Times(1)
mockvisibiltyProcessor.EXPECT().Stop().Return().Times(1)

indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockvisibiltyProcessor,
msgEncoder: defaultEncoder,
}
err := indexer.Start()
assert.NoError(t, err)
close(messageChan)

indexer.Stop()
defer goleak.VerifyNone(t)
}

// TestIndexerStart_ConsumerError tests the Start method when consumer.Start returns an error
func TestIndexerStart_ConsumerError(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}
mockConsumer := messaging.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(fmt.Errorf("some error")).Times(1)
mockvisibiltyProcessor := NewMockESProcessor(ctrl)

indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockvisibiltyProcessor,
msgEncoder: defaultEncoder,
}
err := indexer.Start()
assert.ErrorContains(t, err, "some error")

}

func TestIndexerStop(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}

// Mock the messaging consumer
mockConsumer := messaging.NewMockConsumer(ctrl)
messageChan := make(chan messaging.Message)
mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).AnyTimes()
// No specific expectations for Start or Stop since they're not called in Stop()

// Mock the visibility processor
mockVisibilityProcessor := NewMockESProcessor(ctrl)
// Create the Indexer instance with mocks
indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockVisibilityProcessor,
msgEncoder: defaultEncoder,
}

// Simulate that the indexer was started
atomic.StoreInt32(&indexer.isStarted, 1)

// Simulate the processorPump goroutine that waits on shutdownCh
indexer.shutdownWG.Add(1)
go func() {
defer indexer.shutdownWG.Done()
<-indexer.shutdownCh
}()

// Call Stop and verify behavior
indexer.Stop()

// Verify that shutdownCh is closed
select {
case <-indexer.shutdownCh:
// Expected: shutdownCh should be closed
default:
t.Error("shutdownCh is not closed")
}

// Verify that the WaitGroup has completed
success := common.AwaitWaitGroup(&indexer.shutdownWG, time.Second)
assert.True(t, success)

// Verify that isStopped flag is set
assert.Equal(t, int32(1), atomic.LoadInt32(&indexer.isStopped))

// Call Stop again to ensure idempotency
indexer.Stop()
defer goleak.VerifyNone(t)

}

0 comments on commit 040e28e

Please sign in to comment.