Skip to content

Commit

Permalink
bugfix: mutable state should be reset if the operation is not success… (
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Apr 16, 2018
1 parent 1662b25 commit f57cd0d
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ THRIFTRW_SRCS = \
idl/github.com/uber/cadence/shared.thrift \

PROGS = cadence
TEST_ARG ?= -race -v -timeout 15m
TEST_ARG ?= -race -v -timeout 10m
BUILD := ./build
TOOLS_CMD_ROOT=./cmd/tools
INTEG_TEST_ROOT=./host
Expand Down
10 changes: 7 additions & 3 deletions service/history/historyCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

type (
releaseWorkflowExecutionFunc func()
releaseWorkflowExecutionFunc func(err error)

historyCache struct {
cache.Cache
Expand Down Expand Up @@ -100,7 +100,7 @@ func (c *historyCache) getOrCreateWorkflowExecution(domainID string,

// Test hook for disabling the cache
if c.disabled {
return newWorkflowExecutionContext(domainID, execution, c.shard, c.executionManager, c.logger), func() {}, nil
return newWorkflowExecutionContext(domainID, execution, c.shard, c.executionManager, c.logger), func(error) {}, nil
}

key := execution.GetRunId()
Expand All @@ -118,8 +118,12 @@ func (c *historyCache) getOrCreateWorkflowExecution(domainID string,
// This will create a closure on every request.
// Consider revisiting this if it causes too much GC activity
status := cacheNotReleased
releaseFunc := func() {
releaseFunc := func(err error) {
if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) {
if err != nil {
// TODO see issue #668, there are certain type or errors which can bypass the clear
context.clear()
}
context.Unlock()
c.Release(key)
}
Expand Down
108 changes: 98 additions & 10 deletions service/history/historyCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
package history

import (
"errors"
"os"
"sync"
"testing"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -57,6 +60,16 @@ func TestHistoryCacheSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *historyCacheSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}
}

func (s *historyCacheSuite) TearDownSuite() {

}

func (s *historyCacheSuite) SetupTest() {
s.logger = bark.NewLoggerFromLogrus(log.New())
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
Expand All @@ -76,37 +89,112 @@ func (s *historyCacheSuite) SetupTest() {
s.cache = newHistoryCache(s.mockShard, s.logger)
}

func (s *historyCacheSuite) TearDownTest() {
s.mockExecutionMgr.AssertExpectations(s.T())
}

func (s *historyCacheSuite) TestHistoryCachePinning() {
s.mockShard.GetConfig().HistoryCacheMaxSize = 2
domain := "test_domain"
domainID := "test_domain_id"
s.cache = newHistoryCache(s.mockShard, s.logger)
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wf-cache-test"),
WorkflowId: common.StringPtr("wf-cache-test-pinning"),
RunId: common.StringPtr(uuid.New()),
}

context, release, err := s.cache.getOrCreateWorkflowExecution(domain, we)
context, release, err := s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)

we2 := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wf-cache-test"),
WorkflowId: common.StringPtr("wf-cache-test-pinning"),
RunId: common.StringPtr(uuid.New()),
}

// Cache is full because context is pinned, should get an error now
_, _, err2 := s.cache.getOrCreateWorkflowExecution(domain, we2)
_, _, err2 := s.cache.getOrCreateWorkflowExecution(domainID, we2)
s.NotNil(err2)

// Now release the context, this should unpin it.
release()
release(err2)

_, release2, err3 := s.cache.getOrCreateWorkflowExecution(domain, we2)
_, release2, err3 := s.cache.getOrCreateWorkflowExecution(domainID, we2)
s.Nil(err3)
release2()
release2(err3)

// Old context should be evicted.
newContext, release, err4 := s.cache.getOrCreateWorkflowExecution(domain, we)
newContext, release, err4 := s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err4)
s.False(context == newContext)
release()
release(err4)
}

func (s *historyCacheSuite) TestHistoryCacheClear() {
s.mockShard.GetConfig().HistoryCacheMaxSize = 20
domainID := "test_domain_id"
s.cache = newHistoryCache(s.mockShard, s.logger)
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wf-cache-test-clear"),
RunId: common.StringPtr(uuid.New()),
}

context, release, err := s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
context.msBuilder = &mutableStateBuilder{}
release(nil)

// since last time, the release function receive a nil error
// the ms builder will not be cleared
context, release, err = s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)
s.NotNil(context.msBuilder)
release(errors.New("some random error message"))

// since last time, the release function receive a non-nil error
// the ms builder will be cleared
context, release, err = s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)
s.Nil(context.msBuilder)
release(nil)
}

func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
s.mockShard.GetConfig().HistoryCacheMaxSize = 20
domainID := "test_domain_id"
s.cache = newHistoryCache(s.mockShard, s.logger)
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wf-cache-test-pinning"),
RunId: common.StringPtr(uuid.New()),
}

coroutineCount := 50
waitGroup := &sync.WaitGroup{}
stopChan := make(chan struct{})
testFn := func() {
<-stopChan
context, release, err := s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)
// since each time the builder is reset to nil
s.Nil(context.msBuilder)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
context.msBuilder = &mutableStateBuilder{}
release(errors.New("some random error message"))
waitGroup.Done()
}

for i := 0; i < coroutineCount; i++ {
waitGroup.Add(1)
go testFn()
}
close(stopChan)
waitGroup.Wait()

