Skip to content

Commit

Permalink
feat: jobs support
Browse files Browse the repository at this point in the history
  • Loading branch information
devthejo committed Sep 6, 2022
1 parent 856232c commit c9df401
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestNotFound(t *testing.T) {
assert.True(t, ok)

assert.False(t, rolloutStatus.Continue)
assert.Equal(t, `Selector "foo=bar" did not match any deployments or statefulsets in namespace "any-ns"`, re.Message)
assert.Equal(t, `Selector "foo=bar" did not match any deployments, statefulsets or jobs in namespace "any-ns"`, re.Message)
}

func TestSuccess(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions cmd/main_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/SocialGouv/rollout-status/pkg/client"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"
)
Expand Down Expand Up @@ -67,6 +68,7 @@ type StaticClient struct {
DeploymentList *appsv1.DeploymentList
ReplicaSetList *appsv1.ReplicaSetList
StatefulSetList *appsv1.StatefulSetList
JobList *batchv1.JobList
PodList *v1.PodList
}

Expand All @@ -82,6 +84,10 @@ func (client StaticClient) ListAppsV1ReplicaSets(deployment *appsv1.Deployment)
return client.ReplicaSetList, nil
}

func (client StaticClient) ListBatchV1Jobs(namespace, selector string) (*batchv1.JobList, error) {
return client.JobList, nil
}

func (client StaticClient) ListV1Pods(replicasSet *appsv1.ReplicaSet) (*v1.PodList, error) {
return client.PodList, nil
}
Expand All @@ -90,6 +96,10 @@ func (client StaticClient) ListV1StsPods(sts *appsv1.StatefulSet) (*v1.PodList,
return client.PodList, nil
}

func (client StaticClient) ListV1JobPods(job *batchv1.Job) (*v1.PodList, error) {
return client.PodList, nil
}

func (client StaticClient) TrailContainerLogs(namespace, pod, container string) ([]byte, error) {
panic("not implemented")
}
3 changes: 3 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package client

import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
)

type Kubernetes interface {
ListAppsV1Deployments(namespace, selector string) (*appsv1.DeploymentList, error)
ListAppsV1StatefulSets(namespace, selector string) (*appsv1.StatefulSetList, error)
ListAppsV1ReplicaSets(deployment *appsv1.Deployment) (*appsv1.ReplicaSetList, error)
ListBatchV1Jobs(namespace, selector string) (*batchv1.JobList, error)
ListV1Pods(replicasSet *appsv1.ReplicaSet) (*v1.PodList, error)
ListV1StsPods(replicasSet *appsv1.StatefulSet) (*v1.PodList, error)
ListV1JobPods(job *batchv1.Job) (*v1.PodList, error)
TrailContainerLogs(namespace, pod, container string) ([]byte, error)
}
22 changes: 22 additions & 0 deletions pkg/client/real.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"io/ioutil"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
selection "k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -34,6 +37,10 @@ func (impl KubernetesImpl) ListAppsV1ReplicaSets(deployment *appsv1.Deployment)
return impl.clientset.AppsV1().ReplicaSets(deployment.Namespace).List(options)
}

func (impl KubernetesImpl) ListBatchV1Jobs(namespace, selector string) (*batchv1.JobList, error) {
return impl.clientset.BatchV1().Jobs(namespace).List(metav1.ListOptions{LabelSelector: selector})
}

