Skip to content

Commit

Permalink
添加context参数以支持取消操作和传递请求特定值
Browse files Browse the repository at this point in the history
在此更改中,将context.Context参数添加到多个函数中,以便于取消操作和传递请求特定值。主要修改包括:

- 在ApplyYAML和DeleteYAML函数中添加context参数- 更新CRD相关函数以使用context参数
- 修改资源操作函数如CreateResource、UpdateResource和DeleteResource以支持context
- 在控制器和处理器函数中传递context参数这些更改提高了系统的灵活性和响应能力,特别是在处理长时间运行的操作和需要传递用户特定信息的场景中。
  • Loading branch information
weibaohui committed Oct 18, 2024
1 parent b925963 commit fb14c7d
Show file tree
Hide file tree
Showing 23 changed files with 223 additions and 198 deletions.
21 changes: 11 additions & 10 deletions internal/kubectl/apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubectl

import (
"context"
"fmt"
"strings"

Expand All @@ -10,7 +11,7 @@ import (
)

// ApplyYAML 解析并应用 YAML 字符串到 Kubernetes
func (k8s *Kubectl) ApplyYAML(yamlStr string) (result []string) {
func (k8s *Kubectl) ApplyYAML(ctx context.Context, yamlStr string) (result []string) {
docs := splitYAML(yamlStr)

for _, doc := range docs {
Expand All @@ -34,11 +35,11 @@ func (k8s *Kubectl) ApplyYAML(yamlStr string) (result []string) {

builtin := k8s.IsBuiltinResource(gvk.Kind)
if !builtin {
crd, err := k8s.GetCRD(gvk.Kind, gvk.Group)
crd, err := k8s.GetCRD(ctx, gvk.Kind, gvk.Group)
if err != nil {
result = append(result, fmt.Sprintf("%v", err))
} else {
crdResult := k8s.ApplyCRD(crd, &obj)
crdResult := k8s.ApplyCRD(ctx, crd, &obj)
result = append(result, crdResult)
}
continue
Expand All @@ -54,11 +55,11 @@ func (k8s *Kubectl) ApplyYAML(yamlStr string) (result []string) {
}

// 使用 CreateOrUpdateResource 应用资源
_, err := k8s.CreateResource(kind, ns, &obj)
_, err := k8s.CreateResource(ctx, kind, ns, &obj)
if err != nil {
if errors.IsAlreadyExists(err) {
// 已经存在,更新
existingResource, err := k8s.GetResource(kind, ns, obj.GetName())
existingResource, err := k8s.GetResource(ctx, kind, ns, obj.GetName())
if err != nil {
result = append(result, fmt.Sprintf("获取应用失败%v", err.Error()))
continue
Expand All @@ -67,7 +68,7 @@ func (k8s *Kubectl) ApplyYAML(yamlStr string) (result []string) {
obj.SetResourceVersion(existingResource.GetResourceVersion())
}

_, err = k8s.UpdateResource(kind, ns, &obj)
_, err = k8s.UpdateResource(ctx, kind, ns, &obj)
if err != nil {
result = append(result, fmt.Sprintf("更新应用失败:%v", err.Error()))
continue
Expand All @@ -86,7 +87,7 @@ func (k8s *Kubectl) ApplyYAML(yamlStr string) (result []string) {

return result
}
func (k8s *Kubectl) DeleteYAML(yamlStr string) (result []string) {
func (k8s *Kubectl) DeleteYAML(ctx context.Context, yamlStr string) (result []string) {
docs := splitYAML(yamlStr)

for _, doc := range docs {
Expand All @@ -112,12 +113,12 @@ func (k8s *Kubectl) DeleteYAML(yamlStr string) (result []string) {
builtIn := k8s.IsBuiltinResource(gvk.Kind)
if !builtIn {
// CRD 类型资源
crd, err := k8s.GetCRD(gvk.Kind, gvk.Group)
crd, err := k8s.GetCRD(ctx, gvk.Kind, gvk.Group)
if err != nil {
result = append(result, fmt.Sprintf("%v", err))
} else {
// 确认为 CRD
crdResult := k8s.DeleteCRD(crd, &obj)
crdResult := k8s.DeleteCRD(ctx, crd, &obj)
result = append(result, crdResult)
}
continue
Expand All @@ -129,7 +130,7 @@ func (k8s *Kubectl) DeleteYAML(yamlStr string) (result []string) {
ns = "default" // 默认命名空间
}

err := k8s.DeleteResource(gvk.Kind, ns, obj.GetName())
err := k8s.DeleteResource(ctx, gvk.Kind, ns, obj.GetName())
if err != nil {
result = append(result, fmt.Sprintf("%s/%s deleted error:%v", obj.GetKind(), obj.GetName(), err))
} else {
Expand Down
21 changes: 11 additions & 10 deletions internal/kubectl/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubectl

import (
"context"
"fmt"
"sort"

Expand All @@ -26,7 +27,7 @@ type callbacks struct {

type processor struct {
kubectl *Kubectl
fns []func(*Kubectl) error
fns []func(context.Context, *Kubectl) error
callbacks []*callback
}
type callback struct {
Expand All @@ -35,7 +36,7 @@ type callback struct {
after string
remove bool
replace bool
handler func(*Kubectl) error
handler func(context.Context, *Kubectl) error
processor *processor
}

Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *callback) Remove(name string) error {
return c.processor.compile()
}

func (c *callback) Replace(name string, fn func(*Kubectl) error) error {
func (c *callback) Replace(name string, fn func(context.Context, *Kubectl) error) error {
klog.V(4).Infof("replacing callback `%s` \n", name)
c.name = name
c.handler = fn
Expand All @@ -84,14 +85,14 @@ func (c *callback) After(name string) *callback {
return c
}

func (c *callback) Register(name string, fn func(*Kubectl) error) error {
func (c *callback) Register(name string, fn func(context.Context, *Kubectl) error) error {
c.name = name
c.handler = fn
c.processor.callbacks = append(c.processor.callbacks, c)
return c.processor.compile()
}

func (p *processor) Get(name string) func(*Kubectl) error {
func (p *processor) Get(name string) func(context.Context, *Kubectl) error {
for i := len(p.callbacks) - 1; i >= 0; i-- {
if v := p.callbacks[i]; v.name == name && !v.remove {
return v.handler
Expand All @@ -104,13 +105,13 @@ func (p *processor) Remove(name string) error {
return (&callback{processor: p}).Remove(name)
}

func (p *processor) Replace(name string, fn func(*Kubectl) error) error {
func (p *processor) Replace(name string, fn func(context.Context, *Kubectl) error) error {
return (&callback{processor: p}).Replace(name, fn)
}

func (p *processor) Execute(k8s *Kubectl) error {
func (p *processor) Execute(ctx context.Context, k8s *Kubectl) error {
for _, f := range p.fns {
err := f(k8s)
err := f(ctx, k8s)
if err != nil {
return err
}
Expand All @@ -126,7 +127,7 @@ func (p *processor) After(name string) *callback {
return &callback{after: name, processor: p}
}

func (p *processor) Register(name string, fn func(*Kubectl) error) error {
func (p *processor) Register(name string, fn func(context.Context, *Kubectl) error) error {
return (&callback{processor: p}).Register(name, fn)
}

Expand All @@ -150,7 +151,7 @@ func (p *processor) compile() (err error) {
}
return
}
func sortCallbacks(cs []*callback) (fns []func(*Kubectl) error, err error) {
func sortCallbacks(cs []*callback) (fns []func(context.Context, *Kubectl) error, err error) {
var (
names, sorted []string
sortCallback func(*callback) error
Expand Down
10 changes: 1 addition & 9 deletions internal/kubectl/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kubectl

import (
"context"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -79,9 +78,7 @@ func InitConnection(path string) {

// 注册回调参数
kubectl.callbacks = initializeCallbacks(kubectl)
kubectl.Stmt = &Statement{
Context: context.TODO(),
}
kubectl.Stmt = &Statement{}
}

func getKubeConfig(path string) (*rest.Config, error) {
Expand All @@ -106,8 +103,3 @@ func getKubeConfig(path string) (*rest.Config, error) {
func (k8s *Kubectl) Callback() *callbacks {
return k8s.callbacks
}

func (k8s *Kubectl) WithContext(c context.Context) *Kubectl {
k8s.Stmt.Context = c
return kubectl
}
17 changes: 9 additions & 8 deletions internal/kubectl/configmap.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package kubectl

import (
"context"
"sort"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (k8s *Kubectl) ListConfigMap(ns string) ([]v1.ConfigMap, error) {
list, err := k8s.client.CoreV1().ConfigMaps(ns).List(k8s.Stmt.Context, metav1.ListOptions{})
func (k8s *Kubectl) ListConfigMap(ctx context.Context, ns string) ([]v1.ConfigMap, error) {
list, err := k8s.client.CoreV1().ConfigMaps(ns).List(ctx, metav1.ListOptions{})
if err == nil && list != nil && list.Items != nil && len(list.Items) > 0 {
// 按创建时间倒序排序 Pods 列表
sort.Slice(list.Items, func(i, j int) bool {
Expand All @@ -19,15 +20,15 @@ func (k8s *Kubectl) ListConfigMap(ns string) ([]v1.ConfigMap, error) {
return nil, err
}

func (k8s *Kubectl) GetConfigMap(ns, name string) (*v1.ConfigMap, error) {
ConfigMap, err := k8s.client.CoreV1().ConfigMaps(ns).Get(k8s.Stmt.Context, name, metav1.GetOptions{})
func (k8s *Kubectl) GetConfigMap(ctx context.Context, ns, name string) (*v1.ConfigMap, error) {
ConfigMap, err := k8s.client.CoreV1().ConfigMaps(ns).Get(ctx, name, metav1.GetOptions{})
return ConfigMap, err
}
func (k8s *Kubectl) RemoveConfigMap(ns, name string) error {
err := k8s.client.CoreV1().ConfigMaps(ns).Delete(k8s.Stmt.Context, name, metav1.DeleteOptions{})
func (k8s *Kubectl) RemoveConfigMap(ctx context.Context, ns, name string) error {
err := k8s.client.CoreV1().ConfigMaps(ns).Delete(ctx, name, metav1.DeleteOptions{})
return err
}
func (k8s *Kubectl) CreateConfigMap(cm *v1.ConfigMap) (*v1.ConfigMap, error) {
cm, err := k8s.client.CoreV1().ConfigMaps(cm.Namespace).Create(k8s.Stmt.Context, cm, metav1.CreateOptions{})
func (k8s *Kubectl) CreateConfigMap(ctx context.Context, cm *v1.ConfigMap) (*v1.ConfigMap, error) {
cm, err := k8s.client.CoreV1().ConfigMaps(cm.Namespace).Create(ctx, cm, metav1.CreateOptions{})
return cm, err
}
21 changes: 11 additions & 10 deletions internal/kubectl/deploy.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package kubectl

import (
"context"
"strings"
"time"

"k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (k8s *Kubectl) GetDeploy(ns, name string) (*v1.Deployment, error) {
deployment, err := k8s.client.AppsV1().Deployments(ns).Get(k8s.Stmt.Context, name, metav1.GetOptions{})
func (k8s *Kubectl) GetDeploy(ctx context.Context, ns, name string) (*v1.Deployment, error) {
deployment, err := k8s.client.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
return deployment, err
}

func (k8s *Kubectl) CreateDeploy(deploy *v1.Deployment) (*v1.Deployment, error) {
deployment, err := k8s.client.AppsV1().Deployments(deploy.Namespace).Create(k8s.Stmt.Context, deploy, metav1.CreateOptions{})
func (k8s *Kubectl) CreateDeploy(ctx context.Context, deploy *v1.Deployment) (*v1.Deployment, error) {
deployment, err := k8s.client.AppsV1().Deployments(deploy.Namespace).Create(ctx, deploy, metav1.CreateOptions{})
return deployment, err
}

func (k8s *Kubectl) RestartDeploy(ns string, name string) (*v1.Deployment, error) {
deployment, err := k8s.GetDeploy(ns, name)
func (k8s *Kubectl) RestartDeploy(ctx context.Context, ns string, name string) (*v1.Deployment, error) {
deployment, err := k8s.GetDeploy(ctx, ns, name)
if err != nil {
return nil, err
}
Expand All @@ -30,14 +31,14 @@ func (k8s *Kubectl) RestartDeploy(ns string, name string) (*v1.Deployment, error
deployment.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)

// 更新 Deployment
updatedDeployment, err := k8s.client.AppsV1().Deployments(deployment.Namespace).Update(k8s.Stmt.Context, deployment, metav1.UpdateOptions{})
updatedDeployment, err := k8s.client.AppsV1().Deployments(deployment.Namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
return updatedDeployment, nil
}
func (k8s *Kubectl) UpdateDeployImageTag(ns string, name string, containerName string, tag string) (*v1.Deployment, error) {
deploy, err := k8s.GetDeploy(ns, name)
func (k8s *Kubectl) UpdateDeployImageTag(ctx context.Context, ns string, name string, containerName string, tag string) (*v1.Deployment, error) {
deploy, err := k8s.GetDeploy(ctx, ns, name)
if err != nil {
return nil, err
}
Expand All @@ -49,7 +50,7 @@ func (k8s *Kubectl) UpdateDeployImageTag(ns string, name string, containerName s
c.Image = replaceImageTag(c.Image, tag)
}
}
deployment, err := k8s.client.AppsV1().Deployments(deploy.Namespace).Update(k8s.Stmt.Context, deploy, metav1.UpdateOptions{})
deployment, err := k8s.client.AppsV1().Deployments(deploy.Namespace).Update(ctx, deploy, metav1.UpdateOptions{})
return deployment, err
}

Expand Down
Loading

0 comments on commit fb14c7d

Please sign in to comment.