Skip to content

Commit

Permalink
Add dynamic property Fixer Enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas committed Dec 9, 2020
1 parent 1e057eb commit e046c9b
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 36 deletions.
Empty file.
3 changes: 2 additions & 1 deletion service/worker/scanner/executions/concrete_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func ConcreteExecutionScannerConfig(dc *dynamicconfig.Collection) *shardscanner.
ScannerWFTypeName: ConcreteExecutionsScannerWFTypeName,
FixerWFTypeName: ConcreteExecutionsFixerWFTypeName,
DynamicParams: shardscanner.DynamicParams{
Enabled: dc.GetBoolProperty(dynamicconfig.ConcreteExecutionsScannerEnabled, false),
ScannerEnabled: dc.GetBoolProperty(dynamicconfig.ConcreteExecutionsScannerEnabled, false),
FixerEnabled: dc.GetBoolProperty(dynamicconfig.ConcreteExecutionFixerEnabled, false),
Concurrency: dc.GetIntProperty(dynamicconfig.ConcreteExecutionsScannerConcurrency, 25),
PageSize: dc.GetIntProperty(dynamicconfig.ConcreteExecutionsScannerPersistencePageSize, 1000),
BlobstoreFlushThreshold: dc.GetIntProperty(dynamicconfig.ConcreteExecutionsScannerBlobstoreFlushThreshold, 100),
Expand Down
3 changes: 2 additions & 1 deletion service/worker/scanner/executions/current_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func CurrentExecutionScannerConfig(dc *dynamicconfig.Collection) *shardscanner.S
FixerWFTypeName: CurrentExecutionsFixerWFTypeName,
DynamicCollection: dc,
DynamicParams: shardscanner.DynamicParams{
Enabled: dc.GetBoolProperty(dynamicconfig.CurrentExecutionsScannerEnabled, false),
ScannerEnabled: dc.GetBoolProperty(dynamicconfig.CurrentExecutionsScannerEnabled, false),
FixerEnabled: dc.GetBoolProperty(dynamicconfig.CurrentExecutionFixerEnabled, false),
Concurrency: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerConcurrency, 25),
PageSize: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerPersistencePageSize, 1000),
BlobstoreFlushThreshold: dc.GetIntProperty(dynamicconfig.CurrentExecutionsScannerBlobstoreFlushThreshold, 100),
Expand Down
45 changes: 16 additions & 29 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"go.uber.org/cadence/.gen/go/shared"
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

"github.com/uber/cadence/common"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/worker/scanner/executions"
"github.com/uber/cadence/service/worker/scanner/shardscanner"
)

