Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task list partition config #6343

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
426 changes: 418 additions & 8 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240904133511-55e4ad437989
github.com/uber/cadence-idl v0.0.0-20241014190105-279b1f70dfb8
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240904133511-55e4ad437989 h1:E1VCUPw2n4e7lt3GchEREliowz+ENsf1pRlhtaA9xIQ=
github.com/uber/cadence-idl v0.0.0-20240904133511-55e4ad437989/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20241014190105-279b1f70dfb8 h1:dFQOA6ldrhQNOSSUZ6EsyUfpUv4Aej/uZ072BHvlBmY=
github.com/uber/cadence-idl v0.0.0-20241014190105-279b1f70dfb8/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ var (
StoreOperationCompleteTask = storeOperation("complete-task")
StoreOperationCompleteTasksLessThan = storeOperation("complete-tasks-less-than")
StoreOperationLeaseTaskList = storeOperation("lease-task-list")
StoreOperationGetTaskList = storeOperation("get-task-list")
StoreOperationUpdateTaskList = storeOperation("update-task-list")
StoreOperationListTaskList = storeOperation("list-task-list")
StoreOperationDeleteTaskList = storeOperation("delete-task-list")
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ const (
PersistenceGetOrphanTasksScope
// PersistenceLeaseTaskListScope tracks LeaseTaskList calls made by service to persistence layer
PersistenceLeaseTaskListScope
// PersistenceGetTaskListScope tracks GetTaskList calls made by service to persistence layer
PersistenceGetTaskListScope
// PersistenceUpdateTaskListScope tracks PersistenceUpdateTaskListScope calls made by service to persistence layer
PersistenceUpdateTaskListScope
// PersistenceListTaskListScope is the metric scope for persistence.TaskManager.ListTaskList API
Expand Down Expand Up @@ -1418,6 +1420,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan"},
PersistenceGetOrphanTasksScope: {operation: "GetOrphanTasks"},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList"},
PersistenceGetTaskListScope: {operation: "GetTaskList"},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"},
PersistenceListTaskListScope: {operation: "ListTaskList"},
PersistenceDeleteTaskListScope: {operation: "DeleteTaskList"},
Expand Down
30 changes: 28 additions & 2 deletions common/mocks/TaskManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions common/persistence/dataManagerInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 28 additions & 8 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,22 @@ type (

// TaskListInfo describes a state of a task list implementation.
TaskListInfo struct {
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
AdaptivePartitionConfig *TaskListPartitionConfig
}

// TaskListPartitionConfig represents the configuration for task list partitions.
TaskListPartitionConfig struct {
Version int64
NumReadPartitions int
NumWritePartitions int
}

// TaskInfo describes either activity or decision task
Expand Down Expand Up @@ -1018,6 +1026,17 @@ type (
TaskListInfo *TaskListInfo
}

GetTaskListRequest struct {
DomainID string
DomainName string
TaskList string
TaskType int
}

GetTaskListResponse struct {
TaskListInfo *TaskListInfo
}

// UpdateTaskListRequest is used to update task list implementation information
UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
Expand Down Expand Up @@ -1577,6 +1596,7 @@ type (
GetName() string
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error)
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error)
Expand Down
1 change: 1 addition & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
Closeable
GetName() string
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error)
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
Expand Down
74 changes: 53 additions & 21 deletions common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,14 @@
currTL.RangeID++

err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}, currTL.RangeID-1)
}
if err != nil {
Expand All @@ -161,31 +162,62 @@
return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
}
tli := &persistence.TaskListInfo{
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}
return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil
}

func (t *nosqlTaskStore) GetTaskList(
ctx context.Context,
request *persistence.GetTaskListRequest,
) (*persistence.GetTaskListResponse, error) {
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
if err != nil {
return nil, err

Check warning on line 183 in common/persistence/nosql/nosql_task_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_task_store.go#L183

Added line #L183 was not covered by tests
}
currTL, err := storeShard.db.SelectTaskList(ctx, &nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
})
if err != nil {
return nil, convertCommonErrors(storeShard.db, "GetTaskList", err)
}
tli := &persistence.TaskListInfo{
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: currTL.TaskListKind,
LastUpdated: currTL.LastUpdatedTime,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}
return &persistence.GetTaskListResponse{TaskListInfo: tli}, nil
}

