Skip to content

Commit

Permalink
Add the ability to specify namespace to be used for discovering sched…
Browse files Browse the repository at this point in the history
…uler estimator services

Signed-off-by: Joe Nathan Abellard <[email protected]>

Address comments

Signed-off-by: Joe Nathan Abellard <[email protected]>

Address comments

Signed-off-by: Joe Nathan Abellard <[email protected]>
  • Loading branch information
jabellard committed Sep 9, 2024
1 parent 7a2b493 commit b6fbcb4
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 20 deletions.
3 changes: 3 additions & 0 deletions cmd/descheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Options struct {

// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
SchedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand Down Expand Up @@ -129,6 +131,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.SchedulerEstimatorKeyFile, "scheduler-estimator-key-file", "", "SSL key file used to secure scheduler estimator communication.")
fs.StringVar(&o.SchedulerEstimatorCaFile, "scheduler-estimator-ca-file", "", "SSL Certificate Authority file used to secure scheduler estimator communication.")
fs.BoolVar(&o.InsecureSkipEstimatorVerify, "insecure-skip-estimator-verify", false, "Controls whether verifies the scheduler estimator's certificate chain and host name.")
fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.DurationVar(&o.DeschedulingInterval.Duration, "descheduling-interval", defaultDeschedulingInterval, "Time interval between two consecutive descheduler executions. Setting this value instructs the descheduler to run in a continuous loop at the interval specified.")
fs.DurationVar(&o.UnschedulableThreshold.Duration, "unschedulable-threshold", defaultUnschedulableThreshold, "The period of pod unschedulable condition. This value is considered as a classification standard of unschedulable replicas.")
Expand Down
3 changes: 3 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Options struct {
DisableSchedulerEstimatorInPullMode bool
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
SchedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
Expand Down Expand Up @@ -164,6 +166,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.StringVar(&o.SchedulerEstimatorCertFile, "scheduler-estimator-cert-file", "", "SSL certification file used to secure scheduler estimator communication.")
Expand Down
1 change: 1 addition & 0 deletions cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
scheduler.WithOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator),
scheduler.WithDisableSchedulerEstimatorInPullMode(opts.DisableSchedulerEstimatorInPullMode),
scheduler.WithSchedulerEstimatorServiceNamespace(opts.SchedulerEstimatorServiceNamespace),
scheduler.WithSchedulerEstimatorServicePrefix(opts.SchedulerEstimatorServicePrefix),
scheduler.WithSchedulerEstimatorConnection(opts.SchedulerEstimatorPort, opts.SchedulerEstimatorCertFile, opts.SchedulerEstimatorKeyFile, opts.SchedulerEstimatorCaFile, opts.InsecureSkipEstimatorVerify),
scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout),
Expand Down
30 changes: 21 additions & 9 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ type Descheduler struct {

eventRecorder record.EventRecorder

schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServicePrefix string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
schedulerEstimatorWorker util.AsyncWorker
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
schedulerEstimatorWorker util.AsyncWorker

unschedulableThreshold time.Duration
deschedulingInterval time.Duration
Expand All @@ -93,9 +94,10 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne
KeyFile: opts.SchedulerEstimatorKeyFile,
TargetPort: opts.SchedulerEstimatorPort,
},
schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix,
unschedulableThreshold: opts.UnschedulableThreshold.Duration,
deschedulingInterval: opts.DeschedulingInterval.Duration,
schedulerEstimatorServiceNamespace: opts.SchedulerEstimatorServiceNamespace,
schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix,
unschedulableThreshold: opts.UnschedulableThreshold.Duration,
deschedulingInterval: opts.DeschedulingInterval.Duration,
}
// ignore the error here because the informers haven't been started
_ = desched.bindingInformer.SetTransform(fedinformer.StripUnusedFields)
Expand Down Expand Up @@ -284,7 +286,12 @@ func (d *Descheduler) establishEstimatorConnections() {
return
}
for i := range clusterList.Items {
if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand All @@ -304,7 +311,12 @@ func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error {
}
return err
}
return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig)
}

func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) {
Expand Down
24 changes: 15 additions & 9 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/klog/v2"

estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/grpcconnection"
"github.com/karmada-io/karmada/pkg/util/names"
)
Expand All @@ -37,6 +36,13 @@ type SchedulerEstimatorCache struct {
estimator map[string]*clientWrapper
}

// SchedulerEstimatorServiceInfo contains information needed to discover and connect to a scheduler estimator service.
type SchedulerEstimatorServiceInfo struct {
Name string
NamePrefix string
Namespace string
}

