Skip to content

Commit

Permalink
Write tests for reconciliation fetcher folder (#6424)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Oct 26, 2024
1 parent 01baea8 commit 3ee0a10
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 54 deletions.
2 changes: 1 addition & 1 deletion common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// THE SOFTWARE.

// Geneate rate limiter wrappers.
//go:generate mockgen -package $GOPACKAGE -destination dataManagerInterfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
//go:generate mockgen -package $GOPACKAGE -destination data_manager_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/configstore_generated.go
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/domain_generated.go
//go:generate gowrap gen -g -p . -i HistoryManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/history_generated.go
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -destination dataVisibilityManagerInterfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityManager
//go:generate mockgen -package $GOPACKAGE -destination visibility_manager_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityManager
// Generate rate limiter wrapper.
//go:generate gowrap gen -g -p . -i VisibilityManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/visibility_generated.go

Expand Down
315 changes: 282 additions & 33 deletions common/reconciliation/fetcher/concrete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,83 +23,332 @@
package fetcher

import (
"context"
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/pagination"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/entity"
)

const (
testTreeID = "test-tree-id"
testBranchID = "test-branch-id"
)
func TestConcreteExecutionIterator(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)
retryer.EXPECT().ListConcreteExecutions(gomock.Any(), gomock.Any()).
Return(&persistence.ListConcreteExecutionsResponse{}, nil).
Times(1)

var (
validBranchToken = []byte{89, 11, 0, 10, 0, 0, 0, 12, 116, 101, 115, 116, 45, 116, 114, 101, 101, 45, 105, 100, 11, 0, 20, 0, 0, 0, 14, 116, 101, 115, 116, 45, 98, 114, 97, 110, 99, 104, 45, 105, 100, 0}
invalidBranchToken = []byte("invalid")
)
iterator := ConcreteExecutionIterator(
context.Background(),
retryer,
10,
)
require.NotNil(t, iterator)
}

func TestConcreteExecution(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
tests := []struct {
desc string
req ExecutionRequest
mockFn func(retryer *persistence.MockRetryer)
wantEntity entity.Entity
wantErr bool
}{
{
desc: "success",
req: ExecutionRequest{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
DomainName: "test-domain-name",
},
mockFn: func(retryer *persistence.MockRetryer) {
retryer.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(
&persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
State: persistence.WorkflowStateRunning,
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
},
},
},
nil,
).Times(1)

retryer.EXPECT().GetShardID().Return(355).Times(1)
},
wantEntity: &entity.ConcreteExecution{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
TreeID: "test-tree-id",
BranchID: "test-branch-id",
Execution: entity.Execution{
ShardID: 355,
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
State: persistence.WorkflowStateRunning,
},
},
},
{
desc: "GetWorkflowExecution failed",
req: ExecutionRequest{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
DomainName: "test-domain-name",
},
mockFn: func(retryer *persistence.MockRetryer) {
retryer.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil, fmt.Errorf("failed")).Times(1)
},
wantErr: true,
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)

tc.mockFn(retryer)

gotEntity, err := ConcreteExecution(context.Background(), retryer, tc.req)
if (err != nil) != tc.wantErr {
t.Fatalf("ConcreteExecution() err: %v, wantErr %v", err, tc.wantErr)
}

if diff := cmp.Diff(tc.wantEntity, gotEntity); diff != "" {
t.Errorf("ConcreteExecution() mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestGetConcreteExecutions(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
testExecutions := []*persistence.ListConcreteExecutionsEntity{
{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-1", "test-branch-id-1"),
State: persistence.WorkflowStateRunning,
DomainID: "test-domain-id-1",
WorkflowID: "test-workflow-id-1",
RunID: "test-run-id-1",
},
},
{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-2", "test-branch-id-2"),
State: persistence.WorkflowStateCompleted,
DomainID: "test-domain-id-2",
WorkflowID: "test-workflow-id-2",
RunID: "test-run-id-2",
},
},
}

