Skip to content

Commit

Permalink
fix: meters weren't filtered by subject
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Jul 2, 2024
1 parent 90e517c commit bba2190
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
19 changes: 15 additions & 4 deletions internal/credit/balance_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ func (m *balanceConnector) GetBalanceOfOwner(ctx context.Context, owner Namespac
// This is only possible in case the grant becomes active exactly at the start of the current period
m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, balance.At)

ownerSubjectKey, err := m.ownerConnector.GetOwnerSubjectKey(ctx, owner)
if err != nil {
return nil, fmt.Errorf("failed to get owner subject key for owner %s: %w", owner.ID, err)
}

// run engine and calculate grantbalance
engineParams, err := m.getQueryUsageFn(ctx, owner)
engineParams, err := m.getQueryUsageFn(ctx, owner, ownerSubjectKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,6 +173,11 @@ func (m *balanceConnector) GetBalanceHistoryOfOwner(ctx context.Context, owner N
periods := SortedPeriodsFromDedupedTimes(times)
historySegments := make([]GrantBurnDownHistorySegment, 0, len(periods))

ownerSubjecKey, err := m.ownerConnector.GetOwnerSubjectKey(ctx, owner)
if err != nil {
return GrantBurnDownHistory{}, fmt.Errorf("failed to get owner subject key for owner %s: %w", owner.ID, err)
}

// collect al history segments through all periods
for _, period := range periods {
// get last valid grantbalances at start of period (eq balance at start of period)
Expand All @@ -194,7 +204,7 @@ func (m *balanceConnector) GetBalanceHistoryOfOwner(ctx context.Context, owner N
return GrantBurnDownHistory{}, err
}
// run engine and calculate grantbalance
engineParams, err := m.getQueryUsageFn(ctx, owner)
engineParams, err := m.getQueryUsageFn(ctx, owner, ownerSubjecKey)
if err != nil {
return GrantBurnDownHistory{}, err
}
Expand Down Expand Up @@ -270,7 +280,7 @@ func (m *balanceConnector) ResetUsageForOwner(ctx context.Context, owner Namespa
}
m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, balance.At)

engineParams, err := m.getQueryUsageFn(ctx, owner)
engineParams, err := m.getQueryUsageFn(ctx, owner, ownerMeter.SubjectKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -404,7 +414,7 @@ type engineParams struct {
}

// returns owner specific QueryUsageFn
func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner NamespacedGrantOwner) (*engineParams, error) {
func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner NamespacedGrantOwner, subjectKey string) (*engineParams, error) {
ownerMeter, err := m.ownerConnector.GetMeter(ctx, owner)
if err != nil {
return nil, fmt.Errorf("failed to get query params for owner %v: %w", owner, err)
Expand All @@ -415,6 +425,7 @@ func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner Namespaced
params := ownerMeter.DefaultParams
params.From = &from
params.To = &to
params.FilterSubject = []string{subjectKey}
rows, err := m.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.MeterSlug, params)
if err != nil {
return 0.0, fmt.Errorf("failed to query meter %s: %w", ownerMeter.MeterSlug, err)
Expand Down
2 changes: 2 additions & 0 deletions internal/credit/owner_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type OwnerMeter struct {
MeterSlug string
DefaultParams *streaming.QueryParams
WindowSize models.WindowSize
SubjectKey string
}

type OwnerConnector interface {
GetMeter(ctx context.Context, owner NamespacedGrantOwner) (*OwnerMeter, error)
GetStartOfMeasurement(ctx context.Context, owner NamespacedGrantOwner) (time.Time, error)
GetPeriodStartTimesBetween(ctx context.Context, owner NamespacedGrantOwner, from, to time.Time) ([]time.Time, error)
GetUsagePeriodStartAt(ctx context.Context, owner NamespacedGrantOwner, at time.Time) (time.Time, error)
GetOwnerSubjectKey(ctx context.Context, owner NamespacedGrantOwner) (string, error)

//FIXME: this is a terrible hack
EndCurrentUsagePeriodTx(ctx context.Context, tx *entutils.TxDriver, owner NamespacedGrantOwner, params EndCurrentUsagePeriodParams) error
Expand Down
12 changes: 12 additions & 0 deletions internal/entitlement/metered/grant_owner_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (e *entitlementGrantOwner) GetMeter(ctx context.Context, owner credit.Names
MeterSlug: meter.Slug,
DefaultParams: queryParams,
WindowSize: meter.WindowSize,
SubjectKey: entitlement.SubjectKey,
}, nil
}

Expand Down Expand Up @@ -114,6 +115,17 @@ func (e *entitlementGrantOwner) GetUsagePeriodStartAt(ctx context.Context, owner
return lastUsageReset.ResetTime, nil
}

func (e *entitlementGrantOwner) GetOwnerSubjectKey(ctx context.Context, owner credit.NamespacedGrantOwner) (string, error) {
entitlement, err := getRepoMaybeInTx(ctx, e.entitlementRepo, e.entitlementRepo).GetEntitlement(ctx, owner.NamespacedID())
if err != nil {
return "", &credit.OwnerNotFoundError{
Owner: owner,
AttemptedOwner: "entitlement",
}
}
return entitlement.SubjectKey, nil
}

func (e *entitlementGrantOwner) GetPeriodStartTimesBetween(ctx context.Context, owner credit.NamespacedGrantOwner, from, to time.Time) ([]time.Time, error) {
times := []time.Time{}
usageResets, err := getRepoMaybeInTx(ctx, e.usageResetRepo, e.usageResetRepo).GetBetween(ctx, owner.NamespacedID(), from, to)
Expand Down

0 comments on commit bba2190

Please sign in to comment.