Skip to content

Commit

Permalink
Merge pull request #200 from Surax98/stateless_VK
Browse files Browse the repository at this point in the history
Introducing stateless vk - Fix for clean restart
  • Loading branch information
dciangot authored Apr 9, 2024
2 parents 6ba5f61 + 2fbf196 commit de37bc8
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 78 deletions.
1 change: 1 addition & 0 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func main() {
defer cancel()
nodename := flag.String("nodename", "", "The name of the node")
configpath := flag.String("configpath", "", "Path to the VK config")
flag.Parse()
interLinkConfig, err := commonIL.LoadConfig(*configpath, *nodename, ctx)
if err != nil {
panic(err)
Expand Down
20 changes: 13 additions & 7 deletions pkg/interlink/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,21 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request)

}

for _, pod := range pods {
PodStatuses.mu.Lock()
for _, cached := range PodStatuses.Statuses {
if cached.PodUID == string(pod.UID) {
returnPods = append(returnPods, cached)
break
if len(pods) > 0 {
for _, pod := range pods {
PodStatuses.mu.Lock()
for _, cached := range PodStatuses.Statuses {
if cached.PodUID == string(pod.UID) {
returnPods = append(returnPods, cached)
break
}
}
PodStatuses.mu.Unlock()
}
} else {
for _, pod := range PodStatuses.Statuses {
returnPods = append(returnPods, pod)
}
PodStatuses.mu.Unlock()
}

returnValue, err := json.Marshal(returnPods)
Expand Down
74 changes: 23 additions & 51 deletions pkg/virtualkubelet/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@ import (
"github.com/containerd/containerd/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

commonIL "github.com/intertwin-eu/interlink/pkg/interlink"
)

var ClientSet *kubernetes.Clientset

// PingInterLink pings the InterLink API and returns true if there's an answer. The second return value is given by the answer provided by the API.
func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, error) {
log.G(ctx).Info("Pinging: " + config.Interlinkurl + ":" + config.Interlinkport + "/pinglink")
Expand Down Expand Up @@ -123,7 +119,6 @@ func createRequest(config VirtualKubeletConfig, pod commonIL.PodCreateRequests,
log.L.Error(err)
return nil, err
}
log.G(context.Background()).Info(string(returnValue))
}

return returnValue, nil
Expand Down Expand Up @@ -179,10 +174,6 @@ func deleteRequest(config VirtualKubeletConfig, pod *v1.Pod, token string) ([]by
func statusRequest(config VirtualKubeletConfig, podsList []*v1.Pod, token string) ([]byte, error) {
var returnValue []byte

if len(podsList) == 0 {
log.L.Info("No PODs to monitor")
return nil, nil
}
bodyBytes, err := json.Marshal(podsList)
if err != nil {
log.L.Error(err)
Expand Down Expand Up @@ -281,33 +272,19 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua
for {
timeNow := time.Now()
if timeNow.Sub(startTime).Seconds() < time.Hour.Minutes()*5 {
if ClientSet == nil {
kubeconfig := os.Getenv("KUBECONFIG")

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.G(ctx).Error(err)
return err
}

ClientSet, err = kubernetes.NewForConfig(config)
if err != nil {
log.G(ctx).Error(err)
return err
}
}

_, err := ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
_, err := p.clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return errors.New("deleted pod before actual creation")
log.G(ctx).Warning("Deleted Pod before actual creation")
return nil
}

var failed bool

for _, volume := range pod.Spec.Volumes {

if volume.ConfigMap != nil {
cfgmap, err := ClientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{})
cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
failed = true
log.G(ctx).Warning("Unable to find ConfigMap " + volume.ConfigMap.Name + " for pod " + pod.Name + ". Waiting for it to be initialized")
Expand All @@ -320,7 +297,7 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua
req.ConfigMaps = append(req.ConfigMaps, *cfgmap)
}
} else if volume.Secret != nil {
scrt, err := ClientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{})
scrt, err := p.clientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{})
if err != nil {
failed = true
log.G(ctx).Warning("Unable to find Secret " + volume.Secret.SecretName + " for pod " + pod.Name + ". Waiting for it to be initialized")
Expand Down Expand Up @@ -356,7 +333,6 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua

returnVal, err := createRequest(config, req, token)
if err != nil {
log.G(ctx).Error(err)
return err
}
log.G(ctx).Info(string(returnVal))
Expand All @@ -366,7 +342,6 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua
if pod.Status.Phase != "Initializing" {
returnVal, err := deleteRequest(config, req, token)
if err != nil {
log.G(ctx).Error(err)
return err
}
log.G(ctx).Info(string(returnVal))
Expand All @@ -378,37 +353,30 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua
// checkPodsStatus is regularly called by the VK itself at regular intervals of time to query InterLink for Pods' status.
// It basically append all available pods registered to the VK to a slice and passes this slice to the statusRequest function.
// After the statusRequest returns a response, this function uses that response to update every Pod and Container status.
func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, token string, config VirtualKubeletConfig) error {
if len(p.pods) == 0 {
return nil
}
func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, podsList []*v1.Pod, token string, config VirtualKubeletConfig) ([]commonIL.PodStatus, error) {
var returnVal []byte
var ret []commonIL.PodStatus
var PodsList []*v1.Pod
var err error

for _, pod := range p.pods {
PodsList = append(PodsList, pod)
}
//log.G(ctx).Debug(p.pods) //commented out because it's too verbose. uncomment to see all registered pods

if PodsList != nil {
returnVal, err = statusRequest(config, PodsList, token)
if err != nil {
return err
} else if returnVal != nil {
err = json.Unmarshal(returnVal, &ret)
if err != nil {
return err
}
returnVal, err = statusRequest(config, podsList, token)

if err != nil {
return nil, err
} else if returnVal != nil {
err = json.Unmarshal(returnVal, &ret)
if err != nil {
return nil, err
}
if podsList != nil {
for _, podStatus := range ret {

pod, err := p.GetPod(ctx, podStatus.PodNamespace, podStatus.PodName)
if err != nil {
updateCacheRequest(config, podStatus.PodUID, token)
log.G(ctx).Warning("Error: " + err.Error() + "while getting statuses. Updating InterLink cache")
return err
return nil, err
}

if podStatus.PodUID == string(pod.UID) {
Expand Down Expand Up @@ -468,15 +436,19 @@ func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, token strin
err = p.UpdatePod(ctx, pod)
if err != nil {
log.G(ctx).Error(err)
return err
return nil, err
}
}
}

log.G(ctx).Info("No errors while getting statuses")
log.G(ctx).Debug(ret)
return nil
return nil, nil
} else {
return ret, err
}

}
return err

return nil, err
}
108 changes: 88 additions & 20 deletions pkg/virtualkubelet/virtualkubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package virtualkubelet

import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand All @@ -15,9 +14,12 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/node/api"
stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

commonIL "github.com/intertwin-eu/interlink/pkg/interlink"
)
Expand Down Expand Up @@ -61,6 +63,7 @@ type VirtualKubeletProvider struct {
startTime time.Time
notifier func(*v1.Pod)
onNodeChangeCallback func(*v1.Node)
clientSet *kubernetes.Clientset
}

func NewProviderConfig(
Expand Down Expand Up @@ -166,25 +169,24 @@ func LoadConfig(providerConfig, nodeName string, ctx context.Context) (config Vi
if err != nil {
return config, err
}
configMap := map[string]VirtualKubeletConfig{}
err = json.Unmarshal(data, &configMap)
config = VirtualKubeletConfig{}
err = yaml.Unmarshal(data, &config)
if err != nil {
return config, err
}
if _, exist := configMap[nodeName]; exist {
config = configMap[nodeName]
if config.CPU == "" {
config.CPU = DefaultCPUCapacity
}
if config.Memory == "" {
config.Memory = DefaultMemoryCapacity
}
if config.Pods == "" {
config.Pods = DefaultPodCapacity
}
if config.GPU == "" {
config.GPU = DefaultGPUCapacity
}

//config = configMap
if config.CPU == "" {
config.CPU = DefaultCPUCapacity
}
if config.Memory == "" {
config.Memory = DefaultMemoryCapacity
}
if config.Pods == "" {
config.Pods = DefaultPodCapacity
}
if config.GPU == "" {
config.GPU = DefaultGPUCapacity
}

if _, err = resource.ParseQuantity(config.CPU); err != nil {
Expand Down Expand Up @@ -464,6 +466,9 @@ func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error)

log.G(ctx).Info("receive GetPods")

p.InitClientSet(ctx)
p.RetrievePodsFromInterlink(ctx)

var pods []*v1.Pod

for _, pod := range p.pods {
Expand Down Expand Up @@ -554,10 +559,19 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) {
if err != nil {
fmt.Print(err)
}
err = checkPodsStatus(ctx, p, string(b), p.config)
if err != nil {
log.G(ctx).Error(err)

var podsList []*v1.Pod
for _, pod := range p.pods {
podsList = append(podsList, pod)
}

if podsList != nil {
_, err = checkPodsStatus(ctx, p, podsList, string(b), p.config)
if err != nil {
log.G(ctx).Error(err)
}
}

log.G(ctx).Info("statusLoop=end")
}
}
Expand Down Expand Up @@ -679,3 +693,57 @@ func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Su
// Return the dummy stats.
return res, nil
}

// GetPods returns a list of all pods known to be "running".
func (p *VirtualKubeletProvider) RetrievePodsFromInterlink(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "RetrievePodsFromInterlink")
defer span.End()

log.G(ctx).Info("Retrieving ALL cached InterLink Pods")

b, err := os.ReadFile(p.config.VKTokenFile) // just pass the file name
if err != nil {
log.G(ctx).Error(err)
}

cached_pods, err := checkPodsStatus(ctx, p, nil, string(b), p.config)

for _, pod := range cached_pods {
retrievedPod, err := p.clientSet.CoreV1().Pods(pod.PodNamespace).Get(ctx, pod.PodName, metav1.GetOptions{})
if err != nil {
log.G(ctx).Warning("Unable to retrieve pod " + retrievedPod.Name + " from the cluster")
} else {
key, err := BuildKey(retrievedPod)
if err != nil {
log.G(ctx).Error(err)
}
p.pods[key] = retrievedPod
p.notifier(retrievedPod)
}
}

return err
}

func (p *VirtualKubeletProvider) InitClientSet(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "InitClientSet")
defer span.End()

if p.clientSet == nil {
kubeconfig := os.Getenv("KUBECONFIG")

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.G(ctx).Error(err)
return err
}

p.clientSet, err = kubernetes.NewForConfig(config)
if err != nil {
log.G(ctx).Error(err)
return err
}
}

return nil
}

0 comments on commit de37bc8

Please sign in to comment.