From f2e73521edb68d8d53050660f6cdc508a5ab80cc Mon Sep 17 00:00:00 2001 From: Surax98 Date: Tue, 9 Apr 2024 09:53:06 +0000 Subject: [PATCH 1/2] VK is now able to retrieve cached pods from InterLink Signed-off-by: Surax98 --- cmd/virtual-kubelet/main.go | 5 +- pkg/interlink/api/status.go | 20 +++-- pkg/virtualkubelet/execute.go | 74 ++++++------------ pkg/virtualkubelet/virtualkubelet.go | 108 ++++++++++++++++++++++----- 4 files changed, 127 insertions(+), 80 deletions(-) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 92059397..65b2c6a1 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -169,6 +169,7 @@ func main() { defer cancel() nodename := flag.String("nodename", "", "The name of the node") configpath := flag.String("configpath", "", "Path to the VK config") + flag.Parse() interLinkConfig, err := commonIL.LoadConfig(*configpath, *nodename, ctx) if err != nil { panic(err) @@ -185,7 +186,7 @@ func main() { } log.L = logruslogger.FromLogrus(logrus.NewEntry(logger)) - shutdown, err := initProvider() + /*shutdown, err := initProvider() if err != nil { log.G(ctx).Fatal(err) } @@ -193,7 +194,7 @@ func main() { if err := shutdown(ctx); err != nil { log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err) } - }() + }()*/ log.G(ctx).Info("Tracer setup succeeded") diff --git a/pkg/interlink/api/status.go b/pkg/interlink/api/status.go index d2ffb461..6e7bf6e1 100644 --- a/pkg/interlink/api/status.go +++ b/pkg/interlink/api/status.go @@ -91,15 +91,21 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) } - for _, pod := range pods { - PodStatuses.mu.Lock() - for _, cached := range PodStatuses.Statuses { - if cached.PodUID == string(pod.UID) { - returnPods = append(returnPods, cached) - break + if len(pods) > 0 { + for _, pod := range pods { + PodStatuses.mu.Lock() + for _, cached := range PodStatuses.Statuses { + if cached.PodUID == string(pod.UID) { + returnPods = append(returnPods, cached) + break + } } + PodStatuses.mu.Unlock() + } + } else { + for _, pod := range PodStatuses.Statuses { + returnPods = append(returnPods, pod) } - PodStatuses.mu.Unlock() } returnValue, err := json.Marshal(returnPods) diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 6850186c..fe80f565 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -15,14 +15,10 @@ import ( "github.com/containerd/containerd/log" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" ) -var ClientSet *kubernetes.Clientset - // PingInterLink pings the InterLink API and returns true if there's an answer. The second return value is given by the answer provided by the API. func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, error) { log.G(ctx).Info("Pinging: " + config.Interlinkurl + ":" + config.Interlinkport + "/pinglink") @@ -123,7 +119,6 @@ func createRequest(config VirtualKubeletConfig, pod commonIL.PodCreateRequests, log.L.Error(err) return nil, err } - log.G(context.Background()).Info(string(returnValue)) } return returnValue, nil @@ -179,10 +174,6 @@ func deleteRequest(config VirtualKubeletConfig, pod *v1.Pod, token string) ([]by func statusRequest(config VirtualKubeletConfig, podsList []*v1.Pod, token string) ([]byte, error) { var returnValue []byte - if len(podsList) == 0 { - log.L.Info("No PODs to monitor") - return nil, nil - } bodyBytes, err := json.Marshal(podsList) if err != nil { log.L.Error(err) @@ -281,25 +272,11 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua for { timeNow := time.Now() if timeNow.Sub(startTime).Seconds() < time.Hour.Minutes()*5 { - if ClientSet == nil { - kubeconfig := os.Getenv("KUBECONFIG") - - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - log.G(ctx).Error(err) - return err - } - ClientSet, err = kubernetes.NewForConfig(config) - if err != nil { - log.G(ctx).Error(err) - return err - } - } - - _, err := ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + _, err := p.clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) if err != nil { - return errors.New("deleted pod before actual creation") + log.G(ctx).Warning("Deleted Pod before actual creation") + return nil } var failed bool @@ -307,7 +284,7 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua for _, volume := range pod.Spec.Volumes { if volume.ConfigMap != nil { - cfgmap, err := ClientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{}) + cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{}) if err != nil { failed = true log.G(ctx).Warning("Unable to find ConfigMap " + volume.ConfigMap.Name + " for pod " + pod.Name + ". Waiting for it to be initialized") @@ -320,7 +297,7 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua req.ConfigMaps = append(req.ConfigMaps, *cfgmap) } } else if volume.Secret != nil { - scrt, err := ClientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{}) + scrt, err := p.clientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{}) if err != nil { failed = true log.G(ctx).Warning("Unable to find Secret " + volume.Secret.SecretName + " for pod " + pod.Name + ". Waiting for it to be initialized") @@ -356,7 +333,6 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua returnVal, err := createRequest(config, req, token) if err != nil { - log.G(ctx).Error(err) return err } log.G(ctx).Info(string(returnVal)) @@ -366,7 +342,6 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua if pod.Status.Phase != "Initializing" { returnVal, err := deleteRequest(config, req, token) if err != nil { - log.G(ctx).Error(err) return err } log.G(ctx).Info(string(returnVal)) @@ -378,37 +353,30 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua // checkPodsStatus is regularly called by the VK itself at regular intervals of time to query InterLink for Pods' status. // It basically append all available pods registered to the VK to a slice and passes this slice to the statusRequest function. // After the statusRequest returns a response, this function uses that response to update every Pod and Container status. -func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, token string, config VirtualKubeletConfig) error { - if len(p.pods) == 0 { - return nil - } +func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, podsList []*v1.Pod, token string, config VirtualKubeletConfig) ([]commonIL.PodStatus, error) { var returnVal []byte var ret []commonIL.PodStatus - var PodsList []*v1.Pod var err error - for _, pod := range p.pods { - PodsList = append(PodsList, pod) - } //log.G(ctx).Debug(p.pods) //commented out because it's too verbose. uncomment to see all registered pods - if PodsList != nil { - returnVal, err = statusRequest(config, PodsList, token) - if err != nil { - return err - } else if returnVal != nil { - err = json.Unmarshal(returnVal, &ret) - if err != nil { - return err - } + returnVal, err = statusRequest(config, podsList, token) + if err != nil { + return nil, err + } else if returnVal != nil { + err = json.Unmarshal(returnVal, &ret) + if err != nil { + return nil, err + } + if podsList != nil { for _, podStatus := range ret { pod, err := p.GetPod(ctx, podStatus.PodNamespace, podStatus.PodName) if err != nil { updateCacheRequest(config, podStatus.PodUID, token) log.G(ctx).Warning("Error: " + err.Error() + "while getting statuses. Updating InterLink cache") - return err + return nil, err } if podStatus.PodUID == string(pod.UID) { @@ -468,15 +436,19 @@ func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, token strin err = p.UpdatePod(ctx, pod) if err != nil { log.G(ctx).Error(err) - return err + return nil, err } } } log.G(ctx).Info("No errors while getting statuses") log.G(ctx).Debug(ret) - return nil + return nil, nil + } else { + return ret, err } + } - return err + + return nil, err } diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 32bf7a35..12c3a76a 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -2,7 +2,6 @@ package virtualkubelet import ( "context" - "encoding/json" "fmt" "io" "math/rand" @@ -15,9 +14,12 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/node/api" stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "github.com/virtual-kubelet/virtual-kubelet/trace" + "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" ) @@ -61,6 +63,7 @@ type VirtualKubeletProvider struct { startTime time.Time notifier func(*v1.Pod) onNodeChangeCallback func(*v1.Node) + clientSet *kubernetes.Clientset } func NewProviderConfig( @@ -166,25 +169,24 @@ func LoadConfig(providerConfig, nodeName string, ctx context.Context) (config Vi if err != nil { return config, err } - configMap := map[string]VirtualKubeletConfig{} - err = json.Unmarshal(data, &configMap) + config = VirtualKubeletConfig{} + err = yaml.Unmarshal(data, &config) if err != nil { return config, err } - if _, exist := configMap[nodeName]; exist { - config = configMap[nodeName] - if config.CPU == "" { - config.CPU = DefaultCPUCapacity - } - if config.Memory == "" { - config.Memory = DefaultMemoryCapacity - } - if config.Pods == "" { - config.Pods = DefaultPodCapacity - } - if config.GPU == "" { - config.GPU = DefaultGPUCapacity - } + + //config = configMap + if config.CPU == "" { + config.CPU = DefaultCPUCapacity + } + if config.Memory == "" { + config.Memory = DefaultMemoryCapacity + } + if config.Pods == "" { + config.Pods = DefaultPodCapacity + } + if config.GPU == "" { + config.GPU = DefaultGPUCapacity } if _, err = resource.ParseQuantity(config.CPU); err != nil { @@ -464,6 +466,9 @@ func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) log.G(ctx).Info("receive GetPods") + p.InitClientSet(ctx) + p.RetrievePodsFromInterlink(ctx) + var pods []*v1.Pod for _, pod := range p.pods { @@ -554,10 +559,19 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) { if err != nil { fmt.Print(err) } - err = checkPodsStatus(ctx, p, string(b), p.config) - if err != nil { - log.G(ctx).Error(err) + + var podsList []*v1.Pod + for _, pod := range p.pods { + podsList = append(podsList, pod) } + + if podsList != nil { + _, err = checkPodsStatus(ctx, p, podsList, string(b), p.config) + if err != nil { + log.G(ctx).Error(err) + } + } + log.G(ctx).Info("statusLoop=end") } } @@ -679,3 +693,57 @@ func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Su // Return the dummy stats. return res, nil } + +// GetPods returns a list of all pods known to be "running". +func (p *VirtualKubeletProvider) RetrievePodsFromInterlink(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "RetrievePodsFromInterlink") + defer span.End() + + log.G(ctx).Info("Retrieving ALL cached InterLink Pods") + + b, err := os.ReadFile(p.config.VKTokenFile) // just pass the file name + if err != nil { + log.G(ctx).Error(err) + } + + cached_pods, err := checkPodsStatus(ctx, p, nil, string(b), p.config) + + for _, pod := range cached_pods { + retrievedPod, err := p.clientSet.CoreV1().Pods(pod.PodNamespace).Get(ctx, pod.PodName, metav1.GetOptions{}) + if err != nil { + log.G(ctx).Warning("Unable to retrieve pod " + retrievedPod.Name + " from the cluster") + } else { + key, err := BuildKey(retrievedPod) + if err != nil { + log.G(ctx).Error(err) + } + p.pods[key] = retrievedPod + p.notifier(retrievedPod) + } + } + + return err +} + +func (p *VirtualKubeletProvider) InitClientSet(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "InitClientSet") + defer span.End() + + if p.clientSet == nil { + kubeconfig := os.Getenv("KUBECONFIG") + + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.G(ctx).Error(err) + return err + } + + p.clientSet, err = kubernetes.NewForConfig(config) + if err != nil { + log.G(ctx).Error(err) + return err + } + } + + return nil +} From 2fbf196a321c5ea55ddf091ec6451e2b0a7c151f Mon Sep 17 00:00:00 2001 From: Surax98 Date: Tue, 9 Apr 2024 10:03:29 +0000 Subject: [PATCH 2/2] forgot to uncomment tracer Signed-off-by: Surax98 --- cmd/virtual-kubelet/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 65b2c6a1..2ee9c1e1 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -186,7 +186,7 @@ func main() { } log.L = logruslogger.FromLogrus(logrus.NewEntry(logger)) - /*shutdown, err := initProvider() + shutdown, err := initProvider() if err != nil { log.G(ctx).Fatal(err) } @@ -194,7 +194,7 @@ func main() { if err := shutdown(ctx); err != nil { log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err) } - }()*/ + }() log.G(ctx).Info("Tracer setup succeeded")