diff --git a/api/v1/receiver_types.go b/api/v1/receiver_types.go index 046fc0a4a..9861faa32 100644 --- a/api/v1/receiver_types.go +++ b/api/v1/receiver_types.go @@ -63,21 +63,14 @@ type ReceiverSpec struct { // +optional Events []string `json:"events,omitempty"` - // TODO: Validate one or other (or both?) - // A list of resources to be notified about changes. // +required Resources []CrossNamespaceObjectReference `json:"resources"` - // ResourceExpressions is a list of CEL expressions that will be parsed to - // determine resources to be notified about changes. - // The expressions must evaluate to CEL values that contain the keys "name", - // "kind", "apiVersion" and optionally "namespace". - // These values will be parsed to CrossNamespaceObjectReferences. - // e.g. {"name": "test-resource-1", "kind": "Receiver", "apiVersion": - // "notification.toolkit.fluxcd.io/v1"}. - // +optional - ResourceExpressions []string `json:"resourceExpressions,omitempty"` + // ResourceFilter is an expression that is applied to each Resource + // referenced in the Resources. If the expression returns false then the + // Resource is discarded and will not be notified. + ResourceFilter string `json:"resourceFilter,omitempty"` // SecretRef specifies the Secret containing the token used // to validate the payload authenticity. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 8e0bbd010..81a647ce8 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -126,11 +126,6 @@ func (in *ReceiverSpec) DeepCopyInto(out *ReceiverSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - if in.ResourceExpressions != nil { - in, out := &in.ResourceExpressions, &out.ResourceExpressions - *out = make([]string, len(*in)) - copy(*out, *in) - } out.SecretRef = in.SecretRef } diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml index bb80b7177..f77e7490c 100644 --- a/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml +++ b/config/crd/bases/notification.toolkit.fluxcd.io_receivers.yaml @@ -62,18 +62,12 @@ spec: Secret references. pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$ type: string - resourceExpressions: + resourceFilter: description: |- - ResourceExpressions is a list of CEL expressions that will be parsed to - determine resources to be notified about changes. - The expressions must evaluate to CEL values that contain the keys "name", - "kind", "apiVersion" and optionally "namespace". - These values will be parsed to CrossNamespaceObjectReferences. - e.g. {"name": "test-resource-1", "kind": "Receiver", "apiVersion": - "notification.toolkit.fluxcd.io/v1"}. - items: - type: string - type: array + ResourceFilter is an expression that is applied to each Resource + referenced in the Resources. If the expression returns false then the + Resource is discarded and will not be notified. + type: string resources: description: A list of resources to be notified about changes. items: diff --git a/internal/server/receiver_handler_test.go b/internal/server/receiver_handler_test.go index 19a02de0d..cd01bc433 100644 --- a/internal/server/receiver_handler_test.go +++ b/internal/server/receiver_handler_test.go @@ -27,7 +27,6 @@ import ( "net/http/httptest" "testing" - "github.com/go-logr/logr" "github.com/google/go-github/v64/github" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -762,7 +761,7 @@ func Test_handlePayload(t *testing.T) { expectedResponseCode: http.StatusOK, }, { - name: "resources determined by CEL expressions", + name: "resources filtered with CEL expressions", headers: map[string]string{ "Content-Type": "application/json; charset=utf-8", }, @@ -775,11 +774,17 @@ func Test_handlePayload(t *testing.T) { SecretRef: meta.LocalObjectReference{ Name: "token", }, - ResourceExpressions: []string{ - `{"name": "test-resource-1", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, - `[{"name": body.image.split(':',2)[0] + '-2', "namespace": "tested", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}]`, - `body.resources.map(r, {"name": r, "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"})`, + Resources: []apiv1.CrossNamespaceObjectReference{ + { + APIVersion: apiv1.GroupVersion.String(), + Kind: apiv1.ReceiverKind, + Name: "*", + MatchLabels: map[string]string{ + "label": "production", + }, + }, }, + ResourceFilter: `resource.metadata.name in ["test-resource-1", "test-resource-2"]`, }, Status: apiv1.ReceiverStatus{ WebhookPath: apiv1.ReceiverWebhookPath, @@ -794,13 +799,6 @@ func Test_handlePayload(t *testing.T) { "token": []byte("token"), }, }, - payload: map[string]interface{}{ - "image": "test-resource:1.2.1", - "resources": []string{ - "test-resource-3", - "test-resource-4", - }, - }, resources: []client.Object{ &apiv1.Receiver{ TypeMeta: metav1.TypeMeta{ @@ -809,6 +807,9 @@ func Test_handlePayload(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-resource-1", + Labels: map[string]string{ + "label": "production", + }, }, }, &apiv1.Receiver{ @@ -819,6 +820,9 @@ func Test_handlePayload(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "test-resource-2", Namespace: "tested", + Labels: map[string]string{ + "label": "production", + }, }, }, &apiv1.Receiver{ @@ -828,6 +832,9 @@ func Test_handlePayload(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-resource-3", + Labels: map[string]string{ + "label": "production", + }, }, }, &apiv1.Receiver{ @@ -837,10 +844,13 @@ func Test_handlePayload(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-resource-4", + Labels: map[string]string{ + "label": "production", + }, }, }, }, - expectedResourcesAnnotated: 4, // TODO: This should really check more than just the count. + expectedResourcesAnnotated: 2, // TODO: This should really check more than just the count. expectedResponseCode: http.StatusOK, }, { @@ -857,9 +867,17 @@ func Test_handlePayload(t *testing.T) { SecretRef: meta.LocalObjectReference{ Name: "token", }, - ResourceExpressions: []string{ - `{"name": ["test-resource-1"], "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, + Resources: []apiv1.CrossNamespaceObjectReference{ + { + APIVersion: apiv1.GroupVersion.String(), + Kind: apiv1.ReceiverKind, + Name: "*", + MatchLabels: map[string]string{ + "label": "production", + }, + }, }, + ResourceFilter: `resource.name == "test-resource-1"`, }, Status: apiv1.ReceiverStatus{ WebhookPath: apiv1.ReceiverWebhookPath, @@ -874,13 +892,6 @@ func Test_handlePayload(t *testing.T) { "token": []byte("token"), }, }, - payload: map[string]interface{}{ - "image": "test-resource:1.2.1", - "resources": []string{ - "test-resource-3", - "test-resource-4", - }, - }, resources: []client.Object{ &apiv1.Receiver{ TypeMeta: metav1.TypeMeta{ @@ -889,11 +900,14 @@ func Test_handlePayload(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "test-resource-1", + Labels: map[string]string{ + "label": "production", + }, }, }, }, expectedResourcesAnnotated: 0, // TODO: This should really check more than just the count. - expectedResponseCode: http.StatusBadRequest, + expectedResponseCode: http.StatusInternalServerError, }, } @@ -920,11 +934,11 @@ func Test_handlePayload(t *testing.T) { } client := builder.Build() - s := newReceiverHandler( - logger.NewLogger(logger.Options{}), - client, - false, - ) + s := ReceiverServer{ + port: "", + logger: logger.NewLogger(logger.Options{}), + kubeClient: client, + } data, err := json.Marshal(tt.payload) if err != nil { @@ -962,71 +976,6 @@ func Test_handlePayload(t *testing.T) { } } -func TestReceiverServer(t *testing.T) { - receiver := &apiv1.Receiver{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-receiver", - Namespace: "default", - }, - Spec: apiv1.ReceiverSpec{ - Type: apiv1.GenericReceiver, - SecretRef: meta.LocalObjectReference{ - Name: "token", - }, - ResourceExpressions: []string{ - `{"name": "test-receiver", "kind": "Receiver", "apiVersion": "notification.toolkit.fluxcd.io/v1"}`, - }, - }, - Status: apiv1.ReceiverStatus{ - WebhookPath: apiv1.ReceiverWebhookPath, - Conditions: []metav1.Condition{ - { - Type: meta.ReadyCondition, - Status: metav1.ConditionTrue, - }, - }, - }, - } - secret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "token", - Namespace: "default", - }, - Data: map[string][]byte{ - "token": []byte("token"), - }, - } - - k8sClient := buildTestClient(receiver, secret) - - rs := newReceiverHandler(logr.Discard(), k8sClient, false) - srv := httptest.NewServer(rs) - defer srv.Close() - - payload := map[string]any{ - "image": "test-resource:1.2.1", - } - - body, err := json.Marshal(payload) - if err != nil { - t.Fatal(err) - } - req, err := http.NewRequest(http.MethodPost, srv.URL+apiv1.ReceiverWebhookPath, bytes.NewBuffer(body)) - if err != nil { - t.Fatal(err) - } - req.Header.Set("Content-Type", "application/json; charset=utf-8") - - resp, err := srv.Client().Do(req) - if err != nil { - t.Fatal(err) - } - - if resp.StatusCode != http.StatusOK { - t.Errorf("got StatusCode %v, want %v", resp.StatusCode, http.StatusOK) - } -} - func buildTestClient(objs ...client.Object) client.Client { scheme := runtime.NewScheme() apiv1.AddToScheme(scheme) @@ -1035,6 +984,5 @@ func buildTestClient(objs ...client.Object) client.Client { return fake.NewClientBuilder(). WithScheme(scheme). WithObjects(objs...). - WithStatusSubresource(&apiv1.Receiver{}). WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath).Build() } diff --git a/internal/server/receiver_handlers.go b/internal/server/receiver_handlers.go index dea1e1b0e..1c8cda170 100644 --- a/internal/server/receiver_handlers.go +++ b/internal/server/receiver_handlers.go @@ -26,10 +26,8 @@ import ( "errors" "fmt" "io" - "mime" "net/http" "net/url" - "reflect" "strings" "time" @@ -40,10 +38,9 @@ import ( "github.com/go-logr/logr" "github.com/google/cel-go/cel" "github.com/google/cel-go/checker/decls" - "github.com/google/cel-go/common/types/traits" + celtypes "github.com/google/cel-go/common/types" celext "github.com/google/cel-go/ext" "github.com/google/go-github/v64/github" - "google.golang.org/protobuf/types/known/structpb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -77,27 +74,8 @@ func IndexReceiverWebhookPath(o client.Object) []string { return nil } -func newReceiverHandler(logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *receiverHandler { - h := &receiverHandler{ - logger: logger.WithName("receiver-server"), - kubeClient: kubeClient, - exportHTTPPathMetrics: exportHTTPPathMetrics, - ServeMux: http.NewServeMux(), - } - h.ServeMux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(h.handlePayload)) - - return h -} - -type receiverHandler struct { - logger logr.Logger - kubeClient client.Client - exportHTTPPathMetrics bool - *http.ServeMux -} - -func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() +func (s *ReceiverServer) handlePayload(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath)) s.logger.Info(fmt.Sprintf("handling request: %s", digest)) @@ -106,7 +84,6 @@ func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{ WebhookPathIndexKey: r.RequestURI, }, client.Limit(1)) - if err != nil { s.logger.Error(err, "unable to list receivers") w.WriteHeader(http.StatusInternalServerError) @@ -143,21 +120,7 @@ func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) var withErrors bool for _, resource := range receiver.Spec.Resources { - if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { - logger.Error(err, "unable to request reconciliation") - withErrors = true - } - } - - evaluatedResources, err := s.evaluateResourceExpressions(r, receiver) - if err != nil { - logger.Error(err, "unable to evaluate resource expressions") - w.WriteHeader(http.StatusBadRequest) - return - } - - for _, resource := range evaluatedResources { - if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { + if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace, receiver.Spec.ResourceFilter); err != nil { logger.Error(err, "unable to request reconciliation") withErrors = true } @@ -170,7 +133,7 @@ func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) } } -func (s *receiverHandler) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error { +func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error { token, err := s.token(ctx, receiver) if err != nil { return fmt.Errorf("unable to read token, error: %w", err) @@ -414,7 +377,7 @@ func (s *receiverHandler) validate(ctx context.Context, receiver apiv1.Receiver, return fmt.Errorf("recevier type '%s' not supported", receiver.Spec.Type) } -func (s *receiverHandler) token(ctx context.Context, receiver apiv1.Receiver) (string, error) { +func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (string, error) { token := "" secretName := types.NamespacedName{ Namespace: receiver.GetNamespace(), @@ -437,7 +400,7 @@ func (s *receiverHandler) token(ctx context.Context, receiver apiv1.Receiver) (s } // requestReconciliation requests reconciliation of all the resources matching the given CrossNamespaceObjectReference by annotating them accordingly. -func (s *receiverHandler) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error { +func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace, resourceFilter string) error { namespace := defaultNamespace if resource.Namespace != "" { namespace = resource.Namespace @@ -482,6 +445,16 @@ func (s *receiverHandler) requestReconciliation(ctx context.Context, logger logr } for i, resource := range resources.Items { + if resourceFilter != "" { + reject, err := evaluate(resourceFilter, resource) + if err != nil { + return err + } + if !*reject { + continue + } + } + if err := s.annotate(ctx, &resources.Items[i]); err != nil { return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, namespace, err) } else { @@ -520,7 +493,7 @@ func (s *receiverHandler) requestReconciliation(ctx context.Context, logger logr return nil } -func (s *receiverHandler) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error { +func (s *ReceiverServer) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error { patch := client.MergeFrom(resource.DeepCopy()) sourceAnnotations := resource.GetAnnotations() @@ -585,43 +558,8 @@ func getGroupVersion(s string) (string, string) { return slice[0], slice[1] } -func (s *receiverHandler) evaluateResourceExpressions(r *http.Request, receiver apiv1.Receiver) ([]apiv1.CrossNamespaceObjectReference, error) { - if len(receiver.Spec.ResourceExpressions) == 0 { - return nil, nil - } - - body := map[string]any{} - // Only decodes the body for the expression if the body is JSON. - // Technically you could generate several resources without any body. - if isJSONContent(r) { - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - return nil, fmt.Errorf("failed to parse request body as JSON: %s", err) - } - } - +func evaluate(expr string, partialMetadata metav1.PartialObjectMetadata) (*bool, error) { env, err := makeCELEnv() - if err != nil { - return nil, fmt.Errorf("failed to setup CEL environment: %s", err) - } - - var combinedResources []apiv1.CrossNamespaceObjectReference - - var returnErr error - for _, expr := range receiver.Spec.ResourceExpressions { - evaluated, evalErr := evaluate(expr, env, map[string]interface{}{ - "body": body, - }) - if evalErr != nil { - returnErr = errors.Join(returnErr, evalErr) - continue - } - combinedResources = append(combinedResources, evaluated...) - } - - return combinedResources, returnErr -} - -func evaluate(expr string, env *cel.Env, data map[string]any) ([]apiv1.CrossNamespaceObjectReference, error) { parsed, issues := env.Parse(expr) if issues != nil && issues.Err() != nil { return nil, fmt.Errorf("failed to parse expression %v: %w", expr, issues.Err()) @@ -637,59 +575,26 @@ func evaluate(expr string, env *cel.Env, data map[string]any) ([]apiv1.CrossName return nil, fmt.Errorf("expression %v failed to create a Program: %w", expr, err) } - out, _, err := prg.Eval(data) + data, err := partialObjectMetadataToMap(partialMetadata) if err != nil { - return nil, fmt.Errorf("expression %v failed to evaluate: %w", expr, err) + return nil, err } - switch v := out.(type) { - case traits.Lister: - return parseList(v) - case traits.Mapper: - ref, err := mapperToReference(v) - if err != nil { - return nil, err - } - return []apiv1.CrossNamespaceObjectReference{*ref}, nil - // TODO Log out other types? + out, _, err := prg.Eval(map[string]any{ + "resource": data, + }) + if err != nil { + return nil, fmt.Errorf("expression %v failed to evaluate: %w", expr, err) } - return nil, nil -} - -func parseList(l traits.Lister) ([]apiv1.CrossNamespaceObjectReference, error) { - var resources []apiv1.CrossNamespaceObjectReference - it := l.Iterator() - for it.HasNext().Value() == true { - switch element := it.Next().(type) { - case traits.Mapper: - cno, err := mapperToReference(element) - if err != nil { - return nil, err - } - resources = append(resources, *cno) - } + v, ok := out.(celtypes.Bool) + if !ok { + return nil, fmt.Errorf("expression %q did not return a boolean value", expr) } - return resources, nil -} + result := v.Value().(bool) -func mapperToReference(v traits.Mapper) (*apiv1.CrossNamespaceObjectReference, error) { - raw, err := v.ConvertToNative(reflect.TypeOf(&structpb.Value{})) - if err != nil { - return nil, fmt.Errorf("failed to convert to resource reference: %s", err) - } - cno := apiv1.CrossNamespaceObjectReference{} - b, err := raw.(*structpb.Value).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("converting object references to JSON: %w", err) - } - err = json.Unmarshal(b, &cno) - if err != nil { - return nil, fmt.Errorf("parsing object reference: %w", err) - } - - return &cno, err + return &result, nil } func makeCELEnv() (*cel.Env, error) { @@ -698,21 +603,20 @@ func makeCELEnv() (*cel.Env, error) { celext.Strings(), celext.Encoders(), cel.Declarations( - decls.NewVar("body", mapStrDyn), + decls.NewVar("resource", mapStrDyn), )) } -func isJSONContent(r *http.Request) bool { - contentType := r.Header.Get("Content-type") - for _, v := range strings.Split(contentType, ",") { - t, _, err := mime.ParseMediaType(v) - if err != nil { - break - } - if t == "application/json" { - return true - } +func partialObjectMetadataToMap(v metav1.PartialObjectMetadata) (map[string]any, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to marshal PartialObjectMetadata from resource for CEL expression: %w", err) + } + + var result map[string]any + if err := json.Unmarshal(b, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal PartialObjectMetadata from resource for CEL expression: %w", err) } - return false + return result, nil } diff --git a/internal/server/receiver_server.go b/internal/server/receiver_server.go index c01c8f8bd..e71fd9b19 100644 --- a/internal/server/receiver_server.go +++ b/internal/server/receiver_server.go @@ -32,25 +32,31 @@ import ( // ReceiverServer handles webhook POST requests type ReceiverServer struct { - port string - *receiverHandler + port string + logger logr.Logger + kubeClient client.Client + exportHTTPPathMetrics bool } // NewReceiverServer returns an HTTP server that handles webhooks func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *ReceiverServer { return &ReceiverServer{ - port: port, - receiverHandler: newReceiverHandler(logger, kubeClient, exportHTTPPathMetrics), + port: port, + logger: logger.WithName("receiver-server"), + kubeClient: kubeClient, + exportHTTPPathMetrics: exportHTTPPathMetrics, } } // ListenAndServe starts the HTTP server on the specified port func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Middleware) { + mux := http.NewServeMux() + mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload)) handlerID := apiv1.ReceiverWebhookPath if s.exportHTTPPathMetrics { handlerID = "" } - h := std.Handler(handlerID, mdlw, s.receiverHandler) + h := std.Handler(handlerID, mdlw, mux) srv := &http.Server{ Addr: s.port, Handler: h,