Skip to content

Commit

Permalink
[internal] Improve code coverage of internal_task_pollers.go
Browse files Browse the repository at this point in the history
  • Loading branch information
3vilhamster committed Oct 30, 2024
1 parent bcd0462 commit 6427c91
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 100 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
$Q $(BIN_PATH) go generate ./...
$Q touch $@

# ====================================
# other intermediates
Expand Down
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ codecov:
ignore:
- "**/*_generated.go"
- "**/*_mock.go"
- "**/mock_*.go"
- "**/testdata/**"
- "**/*_test.go"
- "**/*_testsuite.go"
Expand Down
6 changes: 4 additions & 2 deletions internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
s "go.uber.org/cadence/.gen/go/shared"
)

//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE

type (
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)

Expand Down Expand Up @@ -71,7 +73,7 @@ type (

// WorkflowTaskHandler represents decision task handlers.
WorkflowTaskHandler interface {
// Processes the workflow task
// ProcessWorkflowTask processes the workflow task
// The response could be:
// - RespondDecisionTaskCompletedRequest
// - RespondDecisionTaskFailedRequest
Expand All @@ -84,7 +86,7 @@ type (

// ActivityTaskHandler represents activity task handlers.
ActivityTaskHandler interface {
// Executes the activity task
// Execute executes the activity task
// The response is one of the types:
// - RespondActivityTaskCompletedRequest
// - RespondActivityTaskFailedRequest
Expand Down
205 changes: 111 additions & 94 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"go.uber.org/cadence/internal/common/serializer"
)

//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE

const (
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta

Expand Down Expand Up @@ -76,7 +78,7 @@ type (
identity string
service workflowserviceclient.Interface
taskHandler WorkflowTaskHandler
ldaTunnel *locallyDispatchedActivityTunnel
ldaTunnel LocalDispatcher
metricsScope *metrics.TaggedScope
logger *zap.Logger

Expand Down Expand Up @@ -159,6 +161,11 @@ type (
stopCh <-chan struct{}
metricsScope *metrics.TaggedScope
}

// LocalDispatcher is an interface to dispatch locally dispatched activities.
LocalDispatcher interface {
SendTask(task *locallyDispatchedActivityTask) bool
}
)

func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
Expand Down Expand Up @@ -214,7 +221,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit
}
}

func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
select {
case ldat.taskCh <- task:
return true
Expand Down Expand Up @@ -349,7 +356,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
func(response interface{}, startTime time.Time) (*workflowTask, error) {
wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId()))
wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1)
heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime)
heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime)
if err != nil {
return nil, err
}
Expand All @@ -365,10 +372,10 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
if completedRequest == nil && err == nil {
return nil
}
if _, ok := err.(decisionHeartbeatError); ok {
if errors.As(err, new(*decisionHeartbeatError)) {
return err
}
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -397,8 +404,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
return nil
}

func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {
metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
if taskErr != nil {
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
Expand All @@ -416,7 +422,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))

responseStartTime := time.Now()
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
return
}
Expand All @@ -425,103 +431,114 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
return
}

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
ctx := context.Background()
// Respond task completion.
err = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
defer cancel()
var err1 error
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
})
}
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return response, err
}

func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags)
defer cancel()
var (
err error
response *s.RespondDecisionTaskCompletedResponse
operation string
)
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
err = wtp.handleDecisionFailedRequest(ctx, task, request)
operation = "RespondDecisionTaskFailed"
case *s.RespondDecisionTaskCompletedRequest:
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request)
operation = "RespondDecisionTaskCompleted"
case *s.RespondQueryTaskCompletedRequest:
err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
operation = "RespondQueryTaskCompleted"
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
}

traceLog(func() {
wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation))
})

return response, err
}

func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error {
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
}
return nil
}

func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) {
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}

if wtp.ldaTunnel != nil {
var activityTasks []*locallyDispatchedActivityTask
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
case *s.RespondDecisionTaskCompletedRequest:
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
if wtp.ldaTunnel.SendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
var activityTasks []*locallyDispatchedActivityTask
if wtp.ldaTunnel != nil {
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
if wtp.ldaTunnel.sendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
}
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err1 == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}
at.readyCh <- started
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}()
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
})
}

case *s.RespondQueryTaskCompletedRequest:
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
})
}
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
at.readyCh <- started
}
}()
}

return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return
return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
}

func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {
Expand Down
Loading

0 comments on commit 6427c91

Please sign in to comment.