Skip to content

Commit

Permalink
Refactor to split handler and server.
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin McDermott <[email protected]>
  • Loading branch information
bigkevmcd committed Oct 2, 2024
1 parent a6d0852 commit be66d3c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
10 changes: 5 additions & 5 deletions internal/server/receiver_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,11 @@ func Test_handlePayload(t *testing.T) {
}

client := builder.Build()
s := ReceiverServer{
port: "",
logger: logger.NewLogger(logger.Options{}),
kubeClient: client,
}
s := newReceiverHandler(
logger.NewLogger(logger.Options{}),
client,
false,
)

data, err := json.Marshal(tt.payload)
if err != nil {
Expand Down
33 changes: 26 additions & 7 deletions internal/server/receiver_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,27 @@ func IndexReceiverWebhookPath(o client.Object) []string {
return nil
}

func (s *ReceiverServer) handlePayload(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
func newReceiverHandler(logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *receiverHandler {
h := &receiverHandler{
logger: logger.WithName("receiver-server"),
kubeClient: kubeClient,
exportHTTPPathMetrics: exportHTTPPathMetrics,
ServeMux: http.NewServeMux(),
}
h.ServeMux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(h.handlePayload))

return h
}

type receiverHandler struct {
logger logr.Logger
kubeClient client.Client
exportHTTPPathMetrics bool
*http.ServeMux
}

func (s *receiverHandler) handlePayload(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath))

s.logger.Info(fmt.Sprintf("handling request: %s", digest))
Expand Down Expand Up @@ -150,7 +169,7 @@ func (s *ReceiverServer) handlePayload(w http.ResponseWriter, r *http.Request) {
}
}

func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error {
func (s *receiverHandler) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error {
token, err := s.token(ctx, receiver)
if err != nil {
return fmt.Errorf("unable to read token, error: %w", err)
Expand Down Expand Up @@ -394,7 +413,7 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
return fmt.Errorf("recevier type '%s' not supported", receiver.Spec.Type)
}

func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (string, error) {
func (s *receiverHandler) token(ctx context.Context, receiver apiv1.Receiver) (string, error) {
token := ""
secretName := types.NamespacedName{
Namespace: receiver.GetNamespace(),
Expand All @@ -417,7 +436,7 @@ func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (st
}

// requestReconciliation requests reconciliation of all the resources matching the given CrossNamespaceObjectReference by annotating them accordingly.
func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error {
func (s *receiverHandler) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error {
namespace := defaultNamespace
if resource.Namespace != "" {
namespace = resource.Namespace
Expand Down Expand Up @@ -500,7 +519,7 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
return nil
}

func (s *ReceiverServer) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error {
func (s *receiverHandler) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error {
patch := client.MergeFrom(resource.DeepCopy())
sourceAnnotations := resource.GetAnnotations()

Expand Down Expand Up @@ -565,7 +584,7 @@ func getGroupVersion(s string) (string, string) {
return slice[0], slice[1]
}

func (s *ReceiverServer) evaluateResourceExpressions(r *http.Request, receiver apiv1.Receiver) ([]apiv1.CrossNamespaceObjectReference, error) {
func (s *receiverHandler) evaluateResourceExpressions(r *http.Request, receiver apiv1.Receiver) ([]apiv1.CrossNamespaceObjectReference, error) {
if len(receiver.Spec.ResourceExpressions) == 0 {
return nil, nil
}
Expand Down
16 changes: 5 additions & 11 deletions internal/server/receiver_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,25 @@ import (

// ReceiverServer handles webhook POST requests
type ReceiverServer struct {
port string
logger logr.Logger
kubeClient client.Client
exportHTTPPathMetrics bool
port string
*receiverHandler
}

// NewReceiverServer returns an HTTP server that handles webhooks
func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client, exportHTTPPathMetrics bool) *ReceiverServer {
return &ReceiverServer{
port: port,
logger: logger.WithName("receiver-server"),
kubeClient: kubeClient,
exportHTTPPathMetrics: exportHTTPPathMetrics,
port: port,
receiverHandler: newReceiverHandler(logger, kubeClient, exportHTTPPathMetrics),
}
}

// ListenAndServe starts the HTTP server on the specified port
func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Middleware) {
mux := http.NewServeMux()
mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload))
handlerID := apiv1.ReceiverWebhookPath
if s.exportHTTPPathMetrics {
handlerID = ""
}
h := std.Handler(handlerID, mdlw, mux)
h := std.Handler(handlerID, mdlw, s.receiverHandler)
srv := &http.Server{
Addr: s.port,
Handler: h,
Expand Down

0 comments on commit be66d3c

Please sign in to comment.