Skip to content

Commit

Permalink
feat(backend): add ingest event counter metric (#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored May 14, 2024
1 parent 65c73bf commit 55ef0ab
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 25 deletions.
63 changes: 46 additions & 17 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
Expand Down Expand Up @@ -58,6 +59,11 @@ import (
"github.com/openmeterio/openmeter/pkg/slicesx"
)

const (
defaultShutdownTimeout = 5 * time.Second
otelName = "openmeter.io/backend"
)

func main() {
v, flags := viper.New(), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError)
ctx := context.Background()
Expand Down Expand Up @@ -121,35 +127,47 @@ func main() {
telemetryRouter := chi.NewRouter()
telemetryRouter.Mount("/debug", middleware.Profiler())

meterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(context.Background(), res)
// Initialize OTel Metrics
otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(ctx, res)
if err != nil {
logger.Error(err.Error())
logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error()))
os.Exit(1)
}
defer func() {
if err := meterProvider.Shutdown(context.Background()); err != nil {
// Use dedicated context with timeout for shutdown as parent context might be canceled
// by the time the execution reaches this stage.
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := otelMeterProvider.Shutdown(ctx); err != nil {
logger.Error("shutting down meter provider: %v", err)
}
}()

otel.SetMeterProvider(meterProvider)
otel.SetMeterProvider(otelMeterProvider)
metricMeter := otelMeterProvider.Meter(otelName)

if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled {
telemetryRouter.Handle("/metrics", promhttp.Handler())
}

tracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(context.Background(), res)
// Initialize OTel Tracer
otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(ctx, res)
if err != nil {
logger.Error(err.Error())
logger.Error("failed to initialize OpenTelemetry Trace provider", slog.String("error", err.Error()))
os.Exit(1)
}
defer func() {
if err := tracerProvider.Shutdown(context.Background()); err != nil {
// Use dedicated context with timeout for shutdown as parent context might be canceled
// by the time the execution reaches this stage.
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

if err := otelTracerProvider.Shutdown(ctx); err != nil {
logger.Error("shutting down tracer provider", "error", err)
}
}()

otel.SetTracerProvider(tracerProvider)
otel.SetTracerProvider(otelTracerProvider)
otel.SetTextMapPropagator(propagation.TraceContext{})

// Configure health checker
Expand Down Expand Up @@ -184,7 +202,14 @@ func main() {
}

// Initialize Kafka Ingest
ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest(ctx, conf, logger, serializer.NewJSONSerializer(), &group)
ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest(
ctx,
conf,
logger,
metricMeter,
serializer.NewJSONSerializer(),
&group,
)
if err != nil {
logger.Error("failed to initialize kafka ingest", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -325,8 +350,8 @@ func main() {
}
}),
"",
otelhttp.WithMeterProvider(meterProvider),
otelhttp.WithTracerProvider(tracerProvider),
otelhttp.WithMeterProvider(otelMeterProvider),
otelhttp.WithTracerProvider(otelTracerProvider),
)
})
},
Expand Down Expand Up @@ -392,7 +417,7 @@ func main() {
}
}

