Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Commit

Permalink
Stream gateway (#295)
Browse files Browse the repository at this point in the history
Adding new Gateway and *Gateway resources along side the existing *Providers. Updated the Stream resource to be able to specify and track the Gateway in addition to the provisioner. Deprecated all providers.

Refs #86 and RFC 0011

Co-Authored-By: Eric Bottard <[email protected]>
  • Loading branch information
scothis and ericbottard authored Jan 17, 2020
1 parent 05dd172 commit 1655773
Show file tree
Hide file tree
Showing 65 changed files with 10,492 additions and 42 deletions.
83 changes: 76 additions & 7 deletions cmd/managers/streaming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (

buildv1alpha1 "github.com/projectriff/system/pkg/apis/build/v1alpha1"
streamingv1alpha1 "github.com/projectriff/system/pkg/apis/streaming/v1alpha1"
controllers "github.com/projectriff/system/pkg/controllers/streaming"
"github.com/projectriff/system/pkg/controllers"
streamingcontrollers "github.com/projectriff/system/pkg/controllers/streaming"
"github.com/projectriff/system/pkg/tracker"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -78,7 +79,7 @@ func main() {
os.Exit(1)
}

if err = (&controllers.KafkaProviderReconciler{
if err = (&streamingcontrollers.KafkaProviderReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("KafkaProvider"),
Log: ctrl.Log.WithName("controllers").WithName("KafkaProvider"),
Expand All @@ -93,7 +94,7 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "KafkaProvider")
os.Exit(1)
}
if err = (&controllers.PulsarProviderReconciler{
if err = (&streamingcontrollers.PulsarProviderReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("PulsarProvider"),
Log: ctrl.Log.WithName("controllers").WithName("PulsarProvider"),
Expand All @@ -108,7 +109,7 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "PulsarProvider")
os.Exit(1)
}
if err = (&controllers.InMemoryProviderReconciler{
if err = (&streamingcontrollers.InMemoryProviderReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("InMemoryProvider"),
Log: ctrl.Log.WithName("controllers").WithName("InMemoryProvider"),
Expand All @@ -124,12 +125,13 @@ func main() {
os.Exit(1)
}
streamControllerLogger := ctrl.Log.WithName("controllers").WithName("Stream")
if err = (&controllers.StreamReconciler{
if err = (&streamingcontrollers.StreamReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("Stream"),
Log: streamControllerLogger,
Scheme: mgr.GetScheme(),
StreamProvisionerClient: controllers.NewStreamProvisionerClient(http.DefaultClient, streamControllerLogger),
Tracker: tracker.New(syncPeriod, ctrl.Log.WithName("controllers").WithName("Stream").WithName("tracker")),
StreamProvisionerClient: streamingcontrollers.NewStreamProvisionerClient(http.DefaultClient, streamControllerLogger),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Stream")
os.Exit(1)
Expand All @@ -138,7 +140,7 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "Stream")
os.Exit(1)
}
if err = (&controllers.ProcessorReconciler{
if err = (&streamingcontrollers.ProcessorReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("Processor"),
Log: ctrl.Log.WithName("controllers").WithName("Processor"),
Expand All @@ -153,6 +155,73 @@ func main() {
setupLog.Error(err, "unable to create webhook", "webhook", "Processor")
os.Exit(1)
}
if err = streamingcontrollers.GatewayReconciler(
controllers.Config{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("Gateway"),
Log: ctrl.Log.WithName("controllers").WithName("Gateway"),
Scheme: mgr.GetScheme(),
Tracker: tracker.New(syncPeriod, ctrl.Log.WithName("controllers").WithName("Gateway").WithName("tracker")),
},
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Gateway")
os.Exit(1)
}
if err = ctrl.NewWebhookManagedBy(mgr).For(&streamingv1alpha1.Gateway{}).Complete(); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Gateway")
os.Exit(1)
}
if err = streamingcontrollers.KafkaGatewayReconciler(
controllers.Config{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("KafkaGateway"),
Log: ctrl.Log.WithName("controllers").WithName("KafkaGateway"),
Scheme: mgr.GetScheme(),
Tracker: tracker.New(syncPeriod, ctrl.Log.WithName("controllers").WithName("KafkaGateway").WithName("tracker")),
},
namespace,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KafkaGateway")
os.Exit(1)
}
if err = ctrl.NewWebhookManagedBy(mgr).For(&streamingv1alpha1.KafkaGateway{}).Complete(); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "KafkaGateway")
os.Exit(1)
}
if err = streamingcontrollers.PulsarGatewayReconciler(
controllers.Config{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("PulsarGateway"),
Log: ctrl.Log.WithName("controllers").WithName("PulsarGateway"),
Scheme: mgr.GetScheme(),
Tracker: tracker.New(syncPeriod, ctrl.Log.WithName("controllers").WithName("PulsarGateway").WithName("tracker")),
},
namespace,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PulsarGateway")
os.Exit(1)
}
if err = ctrl.NewWebhookManagedBy(mgr).For(&streamingv1alpha1.PulsarGateway{}).Complete(); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "PulsarGateway")
os.Exit(1)
}
if err = streamingcontrollers.InMemoryGatewayReconciler(
controllers.Config{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("InMemoryGateway"),
Log: ctrl.Log.WithName("controllers").WithName("InMemoryGateway"),
Scheme: mgr.GetScheme(),
Tracker: tracker.New(syncPeriod, ctrl.Log.WithName("controllers").WithName("InMemoryGateway").WithName("tracker")),
},
namespace,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "InMemoryGateway")
os.Exit(1)
}
if err = ctrl.NewWebhookManagedBy(mgr).For(&streamingv1alpha1.InMemoryGateway{}).Complete(); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "InMemoryGateway")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("default", func(_ *http.Request) error { return nil }); err != nil {
Expand Down
Loading

0 comments on commit 1655773

Please sign in to comment.