diff --git a/cmd/jobs/entitlement/recalculatesnapshots.go b/cmd/jobs/entitlement/recalculatesnapshots.go index 09e4705d7..f243b8edc 100644 --- a/cmd/jobs/entitlement/recalculatesnapshots.go +++ b/cmd/jobs/entitlement/recalculatesnapshots.go @@ -26,11 +26,13 @@ func NewRecalculateBalanceSnapshotsCommand() *cobra.Command { logger := slog.Default() + metricMeter := otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot) + entitlementConnectors, err := initEntitlements( cmd.Context(), conf, logger, - otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot), + metricMeter, otelNameRecalculateBalanceSnapshot, ) if err != nil { @@ -43,6 +45,7 @@ func NewRecalculateBalanceSnapshotsCommand() *cobra.Command { Entitlement: entitlementConnectors.Registry, Namespace: "default", EventBus: entitlementConnectors.EventBus, + MetricMeter: metricMeter, }) if err != nil { return err diff --git a/openmeter/entitlement/balanceworker/recalculate.go b/openmeter/entitlement/balanceworker/recalculate.go index 3d9cd7ab9..acfc77437 100644 --- a/openmeter/entitlement/balanceworker/recalculate.go +++ b/openmeter/entitlement/balanceworker/recalculate.go @@ -7,6 +7,8 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/openmeter/entitlement" entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver" @@ -26,6 +28,17 @@ const ( DefaultIncludeDeletedDuration = 24 * time.Hour defaultLRUCacheSize = 10_000 + + metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time" +) + +var ( + // metricRecalculationBuckets are the default buckets for the recalculation time histogram. It is geared towards + // representing smaller values. + metricRecalculationBuckets = []float64{0.001, 0.005, 0.01, 0.02, 0.05, 0.075, 0.1, 0.2, 0.5, 1, 5, 10, 30, 60} + + recalculationTimeUpdateAttribute = attribute.String("operation", "update") + recalculationTimeDeleteAttribute = attribute.String("operation", "delete") ) type RecalculatorOptions struct { @@ -33,6 +46,27 @@ type RecalculatorOptions struct { Namespace string SubjectResolver SubjectResolver EventBus eventbus.Publisher + MetricMeter metric.Meter +} + +func (o RecalculatorOptions) Validate() error { + if o.Entitlement == nil { + return errors.New("missing entitlement registry") + } + + if o.Namespace == "" { + return errors.New("missing namespace") + } + + if o.EventBus == nil { + return errors.New("missing event bus") + } + + if o.MetricMeter == nil { + return errors.New("missing metric meter") + } + + return nil } type Recalculator struct { @@ -40,9 +74,15 @@ type Recalculator struct { featureCache *lru.Cache[string, productcatalog.Feature] subjectCache *lru.Cache[string, models.Subject] + + metricRecalculationTime metric.Float64Histogram } func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { + if err := opts.Validate(); err != nil { + return nil, fmt.Errorf("invalid options: %w", err) + } + featureCache, err := lru.New[string, productcatalog.Feature](defaultLRUCacheSize) if err != nil { return nil, fmt.Errorf("failed to create feature cache: %w", err) @@ -53,10 +93,20 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { return nil, fmt.Errorf("failed to create subject ID cache: %w", err) } + metricRecalculationTime, err := opts.MetricMeter.Float64Histogram( + metricNameRecalculationTime, + metric.WithDescription("Entitlement recalculation time"), + metric.WithExplicitBucketBoundaries(metricRecalculationBuckets...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err) + } + return &Recalculator{ - opts: opts, - featureCache: featureCache, - subjectCache: subjectCache, + opts: opts, + featureCache: featureCache, + subjectCache: subjectCache, + metricRecalculationTime: metricRecalculationTime, }, nil } @@ -78,16 +128,25 @@ func (r *Recalculator) Recalculate(ctx context.Context) error { func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []entitlement.Entitlement) error { var errs error for _, ent := range entitlements { + start := time.Now() if ent.DeletedAt != nil { err := r.sendEntitlementDeletedEvent(ctx, ent) if err != nil { errs = errors.Join(errs, err) } + + r.metricRecalculationTime.Record(ctx, + time.Since(start).Seconds(), + metric.WithAttributes(recalculationTimeDeleteAttribute)) } else { err := r.sendEntitlementUpdatedEvent(ctx, ent) if err != nil { errs = errors.Join(errs, err) } + + r.metricRecalculationTime.Record(ctx, + time.Since(start).Seconds(), + metric.WithAttributes(recalculationTimeUpdateAttribute)) } }