Skip to content

Commit

Permalink
fix(streaming): query params validator and timezones (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Nov 8, 2023
1 parent 04ef8b5 commit d56f162
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 32 deletions.
2 changes: 2 additions & 0 deletions internal/ingest/httpingest/httpingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (h Handler) processEvent(ctx context.Context, event event.Event, namespace
if event.Time().IsZero() {
logger.DebugContext(ctx, "event does not have a timestamp")
event.SetTime(time.Now().UTC())
} else {
event.SetTime(event.Time().UTC())
}

err := h.config.Collector.Ingest(ctx, namespace, event)
Expand Down
14 changes: 11 additions & 3 deletions internal/server/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,20 @@ func (a *Router) QueryMeter(w http.ResponseWriter, r *http.Request, meterIDOrSlu
func (a *Router) QueryMeterWithMeter(w http.ResponseWriter, r *http.Request, logger *slog.Logger, meter models.Meter, params api.QueryMeterParams) {
// Query Params
queryParams := &streaming.QueryParams{
From: params.From,
To: params.To,
WindowSize: params.WindowSize,
Aggregation: meter.Aggregation,
}

if params.From != nil {
from := params.From.UTC()
queryParams.From = &from
}

if params.To != nil {
to := params.To.UTC()
queryParams.To = &to
}

if params.Subject != nil {
queryParams.Subject = *params.Subject
}
Expand All @@ -187,7 +195,7 @@ func (a *Router) QueryMeterWithMeter(w http.ResponseWriter, r *http.Request, log
queryParams.GroupBy = *params.GroupBy
}

if err := queryParams.Validate(); err != nil {
if err := queryParams.Validate(meter.WindowSize); err != nil {
logger.Warn("invalid parameters", "error", err)
models.NewStatusProblem(r.Context(), err, http.StatusBadRequest).Respond(w, r)
return
Expand Down
29 changes: 0 additions & 29 deletions internal/streaming/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package streaming

import (
"context"
"errors"
"time"

"github.com/cloudevents/sdk-go/v2/event"
Expand All @@ -14,34 +13,6 @@ type ListEventsParams struct {
Limit int
}

type QueryParams struct {
From *time.Time
To *time.Time
Subject []string
GroupBySubject bool
GroupBy []string
Aggregation models.MeterAggregation
WindowSize *models.WindowSize
}

func (p *QueryParams) Validate() error {
if p.From != nil && p.To != nil && p.From.After(*p.To) {
return errors.New("from must be before to")
}

if p.WindowSize != nil {
windowDuration := p.WindowSize.Duration()
if p.From != nil && p.From.Truncate(windowDuration) != *p.From {
return errors.New("from must be aligned to window size")
}
if p.To != nil && p.To.Truncate(windowDuration) != *p.To {
return errors.New("to must be aligned to window size")
}
}

return nil
}

type QueryResult struct {
WindowSize *models.WindowSize
Values []*models.MeterValue
Expand Down
102 changes: 102 additions & 0 deletions internal/streaming/query_params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package streaming

import (
"errors"
"fmt"
"time"

"github.com/openmeterio/openmeter/pkg/models"
)

type QueryParams struct {
From *time.Time
To *time.Time
Subject []string
GroupBySubject bool
GroupBy []string
Aggregation models.MeterAggregation
WindowSize *models.WindowSize
}

// Validate validates query params focusing on `from` and `to` being aligned with query and meter window sizes
func (p *QueryParams) Validate(meterWindowSize models.WindowSize) error {
if p.From != nil && p.To != nil {
if !p.To.After(*p.From) {
return errors.New("to must be after from")
}
}

// Ensure `from` and `to` aligns with query param window size if any
if p.WindowSize != nil {
err := isRoundedToWindowSize(*p.WindowSize, p.From, p.To)
if err != nil {
return fmt.Errorf("cannot query with %s window size: %w", *p.WindowSize, err)
}

// Ensure query param window size is not smaller than meter window size
switch meterWindowSize {
case models.WindowSizeHour:
if p.WindowSize != nil && *p.WindowSize == models.WindowSizeMinute {
return fmt.Errorf("cannot query meter with window size %s on window size %s", meterWindowSize, *p.WindowSize)
}
case models.WindowSizeDay:
if p.WindowSize != nil && (*p.WindowSize == models.WindowSizeMinute || *p.WindowSize == models.WindowSizeHour) {
return fmt.Errorf("cannot query meter with window size %s on window size %s", meterWindowSize, *p.WindowSize)
}
}
}

// Ensure `from` and `to` aligns with meter aggregation window size
err := isRoundedToWindowSize(meterWindowSize, p.From, p.To)
if err != nil {
return fmt.Errorf("cannot query meter aggregating on %s window size: %w", meterWindowSize, err)
}

return nil
}

// Checks if `from` and `to` are rounded to window size
func isRoundedToWindowSize(windowSize models.WindowSize, from *time.Time, to *time.Time) error {
switch windowSize {
case models.WindowSizeMinute:
if from != nil && !isMinuteRounded(*from) {
return fmt.Errorf("from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00")
}
if to != nil && !isMinuteRounded(*to) {
return fmt.Errorf("to must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00")
}
case models.WindowSizeHour:
if from != nil && !isHourRounded(*from) {
return fmt.Errorf("from must be rounded to HOUR like YYYY-MM-DDTHH:00:00")
}
if to != nil && !isHourRounded(*to) {
return fmt.Errorf("to must be rounded to HOUR like YYYY-MM-DDTHH:00:00")
}
case models.WindowSizeDay:
if from != nil && !isDayRounded(*from) {
return fmt.Errorf("from must be rounded to DAY like YYYY-MM-DDT00:00:00")
}
if to != nil && !isDayRounded(*to) {
return fmt.Errorf("to must be rounded to DAY like YYYY-MM-DDT00:00:00")
}
default:
return fmt.Errorf("unknown window size %s", windowSize)
}

return nil
}

// Is rounded to minute like YYYY-MM-DDTHH:mm:00
func isMinuteRounded(t time.Time) bool {
return t.Second() == 0
}

// Is rounded to hour like YYYY-MM-DDTHH:00:00
func isHourRounded(t time.Time) bool {
return t.Second() == 0 && t.Minute() == 0
}

// Is rounded to day like YYYY-MM-DDT00:00:00
func isDayRounded(t time.Time) bool {
return t.Second() == 0 && t.Minute() == 0 && t.Hour() == 0
}
197 changes: 197 additions & 0 deletions internal/streaming/query_params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package streaming

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/openmeterio/openmeter/pkg/models"
)

func TestQueryParamsValidate(t *testing.T) {
queryWindowSizeMinute := models.WindowSizeMinute
queryWindowSizeHour := models.WindowSizeHour
queryWindowSizeDay := models.WindowSizeDay

tests := []struct {
name string
paramFrom string
paramTo string
paramWindowSize *models.WindowSize
meterWindowSize models.WindowSize
want error
}{
{
name: "should fail when from and to are equal",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T00:00:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: fmt.Errorf("to must be after from"),
},
{
name: "should fail when from is before to",
paramFrom: "2023-01-02T00:00:00Z",
paramTo: "2023-01-01T00:00:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: fmt.Errorf("to must be after from"),
},
{
name: "should fail when querying on minute but meter is hour",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T00:01:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeHour,
want: fmt.Errorf("cannot query meter with window size HOUR on window size MINUTE"),
},
{
name: "should fail when querying on minute but meter is day",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T00:01:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeDay,
want: fmt.Errorf("cannot query meter with window size DAY on window size MINUTE"),
},
{
name: "should fail when querying on hour but meter is day",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:00:00Z",
paramWindowSize: &queryWindowSizeHour,
meterWindowSize: models.WindowSizeDay,
want: fmt.Errorf("cannot query meter with window size DAY on window size HOUR"),
},
{
name: "should be ok to query per hour on minute meter",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:00:00Z",
paramWindowSize: &queryWindowSizeHour,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should be ok to query per day on minute meter",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-02T00:00:00Z",
paramWindowSize: &queryWindowSizeDay,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should be ok to query per day on hour meter",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-02T00:00:00Z",
paramWindowSize: &queryWindowSizeDay,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should be ok with rounded to minute",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T00:01:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should be with rounded to hour",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:00:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should be with rounded to day",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-02T00:01:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: nil,
},
{
name: "should fail with not rounded to minute",
paramFrom: "2023-01-01T00:00:01Z",
paramTo: "2023-01-01T00:01:00Z",
paramWindowSize: &queryWindowSizeMinute,
meterWindowSize: models.WindowSizeMinute,
want: fmt.Errorf("cannot query with MINUTE window size: from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00"),
},
{
name: "should fail with not rounded to minute",
paramFrom: "2023-01-01T00:00:01Z",
paramTo: "2023-01-01T00:01:00Z",
paramWindowSize: nil,
meterWindowSize: models.WindowSizeMinute,
want: fmt.Errorf("cannot query meter aggregating on MINUTE window size: from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00"),
},
{
name: "should fail with not rounded to hour",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:01:00Z",
paramWindowSize: &queryWindowSizeHour,
meterWindowSize: models.WindowSizeHour,
want: fmt.Errorf("cannot query with HOUR window size: to must be rounded to HOUR like YYYY-MM-DDTHH:00:00"),
},
{
name: "should fail with not rounded to hour",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:01:00Z",
paramWindowSize: nil,
meterWindowSize: models.WindowSizeHour,
want: fmt.Errorf("cannot query meter aggregating on HOUR window size: to must be rounded to HOUR like YYYY-MM-DDTHH:00:00"),
},
{
name: "should fail with not rounded to day",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:00:00Z",
paramWindowSize: &queryWindowSizeDay,
meterWindowSize: models.WindowSizeDay,
want: fmt.Errorf("cannot query with DAY window size: to must be rounded to DAY like YYYY-MM-DDT00:00:00"),
},
{
name: "should fail with not rounded to day",
paramFrom: "2023-01-01T00:00:00Z",
paramTo: "2023-01-01T01:00:00Z",
paramWindowSize: nil,
meterWindowSize: models.WindowSizeDay,
want: fmt.Errorf("cannot query meter aggregating on DAY window size: to must be rounded to DAY like YYYY-MM-DDT00:00:00"),
},
}

for _, tt := range tests {
tt := tt
paramWindowSize := "none"
if tt.paramWindowSize != nil {
paramWindowSize = string(*tt.paramWindowSize)
}
name := fmt.Sprintf("%s/%s/%s", tt.meterWindowSize, paramWindowSize, tt.name)
t.Run(name, func(t *testing.T) {
from, err := time.Parse(time.RFC3339, tt.paramFrom)
if err != nil {
t.Fatal(fmt.Errorf("failed to parse from: %w", err))
return
}
to, err := time.Parse(time.RFC3339, tt.paramTo)
if err != nil {
t.Fatal(fmt.Errorf("failed to parse to: %w", err))
return
}

p := QueryParams{
From: &from,
To: &to,
WindowSize: tt.paramWindowSize,
}

got := p.Validate(tt.meterWindowSize)
if tt.want == nil {
assert.NoError(t, got)
} else {
assert.EqualError(t, got, tt.want.Error())
}
})
}
}

0 comments on commit d56f162

Please sign in to comment.