Skip to content

Commit

Permalink
Introduce partition config provider to matching client
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Oct 27, 2024
1 parent 3ee0a10 commit 9fa60ac
Show file tree
Hide file tree
Showing 14 changed files with 864 additions and 439 deletions.
8 changes: 5 additions & 3 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(

peerResolver := matching.NewPeerResolver(cf.resolver, namedPort)

defaultLoadBalancer := matching.NewLoadBalancer(domainIDToName, cf.dynConfig)
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(domainIDToName, cf.dynConfig)
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, domainIDToName, cf.dynConfig, cf.logger)
partitionConfigProvider := matching.NewPartitionConfigProvider(cf.logger, domainIDToName, cf.dynConfig)
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
loadBalancers := map[string]matching.LoadBalancer{
"random": defaultLoadBalancer,
"round-robin": roundRobinLoadBalancer,
Expand All @@ -163,6 +164,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
rawClient,
peerResolver,
matching.NewMultiLoadBalancer(defaultLoadBalancer, loadBalancers, domainIDToName, cf.dynConfig, cf.logger),
partitionConfigProvider,
)
client = timeoutwrapper.NewMatchingClient(client, longPollTimeout, timeout)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.MatchingErrorInjectionRate)(); errorRate != 0 {
Expand Down
9 changes: 9 additions & 0 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,21 @@ type clientImpl struct {
client Client
peerResolver PeerResolver
loadBalancer LoadBalancer
provider PartitionConfigProvider
}

// NewClient creates a new history service TChannel client
func NewClient(
client Client,
peerResolver PeerResolver,
lb LoadBalancer,
provider PartitionConfigProvider,
) Client {
return &clientImpl{
client: client,
peerResolver: peerResolver,
loadBalancer: lb,
provider: provider,
}
}

Expand Down Expand Up @@ -131,6 +134,12 @@ func (c *clientImpl) PollForDecisionTask(
return nil, err
}
request.PollRequest.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeDecision,
resp.PartitionConfig,
)
c.loadBalancer.UpdateWeight(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
Expand Down
60 changes: 32 additions & 28 deletions client/matching/client_test.go

Large diffs are not rendered by default.

29 changes: 5 additions & 24 deletions client/matching/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"strings"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -74,22 +73,17 @@ type (
}

defaultLoadBalancer struct {
nReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
nWritePartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
domainIDToName func(string) (string, error)
provider PartitionConfigProvider
}
)

// NewLoadBalancer returns an instance of matching load balancer that
// can help distribute api calls across task list partitions
func NewLoadBalancer(
domainIDToName func(string) (string, error),
dc *dynamicconfig.Collection,
provider PartitionConfigProvider,
) LoadBalancer {
return &defaultLoadBalancer{
domainIDToName: domainIDToName,
nReadPartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistReadPartitions),
nWritePartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistWritePartitions),
provider: provider,

Check warning on line 86 in client/matching/loadbalancer.go

View check run for this annotation

Codecov / codecov/patch

client/matching/loadbalancer.go#L86

Added line #L86 was not covered by tests
}
}

Expand All @@ -99,16 +93,7 @@ func (lb *defaultLoadBalancer) PickWritePartition(
taskListType int,
forwardedFrom string,
) string {
domainName, err := lb.domainIDToName(domainID)
if err != nil {
return taskList.GetName()
}
nPartitions := lb.nWritePartitions(domainName, taskList.GetName(), taskListType)

// checks to make sure number of writes never exceeds number of reads
if nRead := lb.nReadPartitions(domainName, taskList.GetName(), taskListType); nPartitions > nRead {
nPartitions = nRead
}
nPartitions := lb.provider.GetNumberOfWritePartitions(domainID, taskList, taskListType)
return lb.pickPartition(taskList, forwardedFrom, nPartitions)

}
Expand All @@ -119,11 +104,7 @@ func (lb *defaultLoadBalancer) PickReadPartition(
taskListType int,
forwardedFrom string,
) string {
domainName, err := lb.domainIDToName(domainID)
if err != nil {
return taskList.GetName()
}
n := lb.nReadPartitions(domainName, taskList.GetName(), taskListType)
n := lb.provider.GetNumberOfReadPartitions(domainID, taskList, taskListType)
return lb.pickPartition(taskList, forwardedFrom, n)

}
Expand Down
Loading

0 comments on commit 9fa60ac

Please sign in to comment.