Skip to content

Commit

Permalink
Add more tests to concrete-execution-exists invariant (#6440)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Oct 30, 2024
1 parent 40003fc commit 400f0a0
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 59 deletions.
36 changes: 17 additions & 19 deletions common/reconciliation/invariant/concrete_execution_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,20 @@ import (
"github.com/uber/cadence/common/types"
)

type (
concreteExecutionExists struct {
pr persistence.Retryer
cache cache.DomainCache
}
)
type concreteExecutionExists struct {
pr persistence.Retryer
cache cache.DomainCache
}

// NewConcreteExecutionExists returns a new invariant for checking concrete execution
func NewConcreteExecutionExists(
pr persistence.Retryer, cache cache.DomainCache,
) Invariant {
func NewConcreteExecutionExists(pr persistence.Retryer, cache cache.DomainCache) Invariant {
return &concreteExecutionExists{
pr: pr,
cache: cache,
}
}

func (c *concreteExecutionExists) Check(
ctx context.Context,
execution interface{},
) CheckResult {
func (c *concreteExecutionExists) Check(ctx context.Context, execution interface{}) CheckResult {
if checkResult := validateCheckContext(ctx, c.Name()); checkResult != nil {
return *checkResult
}
Expand Down Expand Up @@ -118,15 +111,20 @@ func (c *concreteExecutionExists) Check(
}
}

func (c *concreteExecutionExists) Fix(
ctx context.Context,
execution interface{},
) FixResult {
func (c *concreteExecutionExists) Fix(ctx context.Context, execution interface{}) FixResult {
if fixResult := validateFixContext(ctx, c.Name()); fixResult != nil {
return *fixResult
}

currentExecution, _ := execution.(*entity.CurrentExecution)
currentExecution, ok := execution.(*entity.CurrentExecution)
if !ok {
return FixResult{
FixResultType: FixResultTypeFailed,
InvariantName: c.Name(),
Info: "failed to fix: expected current execution",
}
}

var runIDCheckResult *CheckResult
if len(currentExecution.CurrentRunID) == 0 {
// this is to set the current run ID prior to the check and fix operations
Expand Down Expand Up @@ -185,7 +183,7 @@ func (c *concreteExecutionExists) validateCurrentRunID(
return nil, &CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: c.Name(),
Info: "Failed to fetch domainName",
Info: "failed to fetch domainName",
InfoDetails: err.Error(),
}
}
Expand Down
233 changes: 193 additions & 40 deletions common/reconciliation/invariant/concrete_execution_exists_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

c "github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
Expand All @@ -42,125 +41,279 @@ import (
"github.com/uber/cadence/common/types"
)

type ConcreteExecutionExistsSuite struct {
*require.Assertions
suite.Suite
}

func TestConcreteExecutionExistsSuite(t *testing.T) {
suite.Run(t, new(ConcreteExecutionExistsSuite))
}

func (s *ConcreteExecutionExistsSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *ConcreteExecutionExistsSuite) TestCheck() {
existsError := types.EntityNotExistsError{}
func TestConcreteExecutionCheckAndFix(t *testing.T) {
notExistsError := types.EntityNotExistsError{}
unknownError := types.BadRequestError{}
testCases := []struct {
execution *entity.CurrentExecution
getConcreteResp *persistence.IsWorkflowExecutionExistsResponse
getConcreteErr error
getCurrentResp *persistence.GetCurrentExecutionResponse
getCurrentErr error
expectedResult CheckResult
desc string
execution any
ctx context.Context
getConcreteResp *persistence.IsWorkflowExecutionExistsResponse
getConcreteErr error
getCurrentResp *persistence.GetCurrentExecutionResponse
getCurrentErr error
getDomainNameErr error
wantCheckResult CheckResult
wantFixResult FixResult
}{
{
desc: "closed execution with concrete execution",
execution: getClosedCurrentExecution(),
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: true},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: getClosedCurrentExecution().CurrentRunID,
},
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "failed to get concrete execution",
execution: getOpenCurrentExecution(),
getConcreteErr: errors.New("error getting concrete execution"),
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: getOpenCurrentExecution().CurrentRunID,
},
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to check if concrete execution exists",
InfoDetails: "error getting concrete execution",
},
wantFixResult: FixResult{
FixResultType: FixResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed fix because check failed",
},
},
{
desc: "open execution without concrete execution",
execution: getOpenCurrentExecution(),
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: false},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: getOpenCurrentExecution().CurrentRunID,
},
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeCorrupted,
InvariantName: ConcreteExecutionExists,
Info: "execution is open without having concrete execution",
InfoDetails: fmt.Sprintf("concrete execution not found. WorkflowId: %v, RunId: %v",
workflowID, currentRunID),
InfoDetails: fmt.Sprintf("concrete execution not found. WorkflowId: %v, RunId: %v", workflowID, currentRunID),
},
wantFixResult: FixResult{
FixResultType: FixResultTypeFixed,
InvariantName: ConcreteExecutionExists,
},
},
{
desc: "mismatching current runid and concrete execution doesn't exist",
execution: getOpenCurrentExecution(),
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: false},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: uuid.New(),
},
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "open execution with concrete execution",
execution: getOpenCurrentExecution(),
getConcreteErr: nil,
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: true},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: getOpenCurrentExecution().CurrentRunID,
},
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "open execution that is not current",
execution: getOpenCurrentExecution(),
getConcreteErr: nil,
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: true},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: uuid.New(),
},
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "concrete exists but current doesn't",
execution: getOpenCurrentExecution(),
getConcreteErr: nil,
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: true},
getCurrentResp: nil,
getCurrentErr: &existsError,
expectedResult: CheckResult{
getCurrentErr: &notExistsError,
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "concrete exists but failed to get current",
execution: getOpenCurrentExecution(),
getConcreteErr: nil,
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: false},
getCurrentResp: nil,
getCurrentErr: &unknownError,
expectedResult: CheckResult{
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to get current execution.",
InfoDetails: unknownError.Error(),
},
wantFixResult: FixResult{
FixResultType: FixResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed fix because check failed",
},
},
{
desc: "canceled context",
ctx: canceledCtx(),
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to check: context expired or cancelled",
InfoDetails: "context canceled",
},
wantFixResult: FixResult{
FixResultType: FixResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to check: context expired or cancelled",
InfoDetails: "context canceled",
},
},
{
desc: "invalid execution object",
execution: &entity.ConcreteExecution{}, // invalid type
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to check: expected current execution",
},
wantFixResult: FixResult{
FixResultType: FixResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to fix: expected current execution",
},
},
{
desc: "empty run id - found after current lookup",
execution: func() *entity.CurrentExecution {
e := getOpenCurrentExecution()
e.CurrentRunID = ""
return e
}(),
getConcreteResp: &persistence.IsWorkflowExecutionExistsResponse{Exists: true},
getCurrentResp: &persistence.GetCurrentExecutionResponse{
RunID: currentRunID,
},
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
Info: "skipped fix because execution was healthy",
},
},
{
desc: "empty run id - current lookup returns not found",
execution: func() *entity.CurrentExecution {
e := getOpenCurrentExecution()
e.CurrentRunID = ""
return e
}(),
getCurrentErr: &notExistsError,
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeHealthy,
InvariantName: ConcreteExecutionExists,
Info: "current execution does not exist.",
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
},
},
{
desc: "empty run id - domain name lookup failed",
execution: func() *entity.CurrentExecution {
e := getOpenCurrentExecution()
e.CurrentRunID = ""
return e
}(),
getDomainNameErr: errors.New("error getting domain name"),
wantCheckResult: CheckResult{
CheckResultType: CheckResultTypeFailed,
InvariantName: ConcreteExecutionExists,
Info: "failed to fetch domainName",
InfoDetails: "error getting domain name",
},
wantFixResult: FixResult{
FixResultType: FixResultTypeSkipped,
InvariantName: ConcreteExecutionExists,
},
},
}
ctrl := gomock.NewController(s.T())
mockDomainCache := cache.NewMockDomainCache(ctrl)