Expand Down Expand Up @@ -165,9 +163,10 @@ func (s *Scanner) startShardScanner(
) (context.Context, []string) {
scannerContextKey := shardscanner.ScannerContextKey(config.ScannerWFTypeName)
fixerContextKey := shardscanner.ScannerContextKey(config.FixerWFTypeName)

workerTaskListNames := []string{}
backgroundActivityContext := context.WithValue(ctx, scannerContextKey, s.context)
if config.DynamicParams.Enabled() {

if config.DynamicParams.ScannerEnabled() {
backgroundActivityContext = context.WithValue(
backgroundActivityContext,
scannerContextKey,
Expand All @@ -180,19 +179,21 @@ func (s *Scanner) startShardScanner(
})
}

backgroundActivityContext = context.WithValue(
backgroundActivityContext,
fixerContextKey,
shardscanner.FixerContext{
Resource: s.context.Resource,
Scope: s.context.Resource.GetMetricsClient().Scope(metrics.ExecutionsFixerScope),
ContextKey: fixerContextKey,
Hooks: config.FixerHooks(),
})
if config.DynamicParams.FixerEnabled() {
backgroundActivityContext = context.WithValue(
backgroundActivityContext,
fixerContextKey,
shardscanner.FixerContext{
Resource: s.context.Resource,
Scope: s.context.Resource.GetMetricsClient().Scope(metrics.ExecutionsFixerScope),
ContextKey: fixerContextKey,
Hooks: config.FixerHooks(),
})

workerTaskListNames := []string{config.FixerTLName}
workerTaskListNames = append(workerTaskListNames, config.FixerTLName)
}

if config.DynamicParams.Enabled() {
if config.DynamicParams.ScannerEnabled() {
workerTaskListNames = append(workerTaskListNames, config.StartWorkflowOptions.TaskList)

go s.startWorkflowWithRetry(config.StartWorkflowOptions, config.ScannerWFTypeName, shardscanner.ScannerWorkflowParams{
Expand All @@ -205,20 +206,6 @@ func (s *Scanner) startShardScanner(
})
}

if fixerConfig.Enabled() {
backgroundActivityContext = context.WithValue(backgroundActivityContext, fixerContextKey, executions.FixerContext{
Resource: s.context.Resource,
Scope: s.context.Resource.GetMetricsClient().Scope(metrics.ExecutionsFixerScope),
FixerWorkflowDynamicConfig: fixerConfig,
})
workerTaskListNames = append(workerTaskListNames, fixerTaskListName)
go s.startWorkflowWithRetry(fixerStartWorkflowOptions, fixerWFTypeName, executions.FixerWorkflowParams{
ScanExecution: workflow.Execution{
ID: scannerStartWorkflowOptions.ID,
},
ScanType: scanType,
})
}
return backgroundActivityContext, workerTaskListNames
}

Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/shardscanner/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ScannerConfigActivity(

result := ResolvedScannerWorkflowConfig{
GenericScannerConfig: GenericScannerConfig{
Enabled: dc.Enabled(),
Enabled: dc.ScannerEnabled(),
Concurrency: dc.Concurrency(),
PageSize: dc.PageSize(),
BlobstoreFlushThreshold: dc.BlobstoreFlushThreshold(),
Expand Down
6 changes: 3 additions & 3 deletions service/worker/scanner/shardscanner/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *activitiesSuite) TestScannerConfigActivity() {
}{
{
dynamicParams: &DynamicParams{
Enabled: dynamicconfig.GetBoolPropertyFn(true),
ScannerEnabled: dynamicconfig.GetBoolPropertyFn(true),
Concurrency: dynamicconfig.GetIntPropertyFn(10),
PageSize: dynamicconfig.GetIntPropertyFn(100),
ActivityBatchSize: dynamicconfig.GetIntPropertyFn(10),
Expand All @@ -310,7 +310,7 @@ func (s *activitiesSuite) TestScannerConfigActivity() {
},
{
dynamicParams: &DynamicParams{
Enabled: dynamicconfig.GetBoolPropertyFn(true),
ScannerEnabled: dynamicconfig.GetBoolPropertyFn(true),
Concurrency: dynamicconfig.GetIntPropertyFn(10),
PageSize: dynamicconfig.GetIntPropertyFn(100),
ActivityBatchSize: dynamicconfig.GetIntPropertyFn(10),
Expand All @@ -332,7 +332,7 @@ func (s *activitiesSuite) TestScannerConfigActivity() {
},
{
dynamicParams: &DynamicParams{
Enabled: dynamicconfig.GetBoolPropertyFn(true),
ScannerEnabled: dynamicconfig.GetBoolPropertyFn(true),
Concurrency: dynamicconfig.GetIntPropertyFn(10),
ActivityBatchSize: dynamicconfig.GetIntPropertyFn(100),
PageSize: dynamicconfig.GetIntPropertyFn(100),
Expand Down
3 changes: 2 additions & 1 deletion service/worker/scanner/shardscanner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ type (

// DynamicParams is the dynamic config for scanner workflow.
DynamicParams struct {
Enabled dynamicconfig.BoolPropertyFn
ScannerEnabled dynamicconfig.BoolPropertyFn
FixerEnabled dynamicconfig.BoolPropertyFn
Concurrency dynamicconfig.IntPropertyFn
PageSize dynamicconfig.IntPropertyFn
BlobstoreFlushThreshold dynamicconfig.IntPropertyFn
Expand Down

0 comments on commit e046c9b

Please sign in to comment.