Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mock ESProcessor and unit test to start and stop indexer #6423

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions service/worker/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination indexer_mock.go github.com/uber/cadence/service/worker/indexer ESProcessor

package indexer

import (
Expand Down
95 changes: 95 additions & 0 deletions service/worker/indexer/indexer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 131 additions & 1 deletion service/worker/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,28 @@
package indexer

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/elasticsearch/bulk"
mocks2 "github.com/uber/cadence/common/elasticsearch/bulk/mocks"
esMocks "github.com/uber/cadence/common/elasticsearch/mocks"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
)

func TestNewDualIndexer(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
Expand Down Expand Up @@ -82,3 +85,130 @@ func TestNewIndexer(t *testing.T) {
indexer := NewIndexer(config, mockMessagingClient, mockESClient, processorName, testlogger.New(t), metrics.NewNoopMetricsClient())
assert.NotNil(t, indexer)
}

// TestIndexerStart tests the Start method of Indexer
func TestIndexerStart(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}
mockConsumer := messaging.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(nil).Times(1)
messageChan := make(chan messaging.Message)
defer close(messageChan)
mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).AnyTimes()
mockvisibiltyProcessor := NewMockESProcessor(ctrl)
mockvisibiltyProcessor.EXPECT().Start().Return().Times(1)

indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockvisibiltyProcessor,
msgEncoder: defaultEncoder,
}
err := indexer.Start()
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, err)
}

// TestIndexerStart_ConsumerError tests the Start method when consumer.Start returns an error
func TestIndexerStart_ConsumerError(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}
mockConsumer := messaging.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(fmt.Errorf("some error")).Times(1)
mockvisibiltyProcessor := NewMockESProcessor(ctrl)

indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockvisibiltyProcessor,
msgEncoder: defaultEncoder,
}
err := indexer.Start()
assert.ErrorContains(t, err, "some error")

}

func TestIndexerStop(t *testing.T) {
ctrl := gomock.NewController(t)

config := &Config{
ESProcessorNumOfWorkers: dynamicconfig.GetIntPropertyFn(1),
ESProcessorBulkActions: dynamicconfig.GetIntPropertyFn(10),
ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20),
ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute),
IndexerConcurrency: dynamicconfig.GetIntPropertyFn(1),
}

// Mock the messaging consumer
mockConsumer := messaging.NewMockConsumer(ctrl)
messageChan := make(chan messaging.Message)
mockConsumer.EXPECT().Messages().Return((<-chan messaging.Message)(messageChan)).AnyTimes()
// No specific expectations for Start or Stop since they're not called in Stop()

// Mock the visibility processor
mockVisibilityProcessor := NewMockESProcessor(ctrl)
// Create the Indexer instance with mocks
indexer := &Indexer{
config: config,
esIndexName: "test-index",
consumer: mockConsumer,
logger: log.NewNoop(),
scope: metrics.NoopScope(metrics.IndexProcessorScope),
shutdownCh: make(chan struct{}),
visibilityProcessor: mockVisibilityProcessor,
msgEncoder: defaultEncoder,
}

// Simulate that the indexer was started
atomic.StoreInt32(&indexer.isStarted, 1)

// Simulate the processorPump goroutine that waits on shutdownCh
indexer.shutdownWG.Add(1)
go func() {
defer indexer.shutdownWG.Done()
<-indexer.shutdownCh
}()

// Call Stop and verify behavior
indexer.Stop()

// Verify that shutdownCh is closed
select {
case <-indexer.shutdownCh:
// Expected: shutdownCh should be closed
default:
t.Error("shutdownCh is not closed")
}

// Verify that the WaitGroup has completed
success := common.AwaitWaitGroup(&indexer.shutdownWG, time.Second)
assert.True(t, success)

// Verify that isStopped flag is set
assert.Equal(t, int32(1), atomic.LoadInt32(&indexer.isStopped))

// Call Stop again to ensure idempotency
indexer.Stop()
// No further expectations since the indexer is already stopped
}
Loading