diff --git a/.gen/go/sqlblobs/sqlblobs.go b/.gen/go/sqlblobs/sqlblobs.go index 6f8d7396e4a..6ed30164fff 100644 --- a/.gen/go/sqlblobs/sqlblobs.go +++ b/.gen/go/sqlblobs/sqlblobs.go @@ -10427,10 +10427,11 @@ func (v *TaskInfo) IsSetPartitionConfig() bool { } type TaskListInfo struct { - Kind *int16 `json:"kind,omitempty"` - AckLevel *int64 `json:"ackLevel,omitempty"` - ExpiryTimeNanos *int64 `json:"expiryTimeNanos,omitempty"` - LastUpdatedNanos *int64 `json:"lastUpdatedNanos,omitempty"` + Kind *int16 `json:"kind,omitempty"` + AckLevel *int64 `json:"ackLevel,omitempty"` + ExpiryTimeNanos *int64 `json:"expiryTimeNanos,omitempty"` + LastUpdatedNanos *int64 `json:"lastUpdatedNanos,omitempty"` + AdaptivePartitionConfig *TaskListPartitionConfig `json:"adaptivePartitionConfig,omitempty"` } // ToWire translates a TaskListInfo struct into a Thrift-level intermediate @@ -10450,7 +10451,7 @@ type TaskListInfo struct { // } func (v *TaskListInfo) ToWire() (wire.Value, error) { var ( - fields [4]wire.Field + fields [5]wire.Field i int = 0 w wire.Value err error @@ -10488,10 +10489,24 @@ func (v *TaskListInfo) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 16, Value: w} i++ } + if v.AdaptivePartitionConfig != nil { + w, err = v.AdaptivePartitionConfig.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 18, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } +func _TaskListPartitionConfig_Read(w wire.Value) (*TaskListPartitionConfig, error) { + var v TaskListPartitionConfig + err := v.FromWire(w) + return &v, err +} + // FromWire deserializes a TaskListInfo struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -10553,6 +10568,14 @@ func (v *TaskListInfo) FromWire(w wire.Value) error { return err } + } + case 18: + if field.Value.Type() == wire.TStruct { + v.AdaptivePartitionConfig, err = _TaskListPartitionConfig_Read(field.Value) + if err != nil { + return err + } + } } } @@ -10617,9 +10640,27 @@ func (v *TaskListInfo) Encode(sw stream.Writer) error { } } + if v.AdaptivePartitionConfig != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 18, Type: wire.TStruct}); err != nil { + return err + } + if err := v.AdaptivePartitionConfig.Encode(sw); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + return sw.WriteStructEnd() } +func _TaskListPartitionConfig_Decode(sr stream.Reader) (*TaskListPartitionConfig, error) { + var v TaskListPartitionConfig + err := v.Decode(sr) + return &v, err +} + // Decode deserializes a TaskListInfo struct directly from its Thrift-level // representation, without going through an intemediary type. // @@ -10670,6 +10711,12 @@ func (v *TaskListInfo) Decode(sr stream.Reader) error { return err } + case fh.ID == 18 && fh.Type == wire.TStruct: + v.AdaptivePartitionConfig, err = _TaskListPartitionConfig_Decode(sr) + if err != nil { + return err + } + default: if err := sr.Skip(fh.Type); err != nil { return err @@ -10699,7 +10746,7 @@ func (v *TaskListInfo) String() string { return "" } - var fields [4]string + var fields [5]string i := 0 if v.Kind != nil { fields[i] = fmt.Sprintf("Kind: %v", *(v.Kind)) @@ -10717,6 +10764,10 @@ func (v *TaskListInfo) String() string { fields[i] = fmt.Sprintf("LastUpdatedNanos: %v", *(v.LastUpdatedNanos)) i++ } + if v.AdaptivePartitionConfig != nil { + fields[i] = fmt.Sprintf("AdaptivePartitionConfig: %v", v.AdaptivePartitionConfig) + i++ + } return fmt.Sprintf("TaskListInfo{%v}", strings.Join(fields[:i], ", ")) } @@ -10743,6 +10794,9 @@ func (v *TaskListInfo) Equals(rhs *TaskListInfo) bool { if !_I64_EqualsPtr(v.LastUpdatedNanos, rhs.LastUpdatedNanos) { return false } + if !((v.AdaptivePartitionConfig == nil && rhs.AdaptivePartitionConfig == nil) || (v.AdaptivePartitionConfig != nil && rhs.AdaptivePartitionConfig != nil && v.AdaptivePartitionConfig.Equals(rhs.AdaptivePartitionConfig))) { + return false + } return true } @@ -10765,6 +10819,9 @@ func (v *TaskListInfo) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { if v.LastUpdatedNanos != nil { enc.AddInt64("lastUpdatedNanos", *v.LastUpdatedNanos) } + if v.AdaptivePartitionConfig != nil { + err = multierr.Append(err, enc.AddObject("adaptivePartitionConfig", v.AdaptivePartitionConfig)) + } return err } @@ -10828,6 +10885,359 @@ func (v *TaskListInfo) IsSetLastUpdatedNanos() bool { return v != nil && v.LastUpdatedNanos != nil } +// GetAdaptivePartitionConfig returns the value of AdaptivePartitionConfig if it is set or its +// zero value if it is unset. +func (v *TaskListInfo) GetAdaptivePartitionConfig() (o *TaskListPartitionConfig) { + if v != nil && v.AdaptivePartitionConfig != nil { + return v.AdaptivePartitionConfig + } + + return +} + +// IsSetAdaptivePartitionConfig returns true if AdaptivePartitionConfig is not nil. +func (v *TaskListInfo) IsSetAdaptivePartitionConfig() bool { + return v != nil && v.AdaptivePartitionConfig != nil +} + +type TaskListPartitionConfig struct { + Version *int64 `json:"version,omitempty"` + NumReadPartitions *int32 `json:"numReadPartitions,omitempty"` + NumWritePartitions *int32 `json:"numWritePartitions,omitempty"` +} + +// ToWire translates a TaskListPartitionConfig struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *TaskListPartitionConfig) ToWire() (wire.Value, error) { + var ( + fields [3]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.Version != nil { + w, err = wire.NewValueI64(*(v.Version)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.NumReadPartitions != nil { + w, err = wire.NewValueI32(*(v.NumReadPartitions)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 12, Value: w} + i++ + } + if v.NumWritePartitions != nil { + w, err = wire.NewValueI32(*(v.NumWritePartitions)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 14, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +// FromWire deserializes a TaskListPartitionConfig struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a TaskListPartitionConfig struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v TaskListPartitionConfig +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *TaskListPartitionConfig) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Version = &x + if err != nil { + return err + } + + } + case 12: + if field.Value.Type() == wire.TI32 { + var x int32 + x, err = field.Value.GetI32(), error(nil) + v.NumReadPartitions = &x + if err != nil { + return err + } + + } + case 14: + if field.Value.Type() == wire.TI32 { + var x int32 + x, err = field.Value.GetI32(), error(nil) + v.NumWritePartitions = &x + if err != nil { + return err + } + + } + } + } + + return nil +} + +// Encode serializes a TaskListPartitionConfig struct directly into bytes, without going +// through an intermediary type. +// +// An error is returned if a TaskListPartitionConfig struct could not be encoded. +func (v *TaskListPartitionConfig) Encode(sw stream.Writer) error { + if err := sw.WriteStructBegin(); err != nil { + return err + } + + if v.Version != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 10, Type: wire.TI64}); err != nil { + return err + } + if err := sw.WriteInt64(*(v.Version)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.NumReadPartitions != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 12, Type: wire.TI32}); err != nil { + return err + } + if err := sw.WriteInt32(*(v.NumReadPartitions)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.NumWritePartitions != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 14, Type: wire.TI32}); err != nil { + return err + } + if err := sw.WriteInt32(*(v.NumWritePartitions)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + return sw.WriteStructEnd() +} + +// Decode deserializes a TaskListPartitionConfig struct directly from its Thrift-level +// representation, without going through an intemediary type. +// +// An error is returned if a TaskListPartitionConfig struct could not be generated from the wire +// representation. +func (v *TaskListPartitionConfig) Decode(sr stream.Reader) error { + + if err := sr.ReadStructBegin(); err != nil { + return err + } + + fh, ok, err := sr.ReadFieldBegin() + if err != nil { + return err + } + + for ok { + switch { + case fh.ID == 10 && fh.Type == wire.TI64: + var x int64 + x, err = sr.ReadInt64() + v.Version = &x + if err != nil { + return err + } + + case fh.ID == 12 && fh.Type == wire.TI32: + var x int32 + x, err = sr.ReadInt32() + v.NumReadPartitions = &x + if err != nil { + return err + } + + case fh.ID == 14 && fh.Type == wire.TI32: + var x int32 + x, err = sr.ReadInt32() + v.NumWritePartitions = &x + if err != nil { + return err + } + + default: + if err := sr.Skip(fh.Type); err != nil { + return err + } + } + + if err := sr.ReadFieldEnd(); err != nil { + return err + } + + if fh, ok, err = sr.ReadFieldBegin(); err != nil { + return err + } + } + + if err := sr.ReadStructEnd(); err != nil { + return err + } + + return nil +} + +// String returns a readable string representation of a TaskListPartitionConfig +// struct. +func (v *TaskListPartitionConfig) String() string { + if v == nil { + return "" + } + + var fields [3]string + i := 0 + if v.Version != nil { + fields[i] = fmt.Sprintf("Version: %v", *(v.Version)) + i++ + } + if v.NumReadPartitions != nil { + fields[i] = fmt.Sprintf("NumReadPartitions: %v", *(v.NumReadPartitions)) + i++ + } + if v.NumWritePartitions != nil { + fields[i] = fmt.Sprintf("NumWritePartitions: %v", *(v.NumWritePartitions)) + i++ + } + + return fmt.Sprintf("TaskListPartitionConfig{%v}", strings.Join(fields[:i], ", ")) +} + +// Equals returns true if all the fields of this TaskListPartitionConfig match the +// provided TaskListPartitionConfig. +// +// This function performs a deep comparison. +func (v *TaskListPartitionConfig) Equals(rhs *TaskListPartitionConfig) bool { + if v == nil { + return rhs == nil + } else if rhs == nil { + return false + } + if !_I64_EqualsPtr(v.Version, rhs.Version) { + return false + } + if !_I32_EqualsPtr(v.NumReadPartitions, rhs.NumReadPartitions) { + return false + } + if !_I32_EqualsPtr(v.NumWritePartitions, rhs.NumWritePartitions) { + return false + } + + return true +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of TaskListPartitionConfig. +func (v *TaskListPartitionConfig) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + if v == nil { + return nil + } + if v.Version != nil { + enc.AddInt64("version", *v.Version) + } + if v.NumReadPartitions != nil { + enc.AddInt32("numReadPartitions", *v.NumReadPartitions) + } + if v.NumWritePartitions != nil { + enc.AddInt32("numWritePartitions", *v.NumWritePartitions) + } + return err +} + +// GetVersion returns the value of Version if it is set or its +// zero value if it is unset. +func (v *TaskListPartitionConfig) GetVersion() (o int64) { + if v != nil && v.Version != nil { + return *v.Version + } + + return +} + +// IsSetVersion returns true if Version is not nil. +func (v *TaskListPartitionConfig) IsSetVersion() bool { + return v != nil && v.Version != nil +} + +// GetNumReadPartitions returns the value of NumReadPartitions if it is set or its +// zero value if it is unset. +func (v *TaskListPartitionConfig) GetNumReadPartitions() (o int32) { + if v != nil && v.NumReadPartitions != nil { + return *v.NumReadPartitions + } + + return +} + +// IsSetNumReadPartitions returns true if NumReadPartitions is not nil. +func (v *TaskListPartitionConfig) IsSetNumReadPartitions() bool { + return v != nil && v.NumReadPartitions != nil +} + +// GetNumWritePartitions returns the value of NumWritePartitions if it is set or its +// zero value if it is unset. +func (v *TaskListPartitionConfig) GetNumWritePartitions() (o int32) { + if v != nil && v.NumWritePartitions != nil { + return *v.NumWritePartitions + } + + return +} + +// IsSetNumWritePartitions returns true if NumWritePartitions is not nil. +func (v *TaskListPartitionConfig) IsSetNumWritePartitions() bool { + return v != nil && v.NumWritePartitions != nil +} + type TimerInfo struct { Version *int64 `json:"version,omitempty"` StartedID *int64 `json:"startedID,omitempty"` @@ -17199,11 +17609,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "sqlblobs", Package: "github.com/uber/cadence/.gen/go/sqlblobs", FilePath: "sqlblobs.thrift", - SHA1: "a03e6c0aa72213ea16c9c27d55445dc5849210f6", + SHA1: "9c854b8a522aa30664f56e3bf0e1c54c413d84ff", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n 42: optional binary pendingFailoverMarkers\n 44: optional string pendingFailoverMarkersEncoding\n 46: optional map replicationDlqAckLevel\n 50: optional binary transferProcessingQueueStates\n 51: optional string transferProcessingQueueStatesEncoding\n 55: optional binary timerProcessingQueueStates\n 56: optional string timerProcessingQueueStatesEncoding\n 60: optional binary crossClusterProcessingQueueStates\n 61: optional string crossClusterProcessingQueueStatesEncoding\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n 52: optional i64 (js.type = \"Long\") previousFailoverVersion\n 54: optional i64 (js.type = \"Long\") lastUpdatedTime\n 56: optional binary isolationGroupsConfiguration\n 58: optional string isolationGroupsConfigurationEncoding\n 60: optional binary asyncWorkflowConfiguration\n 62: optional string asyncWorkflowConfigurationEncoding\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n 126: optional binary firstExecutionRunID\n 128: optional map partitionConfig\n 130: optional binary checksum\n 132: optional string checksumEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 29: optional string domainID\n 30: optional string domainName // deprecated\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n 17: optional map partitionConfig\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n 34: optional set targetDomainIDs\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 34: optional binary newRunBranchToken\n 38: optional i64 (js.type = \"Long\") creationTime\n}\n\nenum AsyncRequestType {\n StartWorkflowExecutionAsyncRequest\n SignalWithStartWorkflowExecutionAsyncRequest\n}\n\nstruct AsyncRequestMessage {\n 10: optional string partitionKey\n 12: optional AsyncRequestType type\n 14: optional shared.Header header\n 16: optional string encoding\n 18: optional binary payload\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n 42: optional binary pendingFailoverMarkers\n 44: optional string pendingFailoverMarkersEncoding\n 46: optional map replicationDlqAckLevel\n 50: optional binary transferProcessingQueueStates\n 51: optional string transferProcessingQueueStatesEncoding\n 55: optional binary timerProcessingQueueStates\n 56: optional string timerProcessingQueueStatesEncoding\n 60: optional binary crossClusterProcessingQueueStates\n 61: optional string crossClusterProcessingQueueStatesEncoding\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n 52: optional i64 (js.type = \"Long\") previousFailoverVersion\n 54: optional i64 (js.type = \"Long\") lastUpdatedTime\n 56: optional binary isolationGroupsConfiguration\n 58: optional string isolationGroupsConfigurationEncoding\n 60: optional binary asyncWorkflowConfiguration\n 62: optional string asyncWorkflowConfigurationEncoding\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n 126: optional binary firstExecutionRunID\n 128: optional map partitionConfig\n 130: optional binary checksum\n 132: optional string checksumEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 29: optional string domainID\n 30: optional string domainName // deprecated\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n 17: optional map partitionConfig\n}\n\nstruct TaskListPartitionConfig {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i32 numReadPartitions\n 14: optional i32 numWritePartitions\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n 18: optional TaskListPartitionConfig adaptivePartitionConfig\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n 34: optional set targetDomainIDs\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 34: optional binary newRunBranchToken\n 38: optional i64 (js.type = \"Long\") creationTime\n}\n\nenum AsyncRequestType {\n StartWorkflowExecutionAsyncRequest\n SignalWithStartWorkflowExecutionAsyncRequest\n}\n\nstruct AsyncRequestMessage {\n 10: optional string partitionKey\n 12: optional AsyncRequestType type\n 14: optional shared.Header header\n 16: optional string encoding\n 18: optional binary payload\n}\n" diff --git a/common/log/tag/values.go b/common/log/tag/values.go index f212c435604..ee12f3f318f 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -225,6 +225,7 @@ var ( StoreOperationCompleteTask = storeOperation("complete-task") StoreOperationCompleteTasksLessThan = storeOperation("complete-tasks-less-than") StoreOperationLeaseTaskList = storeOperation("lease-task-list") + StoreOperationGetTaskList = storeOperation("get-task-list") StoreOperationUpdateTaskList = storeOperation("update-task-list") StoreOperationListTaskList = storeOperation("list-task-list") StoreOperationDeleteTaskList = storeOperation("delete-task-list") diff --git a/common/metrics/defs.go b/common/metrics/defs.go index bdcd38796d5..e83959ba3f5 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -213,6 +213,8 @@ const ( PersistenceGetOrphanTasksScope // PersistenceLeaseTaskListScope tracks LeaseTaskList calls made by service to persistence layer PersistenceLeaseTaskListScope + // PersistenceGetTaskListScope tracks GetTaskList calls made by service to persistence layer + PersistenceGetTaskListScope // PersistenceUpdateTaskListScope tracks PersistenceUpdateTaskListScope calls made by service to persistence layer PersistenceUpdateTaskListScope // PersistenceListTaskListScope is the metric scope for persistence.TaskManager.ListTaskList API @@ -1418,6 +1420,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan"}, PersistenceGetOrphanTasksScope: {operation: "GetOrphanTasks"}, PersistenceLeaseTaskListScope: {operation: "LeaseTaskList"}, + PersistenceGetTaskListScope: {operation: "GetTaskList"}, PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"}, PersistenceListTaskListScope: {operation: "ListTaskList"}, PersistenceDeleteTaskListScope: {operation: "DeleteTaskList"}, diff --git a/common/mocks/TaskManager.go b/common/mocks/TaskManager.go index 2a49b3446d7..0005436c4d5 100644 --- a/common/mocks/TaskManager.go +++ b/common/mocks/TaskManager.go @@ -1,6 +1,6 @@ -// Modifications Copyright (c) 2020 Uber Technologies Inc. +// The MIT License (MIT) -// Copyright (c) 2020 Temporal Technologies, Inc. +// Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -162,6 +162,32 @@ func (_m *TaskManager) GetOrphanTasks(ctx context.Context, request *persistence. return r0, r1 } +// GetTaskList provides a mock function with given fields: ctx, request +func (_m *TaskManager) GetTaskList(ctx context.Context, request *persistence.GetTaskListRequest) (*persistence.GetTaskListResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *persistence.GetTaskListResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTaskListRequest) (*persistence.GetTaskListResponse, error)); ok { + return rf(ctx, request) + } + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTaskListRequest) *persistence.GetTaskListResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*persistence.GetTaskListResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *persistence.GetTaskListRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTaskListSize provides a mock function with given fields: ctx, request func (_m *TaskManager) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) { ret := _m.Called(ctx, request) diff --git a/common/persistence/dataManagerInterfaces_mock.go b/common/persistence/dataManagerInterfaces_mock.go index 1eb808e0087..fb1d39c89aa 100644 --- a/common/persistence/dataManagerInterfaces_mock.go +++ b/common/persistence/dataManagerInterfaces_mock.go @@ -843,6 +843,21 @@ func (mr *MockTaskManagerMockRecorder) GetOrphanTasks(arg0, arg1 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrphanTasks", reflect.TypeOf((*MockTaskManager)(nil).GetOrphanTasks), arg0, arg1) } +// GetTaskList mocks base method. +func (m *MockTaskManager) GetTaskList(arg0 context.Context, arg1 *GetTaskListRequest) (*GetTaskListResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTaskList", arg0, arg1) + ret0, _ := ret[0].(*GetTaskListResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTaskList indicates an expected call of GetTaskList. +func (mr *MockTaskManagerMockRecorder) GetTaskList(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTaskList", reflect.TypeOf((*MockTaskManager)(nil).GetTaskList), arg0, arg1) +} + // GetTaskListSize mocks base method. func (m *MockTaskManager) GetTaskListSize(arg0 context.Context, arg1 *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/data_manager_interfaces.go b/common/persistence/data_manager_interfaces.go index 173dcdd91c2..f1763d1d9c5 100644 --- a/common/persistence/data_manager_interfaces.go +++ b/common/persistence/data_manager_interfaces.go @@ -448,14 +448,22 @@ type ( // TaskListInfo describes a state of a task list implementation. TaskListInfo struct { - DomainID string - Name string - TaskType int - RangeID int64 - AckLevel int64 - Kind int - Expiry time.Time - LastUpdated time.Time + DomainID string + Name string + TaskType int + RangeID int64 + AckLevel int64 + Kind int + Expiry time.Time + LastUpdated time.Time + AdaptivePartitionConfig *TaskListPartitionConfig + } + + // TaskListPartitionConfig represents the configuration for task list partitions. + TaskListPartitionConfig struct { + Version int64 + NumReadPartitions int + NumWritePartitions int } // TaskInfo describes either activity or decision task @@ -1018,6 +1026,17 @@ type ( TaskListInfo *TaskListInfo } + GetTaskListRequest struct { + DomainID string + DomainName string + TaskList string + TaskType int + } + + GetTaskListResponse struct { + TaskListInfo *TaskListInfo + } + // UpdateTaskListRequest is used to update task list implementation information UpdateTaskListRequest struct { TaskListInfo *TaskListInfo @@ -1577,6 +1596,7 @@ type ( GetName() string LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) + GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error) ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error) DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index c05a7c9c58e..4ead2bdd1b2 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -57,6 +57,7 @@ type ( Closeable GetName() string LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) + GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error) UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error) DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error diff --git a/common/persistence/nosql/nosql_task_store.go b/common/persistence/nosql/nosql_task_store.go index 395294f6dc1..49f974f404b 100644 --- a/common/persistence/nosql/nosql_task_store.go +++ b/common/persistence/nosql/nosql_task_store.go @@ -141,13 +141,14 @@ func (t *nosqlTaskStore) LeaseTaskList( currTL.RangeID++ err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{ - DomainID: request.DomainID, - TaskListName: request.TaskList, - TaskListType: request.TaskType, - RangeID: currTL.RangeID, - TaskListKind: currTL.TaskListKind, - AckLevel: currTL.AckLevel, - LastUpdatedTime: now, + DomainID: request.DomainID, + TaskListName: request.TaskList, + TaskListType: request.TaskType, + RangeID: currTL.RangeID, + TaskListKind: currTL.TaskListKind, + AckLevel: currTL.AckLevel, + LastUpdatedTime: now, + AdaptivePartitionConfig: currTL.AdaptivePartitionConfig, }, currTL.RangeID-1) } if err != nil { @@ -161,17 +162,47 @@ func (t *nosqlTaskStore) LeaseTaskList( return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err) } tli := &persistence.TaskListInfo{ - DomainID: request.DomainID, - Name: request.TaskList, - TaskType: request.TaskType, - RangeID: currTL.RangeID, - AckLevel: currTL.AckLevel, - Kind: request.TaskListKind, - LastUpdated: now, + DomainID: request.DomainID, + Name: request.TaskList, + TaskType: request.TaskType, + RangeID: currTL.RangeID, + AckLevel: currTL.AckLevel, + Kind: request.TaskListKind, + LastUpdated: now, + AdaptivePartitionConfig: currTL.AdaptivePartitionConfig, } return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil } +func (t *nosqlTaskStore) GetTaskList( + ctx context.Context, + request *persistence.GetTaskListRequest, +) (*persistence.GetTaskListResponse, error) { + storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType) + if err != nil { + return nil, err + } + currTL, err := storeShard.db.SelectTaskList(ctx, &nosqlplugin.TaskListFilter{ + DomainID: request.DomainID, + TaskListName: request.TaskList, + TaskListType: request.TaskType, + }) + if err != nil { + return nil, convertCommonErrors(storeShard.db, "GetTaskList", err) + } + tli := &persistence.TaskListInfo{ + DomainID: request.DomainID, + Name: request.TaskList, + TaskType: request.TaskType, + RangeID: currTL.RangeID, + AckLevel: currTL.AckLevel, + Kind: currTL.TaskListKind, + LastUpdated: currTL.LastUpdatedTime, + AdaptivePartitionConfig: currTL.AdaptivePartitionConfig, + } + return &persistence.GetTaskListResponse{TaskListInfo: tli}, nil +} + func (t *nosqlTaskStore) UpdateTaskList( ctx context.Context, request *persistence.UpdateTaskListRequest, @@ -179,13 +210,14 @@ func (t *nosqlTaskStore) UpdateTaskList( tli := request.TaskListInfo var err error taskListToUpdate := &nosqlplugin.TaskListRow{ - DomainID: tli.DomainID, - TaskListName: tli.Name, - TaskListType: tli.TaskType, - RangeID: tli.RangeID, - TaskListKind: tli.Kind, - AckLevel: tli.AckLevel, - LastUpdatedTime: time.Now(), + DomainID: tli.DomainID, + TaskListName: tli.Name, + TaskListType: tli.TaskType, + RangeID: tli.RangeID, + TaskListKind: tli.Kind, + AckLevel: tli.AckLevel, + LastUpdatedTime: time.Now(), + AdaptivePartitionConfig: tli.AdaptivePartitionConfig, } storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType) if err != nil { diff --git a/common/persistence/nosql/nosql_task_store_test.go b/common/persistence/nosql/nosql_task_store_test.go index 96ed23e3912..fcb0d351436 100644 --- a/common/persistence/nosql/nosql_task_store_test.go +++ b/common/persistence/nosql/nosql_task_store_test.go @@ -24,6 +24,7 @@ package nosql import ( "context" + "errors" "testing" "time" @@ -154,7 +155,8 @@ func TestLeaseTaskList_selectErrNotFound(t *testing.T) { // We then expect the tasklist to be inserted db.EXPECT().InsertTaskList(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, taskList *nosqlplugin.TaskListRow) error { - checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList) + tl := getExpectedTaskListRow() + checkTaskListRowExpected(t, tl, taskList) return nil }) @@ -242,12 +244,34 @@ func TestLeaseTaskList_RenewUpdateFailed_OtherError(t *testing.T) { assert.ErrorContains(t, err, assert.AnError.Error()) } +func TestGetTaskList_Success(t *testing.T) { + store, db := setupNoSQLStoreMocks(t) + + taskListRow := getExpectedTaskListRow() + db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(taskListRow, nil) + resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest()) + + assert.NoError(t, err) + checkTaskListInfoExpected(t, resp.TaskListInfo) +} + +func TestGetTaskList_NotFound(t *testing.T) { + store, db := setupNoSQLStoreMocks(t) + + db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(nil, errors.New("not found")) + db.EXPECT().IsNotFoundError(gomock.Any()).Return(true) + resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest()) + + assert.ErrorAs(t, err, new(*types.EntityNotExistsError)) + assert.Nil(t, resp) +} + func TestUpdateTaskList(t *testing.T) { store, db := setupNoSQLStoreMocks(t) db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn( func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error { - checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList) + checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList) return nil }, ) @@ -265,7 +289,7 @@ func TestUpdateTaskList_Sticky(t *testing.T) { db.EXPECT().UpdateTaskListWithTTL(gomock.Any(), stickyTaskListTTL, gomock.Any(), int64(1)).DoAndReturn( func(ctx context.Context, ttlSeconds int64, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error { - expectedTaskList := getExpectedTaskListRow() + expectedTaskList := getExpectedTaskListRowWithPartitionConfig() expectedTaskList.TaskListKind = int(types.TaskListKindSticky) checkTaskListRowExpected(t, expectedTaskList, taskList) return nil @@ -288,7 +312,7 @@ func TestUpdateTaskList_ConditionFailure(t *testing.T) { db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn( func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error { - checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList) + checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList) return &nosqlplugin.TaskOperationConditionFailure{Details: "test-details"} }, ) @@ -431,6 +455,15 @@ func getValidLeaseTaskListRequest() *persistence.LeaseTaskListRequest { } } +func getValidGetTaskListRequest() *persistence.GetTaskListRequest { + return &persistence.GetTaskListRequest{ + DomainID: TestDomainID, + DomainName: TestDomainName, + TaskList: TestTaskListName, + TaskType: int(types.TaskListTypeDecision), + } +} + func checkTaskListInfoExpected(t *testing.T, taskListInfo *persistence.TaskListInfo) { assert.Equal(t, TestDomainID, taskListInfo.DomainID) assert.Equal(t, TestTaskListName, taskListInfo.Name) @@ -484,6 +517,23 @@ func getExpectedTaskListRow() *nosqlplugin.TaskListRow { } } +func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow { + return &nosqlplugin.TaskListRow{ + DomainID: TestDomainID, + TaskListName: TestTaskListName, + TaskListType: int(types.TaskListTypeDecision), + RangeID: initialRangeID, + TaskListKind: int(types.TaskListKindNormal), + AckLevel: initialAckLevel, + LastUpdatedTime: time.Now(), + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 2, + NumWritePartitions: 2, + }, + } +} + func checkTaskListRowExpected(t *testing.T, expectedRow *nosqlplugin.TaskListRow, taskList *nosqlplugin.TaskListRow) { // Check the duration assert.WithinDuration(t, expectedRow.LastUpdatedTime, taskList.LastUpdatedTime, time.Second) @@ -502,6 +552,11 @@ func getExpectedTaskListInfo() *persistence.TaskListInfo { AckLevel: initialAckLevel, Kind: int(types.TaskListKindNormal), LastUpdated: time.Now(), + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 2, + NumWritePartitions: 2, + }, } } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go index 2c81a31af84..77f9af0acad 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go @@ -70,13 +70,43 @@ func (db *cdb) SelectTaskList(ctx context.Context, filter *nosqlplugin.TaskListF TaskListName: filter.TaskListName, TaskListType: filter.TaskListType, - TaskListKind: taskListKind, - LastUpdatedTime: lastUpdatedTime, - AckLevel: ackLevel, - RangeID: rangeID, + TaskListKind: taskListKind, + LastUpdatedTime: lastUpdatedTime, + AckLevel: ackLevel, + RangeID: rangeID, + AdaptivePartitionConfig: toTaskListPartitionConfig(tlDB["adaptive_partition_config"]), }, nil } +func toTaskListPartitionConfig(v interface{}) *persistence.TaskListPartitionConfig { + if v == nil { + return nil + } + partition := v.(map[string]interface{}) + if len(partition) == 0 { + return nil + } + version := partition["version"].(int64) + numRead := partition["num_read_partitions"].(int) + numWrite := partition["num_write_partitions"].(int) + return &persistence.TaskListPartitionConfig{ + Version: version, + NumReadPartitions: numRead, + NumWritePartitions: numWrite, + } +} + +func fromTaskListPartitionConfig(config *persistence.TaskListPartitionConfig) map[string]interface{} { + if config == nil { + return nil + } + return map[string]interface{}{ + "version": config.Version, + "num_read_partitions": config.NumReadPartitions, + "num_write_partitions": config.NumWritePartitions, + } +} + // InsertTaskList insert a single tasklist row // Return TaskOperationConditionFailure if the condition doesn't meet func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error { @@ -93,6 +123,7 @@ func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) 0, row.TaskListKind, row.LastUpdatedTime, + fromTaskListPartitionConfig(row.AdaptivePartitionConfig), ).WithContext(ctx) previous := make(map[string]interface{}) @@ -119,6 +150,7 @@ func (db *cdb) UpdateTaskList( row.AckLevel, row.TaskListKind, row.LastUpdatedTime, + fromTaskListPartitionConfig(row.AdaptivePartitionConfig), row.DomainID, row.TaskListName, row.TaskListType, @@ -182,6 +214,7 @@ func (db *cdb) UpdateTaskListWithTTL( row.AckLevel, row.TaskListKind, db.timeSrc.Now(), + fromTaskListPartitionConfig(row.AdaptivePartitionConfig), row.DomainID, row.TaskListName, row.TaskListType, diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go index a91654b5834..b2bf9edf706 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go @@ -28,7 +28,8 @@ const ( `type: ?, ` + `ack_level: ?, ` + `kind: ?, ` + - `last_updated: ? ` + + `last_updated: ?, ` + + `adaptive_partition_config: ? ` + `}` templateTaskType = `{` + diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go index 5a824afcf2f..f3418dc0bb6 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go @@ -66,6 +66,11 @@ func TestSelectTaskList(t *testing.T) { (*tlDB)["ack_level"] = int64(1000) (*tlDB)["kind"] = 2 (*tlDB)["last_updated"] = now + (*tlDB)["adaptive_partition_config"] = map[string]interface{}{ + "version": int64(0), + "num_read_partitions": int(1), + "num_write_partitions": int(1), + } return nil }).Times(1) }, @@ -77,6 +82,11 @@ func TestSelectTaskList(t *testing.T) { AckLevel: 1000, RangeID: 25, LastUpdatedTime: now, + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, }, wantQueries: []string{ `SELECT range_id, task_list FROM tasks WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345`, @@ -149,7 +159,7 @@ func TestInsertTaskList(t *testing.T) { wantErr bool }{ { - name: "successfully applied", + name: "successfully applied - nil partition_config", row: &nosqlplugin.TaskListRow{ DomainID: "domain1", TaskListName: "tasklist1", @@ -168,7 +178,36 @@ func TestInsertTaskList(t *testing.T) { wantQueries: []string{ `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, range_id, task_list ) ` + `VALUES (domain1, tasklist1, 1, 1, -12345, 1, ` + - `{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z }` + + `{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] }` + + `) IF NOT EXISTS`, + }, + }, + { + name: "successfully applied - non-nil partition_config", + row: &nosqlplugin.TaskListRow{ + DomainID: "domain1", + TaskListName: "tasklist1", + TaskListType: 1, + TaskListKind: 2, + AckLevel: 1000, + RangeID: 25, + LastUpdatedTime: ts, + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, + }, + queryMockFn: func(query *gocql.MockQuery) { + query.EXPECT().WithContext(gomock.Any()).Return(query).Times(1) + query.EXPECT().MapScanCAS(gomock.Any()).DoAndReturn(func(prev map[string]interface{}) (bool, error) { + return true, nil + }).Times(1) + }, + wantQueries: []string{ + `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, range_id, task_list ) ` + + `VALUES (domain1, tasklist1, 1, 1, -12345, 1, ` + + `{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[num_read_partitions:1 num_write_partitions:1 version:1] }` + `) IF NOT EXISTS`, }, }, @@ -278,7 +317,7 @@ func TestUpdateTaskList(t *testing.T) { }).Times(1) }, wantQueries: []string{ - `UPDATE tasks SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`, + `UPDATE tasks SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`, }, }, { @@ -386,7 +425,7 @@ func TestUpdateTaskListWithTTL(t *testing.T) { mapExecuteBatchCASApplied: true, wantQueries: []string{ ` INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id ) VALUES (domain1, tasklist1, 1, 1, -12345) USING TTL 180`, - `UPDATE tasks USING TTL 180 SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`, + `UPDATE tasks USING TTL 180 SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`, }, }, { diff --git a/common/persistence/nosql/nosqlplugin/types.go b/common/persistence/nosql/nosqlplugin/types.go index ffbadb53148..bea5380c820 100644 --- a/common/persistence/nosql/nosqlplugin/types.go +++ b/common/persistence/nosql/nosqlplugin/types.go @@ -185,10 +185,11 @@ type ( TaskListName string TaskListType int - RangeID int64 - TaskListKind int - AckLevel int64 - LastUpdatedTime time.Time + RangeID int64 + TaskListKind int + AckLevel int64 + LastUpdatedTime time.Time + AdaptivePartitionConfig *persistence.TaskListPartitionConfig } // ListTaskListResult is the result of list tasklists diff --git a/common/persistence/persistence-tests/matchingPersistenceTest.go b/common/persistence/persistence-tests/matchingPersistenceTest.go index 726ba0a85a6..e7bb4166119 100644 --- a/common/persistence/persistence-tests/matchingPersistenceTest.go +++ b/common/persistence/persistence-tests/matchingPersistenceTest.go @@ -348,6 +348,8 @@ func (s *MatchingPersistenceSuite) TestLeaseAndUpdateTaskList() { s.EqualValues(1, tli.RangeID) s.EqualValues(0, tli.AckLevel) s.True(tli.LastUpdated.After(leaseTime) || tli.LastUpdated.Equal(leaseTime)) + s.EqualValues(p.TaskListKindNormal, tli.Kind) + s.Nil(tli.AdaptivePartitionConfig) leaseTime = time.Now() response, err = s.TaskMgr.LeaseTaskList(ctx, &p.LeaseTaskListRequest{ @@ -360,6 +362,8 @@ func (s *MatchingPersistenceSuite) TestLeaseAndUpdateTaskList() { s.EqualValues(2, tli.RangeID) s.EqualValues(0, tli.AckLevel) s.True(tli.LastUpdated.After(leaseTime) || tli.LastUpdated.Equal(leaseTime)) + s.EqualValues(p.TaskListKindNormal, tli.Kind) + s.Nil(tli.AdaptivePartitionConfig) _, err = s.TaskMgr.LeaseTaskList(ctx, &p.LeaseTaskListRequest{ DomainID: domainID, @@ -378,17 +382,41 @@ func (s *MatchingPersistenceSuite) TestLeaseAndUpdateTaskList() { RangeID: 2, AckLevel: 0, Kind: p.TaskListKindNormal, + AdaptivePartitionConfig: &p.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 2, + NumWritePartitions: 2, + }, } _, err = s.TaskMgr.UpdateTaskList(ctx, &p.UpdateTaskListRequest{ TaskListInfo: taskListInfo, }) s.NoError(err) + var resp *p.GetTaskListResponse + resp, err = s.TaskMgr.GetTaskList(ctx, &p.GetTaskListRequest{ + DomainID: domainID, + TaskList: taskList, + TaskType: p.TaskListTypeActivity, + }) + s.NoError(err) + tli = resp.TaskListInfo + s.EqualValues(2, tli.RangeID) + s.EqualValues(0, tli.AckLevel) + s.True(tli.LastUpdated.After(leaseTime) || tli.LastUpdated.Equal(leaseTime)) + s.EqualValues(p.TaskListKindNormal, tli.Kind) + s.NotNil(tli.AdaptivePartitionConfig) + s.EqualValues(1, tli.AdaptivePartitionConfig.Version) + s.EqualValues(2, tli.AdaptivePartitionConfig.NumReadPartitions) + s.EqualValues(2, tli.AdaptivePartitionConfig.NumWritePartitions) + taskListInfo.RangeID = 3 _, err = s.TaskMgr.UpdateTaskList(ctx, &p.UpdateTaskListRequest{ TaskListInfo: taskListInfo, }) s.Error(err) + _, ok = err.(*p.ConditionFailedError) + s.True(ok) } // TestLeaseAndUpdateTaskListSticky test @@ -410,6 +438,7 @@ func (s *MatchingPersistenceSuite) TestLeaseAndUpdateTaskListSticky() { s.EqualValues(1, tli.RangeID) s.EqualValues(0, tli.AckLevel) s.EqualValues(p.TaskListKindSticky, tli.Kind) + s.Nil(tli.AdaptivePartitionConfig) taskListInfo := &p.TaskListInfo{ DomainID: domainID, diff --git a/common/persistence/serialization/interfaces.go b/common/persistence/serialization/interfaces.go index eff61c54311..9a17cc29b91 100644 --- a/common/persistence/serialization/interfaces.go +++ b/common/persistence/serialization/interfaces.go @@ -261,12 +261,18 @@ type ( PartitionConfig map[string]string } + TaskListPartitionConfig struct { + Version int64 + NumReadPartitions int32 + NumWritePartitions int32 + } // TaskListInfo blob in a serialization agnostic format TaskListInfo struct { - Kind int16 - AckLevel int64 - ExpiryTimestamp time.Time - LastUpdated time.Time + Kind int16 + AckLevel int64 + ExpiryTimestamp time.Time + LastUpdated time.Time + AdaptivePartitionConfig *TaskListPartitionConfig } // TransferTaskInfo blob in a serialization agnostic format diff --git a/common/persistence/serialization/thrift_mapper.go b/common/persistence/serialization/thrift_mapper.go index affa3703a93..2e30a9eb206 100644 --- a/common/persistence/serialization/thrift_mapper.go +++ b/common/persistence/serialization/thrift_mapper.go @@ -556,15 +556,38 @@ func taskInfoFromThrift(info *sqlblobs.TaskInfo) *TaskInfo { } } +func taskListPartitionConfigToThrift(info *TaskListPartitionConfig) *sqlblobs.TaskListPartitionConfig { + if info == nil { + return nil + } + return &sqlblobs.TaskListPartitionConfig{ + Version: &info.Version, + NumReadPartitions: &info.NumReadPartitions, + NumWritePartitions: &info.NumWritePartitions, + } +} + +func taskListParititionConfigFromThrift(info *sqlblobs.TaskListPartitionConfig) *TaskListPartitionConfig { + if info == nil { + return nil + } + return &TaskListPartitionConfig{ + Version: info.GetVersion(), + NumReadPartitions: info.GetNumReadPartitions(), + NumWritePartitions: info.GetNumWritePartitions(), + } +} + func taskListInfoToThrift(info *TaskListInfo) *sqlblobs.TaskListInfo { if info == nil { return nil } return &sqlblobs.TaskListInfo{ - Kind: &info.Kind, - AckLevel: &info.AckLevel, - ExpiryTimeNanos: timeToUnixNanoPtr(info.ExpiryTimestamp), - LastUpdatedNanos: timeToUnixNanoPtr(info.LastUpdated), + Kind: &info.Kind, + AckLevel: &info.AckLevel, + ExpiryTimeNanos: timeToUnixNanoPtr(info.ExpiryTimestamp), + LastUpdatedNanos: timeToUnixNanoPtr(info.LastUpdated), + AdaptivePartitionConfig: taskListPartitionConfigToThrift(info.AdaptivePartitionConfig), } } @@ -573,10 +596,11 @@ func taskListInfoFromThrift(info *sqlblobs.TaskListInfo) *TaskListInfo { return nil } return &TaskListInfo{ - Kind: info.GetKind(), - AckLevel: info.GetAckLevel(), - ExpiryTimestamp: timeFromUnixNano(info.GetExpiryTimeNanos()), - LastUpdated: timeFromUnixNano(info.GetLastUpdatedNanos()), + Kind: info.GetKind(), + AckLevel: info.GetAckLevel(), + ExpiryTimestamp: timeFromUnixNano(info.GetExpiryTimeNanos()), + LastUpdated: timeFromUnixNano(info.GetLastUpdatedNanos()), + AdaptivePartitionConfig: taskListParititionConfigFromThrift(info.AdaptivePartitionConfig), } } diff --git a/common/persistence/serialization/thrift_mapper_test.go b/common/persistence/serialization/thrift_mapper_test.go index 6766efef1f6..684f99a9409 100644 --- a/common/persistence/serialization/thrift_mapper_test.go +++ b/common/persistence/serialization/thrift_mapper_test.go @@ -449,12 +449,18 @@ func TestTaskListInfo(t *testing.T) { AckLevel: int64(rand.Intn(1000)), ExpiryTimestamp: time.Now(), LastUpdated: time.Now(), + AdaptivePartitionConfig: &TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 2, + }, } actual := taskListInfoFromThrift(taskListInfoToThrift(expected)) assert.Equal(t, expected.Kind, actual.Kind) assert.Equal(t, expected.AckLevel, actual.AckLevel) assert.Equal(t, expected.LastUpdated.Sub(actual.LastUpdated), time.Duration(0)) assert.Equal(t, expected.ExpiryTimestamp.Sub(actual.ExpiryTimestamp), time.Duration(0)) + assert.Equal(t, expected.AdaptivePartitionConfig, actual.AdaptivePartitionConfig) } func TestTransferTaskInfo(t *testing.T) { diff --git a/common/persistence/sql/sql_task_store.go b/common/persistence/sql/sql_task_store.go index edaccebc351..01430468874 100644 --- a/common/persistence/sql/sql_task_store.go +++ b/common/persistence/sql/sql_task_store.go @@ -189,31 +189,92 @@ func (m *sqlTaskStore) LeaseTaskList( if rowsAffected == 0 { return fmt.Errorf("%v rows affected instead of 1", rowsAffected) } + var c *persistence.TaskListPartitionConfig + if tlInfo.AdaptivePartitionConfig != nil { + c = &persistence.TaskListPartitionConfig{ + Version: tlInfo.AdaptivePartitionConfig.Version, + NumReadPartitions: int(tlInfo.AdaptivePartitionConfig.NumReadPartitions), + NumWritePartitions: int(tlInfo.AdaptivePartitionConfig.NumWritePartitions), + } + } resp = &persistence.LeaseTaskListResponse{TaskListInfo: &persistence.TaskListInfo{ - DomainID: request.DomainID, - Name: request.TaskList, - TaskType: request.TaskType, - RangeID: rangeID + 1, - AckLevel: ackLevel, - Kind: request.TaskListKind, - LastUpdated: now, + DomainID: request.DomainID, + Name: request.TaskList, + TaskType: request.TaskType, + RangeID: rangeID + 1, + AckLevel: ackLevel, + Kind: request.TaskListKind, + LastUpdated: now, + AdaptivePartitionConfig: c, }} return nil }) return resp, err } +func (m *sqlTaskStore) GetTaskList( + ctx context.Context, + request *persistence.GetTaskListRequest, +) (*persistence.GetTaskListResponse, error) { + dbShardID := sqlplugin.GetDBShardIDFromDomainIDAndTasklist(request.DomainID, request.TaskList, m.db.GetTotalNumDBShards()) + + domainID := serialization.MustParseUUID(request.DomainID) + rows, err := m.db.SelectFromTaskLists(ctx, &sqlplugin.TaskListsFilter{ + ShardID: dbShardID, + DomainID: &domainID, + Name: &request.TaskList, + TaskType: common.Int64Ptr(int64(request.TaskType))}) + if err != nil { + return nil, convertCommonErrors(m.db, "GetTaskList", "", err) + } + row := rows[0] + tlInfo, err := m.parser.TaskListInfoFromBlob(row.Data, row.DataEncoding) + if err != nil { + return nil, err + } + var c *persistence.TaskListPartitionConfig + if tlInfo.AdaptivePartitionConfig != nil { + c = &persistence.TaskListPartitionConfig{ + Version: tlInfo.AdaptivePartitionConfig.Version, + NumReadPartitions: int(tlInfo.AdaptivePartitionConfig.NumReadPartitions), + NumWritePartitions: int(tlInfo.AdaptivePartitionConfig.NumWritePartitions), + } + } + return &persistence.GetTaskListResponse{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: request.DomainID, + Name: request.TaskList, + TaskType: request.TaskType, + RangeID: row.RangeID, + AckLevel: tlInfo.AckLevel, + Kind: int(tlInfo.Kind), + Expiry: tlInfo.ExpiryTimestamp, + LastUpdated: tlInfo.LastUpdated, + AdaptivePartitionConfig: c, + }, + }, nil +} + func (m *sqlTaskStore) UpdateTaskList( ctx context.Context, request *persistence.UpdateTaskListRequest, ) (*persistence.UpdateTaskListResponse, error) { dbShardID := sqlplugin.GetDBShardIDFromDomainIDAndTasklist(request.TaskListInfo.DomainID, request.TaskListInfo.Name, m.db.GetTotalNumDBShards()) domainID := serialization.MustParseUUID(request.TaskListInfo.DomainID) + var c *serialization.TaskListPartitionConfig + if request.TaskListInfo.AdaptivePartitionConfig != nil { + c = &serialization.TaskListPartitionConfig{ + Version: request.TaskListInfo.AdaptivePartitionConfig.Version, + NumReadPartitions: int32(request.TaskListInfo.AdaptivePartitionConfig.NumReadPartitions), + NumWritePartitions: int32(request.TaskListInfo.AdaptivePartitionConfig.NumWritePartitions), + } + } tlInfo := &serialization.TaskListInfo{ - AckLevel: request.TaskListInfo.AckLevel, - Kind: int16(request.TaskListInfo.Kind), - ExpiryTimestamp: time.Unix(0, 0), - LastUpdated: time.Now(), + AckLevel: request.TaskListInfo.AckLevel, + Kind: int16(request.TaskListInfo.Kind), + ExpiryTimestamp: time.Unix(0, 0), + LastUpdated: time.Now(), + AdaptivePartitionConfig: c, } if request.TaskListInfo.Kind == persistence.TaskListKindSticky { tlInfo.ExpiryTimestamp = stickyTaskListExpiry() diff --git a/common/persistence/sql/sql_task_store_test.go b/common/persistence/sql/sql_task_store_test.go index b932149c0bf..bd16ad05c70 100644 --- a/common/persistence/sql/sql_task_store_test.go +++ b/common/persistence/sql/sql_task_store_test.go @@ -137,27 +137,116 @@ func TestLeaseTaskList(t *testing.T) { Name: common.StringPtr("tl"), TaskType: common.Int64Ptr(0), }).Return(nil, sql.ErrNoRows) + mockParser.EXPECT().TaskListInfoToBlob(gomock.Any()).DoAndReturn(func(info *serialization.TaskListInfo) (persistence.DataBlob, error) { + assert.Equal(t, int16(persistence.TaskListKindSticky), info.Kind) + assert.Equal(t, int64(0), info.AckLevel) + return persistence.DataBlob{ + Data: []byte(`tl`), + Encoding: common.EncodingType("tl"), + }, nil + }) + mockDB.EXPECT().SupportsTTL().Return(true) + mockDB.EXPECT().InsertIntoTaskListsWithTTL(gomock.Any(), &sqlplugin.TaskListsRowWithTTL{ + TaskListsRow: sqlplugin.TaskListsRow{ + ShardID: 0, + DomainID: serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), + Name: "tl", + TaskType: 0, + Data: []byte(`tl`), + DataEncoding: "tl", + }, + TTL: stickyTasksListsTTL, + }).Return(nil, nil) + mockParser.EXPECT().TaskListInfoFromBlob([]byte(`tl`), "tl").Return(&serialization.TaskListInfo{ + AckLevel: 0, + Kind: int16(persistence.TaskListKindSticky), + ExpiryTimestamp: time.Unix(0, 0), + LastUpdated: time.Unix(0, 1), + }, nil) + mockDB.EXPECT().BeginTx(gomock.Any(), 0).Return(mockTx, nil) + mockTx.EXPECT().LockTaskLists(gomock.Any(), &sqlplugin.TaskListsFilter{ + ShardID: 0, + DomainID: serialization.UUIDPtr(serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0")), + Name: common.StringPtr("tl"), + TaskType: common.Int64Ptr(0), + }).Return(int64(0), nil) mockParser.EXPECT().TaskListInfoToBlob(gomock.Any()).Return(persistence.DataBlob{ Data: []byte(`tl`), Encoding: common.EncodingType("tl"), }, nil) mockDB.EXPECT().SupportsTTL().Return(true) - mockDB.EXPECT().InsertIntoTaskListsWithTTL(gomock.Any(), &sqlplugin.TaskListsRowWithTTL{ + mockTx.EXPECT().UpdateTaskListsWithTTL(gomock.Any(), &sqlplugin.TaskListsRowWithTTL{ TaskListsRow: sqlplugin.TaskListsRow{ ShardID: 0, DomainID: serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), Name: "tl", TaskType: 0, + RangeID: 1, Data: []byte(`tl`), DataEncoding: "tl", }, TTL: stickyTasksListsTTL, - }).Return(nil, nil) + }).Return(&sqlResult{rowsAffected: 1}, nil) + mockTx.EXPECT().Commit().Return(nil) + }, + want: &persistence.LeaseTaskListResponse{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + Name: "tl", + TaskType: 0, + RangeID: 1, + AckLevel: 0, + Kind: persistence.TaskListKindSticky, + }, + }, + wantErr: false, + }, + { + name: "Success case - first lease - normal tasklist", + req: &persistence.LeaseTaskListRequest{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + TaskList: "tl", + TaskType: 0, + TaskListKind: persistence.TaskListKindNormal, + RangeID: 0, + }, + mockSetup: func(mockDB *sqlplugin.MockDB, mockTx *sqlplugin.MockTx, mockParser *serialization.MockParser) { + mockDB.EXPECT().GetTotalNumDBShards().Return(1) + mockDB.EXPECT().SelectFromTaskLists(gomock.Any(), &sqlplugin.TaskListsFilter{ + ShardID: 0, + DomainID: serialization.UUIDPtr(serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0")), + Name: common.StringPtr("tl"), + TaskType: common.Int64Ptr(0), + }).Return(nil, sql.ErrNoRows) + mockParser.EXPECT().TaskListInfoToBlob(gomock.Any()).DoAndReturn(func(info *serialization.TaskListInfo) (persistence.DataBlob, error) { + assert.Equal(t, int16(persistence.TaskListKindNormal), info.Kind) + assert.Equal(t, int64(0), info.AckLevel) + return persistence.DataBlob{ + Data: []byte(`tl`), + Encoding: common.EncodingType("tl"), + }, nil + }) + mockDB.EXPECT().SupportsTTL().Return(true) + mockDB.EXPECT().InsertIntoTaskLists(gomock.Any(), + &sqlplugin.TaskListsRow{ + ShardID: 0, + DomainID: serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), + Name: "tl", + TaskType: 0, + Data: []byte(`tl`), + DataEncoding: "tl", + }, + ).Return(nil, nil) mockParser.EXPECT().TaskListInfoFromBlob([]byte(`tl`), "tl").Return(&serialization.TaskListInfo{ AckLevel: 0, Kind: int16(persistence.TaskListKindSticky), ExpiryTimestamp: time.Unix(0, 0), LastUpdated: time.Unix(0, 1), + AdaptivePartitionConfig: &serialization.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, }, nil) mockDB.EXPECT().BeginTx(gomock.Any(), 0).Return(mockTx, nil) mockTx.EXPECT().LockTaskLists(gomock.Any(), &sqlplugin.TaskListsFilter{ @@ -192,7 +281,12 @@ func TestLeaseTaskList(t *testing.T) { TaskType: 0, RangeID: 1, AckLevel: 0, - Kind: persistence.TaskListKindSticky, + Kind: persistence.TaskListKindNormal, + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, }, }, wantErr: false, @@ -434,6 +528,111 @@ func TestLeaseTaskList(t *testing.T) { } } +func TestGetTaskList(t *testing.T) { + testCases := []struct { + name string + req *persistence.GetTaskListRequest + mockSetup func(*sqlplugin.MockDB, *serialization.MockParser) + want *persistence.GetTaskListResponse + wantErr bool + }{ + { + name: "Success case", + req: &persistence.GetTaskListRequest{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + TaskList: "tl", + TaskType: 1, + }, + mockSetup: func(mockDB *sqlplugin.MockDB, mockParser *serialization.MockParser) { + mockDB.EXPECT().GetTotalNumDBShards().Return(1) + mockDB.EXPECT().SelectFromTaskLists(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) { + assert.Equal(t, serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), *filter.DomainID) + assert.Equal(t, "tl", *filter.Name) + assert.Equal(t, int64(1), *filter.TaskType) + return []sqlplugin.TaskListsRow{ + { + ShardID: 11, + DomainID: serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), + Name: "tl", + TaskType: 1, + RangeID: 123, + Data: []byte(`tl`), + DataEncoding: "tl", + }, + }, nil + }) + mockParser.EXPECT().TaskListInfoFromBlob([]byte(`tl`), "tl").Return(&serialization.TaskListInfo{ + Kind: 1, + AckLevel: 2, + ExpiryTimestamp: time.Unix(1, 4), + LastUpdated: time.Unix(10, 0), + AdaptivePartitionConfig: &serialization.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, + }, nil) + }, + want: &persistence.GetTaskListResponse{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + Name: "tl", + TaskType: 1, + RangeID: 123, + Kind: 1, + AckLevel: 2, + Expiry: time.Unix(1, 4), + LastUpdated: time.Unix(10, 0), + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, + }, + }, + wantErr: false, + }, + { + name: "Error case", + req: &persistence.GetTaskListRequest{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + TaskList: "tl", + TaskType: 0, + }, + mockSetup: func(mockDB *sqlplugin.MockDB, mockParser *serialization.MockParser) { + err := errors.New("some error") + mockDB.EXPECT().GetTotalNumDBShards().Return(1) + mockDB.EXPECT().SelectFromTaskLists(gomock.Any(), gomock.Any()).Return(nil, err) + mockDB.EXPECT().IsNotFoundError(err).Return(true) + }, + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockDB := sqlplugin.NewMockDB(ctrl) + mockParser := serialization.NewMockParser(ctrl) + store := &sqlTaskStore{ + sqlStore: sqlStore{db: mockDB, parser: mockParser}, + nShards: 1000, + } + + tc.mockSetup(mockDB, mockParser) + + got, err := store.GetTaskList(context.Background(), tc.req) + if tc.wantErr { + assert.Error(t, err, "Expected an error for test case") + } else { + assert.NoError(t, err, "Did not expect an error for test case") + assert.Equal(t, tc.want, got, "Unexpected result for test case") + } + }) + } +} + func TestUpdateTaskList(t *testing.T) { testCases := []struct { name string @@ -485,6 +684,60 @@ func TestUpdateTaskList(t *testing.T) { want: &persistence.UpdateTaskListResponse{}, wantErr: false, }, + { + name: "Success case - normal tasklist", + req: &persistence.UpdateTaskListRequest{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: "c9488dc7-20b2-44c3-b2e4-bfea5af62ac0", + Name: "tl", + TaskType: 0, + RangeID: 1, + AckLevel: 0, + Kind: persistence.TaskListKindNormal, + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 0, + NumReadPartitions: 1, + NumWritePartitions: 1, + }, + }, + }, + mockSetup: func(mockDB *sqlplugin.MockDB, mockTx *sqlplugin.MockTx, mockParser *serialization.MockParser) { + mockDB.EXPECT().GetTotalNumDBShards().Return(1) + mockParser.EXPECT().TaskListInfoToBlob(gomock.Any()).DoAndReturn(func(info *serialization.TaskListInfo) (persistence.DataBlob, error) { + assert.Equal(t, int16(persistence.TaskListKindNormal), info.Kind) + assert.Equal(t, int64(0), info.AckLevel) + assert.Equal(t, int64(0), info.AdaptivePartitionConfig.Version) + assert.Equal(t, int32(1), info.AdaptivePartitionConfig.NumReadPartitions) + assert.Equal(t, int32(1), info.AdaptivePartitionConfig.NumWritePartitions) + return persistence.DataBlob{ + Data: []byte(`tl`), + Encoding: common.EncodingType("tl"), + }, nil + }) + mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil) + mockTx.EXPECT().LockTaskLists(gomock.Any(), &sqlplugin.TaskListsFilter{ + ShardID: 0, + DomainID: serialization.UUIDPtr(serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0")), + Name: common.StringPtr("tl"), + TaskType: common.Int64Ptr(0), + }).Return(int64(1), nil) + mockDB.EXPECT().SupportsTTL().Return(true) + mockTx.EXPECT().UpdateTaskLists(gomock.Any(), + &sqlplugin.TaskListsRow{ + ShardID: 0, + DomainID: serialization.MustParseUUID("c9488dc7-20b2-44c3-b2e4-bfea5af62ac0"), + Name: "tl", + TaskType: 0, + RangeID: 1, + Data: []byte(`tl`), + DataEncoding: "tl", + }, + ).Return(&sqlResult{rowsAffected: 1}, nil) + mockTx.EXPECT().Commit().Return(nil) + }, + want: &persistence.UpdateTaskListResponse{}, + wantErr: false, + }, { name: "Error case - failed to lock task list", req: &persistence.UpdateTaskListRequest{ diff --git a/common/persistence/taskManager.go b/common/persistence/taskManager.go index f0e22445bc0..cab366aea26 100644 --- a/common/persistence/taskManager.go +++ b/common/persistence/taskManager.go @@ -57,6 +57,10 @@ func (t *taskManager) LeaseTaskList(ctx context.Context, request *LeaseTaskListR return t.persistence.LeaseTaskList(ctx, request) } +func (t *taskManager) GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error) { + return t.persistence.GetTaskList(ctx, request) +} + func (t *taskManager) UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) { return t.persistence.UpdateTaskList(ctx, request) } diff --git a/common/persistence/wrappers/errorinjectors/injectors_test.go b/common/persistence/wrappers/errorinjectors/injectors_test.go index b21f640dda4..a75f72bdb0f 100644 --- a/common/persistence/wrappers/errorinjectors/injectors_test.go +++ b/common/persistence/wrappers/errorinjectors/injectors_test.go @@ -241,6 +241,7 @@ func builderForPassThrough(t *testing.T, injector any, errorRate float64, logger mocked.EXPECT().GetOrphanTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetOrphanTasksResponse{}, expectedErr) mocked.EXPECT().GetTaskListSize(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListSizeResponse{}, expectedErr) mocked.EXPECT().LeaseTaskList(gomock.Any(), gomock.Any()).Return(&persistence.LeaseTaskListResponse{}, expectedErr) + mocked.EXPECT().GetTaskList(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListResponse{}, expectedErr) mocked.EXPECT().ListTaskList(gomock.Any(), gomock.Any()).Return(&persistence.ListTaskListResponse{}, expectedErr) mocked.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any()).Return(&persistence.UpdateTaskListResponse{}, expectedErr) } diff --git a/common/persistence/wrappers/errorinjectors/task_generated.go b/common/persistence/wrappers/errorinjectors/task_generated.go index 49b76e8d82a..e0ee540328d 100644 --- a/common/persistence/wrappers/errorinjectors/task_generated.go +++ b/common/persistence/wrappers/errorinjectors/task_generated.go @@ -137,6 +137,21 @@ func (c *injectorTaskManager) GetOrphanTasks(ctx context.Context, request *persi return } +func (c *injectorTaskManager) GetTaskList(ctx context.Context, request *persistence.GetTaskListRequest) (gp1 *persistence.GetTaskListResponse, err error) { + fakeErr := generateFakeError(c.errorRate) + var forwardCall bool + if forwardCall = shouldForwardCallToPersistence(fakeErr); forwardCall { + gp1, err = c.wrapped.GetTaskList(ctx, request) + } + + if fakeErr != nil { + logErr(c.logger, "TaskManager.GetTaskList", fakeErr, forwardCall, err) + err = fakeErr + return + } + return +} + func (c *injectorTaskManager) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (gp1 *persistence.GetTaskListSizeResponse, err error) { fakeErr := generateFakeError(c.errorRate) var forwardCall bool diff --git a/common/persistence/wrappers/errorinjectors/utils.go b/common/persistence/wrappers/errorinjectors/utils.go index c741a82dd2b..7f16d1153cb 100644 --- a/common/persistence/wrappers/errorinjectors/utils.go +++ b/common/persistence/wrappers/errorinjectors/utils.go @@ -300,6 +300,8 @@ func taskManagerTags(op string) *tag.Tag { switch op { case "TaskManager.LeaseTaskList": return &tag.StoreOperationLeaseTaskList + case "TaskManager.GetTaskList": + return &tag.StoreOperationGetTaskList case "TaskManager.UpdateTaskList": return &tag.StoreOperationUpdateTaskList case "TaskManager.CreateTasks": diff --git a/common/persistence/wrappers/metered/metered_test.go b/common/persistence/wrappers/metered/metered_test.go index 9232b68d229..a342b44dff3 100644 --- a/common/persistence/wrappers/metered/metered_test.go +++ b/common/persistence/wrappers/metered/metered_test.go @@ -235,6 +235,7 @@ func prepareMockForTest(t *testing.T, input interface{}, expectedErr error) { mocked.EXPECT().GetOrphanTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetOrphanTasksResponse{}, expectedErr).Times(1) mocked.EXPECT().GetTaskListSize(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListSizeResponse{}, expectedErr).Times(1) mocked.EXPECT().LeaseTaskList(gomock.Any(), gomock.Any()).Return(&persistence.LeaseTaskListResponse{}, expectedErr).Times(1) + mocked.EXPECT().GetTaskList(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListResponse{}, expectedErr).Times(1) mocked.EXPECT().ListTaskList(gomock.Any(), gomock.Any()).Return(&persistence.ListTaskListResponse{}, expectedErr).Times(1) mocked.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any()).Return(&persistence.UpdateTaskListResponse{}, expectedErr).Times(1) case *persistence.MockVisibilityManager: diff --git a/common/persistence/wrappers/metered/task_generated.go b/common/persistence/wrappers/metered/task_generated.go index 69c8eb681af..d8cef308b0f 100644 --- a/common/persistence/wrappers/metered/task_generated.go +++ b/common/persistence/wrappers/metered/task_generated.go @@ -122,6 +122,17 @@ func (c *meteredTaskManager) GetOrphanTasks(ctx context.Context, request *persis return } +func (c *meteredTaskManager) GetTaskList(ctx context.Context, request *persistence.GetTaskListRequest) (gp1 *persistence.GetTaskListResponse, err error) { + op := func() error { + gp1, err = c.wrapped.GetTaskList(ctx, request) + c.emptyMetric("TaskManager.GetTaskList", request, gp1, err) + return err + } + + err = c.call(metrics.PersistenceGetTaskListScope, op, getCustomMetricTags(request)...) + return +} + func (c *meteredTaskManager) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (gp1 *persistence.GetTaskListSizeResponse, err error) { op := func() error { gp1, err = c.wrapped.GetTaskListSize(ctx, request) diff --git a/common/persistence/wrappers/ratelimited/task_generated.go b/common/persistence/wrappers/ratelimited/task_generated.go index 1e4c25dbeb8..457b4bcada6 100644 --- a/common/persistence/wrappers/ratelimited/task_generated.go +++ b/common/persistence/wrappers/ratelimited/task_generated.go @@ -99,6 +99,14 @@ func (c *ratelimitedTaskManager) GetOrphanTasks(ctx context.Context, request *pe return c.wrapped.GetOrphanTasks(ctx, request) } +func (c *ratelimitedTaskManager) GetTaskList(ctx context.Context, request *persistence.GetTaskListRequest) (gp1 *persistence.GetTaskListResponse, err error) { + if ok := c.rateLimiter.Allow(); !ok { + err = ErrPersistenceLimitExceeded + return + } + return c.wrapped.GetTaskList(ctx, request) +} + func (c *ratelimitedTaskManager) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (gp1 *persistence.GetTaskListSizeResponse, err error) { if ok := c.rateLimiter.Allow(); !ok { err = ErrPersistenceLimitExceeded diff --git a/common/persistence/wrappers/ratelimited/wrappers_test.go b/common/persistence/wrappers/ratelimited/wrappers_test.go index 93c3f0ad76d..8afb06fb944 100644 --- a/common/persistence/wrappers/ratelimited/wrappers_test.go +++ b/common/persistence/wrappers/ratelimited/wrappers_test.go @@ -226,6 +226,7 @@ func builderForPassThrough(t *testing.T, injector any, limiter quotas.Limiter, e mocked.EXPECT().GetOrphanTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetOrphanTasksResponse{}, expectedErr) mocked.EXPECT().GetTaskListSize(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListSizeResponse{}, expectedErr) mocked.EXPECT().LeaseTaskList(gomock.Any(), gomock.Any()).Return(&persistence.LeaseTaskListResponse{}, expectedErr) + mocked.EXPECT().GetTaskList(gomock.Any(), gomock.Any()).Return(&persistence.GetTaskListResponse{}, expectedErr) mocked.EXPECT().ListTaskList(gomock.Any(), gomock.Any()).Return(&persistence.ListTaskListResponse{}, expectedErr) mocked.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any()).Return(&persistence.UpdateTaskListResponse{}, expectedErr) } diff --git a/idls b/idls index 55e4ad43798..279b1f70dfb 160000 --- a/idls +++ b/idls @@ -1 +1 @@ -Subproject commit 55e4ad437989730379baf570f00a11ddcb7d3a75 +Subproject commit 279b1f70dfb8aa54bc26fa22436eb0d6b93efabc diff --git a/schema/cassandra/cadence/schema.cql b/schema/cassandra/cadence/schema.cql index 768e71d153f..907c7681160 100644 --- a/schema/cassandra/cadence/schema.cql +++ b/schema/cassandra/cadence/schema.cql @@ -260,13 +260,21 @@ CREATE TYPE task ( partition_config map ); +CREATE TYPE task_list_partition_config ( + version bigint, + num_read_partitions int, + num_write_partitions int +); + + CREATE TYPE task_list ( domain_id uuid, name text, type int, -- enum TaskRowType {ActivityTask, DecisionTask} ack_level bigint, -- task_id of the last acknowledged message kind int, -- enum TaskListKind {Normal, Sticky} - last_updated timestamp + last_updated timestamp, + adaptive_partition_config frozen ); CREATE TYPE domain ( diff --git a/schema/cassandra/cadence/versioned/v0.38/manifest.json b/schema/cassandra/cadence/versioned/v0.38/manifest.json new file mode 100644 index 00000000000..9a0feb10b90 --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.38/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.38", + "MinCompatibleVersion": "0.38", + "Description": "Adding the task list partition config to task list type", + "SchemaUpdateCqlFiles": [ + "task_list_partition_config.cql" + ] +} diff --git a/schema/cassandra/cadence/versioned/v0.38/task_list_partition_config.cql b/schema/cassandra/cadence/versioned/v0.38/task_list_partition_config.cql new file mode 100644 index 00000000000..019bb66ed1f --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.38/task_list_partition_config.cql @@ -0,0 +1,7 @@ +CREATE TYPE task_list_partition_config ( + version bigint, + num_read_partitions int, + num_write_partitions int +); + +ALTER TYPE task_list ADD adaptive_partition_config frozen; diff --git a/schema/cassandra/version.go b/schema/cassandra/version.go index 7fd89de7826..1c6bbc66b04 100644 --- a/schema/cassandra/version.go +++ b/schema/cassandra/version.go @@ -23,7 +23,7 @@ package cassandra // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the Cassandra database release version -const Version = "0.37" +const Version = "0.38" // VisibilityVersion is the Cassandra visibility database release version const VisibilityVersion = "0.9" diff --git a/service/matching/tasklist/testing.go b/service/matching/tasklist/testing.go index fe88e450fe2..87c3a8f7cad 100644 --- a/service/matching/tasklist/testing.go +++ b/service/matching/tasklist/testing.go @@ -100,6 +100,13 @@ func (m *TestTaskManager) LeaseTaskList( }, nil } +func (m *TestTaskManager) GetTaskList( + _ context.Context, + request *persistence.GetTaskListRequest, +) (*persistence.GetTaskListResponse, error) { + return nil, fmt.Errorf("not implemented") +} + // UpdateTaskList provides a mock function with given fields: ctx, request func (m *TestTaskManager) UpdateTaskList( _ context.Context, diff --git a/tools/common/schema/updatetask_test.go b/tools/common/schema/updatetask_test.go index 602fc249093..dba2c61e7a0 100644 --- a/tools/common/schema/updatetask_test.go +++ b/tools/common/schema/updatetask_test.go @@ -108,7 +108,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() { s.NoError(err) ans, err := readSchemaDir(fsys, "0.30", "") s.NoError(err) - s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37"}, ans) + s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37", "v0.38"}, ans) fsys, err = fs.Sub(cassandra.SchemaFS, "visibility/versioned") s.NoError(err)