Skip to content

Commit

Permalink
feat(command): add support to delete dependants of command resource (#…
Browse files Browse the repository at this point in the history
…189)

* feat(command): add support to delete dependents of command resource

This PR does the following changes:
- Add support to delete dependent resources when command resource is
  deleted.
- Add support to launch the jobs periodically when command is configured
  for run always

Signed-off-by: mittachaitu <[email protected]>
  • Loading branch information
sai chaithanya authored Feb 17, 2021
1 parent ad823c7 commit a61dfc0
Show file tree
Hide file tree
Showing 13 changed files with 648 additions and 118 deletions.
14 changes: 8 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func main() {

// controller name & corresponding controller reconcile function
var controllers = map[string]generic.InlineInvokeFn{
"sync/recipe": recipe.Sync,
"finalize/recipe": recipe.Finalize,
"sync/http": http.Sync,
"sync/doperator": doperator.Sync,
"sync/run": run.Sync,
"sync/command": command.Sync,
"sync/recipe": recipe.Sync,
"finalize/recipe": recipe.Finalize,
"sync/http": http.Sync,
"sync/doperator": doperator.Sync,
"sync/run": run.Sync,
"sync/command": command.Sync,
"finalize/command": command.Finalize,
}

for name, ctrl := range controllers {
generic.AddToInlineRegistry(name, ctrl)
}
Expand Down
50 changes: 49 additions & 1 deletion config/metac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,52 @@ spec:
hooks:
sync:
inline:
funcName: sync/command
funcName: sync/command
---
apiVersion: dope/v1
kind: GenericController
metadata:
name: finalize-command
namespace: dope
spec:
watch:
apiVersion: dope.mayadata.io/v1
resource: commands
attachments:
# Delete pod
- apiVersion: v1
resource: pods
advancedSelector:
selectorTerms:
# select Pod if its labels has following
- matchReferenceExpressions:
- key: metadata.namespace
operator: EqualsWatchNamespace
- key: metadata.labels.job-name
operator: EqualsWatchName # match this lbl value against watch Name
# Delete job
- apiVersion: batch/v1
resource: jobs
advancedSelector:
selectorTerms:
# select job if its labels has following
- matchLabels:
command.dope.mayadata.io/controller: "true"
matchReferenceExpressions:
- key: metadata.labels.command\.dope\.mayadata\.io/uid
operator: EqualsWatchUID # match this lbl value against watch UID
- apiVersion: v1
resource: configmaps
advancedSelector:
selectorTerms:
# select ConfigMap if its labels has following
- matchLabels:
command.dope.mayadata.io/lock: "true"
matchReferenceExpressions:
- key: metadata.labels.command\.dope\.mayadata\.io/uid
operator: EqualsWatchUID # match this lbl value against watch Name
hooks:
finalize:
inline:
funcName: finalize/command
---
57 changes: 57 additions & 0 deletions controller/command/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2020 The MayaData Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command

import (
"openebs.io/metac/controller/generic"
)

var (
defaultDeletionResyncTime = float64(30)
)

// Finalize implements the idempotent logic that gets executed when
// Command instance is deleted. A Command instance may have child job &
// dedicated lock in form of a ConfigMap.
// Finalize logic tries to delete child pod, job & ConfigMap
//
// NOTE:
// When finalize hook is set in the config metac automatically sets
// a finalizer entry against the Command metadata's finalizers field .
// This finalizer entry is removed when SyncHookResponse's Finalized
// field is set to true
//
// NOTE:
// SyncHookRequest is the payload received as part of finalize
// request. Similarly, SyncHookResponse is the payload sent as a
// response as part of finalize request.
//
// NOTE:
// Returning error will panic this process. We would rather want this
// controller to run continuously. Hence, the errors are handled.
func Finalize(request *generic.SyncHookRequest, response *generic.SyncHookResponse) error {
if request.Attachments.IsEmpty() {
// Since no Dependents found it is safe to delete Command
response.Finalized = true
return nil
}

response.ResyncAfterSeconds = defaultDeletionResyncTime
// Observed attachments will get deleted
response.ExplicitDeletes = request.Attachments.List()
return nil
}
11 changes: 11 additions & 0 deletions controller/command/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func (r *Reconciler) eval() {
}

func (r *Reconciler) sync() {
// Check for deletion timestamp on command resource
// if set then command is marked for deletion
if !r.observedCommand.DeletionTimestamp.IsZero() {
klog.V(1).Infof(
"Will skip command reconciliation: It is marked for deletion: Command %q / %q",
r.observedCommand.GetNamespace(),
r.observedCommand.GetName(),
)
return
}

// creating / deleting a Kubernetes Job is part of Command reconciliation
jobBuilder := command.NewJobBuilder(
command.JobBuildingConfig{
Expand Down
2 changes: 1 addition & 1 deletion deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@ spec:
versions:
- name: v1
served: true
storage: true
storage: true
106 changes: 95 additions & 11 deletions pkg/command/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package command

import (
"fmt"
"strings"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -57,6 +58,11 @@ type Reconciliation struct {
// client to invoke CRUD operations against k8s Job
jobClient *clientset.ResourceClient

// getChildJob will hold function to fetch the child object
// from k8s cluster
// NOTE: This is helpful to mocking
getChildJob func() (*unstructured.Unstructured, bool, error)

// is Command resource supposed to run Once
isRunOnce bool

Expand Down Expand Up @@ -98,6 +104,10 @@ type Reconciliation struct {
}

func (r *Reconciliation) initChildJobDetails() {
var got *unstructured.Unstructured
var found bool
var err error

if r.childJob == nil || r.childJob.Object == nil {
return
}
Expand All @@ -115,7 +125,12 @@ func (r *Reconciliation) initChildJobDetails() {
)
return
}
got, found, err := r.isChildJobAvailable()

if r.getChildJob != nil {
got, found, err = r.getChildJob()
} else {
got, found, err = r.isChildJobAvailable()
}
if err != nil {
r.err = err
return
Expand Down Expand Up @@ -162,16 +177,17 @@ func (r *Reconciliation) initChildJobDetails() {
return
}

// Extract status.active of this Job
activeCount, found, err := unstructured.NestedInt64(
// Extract status.conditions of this Job to know whether
// job has completed its execution
jobConditions, found, err := unstructured.NestedSlice(
got.Object,
"status",
"active",
"conditions",
)
if err != nil {
r.err = errors.Wrapf(
err,
"Failed to get Job status.active: Kind %q: Job %q / %q",
"Failed to get Job status.conditions: Kind %q: Job %q / %q",
r.childJob.GetKind(),
r.childJob.GetNamespace(),
r.childJob.GetName(),
Expand All @@ -180,21 +196,45 @@ func (r *Reconciliation) initChildJobDetails() {
}
if !found {
klog.V(1).Infof(
"Job status.active is not set: Kind %q: Job %q / %q",
"Job status.conditions is not set: Kind %q: Job %q / %q",
r.childJob.GetKind(),
r.childJob.GetNamespace(),
r.childJob.GetName(),
)
// Job's status.active is not set
// Job's status.conditions is not set
//
// Nothing to do
// Wait for next reconcile
return
}

if activeCount > 0 {
r.isChildJobCompleted = true
// Look for condition type complete
// if found then mark isChildJobCompleted as true
for _, value := range jobConditions {
condition, ok := value.(map[string]interface{})
if !ok {
r.err = errors.Errorf(
"Job status.condition is not map[string]interface{} got %T: "+
"kind %q: Job %q / %q",
value,
r.childJob.GetKind(),
r.childJob.GetNamespace(),
r.childJob.GetName(),
)
return
}
condType := condition["type"].(string)
if condType == types.JobPhaseCompleted {
condStatus := condition["status"].(string)
if strings.ToLower(condStatus) == "true" {
r.isChildJobCompleted = true
}
}
}

// If there is no condtion with complete type then
// nothing to do

// wait for next reconciliation
}

func (r *Reconciliation) initCommandDetails() {
Expand Down Expand Up @@ -356,13 +396,17 @@ func (r *Reconciliation) isChildJobAvailable() (*unstructured.Unstructured, bool
}

func (r *Reconciliation) deleteChildJob() (types.CommandStatus, error) {
// If propagationPolicy is set to background then the garbage collector will
// delete dependents in the background
propagationPolicy := v1.DeletePropagationBackground
err := r.jobClient.
Namespace(r.childJob.GetNamespace()).
Delete(
r.childJob.GetName(),
&v1.DeleteOptions{
// Delete immediately
GracePeriodSeconds: pointer.Int64(0),
PropagationPolicy: &propagationPolicy,
},
)
if err != nil && !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -445,12 +489,52 @@ func (r *Reconciliation) reconcileRunAlwaysCommand() (types.CommandStatus, error
return r.createChildJob()
}
if r.isStatusSet && r.isChildJobCompleted {
// Since this is for run always we are performing below steps
// 1. Delete Job and wait til it gets deleted from etcd
// 2. Create Job in the same reconciliation
klog.V(1).Infof(
"Will delete command job: Command %q / %q",
r.command.GetNamespace(),
r.command.GetName(),
)
return r.deleteChildJob()
_, err := r.deleteChildJob()
if err != nil {
return types.CommandStatus{}, err
}

// Logic to wait for Job resource deletion from etcd
var message = fmt.Sprintf(
"Waiting for command job: %q / %q deletion",
r.childJob.GetNamespace(),
r.childJob.GetName(),
)
err = r.Retry.Waitf(
func() (bool, error) {
_, err := r.jobClient.
Namespace(r.childJob.GetNamespace()).
Get(r.childJob.GetName(), v1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
},
message,
)

klog.V(1).Infof("Deleted command job: Command %q / %q successfully",
r.command.GetNamespace(),
r.command.GetName(),
)

klog.V(1).Infof(
"Will create command job: Command %q / %q",
r.command.GetNamespace(),
r.command.GetName(),
)
return r.createChildJob()
}
return types.CommandStatus{
Phase: types.CommandPhaseInProgress,
Expand Down
Loading

0 comments on commit a61dfc0

Please sign in to comment.