Skip to content

Commit

Permalink
bugfix: parent workflow, when signaling child workflow, can experienc… (
Browse files Browse the repository at this point in the history
#607)

* bugfix: parent workflow, when signaling child workflow, can experience deadlock, if child workflow, at the same time, is completed.

* bugfix: handling workflow using decision to cancel itself.
  • Loading branch information
wxing1292 authored Mar 12, 2018
1 parent 0a4abbe commit 35bf3de
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 43 deletions.
14 changes: 12 additions & 2 deletions service/history/historyCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package history

import (
"sync/atomic"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
Expand All @@ -44,6 +46,11 @@ type (
}
)

const (
cacheNotReleased int32 = 0
cacheReleased int32 = 1
)

var (
// ErrTryLock is a temporary error that is thrown by the API
// when it loses the race to create workflow execution context
Expand Down Expand Up @@ -107,9 +114,12 @@ func (c *historyCache) getOrCreateWorkflowExecution(domainID string,

// This will create a closure on every request.
// Consider revisiting this if it causes too much GC activity
status := cacheNotReleased
releaseFunc := func() {
context.Unlock()
c.Release(key)
if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) {
context.Unlock()
c.Release(key)
}
}

context.Lock()
Expand Down
131 changes: 90 additions & 41 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,26 +328,28 @@ func (t *transferQueueProcessorImpl) processActivityTask(task *persistence.Trans
if err != nil {
return err
}
defer release()

var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
timeout := int32(0)
if err != nil {
release()
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}

if ai, found := mb.GetActivityInfo(task.ScheduleID); found {
if ai, found := msBuilder.GetActivityInfo(task.ScheduleID); found {
timeout = ai.ScheduleToStartTimeout
} else {
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID)
}
release()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
if timeout != 0 {
err = t.matchingClient.AddActivityTask(nil, &m.AddActivityTaskRequest{
DomainUUID: common.StringPtr(targetDomainID),
Expand Down Expand Up @@ -382,27 +384,30 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans
if err != nil {
return err
}
defer release()

var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
release()
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}
timeout := mb.executionInfo.WorkflowTimeout
wfTypeName := mb.executionInfo.WorkflowTypeName
startTimestamp := mb.executionInfo.StartTimestamp
if mb.isStickyTaskListEnabled() {
taskList.Name = common.StringPtr(mb.executionInfo.StickyTaskList)

timeout := msBuilder.executionInfo.WorkflowTimeout
wfTypeName := msBuilder.executionInfo.WorkflowTypeName
startTimestamp := msBuilder.executionInfo.StartTimestamp
if msBuilder.isStickyTaskListEnabled() {
taskList.Name = common.StringPtr(msBuilder.executionInfo.StickyTaskList)
taskList.Kind = common.TaskListKindPtr(workflow.TaskListKindSticky)
timeout = mb.executionInfo.StickyScheduleToStartTimeout
timeout = msBuilder.executionInfo.StickyScheduleToStartTimeout
}
release()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
err = t.matchingClient.AddDecisionTask(nil, &m.AddDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
Execution: &execution,
Expand Down Expand Up @@ -444,8 +449,8 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
}
defer release()

var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, but the mutable state was
Expand All @@ -455,16 +460,34 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
return err
}

replyToParentWorkflow := msBuilder.hasParentExecution() && msBuilder.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew
var completionEvent *workflow.HistoryEvent
if replyToParentWorkflow {
completionEvent, _ = msBuilder.GetCompletionEvent()
}
parentDomainID := msBuilder.executionInfo.ParentDomainID
parentWorkflowID := msBuilder.executionInfo.ParentWorkflowID
parentRunID := msBuilder.executionInfo.ParentRunID
initiatedID := msBuilder.executionInfo.InitiatedID

workflowTypeName := msBuilder.executionInfo.WorkflowTypeName
workflowStartTimestamp := msBuilder.executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.getLastUpdatedTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(msBuilder.executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
// Communicate the result to parent execution if this is Child Workflow execution
if mb.hasParentExecution() && mb.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew {
completionEvent, _ := mb.GetCompletionEvent()
if replyToParentWorkflow {
err = t.historyClient.RecordChildExecutionCompleted(nil, &history.RecordChildExecutionCompletedRequest{
DomainUUID: common.StringPtr(mb.executionInfo.ParentDomainID),
DomainUUID: common.StringPtr(parentDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(mb.executionInfo.ParentWorkflowID),
RunId: common.StringPtr(mb.executionInfo.ParentRunID),
WorkflowId: common.StringPtr(parentWorkflowID),
RunId: common.StringPtr(parentRunID),
},
InitiatedId: common.Int64Ptr(mb.executionInfo.InitiatedID),
InitiatedId: common.Int64Ptr(initiatedID),
CompletedExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
Expand All @@ -476,11 +499,10 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
switch err.(type) {
case *workflow.EntityNotExistsError:
err = nil
default:
return err
}
}
if err != nil {
return err
}

// Record closing in visibility store
retentionSeconds := int64(0)
Expand All @@ -498,11 +520,11 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
return t.visibilityManager.RecordWorkflowExecutionClosed(&persistence.RecordWorkflowExecutionClosedRequest{
DomainUUID: task.DomainID,
Execution: execution,
WorkflowTypeName: mb.executionInfo.WorkflowTypeName,
StartTimestamp: mb.executionInfo.StartTimestamp.UnixNano(),
CloseTimestamp: mb.getLastUpdatedTimestamp(),
Status: getWorkflowExecutionCloseStatus(mb.executionInfo.CloseStatus),
HistoryLength: mb.GetNextEventID(),
WorkflowTypeName: workflowTypeName,
StartTimestamp: workflowStartTimestamp,
CloseTimestamp: workflowCloseTimestamp,
Status: workflowCloseStatus,
HistoryLength: workflowHistoryLength,
RetentionSeconds: retentionSeconds,
})
}
Expand Down Expand Up @@ -545,6 +567,29 @@ func (t *transferQueueProcessorImpl) processCancelExecution(task *persistence.Tr
// No pending request cancellation for this initiatedID, complete this transfer task
return nil
}

// handle workflow cancel itself
if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
cancelRequest := &history.RequestCancelWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{
Domain: common.StringPtr(targetDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.TargetWorkflowID),
RunId: common.StringPtr(task.TargetRunID),
},
Identity: common.StringPtr(identityHistoryService),
},
}
err = t.requestCancelFailed(task, context, cancelRequest)
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}

cancelRequest := &history.RequestCancelWorkflowExecutionRequest{
DomainUUID: common.StringPtr(targetDomainID),
CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{
Expand Down Expand Up @@ -643,13 +688,14 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr

// handle workflow signal itself
if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
signalRequest := &history.SignalWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
DomainUUID: common.StringPtr(targetDomainID),
SignalRequest: &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
Domain: common.StringPtr(targetDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
WorkflowId: common.StringPtr(task.TargetWorkflowID),
RunId: common.StringPtr(task.TargetRunID),
},
Identity: common.StringPtr(identityHistoryService),
Control: ri.Control,
Expand Down Expand Up @@ -711,6 +757,9 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr
return nil
}

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
// remove signalRequestedID from target workflow, after Signal detail is removed from source workflow
removeRequest := &history.RemoveSignalMutableStateRequest{
DomainUUID: common.StringPtr(targetDomainID),
Expand Down Expand Up @@ -929,9 +978,9 @@ func (t *transferQueueProcessorImpl) requestCancelCompleted(task *persistence.Tr

msBuilder.AddExternalWorkflowExecutionCancelRequested(
initiatedEventID,
*request.DomainUUID,
*request.CancelRequest.WorkflowExecution.WorkflowId,
common.StringDefault(request.CancelRequest.WorkflowExecution.RunId),
request.GetDomainUUID(),
request.CancelRequest.WorkflowExecution.GetWorkflowId(),
request.CancelRequest.WorkflowExecution.GetRunId(),
)

return nil
Expand Down Expand Up @@ -983,9 +1032,9 @@ func (t *transferQueueProcessorImpl) requestCancelFailed(task *persistence.Trans
msBuilder.AddRequestCancelExternalWorkflowExecutionFailedEvent(
emptyEventID,
initiatedEventID,
*request.DomainUUID,
*request.CancelRequest.WorkflowExecution.WorkflowId,
common.StringDefault(request.CancelRequest.WorkflowExecution.RunId),
request.GetDomainUUID(),
request.CancelRequest.WorkflowExecution.GetWorkflowId(),
request.CancelRequest.WorkflowExecution.GetRunId(),
workflow.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution)

return nil
Expand Down

0 comments on commit 35bf3de

Please sign in to comment.