Skip to content

Commit

Permalink
Merge pull request #1309 from openmeterio/fix/use-confluent-kafka-client
Browse files Browse the repository at this point in the history
fix: confluent cloud connectivity issues
  • Loading branch information
turip authored Aug 5, 2024
2 parents 4ad9628 + 4cf15df commit ff19f05
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 39 deletions.
7 changes: 4 additions & 3 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func main() {
}

// Create publisher
publishers, err := initEventPublisher(logger, conf, metricMeter)
publishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -359,7 +359,7 @@ type eventPublishers struct {
eventPublisher publisher.Publisher
}

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Expand All @@ -368,12 +368,13 @@ func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMe
})
}

eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func main() {
}

// Create publisher
publishers, err := initEventPublisher(logger, conf, metricMeter)
publishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -354,7 +354,7 @@ type eventPublishers struct {
eventPublisher publisher.Publisher
}

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Expand All @@ -363,12 +363,13 @@ func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMe
})
}

eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func main() {
os.Exit(1)
}

eventPublishers, err := initEventPublisher(logger, conf, metricMeter)
eventPublishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -435,7 +435,7 @@ type publishers struct {
driver message.Publisher
}

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) {
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) {
if !conf.Events.Enabled {
publisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: &noop.Publisher{},
Expand All @@ -458,12 +458,13 @@ func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMe
})
}

eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
Expand Down
13 changes: 7 additions & 6 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func main() {
var group run.Group

// initialize system event producer
ingestEventFlushHandler, err := initIngestEventPublisher(logger, conf, metricMeter)
ingestEventFlushHandler, err := initIngestEventPublisher(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -220,15 +220,16 @@ func main() {
}
}

func initIngestEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (flushhandler.FlushEventHandler, error) {
func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (flushhandler.FlushEventHandler, error) {
if !conf.Events.Enabled {
return nil, nil
}

eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,

ProvisionTopics: []watermillkafka.AutoProvisionTopic{
{
Expand Down
23 changes: 23 additions & 0 deletions internal/watermill/driver/kafka/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka

import (
"fmt"
)

type LoggerFunc func(fmt string, args ...any)

type SaramaLoggerAdaptor struct {
loggerFunc LoggerFunc
}

func (s *SaramaLoggerAdaptor) Print(v ...interface{}) {
s.loggerFunc(fmt.Sprint(v...))
}

func (s *SaramaLoggerAdaptor) Printf(format string, v ...interface{}) {
s.loggerFunc(fmt.Sprintf(format, v...))
}

func (s *SaramaLoggerAdaptor) Println(v ...interface{}) {
s.loggerFunc(fmt.Sprint(v...))
}
37 changes: 28 additions & 9 deletions internal/watermill/driver/kafka/publisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"crypto/tls"
"errors"
"fmt"
Expand All @@ -26,6 +27,7 @@ type PublisherOptions struct {
Logger *slog.Logger
MetricMeter otelmetric.Meter
MeterPrefix string
DebugLogging bool
}

func (o *PublisherOptions) Validate() error {
Expand All @@ -47,7 +49,7 @@ func (o *PublisherOptions) Validate() error {
return nil
}

func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
func NewPublisher(ctx context.Context, in PublisherOptions) (*kafka.Publisher, error) {
if err := in.Validate(); err != nil {
return nil, err
}
Expand All @@ -66,18 +68,35 @@ func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = in.KafkaConfig.TopicMetadataRefreshInterval.Duration()
wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/balance-worker"

switch in.KafkaConfig.SecurityProtocol {
case "SASL_SSL":
// These are globals, so we cannot append the publisher/subscriber name to them
sarama.Logger = &SaramaLoggerAdaptor{
loggerFunc: in.Logger.Info,
}

if in.DebugLogging {
sarama.DebugLogger = &SaramaLoggerAdaptor{
loggerFunc: in.Logger.Debug,
}
}

if in.KafkaConfig.SecurityProtocol == "SASL_SSL" {
wmConfig.OverwriteSaramaConfig.Net.SASL.Enable = true
wmConfig.OverwriteSaramaConfig.Net.SASL.User = in.KafkaConfig.SaslUsername
wmConfig.OverwriteSaramaConfig.Net.SASL.Password = in.KafkaConfig.SaslPassword
wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(in.KafkaConfig.SecurityProtocol)
wmConfig.OverwriteSaramaConfig.Net.SASL.Handshake = true

wmConfig.OverwriteSaramaConfig.Net.TLS.Enable = true
wmConfig.OverwriteSaramaConfig.Net.TLS.Config = &tls.Config{}
default:

switch in.KafkaConfig.SaslMechanisms {
case "PLAIN":
wmConfig.OverwriteSaramaConfig.Net.SASL.User = in.KafkaConfig.SaslUsername
wmConfig.OverwriteSaramaConfig.Net.SASL.Password = in.KafkaConfig.SaslPassword
wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return nil, fmt.Errorf("unsupported SASL mechanism: %s", in.KafkaConfig.SaslMechanisms)
}
}

// Producer specific settings
wmConfig.OverwriteSaramaConfig.Producer.Retry.Max = 10
wmConfig.OverwriteSaramaConfig.Producer.Return.Successes = true

meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{
Expand All @@ -95,7 +114,7 @@ func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
return nil, err
}

if err := provisionTopics(in.KafkaConfig.Broker, wmConfig.OverwriteSaramaConfig, in.ProvisionTopics); err != nil {
if err := provisionTopics(ctx, in.Logger, in.KafkaConfig.CreateKafkaConfig(), in.ProvisionTopics); err != nil {
return nil, err
}

Expand Down
41 changes: 28 additions & 13 deletions internal/watermill/driver/kafka/topic_provision.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,50 @@
package kafka

import (
"github.com/IBM/sarama"
"context"
"log/slog"

"github.com/openmeterio/openmeter/pkg/errorsx"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

type AutoProvisionTopic struct {
Topic string
NumPartitions int32
}

func provisionTopics(broker string, config *sarama.Config, topics []AutoProvisionTopic) error {
admin, err := sarama.NewClusterAdmin([]string{broker}, config)
// provisionTopics creates the topics if they don't exist. This relies on the confluent kafka lib, as the sarama doesn't seem to
// properly support interacting with the confluent cloud.
func provisionTopics(ctx context.Context, logger *slog.Logger, config kafka.ConfigMap, topics []AutoProvisionTopic) error {
// This is not supported on admin client, so we need to remove it
delete(config, "go.logs.channel.enable")

adminClient, err := kafka.NewAdminClient(&config)
if err != nil {
return err
}
defer admin.Close()

defer adminClient.Close()

for _, topic := range topics {
err := admin.CreateTopic(topic.Topic, &sarama.TopicDetail{
NumPartitions: topic.NumPartitions,
ReplicationFactor: -1, // use default
}, false)
result, err := adminClient.CreateTopics(ctx, []kafka.TopicSpecification{
{
Topic: topic.Topic,
NumPartitions: int(topic.NumPartitions),
},
})
if err != nil {
if topicError, ok := errorsx.ErrorAs[*sarama.TopicError](err); ok && topicError.Err == sarama.ErrTopicAlreadyExists {
continue
}

return err
}

for _, r := range result {
code := r.Error.Code()

if code == kafka.ErrTopicAlreadyExists {
logger.Debug("topic already exists", slog.String("topic", topic.Topic))
} else if code != kafka.ErrNoError {
return r.Error
}
}
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions openmeter/watermill/driver/kafka/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import (
"context"

"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/cloudevents/sdk-go/v2/event"
Expand All @@ -17,8 +19,8 @@ type (
AutoProvisionTopic = watermillkafka.AutoProvisionTopic
)

func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
return watermillkafka.NewPublisher(in)
func NewPublisher(ctx context.Context, in PublisherOptions) (*kafka.Publisher, error) {
return watermillkafka.NewPublisher(ctx, in)
}

func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) {
Expand Down

0 comments on commit ff19f05

Please sign in to comment.