func (impl KubernetesImpl) ListV1Pods(replicasSet *appsv1.ReplicaSet) (*v1.PodList, error) {
selector, err := metav1.LabelSelectorAsSelector(replicasSet.Spec.Selector)
if err != nil {
Expand All @@ -52,6 +59,21 @@ func (impl KubernetesImpl) ListV1StsPods(sts *appsv1.StatefulSet) (*v1.PodList,
return impl.clientset.CoreV1().Pods(sts.Namespace).List(options)
}

func (impl KubernetesImpl) ListV1JobPods(job *batchv1.Job) (*v1.PodList, error) {
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
return nil, err
}
jobNameSelector := []string{job.ObjectMeta.Name}
jobNameRequirement, err := labels.NewRequirement("job-name", selection.DoubleEquals, jobNameSelector)
if err != nil {
return nil, err
}
selector.Add(*jobNameRequirement)
options := metav1.ListOptions{LabelSelector: selector.String()}
return impl.clientset.CoreV1().Pods(job.Namespace).List(options)
}

func (impl KubernetesImpl) TrailContainerLogs(namespace, pod, container string) ([]byte, error) {
// https://github.com/kubernetes/kubernetes/blob/c2e90cd1549dff87db7941544ce15f4c8ad0ba4c/pkg/kubectl/cmd/log.go#L188
req := impl.clientset.CoreV1().RESTClient().Get().
Expand Down
31 changes: 24 additions & 7 deletions pkg/status/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@ func TestRollout(wrapper client.Kubernetes, namespace, selector string) RolloutS
return RolloutFatal(err)
}

if len(deployments.Items) == 0 && len(statefulsets.Items) == 0 {
err = MakeRolloutError(FailureNotFound, "Selector %q did not match any deployments or statefulsets in namespace %q", selector, namespace)
jobs, err := wrapper.ListBatchV1Jobs(namespace, selector)
if err != nil {
return RolloutFatal(err)
}

if (deployments == nil || len(deployments.Items) == 0) && (statefulsets == nil || len(statefulsets.Items) == 0) && (jobs == nil || len(jobs.Items) == 0) {
err = MakeRolloutError(FailureNotFound, "Selector %q did not match any deployments, statefulsets or jobs in namespace %q", selector, namespace)
return RolloutFatal(err)
}

aggr := Aggregator{}
//https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kubectl/pkg/cmd/rollout/rollout_status.go
//https://github.com/kubernetes/kubernetes/blob/47daccb272c1a98c7b005dc1c19a88dbb643a3ee/staging/src/k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go#L59
for _, deployment := range deployments.Items {
status := DeploymentStatus(wrapper, &deployment)
aggr.Add(status)
if fatal := aggr.Fatal(); fatal != nil {
return *fatal
if deployments != nil {
for _, deployment := range deployments.Items {
status := DeploymentStatus(wrapper, &deployment)
aggr.Add(status)
if fatal := aggr.Fatal(); fatal != nil {
return *fatal
}
}
}

Expand All @@ -41,5 +48,15 @@ func TestRollout(wrapper client.Kubernetes, namespace, selector string) RolloutS
}
}

if jobs != nil {
for _, job := range jobs.Items {
status := JobStatus(wrapper, &job)
aggr.Add(status)
if fatal := aggr.Fatal(); fatal != nil {
return *fatal
}
}
}

return aggr.Resolve()
}
48 changes: 48 additions & 0 deletions pkg/status/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package status

import (
"github.com/SocialGouv/rollout-status/pkg/client"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
)

func TestJobStatus(wrapper client.Kubernetes, job batchv1.Job) RolloutStatus {
podList, err := wrapper.ListV1JobPods(&job)
if err != nil {
return RolloutFatal(err)
}

aggr := Aggregator{}
for _, pod := range podList.Items {
status := TestPodStatus(&pod)
aggr.Add(status)
if fatal := aggr.Fatal(); fatal != nil {
return *fatal
}
}
return aggr.Resolve()
}

func JobStatus(wrapper client.Kubernetes, job *batchv1.Job) RolloutStatus {

aggr := Aggregator{}

for _, condition := range job.Status.Conditions {
if condition.Type == batchv1.JobComplete && condition.Status == v1.ConditionTrue {
aggr.Add(RolloutOk())
return aggr.Resolve()
}
}

status := TestJobStatus(wrapper, *job)
if job.Status.Failed >= (*job.Spec.BackoffLimit + 1) {
aggr.Add(status)
} else if status.Error != nil {
aggr.Add(RolloutErrorProgressing(status.Error))
}
if fatal := aggr.Fatal(); fatal != nil {
return *fatal
}

return aggr.Resolve()
}
5 changes: 3 additions & 2 deletions pkg/status/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func (agg *Aggregator) Resolve() RolloutStatus {
for _, status := range agg.statuses {
if status.Error != nil {
hasErrors = true
break
// break
}
if status.Continue {
hasContinue = true
break
// break
}
}
if !hasErrors && !hasContinue {
Expand All @@ -46,5 +46,6 @@ func (agg *Aggregator) Resolve() RolloutStatus {
return status
}
}
// fmt.Printf("agg: %v\n", agg)
panic("invalid aggregator state")
}

0 comments on commit c9df401

Please sign in to comment.