diff --git a/internal/credit/balance_connector.go b/internal/credit/balance_connector.go index df1ee3d8c..f6bcdab3e 100644 --- a/internal/credit/balance_connector.go +++ b/internal/credit/balance_connector.go @@ -98,8 +98,13 @@ func (m *balanceConnector) GetBalanceOfOwner(ctx context.Context, owner Namespac // This is only possible in case the grant becomes active exactly at the start of the current period m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, balance.At) + ownerSubjectKey, err := m.ownerConnector.GetOwnerSubjectKey(ctx, owner) + if err != nil { + return nil, fmt.Errorf("failed to get owner subject key for owner %s: %w", owner.ID, err) + } + // run engine and calculate grantbalance - engineParams, err := m.getQueryUsageFn(ctx, owner) + engineParams, err := m.getQueryUsageFn(ctx, owner, ownerSubjectKey) if err != nil { return nil, err } @@ -168,6 +173,11 @@ func (m *balanceConnector) GetBalanceHistoryOfOwner(ctx context.Context, owner N periods := SortedPeriodsFromDedupedTimes(times) historySegments := make([]GrantBurnDownHistorySegment, 0, len(periods)) + ownerSubjecKey, err := m.ownerConnector.GetOwnerSubjectKey(ctx, owner) + if err != nil { + return GrantBurnDownHistory{}, fmt.Errorf("failed to get owner subject key for owner %s: %w", owner.ID, err) + } + // collect al history segments through all periods for _, period := range periods { // get last valid grantbalances at start of period (eq balance at start of period) @@ -194,7 +204,7 @@ func (m *balanceConnector) GetBalanceHistoryOfOwner(ctx context.Context, owner N return GrantBurnDownHistory{}, err } // run engine and calculate grantbalance - engineParams, err := m.getQueryUsageFn(ctx, owner) + engineParams, err := m.getQueryUsageFn(ctx, owner, ownerSubjecKey) if err != nil { return GrantBurnDownHistory{}, err } @@ -270,7 +280,7 @@ func (m *balanceConnector) ResetUsageForOwner(ctx context.Context, owner Namespa } m.populateBalanceSnapshotWithMissingGrantsActiveAt(&balance, grants, balance.At) - engineParams, err := m.getQueryUsageFn(ctx, owner) + engineParams, err := m.getQueryUsageFn(ctx, owner, ownerMeter.SubjectKey) if err != nil { return nil, err } @@ -404,7 +414,7 @@ type engineParams struct { } // returns owner specific QueryUsageFn -func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner NamespacedGrantOwner) (*engineParams, error) { +func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner NamespacedGrantOwner, subjectKey string) (*engineParams, error) { ownerMeter, err := m.ownerConnector.GetMeter(ctx, owner) if err != nil { return nil, fmt.Errorf("failed to get query params for owner %v: %w", owner, err) @@ -415,6 +425,7 @@ func (m *balanceConnector) getQueryUsageFn(ctx context.Context, owner Namespaced params := ownerMeter.DefaultParams params.From = &from params.To = &to + params.FilterSubject = []string{subjectKey} rows, err := m.streamingConnector.QueryMeter(ctx, owner.Namespace, ownerMeter.MeterSlug, params) if err != nil { return 0.0, fmt.Errorf("failed to query meter %s: %w", ownerMeter.MeterSlug, err) diff --git a/internal/credit/owner_connector.go b/internal/credit/owner_connector.go index 5dfc42146..a3e61776f 100644 --- a/internal/credit/owner_connector.go +++ b/internal/credit/owner_connector.go @@ -19,6 +19,7 @@ type OwnerMeter struct { MeterSlug string DefaultParams *streaming.QueryParams WindowSize models.WindowSize + SubjectKey string } type OwnerConnector interface { @@ -26,6 +27,7 @@ type OwnerConnector interface { GetStartOfMeasurement(ctx context.Context, owner NamespacedGrantOwner) (time.Time, error) GetPeriodStartTimesBetween(ctx context.Context, owner NamespacedGrantOwner, from, to time.Time) ([]time.Time, error) GetUsagePeriodStartAt(ctx context.Context, owner NamespacedGrantOwner, at time.Time) (time.Time, error) + GetOwnerSubjectKey(ctx context.Context, owner NamespacedGrantOwner) (string, error) //FIXME: this is a terrible hack EndCurrentUsagePeriodTx(ctx context.Context, tx *entutils.TxDriver, owner NamespacedGrantOwner, params EndCurrentUsagePeriodParams) error diff --git a/internal/entitlement/metered/grant_owner_adapter.go b/internal/entitlement/metered/grant_owner_adapter.go index 93f97d041..d02dc2c90 100644 --- a/internal/entitlement/metered/grant_owner_adapter.go +++ b/internal/entitlement/metered/grant_owner_adapter.go @@ -79,6 +79,7 @@ func (e *entitlementGrantOwner) GetMeter(ctx context.Context, owner credit.Names MeterSlug: meter.Slug, DefaultParams: queryParams, WindowSize: meter.WindowSize, + SubjectKey: entitlement.SubjectKey, }, nil } @@ -114,6 +115,17 @@ func (e *entitlementGrantOwner) GetUsagePeriodStartAt(ctx context.Context, owner return lastUsageReset.ResetTime, nil } +func (e *entitlementGrantOwner) GetOwnerSubjectKey(ctx context.Context, owner credit.NamespacedGrantOwner) (string, error) { + entitlement, err := getRepoMaybeInTx(ctx, e.entitlementRepo, e.entitlementRepo).GetEntitlement(ctx, owner.NamespacedID()) + if err != nil { + return "", &credit.OwnerNotFoundError{ + Owner: owner, + AttemptedOwner: "entitlement", + } + } + return entitlement.SubjectKey, nil +} + func (e *entitlementGrantOwner) GetPeriodStartTimesBetween(ctx context.Context, owner credit.NamespacedGrantOwner, from, to time.Time) ([]time.Time, error) { times := []time.Time{} usageResets, err := getRepoMaybeInTx(ctx, e.usageResetRepo, e.usageResetRepo).GetBetween(ctx, owner.NamespacedID(), from, to)