Skip to content

Commit

Permalink
Implement new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Nov 5, 2024
1 parent 982348b commit 7df40de
Show file tree
Hide file tree
Showing 19 changed files with 1,529 additions and 104 deletions.
12 changes: 10 additions & 2 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,21 @@ func (c *clientImpl) UpdateTaskListPartitionConfig(
request *types.MatchingUpdateTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingUpdateTaskListPartitionConfigResponse, error) {
return nil, &types.BadRequestError{}
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.UpdateTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}

func (c *clientImpl) RefreshTaskListPartitionConfig(
ctx context.Context,
request *types.MatchingRefreshTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingRefreshTaskListPartitionConfigResponse, error) {
return nil, &types.BadRequestError{}
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.RefreshTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}
92 changes: 92 additions & 0 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,74 @@ func TestClient_withResponse(t *testing.T) {
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingUpdateTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingUpdateTaskListPartitionConfigResponse{},
},
{
name: "UpdateTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingRefreshTaskListPartitionConfigResponse{},
},
{
name: "RefreshTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -526,3 +594,27 @@ func testGetTaskListsByDomainRequest() *types.GetTaskListsByDomainRequest {
Domain: _testDomain,
}
}

func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTaskListPartitionConfigRequest {
return &types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}

func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshTaskListPartitionConfigRequest {
return &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
32 changes: 20 additions & 12 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,10 @@ const (
MatchingListTaskListPartitionsScope
// MatchingGetTaskListsByDomainScope tracks GetTaskListsByDomain API calls received by service
MatchingGetTaskListsByDomainScope
// MatchingUpdateTaskListPartitionConfigScope tracks UpdateTaskListPartitionConfig API calls received by service
MatchingUpdateTaskListPartitionConfigScope
// MatchingRefreshTaskListPartitionConfigScope tracks RefreshTaskListPartitionConfig API calls received by service
MatchingRefreshTaskListPartitionConfigScope

NumMatchingScopes
)
Expand Down Expand Up @@ -1976,18 +1980,20 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
},
// Matching Scope Names
Matching: {
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingAddTaskScope: {operation: "AddTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingQueryWorkflowScope: {operation: "QueryWorkflow"},
MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
MatchingCancelOutstandingPollScope: {operation: "CancelOutstandingPoll"},
MatchingDescribeTaskListScope: {operation: "DescribeTaskList"},
MatchingListTaskListPartitionsScope: {operation: "ListTaskListPartitions"},
MatchingGetTaskListsByDomainScope: {operation: "GetTaskListsByDomain"},
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingAddTaskScope: {operation: "AddTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingQueryWorkflowScope: {operation: "QueryWorkflow"},
MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
MatchingCancelOutstandingPollScope: {operation: "CancelOutstandingPoll"},
MatchingDescribeTaskListScope: {operation: "DescribeTaskList"},
MatchingListTaskListPartitionsScope: {operation: "ListTaskListPartitions"},
MatchingGetTaskListsByDomainScope: {operation: "GetTaskListsByDomain"},
MatchingUpdateTaskListPartitionConfigScope: {operation: "UpdateTaskListPartitionConfig"},
MatchingRefreshTaskListPartitionConfigScope: {operation: "RefreshTaskListPartitionConfig"},
},
// Worker Scope Names
Worker: {
Expand Down Expand Up @@ -2575,6 +2581,7 @@ const (
IsolationTaskMatchPerTaskListCounter
PollerPerTaskListCounter
PollerInvalidIsolationGroupCounter
TaskListPartitionUpdateFailedCounter
TaskListManagersGauge
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
Expand Down Expand Up @@ -3257,6 +3264,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListPartitionUpdateFailedCounter: {metricName: "tasklist_partition_update_failed_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
Expand Down
24 changes: 24 additions & 0 deletions common/types/mapper/proto/matching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,27 @@ func TestMatchingGetTaskListsByDomainResponse(t *testing.T) {
assert.Equal(t, item, ToMatchingGetTaskListsByDomainResponse(FromMatchingGetTaskListsByDomainResponse(item)))
}
}

func TestMatchingUpdateTaskListPartitionConfigRequest(t *testing.T) {
for _, item := range []*types.MatchingUpdateTaskListPartitionConfigRequest{nil, {}, &testdata.MatchingUpdateTaskListPartitionConfigRequest} {
assert.Equal(t, item, ToMatchingUpdateTaskListPartitionConfigRequest(FromMatchingUpdateTaskListPartitionConfigRequest(item)))
}
}

func TestMatchingUpdateTaskListPartitionConfigResponse(t *testing.T) {
for _, item := range []*types.MatchingUpdateTaskListPartitionConfigResponse{nil, {}} {
assert.Equal(t, item, ToMatchingUpdateTaskListPartitionConfigResponse(FromMatchingUpdateTaskListPartitionConfigResponse(item)))
}
}

func TestMatchingRefreshTaskListPartitionConfigRequest(t *testing.T) {
for _, item := range []*types.MatchingRefreshTaskListPartitionConfigRequest{nil, {}, &testdata.MatchingRefreshTaskListPartitionConfigRequest} {
assert.Equal(t, item, ToMatchingRefreshTaskListPartitionConfigRequest(FromMatchingRefreshTaskListPartitionConfigRequest(item)))
}
}

func TestMatchingRefreshTaskListPartitionConfigResponse(t *testing.T) {
for _, item := range []*types.MatchingRefreshTaskListPartitionConfigResponse{nil, {}} {
assert.Equal(t, item, ToMatchingRefreshTaskListPartitionConfigResponse(FromMatchingRefreshTaskListPartitionConfigResponse(item)))
}
}
14 changes: 14 additions & 0 deletions common/types/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,13 @@ type MatchingUpdateTaskListPartitionConfigRequest struct {
PartitionConfig *TaskListPartitionConfig
}

func (v *MatchingUpdateTaskListPartitionConfigRequest) GetTaskListType() (o TaskListType) {
if v != nil && v.TaskListType != nil {
return *v.TaskListType
}
return
}

type MatchingUpdateTaskListPartitionConfigResponse struct{}

type MatchingRefreshTaskListPartitionConfigRequest struct {
Expand All @@ -663,4 +670,11 @@ type MatchingRefreshTaskListPartitionConfigRequest struct {
PartitionConfig *TaskListPartitionConfig
}

func (v *MatchingRefreshTaskListPartitionConfigRequest) GetTaskListType() (o TaskListType) {
if v != nil && v.TaskListType != nil {
return *v.TaskListType
}
return
}

type MatchingRefreshTaskListPartitionConfigResponse struct{}
60 changes: 60 additions & 0 deletions common/types/matching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,3 +1721,63 @@ func TestTaskSource_MarshalText(t *testing.T) {
})
}
}

func TestMatchingUpdateTaskListPartitionConfigRequest_GetTaskListType(t *testing.T) {
tests := []struct {
name string
req *MatchingUpdateTaskListPartitionConfigRequest
want TaskListType
}{
{
name: "nil request",
req: nil,
want: TaskListTypeDecision,
},
{
name: "empty request",
req: &MatchingUpdateTaskListPartitionConfigRequest{},
want: TaskListTypeDecision,
},
{
name: "non empty request",
req: &MatchingUpdateTaskListPartitionConfigRequest{TaskListType: TaskListTypeActivity.Ptr()},
want: TaskListTypeActivity,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.req.GetTaskListType()
assert.Equal(t, tt.want, got)
})
}
}

func TestMatchingRefreshTaskListPartitionConfigRequest_GetTaskListType(t *testing.T) {
tests := []struct {
name string
req *MatchingRefreshTaskListPartitionConfigRequest
want TaskListType
}{
{
name: "nil request",
req: nil,
want: TaskListTypeDecision,
},
{
name: "empty request",
req: &MatchingRefreshTaskListPartitionConfigRequest{},
want: TaskListTypeDecision,
},
{
name: "non empty request",
req: &MatchingRefreshTaskListPartitionConfigRequest{TaskListType: TaskListTypeActivity.Ptr()},
want: TaskListTypeActivity,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.req.GetTaskListType()
assert.Equal(t, tt.want, got)
})
}
}
14 changes: 14 additions & 0 deletions common/types/testdata/service_matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,18 @@ var (
WorkflowType: &WorkflowType,
WorkflowDomain: DomainName,
}

MatchingUpdateTaskListPartitionConfigRequest = types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: DomainID,
TaskList: &TaskList,
TaskListType: &TaskListType,
PartitionConfig: &TaskListPartitionConfig,
}

MatchingRefreshTaskListPartitionConfigRequest = types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: DomainID,
TaskList: &TaskList,
TaskListType: &TaskListType,
PartitionConfig: &TaskListPartitionConfig,
}
)
Loading

0 comments on commit 7df40de

Please sign in to comment.