// NewSchedulerEstimatorCache returns an accurate scheduler estimator cache.
func NewSchedulerEstimatorCache() *SchedulerEstimatorCache {
return &SchedulerEstimatorCache{
Expand Down Expand Up @@ -97,25 +103,25 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim
}

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(name) {
func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerEstimatorServiceInfo, estimatorCache *SchedulerEstimatorCache, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(serviceInfo.Name) {
return nil
}

serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace,
names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort))
if err != nil {
return err
}

klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, name)
klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name)
cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second)
if err != nil {
klog.Errorf("Failed to dial cluster(%s): %v.", name, err)
klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err)
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
estimatorCache.AddCluster(serviceInfo.Name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name)
return nil
}
25 changes: 23 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type Scheduler struct {
enableSchedulerEstimator bool
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerEstimatorWorker util.AsyncWorker
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
Expand All @@ -121,6 +122,8 @@ type schedulerOptions struct {
disableSchedulerEstimatorInPullMode bool
// schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
schedulerEstimatorTimeout metav1.Duration
// schedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services.
schedulerEstimatorServiceNamespace string
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
schedulerEstimatorServicePrefix string
// schedulerName is the name of the scheduler. Default is "default-scheduler".
Expand Down Expand Up @@ -174,6 +177,13 @@ func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Op
}
}

// WithSchedulerEstimatorServiceNamespace sets the schedulerEstimatorServiceNamespace for the scheduler
func WithSchedulerEstimatorServiceNamespace(schedulerEstimatorServiceNamespace string) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorServiceNamespace = schedulerEstimatorServiceNamespace
}
}

// WithSchedulerEstimatorServicePrefix sets the schedulerEstimatorServicePrefix for scheduler
func WithSchedulerEstimatorServicePrefix(schedulerEstimatorServicePrefix string) Option {
return func(o *schedulerOptions) {
Expand Down Expand Up @@ -262,6 +272,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode
sched.schedulerEstimatorServicePrefix = options.schedulerEstimatorServicePrefix
sched.schedulerEstimatorServiceNamespace = options.schedulerEstimatorServiceNamespace
sched.schedulerEstimatorClientConfig = options.schedulerEstimatorClientConfig
sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache()
schedulerEstimatorWorkerOptions := util.Options{
Expand Down Expand Up @@ -776,7 +787,12 @@ func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
return nil
}

return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig)
}

func (s *Scheduler) establishEstimatorConnections() {
Expand All @@ -789,7 +805,12 @@ func (s *Scheduler) establishEstimatorConnections() {
if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
continue
}
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCreateScheduler(t *testing.T) {
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
port := 10025
serviceNamespace := "tenant1"
servicePrefix := "test-service-prefix"
schedulerName := "test-scheduler"
timeout := metav1.Duration{Duration: 5 * time.Second}
Expand All @@ -51,6 +52,7 @@ func TestCreateScheduler(t *testing.T) {
schedulerEstimatorPort int
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorTimeout metav1.Duration
schedulerEstimatorServiceNamespace string
schedulerEstimatorServicePrefix string
schedulerName string
schedulerEstimatorClientConfig *grpcconnection.ClientConfig
Expand Down Expand Up @@ -101,6 +103,17 @@ func TestCreateScheduler(t *testing.T) {
schedulerEstimatorPort: port,
schedulerEstimatorServicePrefix: servicePrefix,
},
{
name: "scheduler with custom SchedulerEstimatorServiceNamespace set",
opts: []Option{
WithEnableSchedulerEstimator(true),
WithSchedulerEstimatorConnection(port, "", "", "", false),
WithSchedulerEstimatorServiceNamespace(serviceNamespace),
},
enableSchedulerEstimator: true,
schedulerEstimatorPort: port,
schedulerEstimatorServiceNamespace: serviceNamespace,
},
{
name: "scheduler with SchedulerName enabled",
opts: []Option{
Expand Down Expand Up @@ -147,6 +160,10 @@ func TestCreateScheduler(t *testing.T) {
t.Errorf("unexpected disableSchedulerEstimatorInPullMode want %v, got %v", tc.disableSchedulerEstimatorInPullMode, sche.disableSchedulerEstimatorInPullMode)
}

if tc.schedulerEstimatorServiceNamespace != sche.schedulerEstimatorServiceNamespace {
t.Errorf("unexpected schedulerEstimatorServiceNamespace want %v, got %v", tc.schedulerEstimatorServiceNamespace, sche.schedulerEstimatorServiceNamespace)
}

if tc.schedulerEstimatorServicePrefix != sche.schedulerEstimatorServicePrefix {
t.Errorf("unexpected schedulerEstimatorServicePrefix want %v, got %v", tc.schedulerEstimatorServicePrefix, sche.schedulerEstimatorServicePrefix)
}
Expand Down

0 comments on commit b6fbcb4

Please sign in to comment.