func initKafkaIngest(ctx context.Context, config config.Configuration, logger *slog.Logger, serializer serializer.Serializer, group *run.Group) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) {
func initKafkaIngest(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer, group *run.Group) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) {
// Initialize Kafka Admin Client
kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()

Expand All @@ -409,10 +434,14 @@ func initKafkaIngest(ctx context.Context, config config.Configuration, logger *s

slog.Debug("connected to Kafka")

collector := &kafkaingest.Collector{
Producer: producer,
NamespacedTopicTemplate: config.Ingest.Kafka.EventsTopicTemplate,
Serializer: serializer,
collector, err := kafkaingest.NewCollector(
producer,
serializer,
config.Ingest.Kafka.EventsTopicTemplate,
metricMeter,
)
if err != nil {
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer)
Expand Down
6 changes: 3 additions & 3 deletions collector/benthos/internal/message/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Transaction struct {
// indicates whether the message has been propagated successfully.
responseFunc func(context.Context, error) error

// Used for cancelling transactions. When cancelled it is up to the receiver
// Used for canceling transactions. When canceled it is up to the receiver
// of this transaction to abort any attempt to deliver the transaction
// message.
ctx context.Context
Expand Down Expand Up @@ -60,7 +60,7 @@ func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Tr
}

// Context returns a context that indicates the cancellation of a transaction.
// It is optional for receivers of a transaction to honour this context, and is
// It is optional for receivers of a transaction to honor this context, and is
// worth doing in cases where the transaction is blocked (on reconnect loops,
// etc) as it is often used as a fail-fast mechanism.
//
Expand All @@ -71,7 +71,7 @@ func (t *Transaction) Context() context.Context {
}

// WithContext returns a copy of the transaction associated with a context used
// for cancellation. When cancelled it is up to the receiver of this transaction
// for cancellation. When canceled it is up to the receiver of this transaction
// to abort any attempt to deliver the transaction message.
func (t *Transaction) WithContext(ctx context.Context) *Transaction {
newT := *t
Expand Down
8 changes: 4 additions & 4 deletions collector/benthos/internal/shutdown/signaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Signaller) CloseAtLeisureChan() <-chan struct{} {
}

// CloseAtLeisureCtx returns a context.Context that will be terminated when
// either the provided context is cancelled or the signal to shut down
// either the provided context is canceled or the signal to shut down
// either at leisure or immediately has been made.
func (s *Signaller) CloseAtLeisureCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
Expand Down Expand Up @@ -119,7 +119,7 @@ func (s *Signaller) CloseNowChan() <-chan struct{} {
}

// CloseNowCtx returns a context.Context that will be terminated when either the
// provided context is cancelled or the signal to shut down immediately has been
// provided context is canceled or the signal to shut down immediately has been
// made.
func (s *Signaller) CloseNowCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
Expand Down Expand Up @@ -151,8 +151,8 @@ func (s *Signaller) HasClosedChan() <-chan struct{} {
return s.hasClosedChan
}

// HasClosedCtx returns a context.Context that will be cancelled when either the
// provided context is cancelled or the signal that the component has shut down
// HasClosedCtx returns a context.Context that will be canceled when either the
// provided context is canceled or the signal that the component has shut down
// has been made.
func (s *Signaller) HasClosedCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
Expand Down
47 changes: 46 additions & 1 deletion internal/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer"
)
Expand All @@ -19,10 +21,49 @@ type Collector struct {
// NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf.
// For example: "om_%s_events"
NamespacedTopicTemplate string

ingestEventCounter metric.Int64Counter
}

func NewCollector(
producer *kafka.Producer,
serializer serializer.Serializer,
namespacedTopicTemplate string,
metricMeter metric.Meter,
) (*Collector, error) {
if producer == nil {
return nil, fmt.Errorf("producer is required")
}
if serializer == nil {
return nil, fmt.Errorf("serializer is required")
}
if namespacedTopicTemplate == "" {
return nil, fmt.Errorf("namespaced topic template is required")
}
if metricMeter == nil {
return nil, fmt.Errorf("metric meter is required")
}

// Initialize OTel metrics
ingestEventCounter, err := metricMeter.Int64Counter(
"ingest.events",
metric.WithDescription("The number of events ingested"),
metric.WithUnit("{event}"),
)
if err != nil {
return nil, fmt.Errorf("failed to create events counter: %w", err)
}

return &Collector{
Producer: producer,
Serializer: serializer,
NamespacedTopicTemplate: namespacedTopicTemplate,
ingestEventCounter: ingestEventCounter,
}, nil
}

// Ingest produces an event to a Kafka topic.
func (s Collector) Ingest(_ context.Context, namespace string, ev event.Event) error {
func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event) error {
topic := fmt.Sprintf(s.NamespacedTopicTemplate, namespace)
key, err := s.Serializer.SerializeKey(topic, ev)
if err != nil {
Expand Down Expand Up @@ -50,6 +91,10 @@ func (s Collector) Ingest(_ context.Context, namespace string, ev event.Event) e
return fmt.Errorf("producing kafka message: %w", err)
}

// Increment the ingest event counter metric
namespaceAttr := attribute.String("namespace", namespace)
s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr))

return nil
}

Expand Down

0 comments on commit 55ef0ab

Please sign in to comment.