Skip to content

Commit

Permalink
enforce having larger than one decision poller for sticky execution e…
Browse files Browse the repository at this point in the history
…nabled workers
  • Loading branch information
shijiesheng committed Oct 8, 2024
1 parent 4d4c09f commit 8222a62
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 9 deletions.
3 changes: 2 additions & 1 deletion evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
// so if our worker puts *cacheSize* entries in the cache, it should evict exactly one
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1)

workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
DisableActivityWorker: true,
Logger: zaptest.NewLogger(s.T()),
IsolationGroup: "zone-1",
})
s.Require().NoError(err)
// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
// won't work because the workflow will simply just complete
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
const (
defaultPollerAutoScalerCooldown = time.Minute
defaultPollerAutoScalerTargetUtilization = 0.6
defaultMinConcurrentPollerSize = 1
defaultMinConcurrentActivityPollerSize = 1
defaultMinConcurrentDecisionPollerSize = 2
)

var (
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,10 +1286,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
}
if options.MinConcurrentActivityTaskPollers == 0 {
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize
}
if options.MinConcurrentDecisionTaskPollers == 0 {
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize
}
if options.PollerAutoScalerCooldown == 0 {
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown
Expand Down
3 changes: 2 additions & 1 deletion internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,12 @@ func createWorkerWithThrottle(
workerOptions.EnableSessionWorker = true

// Start Worker.
worker := NewWorker(
worker, err := NewWorker(
service,
domain,
"testGroupName2",
workerOptions)
require.NoError(t, err)
return worker
}

Expand Down
19 changes: 16 additions & 3 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package internal

import (
"context"
"fmt"
"time"

"go.uber.org/cadence/internal/common/debug"
Expand Down Expand Up @@ -108,7 +109,7 @@ type (
// optional: Sets the minimum number of goroutines that will concurrently poll the
// cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true,
// changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list.
// Default value is 1
// Default value is 2
MinConcurrentDecisionTaskPollers int

// optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count
Expand Down Expand Up @@ -333,8 +334,11 @@ func NewWorker(
domain string,
taskList string,
options WorkerOptions,
) *aggregatedWorker {
return newAggregatedWorker(service, domain, taskList, options)
) (*aggregatedWorker, error) {
if err := options.Validate(); err != nil {
return nil, fmt.Errorf("worker options validation error: %w", err)
}
return newAggregatedWorker(service, domain, taskList, options), nil
}

// ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it.
Expand Down Expand Up @@ -383,3 +387,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s
r := NewWorkflowReplayer()
return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
}

// Validate sanity validation of WorkerOptions
func (o WorkerOptions) Validate() error {
// decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) {
return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value")
}
return nil
}
68 changes: 68 additions & 0 deletions internal/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_NewWorker(t *testing.T) {
tests := []struct {
name string
options WorkerOptions
expectErr string
}{
{
name: "happy with default value",
options: WorkerOptions{},
expectErr: "",
},
{
name: "happy with explicit decision task poller set to 1 if sticky task list is disabled",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
DisableStickyExecution: true,
},
expectErr: "",
},
{
name: "invalid worker with explicit decision task poller set to 1",
options: WorkerOptions{
MaxConcurrentDecisionTaskPollers: 1,
},
expectErr: "DecisionTaskPollers must be >= 2 or use default value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options)
if tt.expectErr != "" {
assert.ErrorContains(t, err, tt.expectErr)
assert.Nil(t, w)
} else {
assert.NoError(t, err)
assert.NotNil(t, w)
}
})
}
}
6 changes: 5 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ func New(
taskList string,
options Options,
) Worker {
return internal.NewWorker(service, domain, taskList, options)
w, err := internal.NewWorker(service, domain, taskList, options)
if err != nil {
panic(err)
}
return w
}

// NewWorkflowReplayer creates a WorkflowReplayer instance.
Expand Down

0 comments on commit 8222a62

Please sign in to comment.