for _, tc := range testCases {
execManager := &mocks.ExecutionManager{}
execManager.On("IsWorkflowExecutionExists", mock.Anything, mock.Anything).Return(tc.getConcreteResp, tc.getConcreteErr)
execManager.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(tc.getCurrentResp, tc.getCurrentErr)
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain-name", nil).AnyTimes()
o := NewConcreteExecutionExists(persistence.NewPersistenceRetryer(execManager, nil, c.CreatePersistenceRetryPolicy()), mockDomainCache)
s.Equal(tc.expectedResult, o.Check(context.Background(), tc.execution))
t.Run(tc.desc, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockDomainCache := cache.NewMockDomainCache(ctrl)
execManager := &mocks.ExecutionManager{}
execManager.On("IsWorkflowExecutionExists", mock.Anything, mock.Anything).Return(tc.getConcreteResp, tc.getConcreteErr)
execManager.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(tc.getCurrentResp, tc.getCurrentErr)
execManager.On("DeleteCurrentWorkflowExecution", mock.Anything, mock.Anything).Return(nil)
mockDomainCache.EXPECT().GetDomainName(domainID).Return(domainName, tc.getDomainNameErr).AnyTimes()
o := NewConcreteExecutionExists(persistence.NewPersistenceRetryer(execManager, nil, c.CreatePersistenceRetryPolicy()), mockDomainCache)
ctx := tc.ctx
if ctx == nil {
ctx = context.Background()
}
require.Equal(t, tc.wantCheckResult, o.Check(ctx, tc.execution))

gotFixResult := o.Fix(ctx, tc.execution)
gotFixResult.CheckResult = CheckResult{}
require.Equal(t, tc.wantFixResult, gotFixResult)
})
}
}

func canceledCtx() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}

0 comments on commit 400f0a0

Please sign in to comment.