tests := []struct {
desc string
pageSize int
pageToken pagination.PageToken
mockFn func(*testing.T, *persistence.MockRetryer)
wantPage pagination.Page
wantErr bool
}{
{
desc: "success",
pageSize: 2,
pageToken: []byte("test-page-token"),
mockFn: func(t *testing.T, retryer *persistence.MockRetryer) {
retryer.EXPECT().ListConcreteExecutions(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error) {
wantReq := &persistence.ListConcreteExecutionsRequest{
PageSize: 2,
PageToken: []byte("test-page-token"),
}
if diff := cmp.Diff(wantReq, req); diff != "" {
t.Errorf("Request mismatch (-want +got):\n%s", diff)
}
return &persistence.ListConcreteExecutionsResponse{
PageToken: []byte("test-next-page-token"),
Executions: testExecutions,
}, nil
}).Times(1)

// will be called for each execution in the response
retryer.EXPECT().GetShardID().Return(355).Times(2)
},
wantPage: pagination.Page{
CurrentToken: []byte("test-page-token"),
NextToken: []byte("test-next-page-token"),
Entities: concreteExecutionsToEntities(testExecutions, 355, encoder),
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)

tc.mockFn(t, retryer)

fetchFn := getConcreteExecutions(retryer, tc.pageSize, encoder)
gotPage, err := fetchFn(context.Background(), tc.pageToken)
if (err != nil) != tc.wantErr {
t.Fatalf("ConcreteExecution() err: %v, wantErr %v", err, tc.wantErr)
}

if diff := cmp.Diff(tc.wantPage, gotPage); diff != "" {
t.Errorf("ConcreteExecution() mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestGetBranchToken(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
testCases := []struct {
name string
entity *persistence.ListConcreteExecutionsEntity
expectError bool
branchToken []byte
treeID string
branchID string
name string
entity *persistence.ListConcreteExecutionsEntity
wantErr bool
wantBranchToken []byte
wantHistoryBranch shared.HistoryBranch
}{
{
name: "ValidBranchToken",
name: "valid branch token - no version histories",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
},
wantBranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
wantHistoryBranch: shared.HistoryBranch{
TreeID: common.StringPtr("test-tree-id"),
BranchID: common.StringPtr("test-branch-id"),
},
},
{
name: "valid branch token - with version histories",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: getValidBranchToken(t, encoder),
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
VersionHistories: &persistence.VersionHistories{
CurrentVersionHistoryIndex: 1,
Histories: []*persistence.VersionHistory{
{}, // this will be ignored because index is 1
{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
},
},
},
},
wantBranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
wantHistoryBranch: shared.HistoryBranch{
TreeID: common.StringPtr("test-tree-id-from-versionhistory"),
BranchID: common.StringPtr("test-branch-id-from-versionhistory"),
},
expectError: false,
branchToken: validBranchToken,
treeID: testTreeID,
branchID: testBranchID,
},
{
name: "InvalidBranchToken",
name: "version history index out of bound",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: invalidBranchToken,
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
VersionHistories: &persistence.VersionHistories{
CurrentVersionHistoryIndex: 2,
Histories: []*persistence.VersionHistory{
{},
{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
},
},
},
},
expectError: true,
wantErr: true,
},
{
name: "invalid branch token",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: []byte("invalid"),
},
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
branchToken, branch, err := getBranchToken(tc.entity.ExecutionInfo.BranchToken, tc.entity.VersionHistories, encoder)
if tc.expectError {
branchToken, branch, err := getBranchToken(
tc.entity.ExecutionInfo.BranchToken,
tc.entity.VersionHistories,
encoder,
)

if tc.wantErr {
require.Error(t, err)
require.Nil(t, branchToken)
require.Empty(t, branch.GetTreeID())
require.Empty(t, branch.GetBranchID())
} else {
require.NoError(t, err)
require.Equal(t, tc.branchToken, branchToken)
require.Equal(t, tc.treeID, branch.GetTreeID())
require.Equal(t, tc.branchID, branch.GetBranchID())
if diff := cmp.Diff(tc.wantHistoryBranch, branch); diff != "" {
t.Fatalf("HistoryBranch mismatch (-want +got):\n%s", diff)
}
require.Equal(t, tc.wantBranchToken, branchToken)
}
})
}
}

func getValidBranchToken(t *testing.T, encoder *codec.ThriftRWEncoder) []byte {
func mustGetValidBranchToken(t *testing.T, encoder *codec.ThriftRWEncoder, treeID, branchID string) []byte {
hb := &shared.HistoryBranch{
TreeID: common.StringPtr(testTreeID),
BranchID: common.StringPtr(testBranchID),
TreeID: common.StringPtr(treeID),
BranchID: common.StringPtr(branchID),
}
bytes, err := encoder.Encode(hb)
require.NoError(t, err)
if err != nil {
t.Fatalf("failed to encode branch token: %v", err)
}

return bytes
}

func concreteExecutionsToEntities(execs []*persistence.ListConcreteExecutionsEntity, shardID int, encoder *codec.ThriftRWEncoder) []pagination.Entity {
entities := make([]pagination.Entity, len(execs))
for i, e := range execs {
branchToken, branch, err := getBranchToken(e.ExecutionInfo.BranchToken, e.VersionHistories, encoder)
if err != nil {
return nil
}
concreteExec := &entity.ConcreteExecution{
BranchToken: branchToken,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
Execution: entity.Execution{
ShardID: shardID,
DomainID: e.ExecutionInfo.DomainID,
WorkflowID: e.ExecutionInfo.WorkflowID,
RunID: e.ExecutionInfo.RunID,
State: e.ExecutionInfo.State,
},
}
entities[i] = concreteExec
}
return entities
}
Loading

0 comments on commit 3ee0a10

Please sign in to comment.