Skip to content

Commit

Permalink
Merge pull request #146 from AmitKumarDas/code-39
Browse files Browse the repository at this point in the history
feat(command): add command cr that creates a k8s job
  • Loading branch information
amitbhatt818 authored Oct 8, 2020
2 parents 4540187 + 0aff669 commit 090d341
Show file tree
Hide file tree
Showing 20 changed files with 3,040 additions and 411 deletions.
292 changes: 209 additions & 83 deletions cmd/commander/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"

"github.com/pkg/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -39,33 +40,38 @@ import (
var (
commandKind = flag.String(
"command-kind",
"Command",
"Kind of Command custom resource",
"Command", // default
"Kubernetes custom resource kind",
)

commandResource = flag.String(
"command-resource",
"commands",
"Resource name of Command custom resource",
"commands", // default
"Kubernetes custom resource name",
)

commandGroup = flag.String(
"command-group",
"dope.metacontroller.io",
"Group of Command custom resource",
"dope.metacontroller.io", // default
"Kubernetes custom resource group",
)

commandVersion = flag.String(
"command-version",
"v1",
"Version of Command custom resource",
"command-api-version",
"v1", // default
"Kubernetes custom resource api version",
)

commandName = flag.String(
"command-name",
"",
"Name of Command custom resource",
"Name of the command",
)

commandNamespace = flag.String(
"command-ns",
"",
"Namespace of Command custom resource",
"Namespace of the command",
)

kubeAPIServerURL = flag.String(
Expand All @@ -74,24 +80,29 @@ var (
`Kubernetes api server url (same format as used by kubectl).
If not specified, uses in-cluster config`,
)

kubeconfig *string

clientGoQPS = flag.Float64(
"client-go-qps",
5,
5, // default
"Number of queries per second client-go is allowed to make (default 5)",
)

clientGoBurst = flag.Int(
"client-go-burst",
10,
10, // default
"Allowed burst queries for client-go (default 10)",
)
)

// main function is the entry point of this binary.
// This is the entry point of this binary
//
// This binary is meant to be run to completion. In other
// words this does not expose any long running service.
// This executes the commands or scripts specified in the
// custom resource and updates this resource post execution.
//
//
// NOTE:
// A kubernetes **Job** can make use of this binary
Expand Down Expand Up @@ -127,45 +138,71 @@ func main() {
defer klog.Flush()

if *commandName == "" {
klog.Fatal("Invalid arguments: Flag 'command-name' must be set")
klog.Exit("Invalid arguments: Flag 'command-name' must be set")
}
if *commandNamespace == "" {
klog.Exit("Invalid arguments: Flag 'command-ns' must be set")
}

klog.V(1).Infof("Command custom resource: kind %s", *commandKind)
klog.V(1).Infof("Command custom resource: resource %s", *commandResource)
klog.V(1).Infof("Command custom resource: group %s", *commandGroup)
klog.V(1).Infof("Command custom resource: version %s", *commandVersion)
klog.V(1).Infof("Command custom resource: name %s", *commandName)
klog.V(1).Infof("Command custom resource: namespace %s", *commandNamespace)
klog.V(1).Infof("Command custom resource: group %q", *commandGroup)
klog.V(1).Infof("Command custom resource: version %q", *commandVersion)
klog.V(1).Infof("Command custom resource: kind %q", *commandKind)
klog.V(1).Infof("Command custom resource: resource %q", *commandResource)
klog.V(1).Infof("Command custom resource: namespace %q", *commandNamespace)
klog.V(1).Infof("Command custom resource: name %q", *commandName)

runCommand(getRestConfig())
r, err := NewRunner()
if err != nil {
// This should lead to crashloopback if this
// is running from within a Kubernetes pod
klog.Exit(err)
}
err = r.Run()
if err != nil {
// This should lead to crashloopback if this
// is running from within a Kubernetes pod
klog.Exit(err)
}
os.Exit(0)
}

func getRestConfig() *rest.Config {
// Runnable helps in executing the Kubernetes command resource.
// It does so by executing the commands or scripts specified in
// the resource and updating this resource post execution.
type Runnable struct {
Client dynamic.Interface
GVR schema.GroupVersionResource

commandStatus *types.CommandStatus
}

// NewRunner returns a new instance of Runnable
func NewRunner() (*Runnable, error) {
var config *rest.Config
var err error

if *kubeconfig != "" {
klog.V(1).Infof("Using kubeconfig %s", *kubeconfig)
klog.V(2).Infof("Using kubeconfig %q", *kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else if *kubeAPIServerURL != "" {
klog.V(1).Infof("Using kubernetes api server url %s", *kubeAPIServerURL)
klog.V(2).Infof("Using kubernetes api server url %q", *kubeAPIServerURL)
config, err = clientcmd.BuildConfigFromFlags(*kubeAPIServerURL, "")
} else {
klog.V(1).Info("Using in-cluster kubeconfig")
klog.V(2).Info("Using in-cluster kubeconfig")
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatal(err)
return nil, err
}

// configure kubernetes client config with additional settings
// to manage deluge of requests to kubernetes API server
config.QPS = float32(*clientGoQPS)
config.Burst = *clientGoBurst
return config
}

func runCommand(config *rest.Config) {
client, err := dynamic.NewForConfig(config)
if err != nil {
klog.Fatal(err)
return nil, err
}

gvr := schema.GroupVersionResource{
Expand All @@ -174,77 +211,166 @@ func runCommand(config *rest.Config) {
Resource: *commandResource,
}

got, err := client.Resource(gvr).
return &Runnable{
Client: client,
GVR: gvr,
}, nil
}

func (a *Runnable) updateWithRetries() error {
var statusNew interface{}
err := unstruct.MarshalThenUnmarshal(a.commandStatus, &statusNew)
if err != nil {
return errors.Wrapf(
err,
"Marshal unmarshal failed: Command %q %q",
*commandNamespace,
*commandName,
)
}

// Command is updated with latest labels
labels := map[string]string{
// this label key is set with same value as that of status.phase
types.LblKeyCommandPhase: string(a.commandStatus.Phase),
}

var runtimeErr error

// This uses exponential backoff to avoid exhausting
// the apiserver
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
// Retrieve the latest version of Command
cmd, err := a.Client.
Resource(a.GVR).
Namespace(*commandNamespace).
Get(*commandName, v1.GetOptions{})
if err != nil {
// Retry this error since this might be a temporary
return errors.Wrapf(
err,
"Failed to get command: %q %q",
*commandNamespace,
*commandName,
)
}

// Mutate command resource's status field
err = unstructured.SetNestedField(
cmd.Object,
statusNew,
"status",
)
if err != nil {
runtimeErr = errors.Wrapf(
err,
"Set unstruct failed: Command %q %q",
*commandNamespace,
*commandName,
)
// Return nil to avoid retry
//
// NOTE:
// Setting unstructured instance should not be
// retried since every retry will result in the
// same error
return nil
}

// Merge existing labels with desired pair(s)
unstruct.SetLabels(cmd, labels)

updated, err := a.Client.
Resource(a.GVR).
Namespace(*commandNamespace).
Update(cmd, v1.UpdateOptions{})

if err != nil {
// Update error is returned to be retried since this
// might be temporary
return errors.Wrapf(
err,
"Update failed: Command %q %q",
*commandNamespace,
*commandName,
)
}

// Mutate command instance with latest resource version
// before trying update status. This is done since previous
// update would have modified resource version.
cmd.SetResourceVersion(updated.GetResourceVersion())
// Update command status as a **sub resource** update
_, err = a.Client.
Resource(a.GVR).
Namespace(*commandNamespace).
UpdateStatus(cmd, v1.UpdateOptions{})

// If update status resulted in an error it will be
// returned so that update can be retried
return errors.Wrapf(
err,
"Update status failed: Command %q %q",
*commandNamespace,
*commandName,
)
})

if runtimeErr != nil {
return errors.Wrapf(
runtimeErr,
"Update failed: Runtime error: Command: %q %q",
*commandNamespace,
*commandName,
)
}
return retryErr
}

// Run executes the command resource
func (a *Runnable) Run() error {
got, err := a.Client.
Resource(a.GVR).
Namespace(*commandNamespace).
Get(
*commandName,
v1.GetOptions{},
)
if err != nil {
klog.Fatal(err)
return errors.Wrapf(
err,
"Failed to get command: %q %q",
*commandNamespace,
*commandName,
)
}

var c types.Command
// convert from unstructured instance to typed instance
err = unstruct.ToTyped(got, &c)
if err != nil {
klog.Fatal(err)
return errors.Wrapf(
err,
"Failed to convert unstructured to typed instance: %q %q",
*commandNamespace,
*commandName,
)
}

cmder, err := command.NewCommander(
command.CommandableConfig{
Command: &c,
cmdRunner, err := command.NewRunner(
command.RunnableConfig{
Command: c,
},
)
if err != nil {
klog.Fatal(err)
return err
}
status, err := cmder.Run()
a.commandStatus, err = cmdRunner.Run()
if err != nil {
klog.Fatal(err)
return err
}

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Command before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
got, err = client.Resource(gvr).
Namespace(*commandNamespace).
Get(
*commandName,
v1.GetOptions{},
)
if err != nil {
klog.Fatal(err)
}

// update labels
lbls := got.GetLabels()
if len(lbls) == 0 {
lbls = make(map[string]string)
}
lbls["command.dope.metacontroller.io/phase"] = string(status.Phase)
got.SetLabels(lbls)

// update status
err = unstructured.SetNestedField(
got.Object,
status,
"status",
)
if err != nil {
klog.Fatal(err)
}

// update command resource
_, updateErr := client.Resource(gvr).
Namespace(*commandNamespace).
Update(
got,
v1.UpdateOptions{},
)
return updateErr
})
if retryErr != nil {
klog.Fatal(retryErr)
}
return a.updateWithRetries()
}
Loading

0 comments on commit 090d341

Please sign in to comment.