Skip to content

Commit

Permalink
Extract pubsub.go
Browse files Browse the repository at this point in the history
  • Loading branch information
int128 committed Aug 3, 2023
1 parent ba3c3b2 commit d9902b4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
20 changes: 0 additions & 20 deletions internal/controller/error.go

This file was deleted.

40 changes: 40 additions & 0 deletions internal/controller/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package controller

import (
"context"
"errors"

"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type newPubSubClientFunc func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error)

func isPubSubNotFoundError(err error) bool {
if gs, ok := gRPCStatusFromError(err); ok {
return gs.Code() == codes.NotFound
}
return false
}

func isPubSubAlreadyExistsError(err error) bool {
if gs, ok := gRPCStatusFromError(err); ok {
return gs.Code() == codes.AlreadyExists
}
return false
}

func gRPCStatusFromError(err error) (*status.Status, bool) {
type gRPCError interface {
GRPCStatus() *status.Status
}

var se gRPCError
if errors.As(err, &se) {
return se.GRPCStatus(), true
}

return nil, false
}
14 changes: 6 additions & 8 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"time"

"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -37,9 +35,9 @@ const subscriptionFinalizerName = "subscription.googlecloudpubsuboperator.quippe

// SubscriptionReconciler reconciles a Subscription object
type SubscriptionReconciler struct {
client.Client
crclient.Client
Scheme *runtime.Scheme
NewClient func(ctx context.Context, projectID string, opts ...option.ClientOption) (c *pubsub.Client, err error)
NewClient newPubSubClientFunc
}

//+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -55,7 +53,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, crclient.IgnoreNotFound(err)
}

logger.Info("Found the subscription", "subscription", subscription)
Expand Down Expand Up @@ -94,7 +92,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request

s, err := r.createSubscription(ctx, subscription)
if err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.AlreadyExists {
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("PubSub subscription already exists")
return ctrl.Result{}, nil
Expand Down Expand Up @@ -144,7 +142,7 @@ func (r *SubscriptionReconciler) deleteSubscription(ctx context.Context, subscri

err = c.Subscription(subscription.Spec.SubscriptionID).Delete(ctx)
if err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.NotFound {
if isPubSubNotFoundError(err) {
// for idempotent
return nil
}
Expand Down
13 changes: 5 additions & 8 deletions internal/controller/topic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import (
"fmt"

"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -35,9 +32,9 @@ import (

// TopicReconciler reconciles a Topic object
type TopicReconciler struct {
client.Client
crclient.Client
Scheme *runtime.Scheme
NewClient func(ctx context.Context, projectID string, opts ...option.ClientOption) (c *pubsub.Client, err error)
NewClient newPubSubClientFunc
}

const topicFinalizerName = "topic.googlecloudpubsuboperator.quipper.github.io/finalizer"
Expand All @@ -55,7 +52,7 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, crclient.IgnoreNotFound(err)
}
logger.Info("Found Topic resource")

Expand Down Expand Up @@ -93,7 +90,7 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

t, err := r.createTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID)
if err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.AlreadyExists {
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("Topic already exists in Cloud Pub/Sub", "error", err)
topicPatch := crclient.MergeFrom(topic.DeepCopy())
Expand Down Expand Up @@ -156,7 +153,7 @@ func (r *TopicReconciler) deleteTopic(ctx context.Context, projectID, topicID st
defer c.Close()

if err := c.Topic(topicID).Delete(ctx); err != nil {
if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.NotFound {
if isPubSubNotFoundError(err) {
// for idempotent
return nil
}
Expand Down

0 comments on commit d9902b4

Please sign in to comment.