Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-278] Waiter uses watchTools.UntilWithSync instead of Until to handle resource version old error #359

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/commands/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package _import

import (
"io"
"io/ioutil"
"os"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -170,7 +169,7 @@ func readDescriptor(cmd *cobra.Command, filename string) (string, error) {
)

if filename == "-" {
reader = ioutil.NopCloser(cmd.InOrStdin())
reader = io.NopCloser(cmd.InOrStdin())
} else {
reader, err = os.Open(filename)
if err != nil {
Expand All @@ -179,7 +178,7 @@ func readDescriptor(cmd *cobra.Command, filename string) (string, error) {
}
defer reader.Close()

buf, err := ioutil.ReadAll(reader)
buf, err := io.ReadAll(reader)
if err != nil {
return "", err
}
Expand Down
32 changes: 22 additions & 10 deletions pkg/commands/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
watchTools "k8s.io/client-go/tools/watch"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
Expand Down Expand Up @@ -59,18 +60,24 @@ func (w *Waiter) wait(ctx context.Context, ob runtime.Object, condition watchToo
return err
}

if !done {
refable, ok := ob.(kmeta.OwnerRefable)
if !ok {
return errors.New("unexpected type")
}
refable, ok := ob.(kmeta.OwnerRefable)
if !ok {
return errors.New("unexpected type")
}

rv := refable.GetObjectMeta().GetResourceVersion()
watchOne := newWatchOneWatcher(ctx, refable, w.dynamicClient)
listerWatcherOne := newWatchOneWatcher(ctx, refable, w.dynamicClient)

ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
e, err = watchTools.Until(ctx, rv, watchOne, filterErrors(cfs)...)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()

if !done {
e, err = watchTools.UntilWithSync(ctx,
&listerWatcherOne,
&unstructured.Unstructured{},
func(store cache.Store) (bool, error) {
return false, nil
tomkennedy513 marked this conversation as resolved.
Show resolved Hide resolved
},
filterErrors(cfs)...)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,6 +165,11 @@ func (w watchOneWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
return w.dynamicClient.Resource(w.gvr).Namespace(w.namespace).Watch(w.ctx, options)
}

func (w watchOneWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fmt.Sprintf("metadata.name=%s", w.name)
return w.dynamicClient.Resource(w.gvr).Namespace(w.namespace).List(w.ctx, options)
}

func filterErrors(conditions []watchTools.ConditionFunc) []watchTools.ConditionFunc {
cfs := []watchTools.ConditionFunc{}
for _, c := range conditions {
Expand Down
75 changes: 58 additions & 17 deletions pkg/commands/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -81,13 +83,19 @@ func testWaiter(t *testing.T, when spec.G, it spec.S) {
Status: conditionReady(corev1.ConditionFalse, generation-1),
}

builderObj := &v1alpha2.Builder{
TypeMeta: resourceToWatch.TypeMeta,
ObjectMeta: resourceToWatch.ObjectMeta,
Status: v1alpha2.BuilderStatus{Status: conditionReady(corev1.ConditionTrue, generation)},
}

content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(builderObj)
if err != nil {
panic(err)
}
watcher.addEvent(watch.Event{
Type: watch.Modified,
Object: &v1alpha2.Builder{
TypeMeta: resourceToWatch.TypeMeta,
ObjectMeta: resourceToWatch.ObjectMeta,
Status: v1alpha2.BuilderStatus{Status: conditionReady(corev1.ConditionTrue, generation)},
},
Type: watch.Modified,
Object: &unstructured.Unstructured{Object: content},
})

require.NoError(t, waiter.Wait(context.Background(), resourceToWatch))
Expand All @@ -99,18 +107,56 @@ func testWaiter(t *testing.T, when spec.G, it spec.S) {
Status: conditionReady(corev1.ConditionFalse, generation-1),
}

builderObj := &v1alpha2.Builder{
TypeMeta: resourceToWatch.TypeMeta,
ObjectMeta: resourceToWatch.ObjectMeta,
Status: v1alpha2.BuilderStatus{Status: conditionReady(corev1.ConditionTrue, generation)},
}

content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(builderObj)
if err != nil {
panic(err)
}
watcher.addEvent(watch.Event{
Type: watch.Modified,
Object: &v1alpha2.Builder{
TypeMeta: resourceToWatch.TypeMeta,
ObjectMeta: resourceToWatch.ObjectMeta,
Status: v1alpha2.BuilderStatus{Status: conditionReady(corev1.ConditionTrue, generation)},
},
Type: watch.Modified,
Object: &unstructured.Unstructured{Object: content},
})

require.NoError(t, waiter.Wait(context.Background(), resourceToWatch, fakeConditionChecker.conditionCheck))
require.True(t, fakeConditionChecker.called)
})

it("recovers from too old resource version error", func() {
watcher.addEvent(watch.Event{
Type: watch.Error,
Object: &v1.Status{
TypeMeta: v1.TypeMeta{
APIVersion: "v1",
Kind: "Status",
},
Status: "Failure",
Message: "too old resource version: 23358 (23360)",
Reason: "Expired",
Code: 410,
},
})

builderObj := &v1alpha2.Builder{
TypeMeta: resourceToWatch.TypeMeta,
ObjectMeta: resourceToWatch.ObjectMeta,
Status: v1alpha2.BuilderStatus{Status: conditionReady(corev1.ConditionTrue, generation)},
}
content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(builderObj)
if err != nil {
panic(err)
}
watcher.addEvent(watch.Event{
Type: watch.Modified,
Object: &unstructured.Unstructured{Object: content},
})

require.NoError(t, waiter.Wait(context.Background(), resourceToWatch))
})
Comment on lines +129 to +159
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will fail with the cachetools.Until version with the error message that was first reported. Couldn't force the too old resource version error while testing locally in my cluster :(

})
}

Expand All @@ -121,7 +167,6 @@ type fakeConditionChecker struct {
func (cc *fakeConditionChecker) conditionCheck(_ watch.Event) (bool, error) {
cc.called = true
return true, nil

}

func conditionReady(status corev1.ConditionStatus, generation int64) corev1alpha1.Status {
Expand Down Expand Up @@ -159,10 +204,6 @@ func (t *TestWatcher) watchReactor(action clientgotesting.Action) (handled bool,
}

watchAction := action.(clientgotesting.WatchAction)
if watchAction.GetWatchRestrictions().ResourceVersion != t.expectedResource.GetObjectMeta().GetResourceVersion() {
return true, nil, errors.New("expected watch on resource version")
}

if watchAction.GetNamespace() != t.expectedResource.GetObjectMeta().GetNamespace() {
return true, nil, errors.New("expected watch on namespace")
}
Expand Down