Skip to content

Commit

Permalink
Merge pull request #1442 from openmeterio/refactor/metrics
Browse files Browse the repository at this point in the history
refactor: metrics
  • Loading branch information
turip authored Aug 29, 2024
2 parents 985dc4a + 50562c3 commit 492686a
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 59 deletions.
10 changes: 5 additions & 5 deletions openmeter/entitlement/balanceworker/recalculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (

defaultLRUCacheSize = 10_000

metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time"
metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ type Recalculator struct {
featureCache *lru.Cache[string, productcatalog.Feature]
subjectCache *lru.Cache[string, models.Subject]

metricRecalculationTime metric.Float64Histogram
metricRecalculationTime metric.Int64Histogram
}

func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
Expand All @@ -88,7 +88,7 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
return nil, fmt.Errorf("failed to create subject ID cache: %w", err)
}

metricRecalculationTime, err := opts.MetricMeter.Float64Histogram(
metricRecalculationTime, err := opts.MetricMeter.Int64Histogram(
metricNameRecalculationTime,
metric.WithDescription("Entitlement recalculation time"),
metric.WithExplicitBucketBoundaries(metricRecalculationBuckets...),
Expand Down Expand Up @@ -135,7 +135,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeDeleteAttribute))
} else {
err := r.sendEntitlementUpdatedEvent(ctx, ent)
Expand All @@ -144,7 +144,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeUpdateAttribute))
}
}
Expand Down
10 changes: 8 additions & 2 deletions openmeter/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/credit/grant"
"github.com/openmeterio/openmeter/openmeter/entitlement"
Expand Down Expand Up @@ -85,7 +86,11 @@ func New(opts WorkerOptions) (*Worker, error) {

worker.router = router

eventHandler := worker.eventHandler()
eventHandler, err := worker.eventHandler(opts.Router.MetricMeter)
if err != nil {
return nil, err
}

router.AddNoPublisherHandler(
"balance_worker_system_events",
opts.SystemEventsTopic,
Expand All @@ -105,9 +110,10 @@ func New(opts WorkerOptions) (*Worker, error) {
return worker, nil
}

func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
func (w *Worker) eventHandler(metricMeter metric.Meter) (message.NoPublishHandlerFunc, error) {
return grouphandler.NewNoPublishingHandler(
w.opts.EventBus.Marshaler(),
metricMeter,

// Entitlement created event
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error {
Expand Down
23 changes: 14 additions & 9 deletions openmeter/notification/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,24 @@ func New(opts Options) (*Consumer, error) {
balanceThresholdHandler: balanceThresholdEventHandler,
}

handler, err := grouphandler.NewNoPublishingHandler(opts.Marshaler, opts.Router.MetricMeter,
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error {
if event == nil {
return nil
}

return consumer.balanceThresholdHandler.Handle(ctx, *event)
}),
)
if err != nil {
return nil, err
}

_ = router.AddNoPublisherHandler(
"balance_consumer_system_events",
opts.SystemEventsTopic,
opts.Router.Subscriber,
grouphandler.NewNoPublishingHandler(opts.Marshaler,
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error {
if event == nil {
return nil
}

return consumer.balanceThresholdHandler.Handle(ctx, *event)
}),
),
handler,
)

return consumer, nil
Expand Down
2 changes: 1 addition & 1 deletion openmeter/sink/flushhandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (f *flushEventHandler) invokeCallback(ctx context.Context, events []models.
return err
}

f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Seconds())
f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Milliseconds())
f.metrics.eventsProcessed.Add(ctx, 1)

return nil
Expand Down
4 changes: 2 additions & 2 deletions openmeter/sink/flushhandler/meters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type metrics struct {
eventsReceived metric.Int64Counter
eventsProcessed metric.Int64Counter
eventsFailed metric.Int64Counter
eventProcessingTime metric.Float64Histogram
eventProcessingTime metric.Int64Histogram
eventChannelFull metric.Int64Counter
}

Expand All @@ -35,7 +35,7 @@ func newMetrics(handlerName string, meter metric.Meter) (*metrics, error) {
return nil, err
}

if r.eventProcessingTime, err = meter.Float64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time", handlerName)); err != nil {
if r.eventProcessingTime, err = meter.Int64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time_ms", handlerName)); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion openmeter/watermill/driver/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
defaultMeterPrefix = "sarama.publisher."
defaultMeterPrefix = "sarama."
defaultKeepalive = time.Minute
)

Expand Down
75 changes: 73 additions & 2 deletions openmeter/watermill/grouphandler/grouphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,23 @@ package grouphandler

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
meterNameHandlerMessageCount = "watermill.grouphandler.message_count"
meterNameHandlerProcessingTime = "watermill.grouphandler.processing_time_ms"
)

var (
meterAttributeStatusIgnored = attribute.String("status", "ignored")
meterAttributeStatusFailed = attribute.String("status", "failed")
meterAttributeStatusSuccess = attribute.String("status", "success")
)

type GroupEventHandler = cqrs.GroupEventHandler
Expand All @@ -14,7 +28,12 @@ func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T)
}

// NewNoPublishingHandler creates a NoPublishHandlerFunc that will handle events with the provided GroupEventHandlers.
func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers ...GroupEventHandler) message.NoPublishHandlerFunc {
func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, metricMeter metric.Meter, groupHandlers ...GroupEventHandler) (message.NoPublishHandlerFunc, error) {
meters, err := getMeters(metricMeter)
if err != nil {
return nil, err
}

typeHandlerMap := make(map[string]cqrs.GroupEventHandler)
for _, groupHandler := range groupHandlers {
event := groupHandler.NewEvent()
Expand All @@ -24,8 +43,14 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers
return func(msg *message.Message) error {
eventName := marshaler.NameFromMessage(msg)

meterAttributeCEType := attribute.String("ce_type", eventName)

groupHandler, ok := typeHandlerMap[eventName]
if !ok {
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusIgnored,
))
return nil
}

Expand All @@ -35,6 +60,52 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers
return err
}

return groupHandler.Handle(msg.Context(), event)
startedAt := time.Now()
err := groupHandler.Handle(msg.Context(), event)
if err != nil {
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusFailed,
))
meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusFailed,
))

return err
}

meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusSuccess,
))
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusSuccess,
))

return nil
}, nil
}

type meters struct {
handlerMessageCount metric.Int64Counter
handlerProcessingTime metric.Int64Histogram
}

func getMeters(meter metric.Meter) (*meters, error) {
handlerMessageCount, err := meter.Int64Counter(meterNameHandlerMessageCount)
if err != nil {
return nil, err
}

handlerProcessingTime, err := meter.Int64Histogram(meterNameHandlerProcessingTime)
if err != nil {
return nil, err
}

return &meters{
handlerMessageCount: handlerMessageCount,
handlerProcessingTime: handlerProcessingTime,
}, nil
}
Loading

0 comments on commit 492686a

Please sign in to comment.