func (t *nosqlTaskStore) UpdateTaskList(
ctx context.Context,
request *persistence.UpdateTaskListRequest,
) (*persistence.UpdateTaskListResponse, error) {
tli := request.TaskListInfo
var err error
taskListToUpdate := &nosqlplugin.TaskListRow{
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: tli.TaskType,
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: tli.TaskType,
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
AdaptivePartitionConfig: tli.AdaptivePartitionConfig,
}
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
if err != nil {
Expand Down
63 changes: 59 additions & 4 deletions common/persistence/nosql/nosql_task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package nosql

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -154,7 +155,8 @@ func TestLeaseTaskList_selectErrNotFound(t *testing.T) {
// We then expect the tasklist to be inserted
db.EXPECT().InsertTaskList(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, taskList *nosqlplugin.TaskListRow) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
tl := getExpectedTaskListRow()
checkTaskListRowExpected(t, tl, taskList)
return nil
})

Expand Down Expand Up @@ -242,12 +244,34 @@ func TestLeaseTaskList_RenewUpdateFailed_OtherError(t *testing.T) {
assert.ErrorContains(t, err, assert.AnError.Error())
}

func TestGetTaskList_Success(t *testing.T) {
store, db := setupNoSQLStoreMocks(t)

taskListRow := getExpectedTaskListRow()
db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(taskListRow, nil)
resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest())

assert.NoError(t, err)
checkTaskListInfoExpected(t, resp.TaskListInfo)
}

func TestGetTaskList_NotFound(t *testing.T) {
store, db := setupNoSQLStoreMocks(t)

db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(nil, errors.New("not found"))
db.EXPECT().IsNotFoundError(gomock.Any()).Return(true)
resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest())

assert.ErrorAs(t, err, new(*types.EntityNotExistsError))
assert.Nil(t, resp)
}

func TestUpdateTaskList(t *testing.T) {
store, db := setupNoSQLStoreMocks(t)

db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
return nil
},
)
Expand All @@ -265,7 +289,7 @@ func TestUpdateTaskList_Sticky(t *testing.T) {

db.EXPECT().UpdateTaskListWithTTL(gomock.Any(), stickyTaskListTTL, gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, ttlSeconds int64, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
expectedTaskList := getExpectedTaskListRow()
expectedTaskList := getExpectedTaskListRowWithPartitionConfig()
expectedTaskList.TaskListKind = int(types.TaskListKindSticky)
checkTaskListRowExpected(t, expectedTaskList, taskList)
return nil
Expand All @@ -288,7 +312,7 @@ func TestUpdateTaskList_ConditionFailure(t *testing.T) {

db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
return &nosqlplugin.TaskOperationConditionFailure{Details: "test-details"}
},
)
Expand Down Expand Up @@ -431,6 +455,15 @@ func getValidLeaseTaskListRequest() *persistence.LeaseTaskListRequest {
}
}

func getValidGetTaskListRequest() *persistence.GetTaskListRequest {
return &persistence.GetTaskListRequest{
DomainID: TestDomainID,
DomainName: TestDomainName,
TaskList: TestTaskListName,
TaskType: int(types.TaskListTypeDecision),
}
}

func checkTaskListInfoExpected(t *testing.T, taskListInfo *persistence.TaskListInfo) {
assert.Equal(t, TestDomainID, taskListInfo.DomainID)
assert.Equal(t, TestTaskListName, taskListInfo.Name)
Expand Down Expand Up @@ -484,6 +517,23 @@ func getExpectedTaskListRow() *nosqlplugin.TaskListRow {
}
}

func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow {
return &nosqlplugin.TaskListRow{
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: int(types.TaskListTypeDecision),
RangeID: initialRangeID,
TaskListKind: int(types.TaskListKindNormal),
AckLevel: initialAckLevel,
LastUpdatedTime: time.Now(),
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
}
}

func checkTaskListRowExpected(t *testing.T, expectedRow *nosqlplugin.TaskListRow, taskList *nosqlplugin.TaskListRow) {
// Check the duration
assert.WithinDuration(t, expectedRow.LastUpdatedTime, taskList.LastUpdatedTime, time.Second)
Expand All @@ -502,6 +552,11 @@ func getExpectedTaskListInfo() *persistence.TaskListInfo {
AckLevel: initialAckLevel,
Kind: int(types.TaskListKindNormal),
LastUpdated: time.Now(),
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
}
}

Expand Down
Loading
Loading