Skip to content

Commit

Permalink
feat: add recalculation metrics
Browse files Browse the repository at this point in the history
We can use these metrics to determine the average time required to recalculate
an entitlement in balance worker.
  • Loading branch information
turip committed Aug 28, 2024
1 parent a016d9a commit c97e178
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cmd/jobs/entitlement/recalculatesnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ func NewRecalculateBalanceSnapshotsCommand() *cobra.Command {

logger := slog.Default()

metricMeter := otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot)

entitlementConnectors, err := initEntitlements(
cmd.Context(),
conf,
logger,
otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot),
metricMeter,
otelNameRecalculateBalanceSnapshot,
)
if err != nil {
Expand All @@ -43,6 +45,7 @@ func NewRecalculateBalanceSnapshotsCommand() *cobra.Command {
Entitlement: entitlementConnectors.Registry,
Namespace: "default",
EventBus: entitlementConnectors.EventBus,
MetricMeter: metricMeter,
})
if err != nil {
return err
Expand Down
65 changes: 62 additions & 3 deletions openmeter/entitlement/balanceworker/recalculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/entitlement"
entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver"
Expand All @@ -26,23 +28,61 @@ const (
DefaultIncludeDeletedDuration = 24 * time.Hour

defaultLRUCacheSize = 10_000

metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time"
)

var (
// metricRecalculationBuckets are the default buckets for the recalculation time histogram. It is geared towards
// representing smaller values.
metricRecalculationBuckets = []float64{0.001, 0.005, 0.01, 0.02, 0.05, 0.075, 0.1, 0.2, 0.5, 1, 5, 10, 30, 60}

recalculationTimeUpdateAttribute = attribute.String("operation", "update")
recalculationTimeDeleteAttribute = attribute.String("operation", "delete")
)

type RecalculatorOptions struct {
Entitlement *registry.Entitlement
Namespace string
SubjectResolver SubjectResolver
EventBus eventbus.Publisher
MetricMeter metric.Meter
}

func (o RecalculatorOptions) Validate() error {
if o.Entitlement == nil {
return errors.New("missing entitlement registry")
}

if o.Namespace == "" {
return errors.New("missing namespace")
}

if o.EventBus == nil {
return errors.New("missing event bus")
}

if o.MetricMeter == nil {
return errors.New("missing metric meter")
}

return nil
}

type Recalculator struct {
opts RecalculatorOptions

featureCache *lru.Cache[string, productcatalog.Feature]
subjectCache *lru.Cache[string, models.Subject]

metricRecalculationTime metric.Float64Histogram
}

func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
if err := opts.Validate(); err != nil {
return nil, fmt.Errorf("invalid options: %w", err)
}

featureCache, err := lru.New[string, productcatalog.Feature](defaultLRUCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create feature cache: %w", err)
Expand All @@ -53,10 +93,20 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
return nil, fmt.Errorf("failed to create subject ID cache: %w", err)
}

metricRecalculationTime, err := opts.MetricMeter.Float64Histogram(
metricNameRecalculationTime,
metric.WithDescription("Entitlement recalculation time"),
metric.WithExplicitBucketBoundaries(metricRecalculationBuckets...),
)
if err != nil {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

return &Recalculator{
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
metricRecalculationTime: metricRecalculationTime,
}, nil
}

Expand All @@ -78,16 +128,25 @@ func (r *Recalculator) Recalculate(ctx context.Context) error {
func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []entitlement.Entitlement) error {
var errs error
for _, ent := range entitlements {
start := time.Now()
if ent.DeletedAt != nil {
err := r.sendEntitlementDeletedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
metric.WithAttributes(recalculationTimeDeleteAttribute))
} else {
err := r.sendEntitlementUpdatedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
metric.WithAttributes(recalculationTimeUpdateAttribute))
}
}

Expand Down

0 comments on commit c97e178

Please sign in to comment.