context, release, err := s.cache.getOrCreateWorkflowExecution(domainID, we)
s.Nil(err)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
s.Nil(context.msBuilder)
release(nil)
}
26 changes: 13 additions & 13 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,13 @@ func (e *historyEngineImpl) GetMutableState(ctx context.Context,
}

func (e *historyEngineImpl) getMutableState(
domainID string, execution workflow.WorkflowExecution) (*h.GetMutableStateResponse, error) {
domainID string, execution workflow.WorkflowExecution) (retResp *h.GetMutableStateResponse, retError error) {

context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, execution)
if err0 != nil {
return nil, err0
}
defer release()
defer func() { release(retError) }()

msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
Expand Down Expand Up @@ -525,7 +525,7 @@ func (e *historyEngineImpl) ResetStickyTaskList(resetRequest *h.ResetStickyTaskL

// DescribeWorkflowExecution returns information about the specified workflow execution.
func (e *historyEngineImpl) DescribeWorkflowExecution(
request *h.DescribeWorkflowExecutionRequest) (*workflow.DescribeWorkflowExecutionResponse, error) {
request *h.DescribeWorkflowExecutionRequest) (retResp *workflow.DescribeWorkflowExecutionResponse, retError error) {
domainID, err := getDomainUUID(request.DomainUUID)
if err != nil {
return nil, err
Expand All @@ -537,7 +537,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
if err0 != nil {
return nil, err0
}
defer release()
defer func() { release(retError) }()

msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
Expand Down Expand Up @@ -593,7 +593,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
}

func (e *historyEngineImpl) RecordDecisionTaskStarted(
request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) {
request *h.RecordDecisionTaskStartedRequest) (retResp *h.RecordDecisionTaskStartedResponse, retError error) {
domainID, err := getDomainUUID(request.DomainUUID)
if err != nil {
return nil, err
Expand All @@ -603,7 +603,7 @@ func (e *historyEngineImpl) RecordDecisionTaskStarted(
if err0 != nil {
return nil, err0
}
defer release()
defer func() { release(retError) }()

scheduleID := request.GetScheduleId()
requestID := request.GetRequestId()
Expand Down Expand Up @@ -776,7 +776,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
}

// RespondDecisionTaskCompleted completes a decision task
func (e *historyEngineImpl) RespondDecisionTaskCompleted(ctx context.Context, req *h.RespondDecisionTaskCompletedRequest) error {
func (e *historyEngineImpl) RespondDecisionTaskCompleted(ctx context.Context, req *h.RespondDecisionTaskCompletedRequest) (retError error) {
domainID, err := getDomainUUID(req.DomainUUID)
if err != nil {
return err
Expand All @@ -801,7 +801,7 @@ func (e *historyEngineImpl) RespondDecisionTaskCompleted(ctx context.Context, re
if err0 != nil {
return err0
}
defer release()
defer func() { release(retError) }()

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
Expand Down Expand Up @@ -1142,7 +1142,7 @@ Update_History_Loop:
runID := uuid.New()
_, newStateBuilder, err := msBuilder.AddContinueAsNewEvent(completedID, domainID, domainName, runID, attributes)
if err != nil {
return nil
return err
}

// add timer task to new workflow
Expand Down Expand Up @@ -1635,7 +1635,7 @@ func (e *historyEngineImpl) SignalWorkflowExecution(signalRequest *h.SignalWorkf
}

func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequest *h.SignalWithStartWorkflowExecutionRequest) (
*workflow.StartWorkflowExecutionResponse, error) {
retResp *workflow.StartWorkflowExecutionResponse, retError error) {

domainID, err := getDomainUUID(signalWithStartRequest.DomainUUID)
if err != nil {
Expand All @@ -1653,7 +1653,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ
context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, execution)

if err0 == nil {
defer release()
defer func() { release(retError) }()
Just_Signal_Loop:
for ; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
Expand Down Expand Up @@ -1961,13 +1961,13 @@ func (e *historyEngineImpl) ReplicateEvents(replicateRequest *h.ReplicateEventsR

func (e *historyEngineImpl) updateWorkflowExecution(domainID string, execution workflow.WorkflowExecution,
createDeletionTask, createDecisionTask bool,
action func(builder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error)) error {
action func(builder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error)) (retError error) {

context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, execution)
if err0 != nil {
return err0
}
defer release()
defer func() { release(retError) }()

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func (s *engine2Suite) getBuilder(domainID string, we workflow.WorkflowExecution
if err != nil {
return nil
}
defer release()
defer release(nil)

return context.msBuilder
}
2 changes: 1 addition & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3340,7 +3340,7 @@ func (s *engineSuite) getBuilder(domainID string, we workflow.WorkflowExecution)
if err != nil {
return nil
}
defer release()
defer release(nil)

return context.msBuilder
}
Expand Down
5 changes: 3 additions & 2 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"fmt"

"github.com/pborman/uuid"
"github.com/uber-common/bark"
h "github.com/uber/cadence/.gen/go/history"
Expand Down Expand Up @@ -56,7 +57,7 @@ func newHistoryReplicator(shard ShardContext, historyCache *historyCache, domain
return replicator
}

func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error {
func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retError error) {
if request == nil || request.History == nil || len(request.History.Events) == 0 {
return nil
}
Expand All @@ -80,7 +81,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error
if err != nil {
return err
}
defer release()
defer func() { release(retError) }()

msBuilder, err = context.loadWorkflowExecution()
if err != nil {
Expand Down
Loading

0 comments on commit f57cd0d

Please sign in to comment.