diff --git a/internal/ingest/httpingest/httpingest.go b/internal/ingest/httpingest/httpingest.go index f198d99ca..8c12ab293 100644 --- a/internal/ingest/httpingest/httpingest.go +++ b/internal/ingest/httpingest/httpingest.go @@ -116,6 +116,8 @@ func (h Handler) processEvent(ctx context.Context, event event.Event, namespace if event.Time().IsZero() { logger.DebugContext(ctx, "event does not have a timestamp") event.SetTime(time.Now().UTC()) + } else { + event.SetTime(event.Time().UTC()) } err := h.config.Collector.Ingest(ctx, namespace, event) diff --git a/internal/server/router/router.go b/internal/server/router/router.go index 703af6333..bd8494a41 100644 --- a/internal/server/router/router.go +++ b/internal/server/router/router.go @@ -173,12 +173,20 @@ func (a *Router) QueryMeter(w http.ResponseWriter, r *http.Request, meterIDOrSlu func (a *Router) QueryMeterWithMeter(w http.ResponseWriter, r *http.Request, logger *slog.Logger, meter models.Meter, params api.QueryMeterParams) { // Query Params queryParams := &streaming.QueryParams{ - From: params.From, - To: params.To, WindowSize: params.WindowSize, Aggregation: meter.Aggregation, } + if params.From != nil { + from := params.From.UTC() + queryParams.From = &from + } + + if params.To != nil { + to := params.To.UTC() + queryParams.To = &to + } + if params.Subject != nil { queryParams.Subject = *params.Subject } @@ -187,7 +195,7 @@ func (a *Router) QueryMeterWithMeter(w http.ResponseWriter, r *http.Request, log queryParams.GroupBy = *params.GroupBy } - if err := queryParams.Validate(); err != nil { + if err := queryParams.Validate(meter.WindowSize); err != nil { logger.Warn("invalid parameters", "error", err) models.NewStatusProblem(r.Context(), err, http.StatusBadRequest).Respond(w, r) return diff --git a/internal/streaming/connector.go b/internal/streaming/connector.go index c89579c24..f6186b0c8 100644 --- a/internal/streaming/connector.go +++ b/internal/streaming/connector.go @@ -2,7 +2,6 @@ package streaming import ( "context" - "errors" "time" "github.com/cloudevents/sdk-go/v2/event" @@ -14,34 +13,6 @@ type ListEventsParams struct { Limit int } -type QueryParams struct { - From *time.Time - To *time.Time - Subject []string - GroupBySubject bool - GroupBy []string - Aggregation models.MeterAggregation - WindowSize *models.WindowSize -} - -func (p *QueryParams) Validate() error { - if p.From != nil && p.To != nil && p.From.After(*p.To) { - return errors.New("from must be before to") - } - - if p.WindowSize != nil { - windowDuration := p.WindowSize.Duration() - if p.From != nil && p.From.Truncate(windowDuration) != *p.From { - return errors.New("from must be aligned to window size") - } - if p.To != nil && p.To.Truncate(windowDuration) != *p.To { - return errors.New("to must be aligned to window size") - } - } - - return nil -} - type QueryResult struct { WindowSize *models.WindowSize Values []*models.MeterValue diff --git a/internal/streaming/query_params.go b/internal/streaming/query_params.go new file mode 100644 index 000000000..160665454 --- /dev/null +++ b/internal/streaming/query_params.go @@ -0,0 +1,102 @@ +package streaming + +import ( + "errors" + "fmt" + "time" + + "github.com/openmeterio/openmeter/pkg/models" +) + +type QueryParams struct { + From *time.Time + To *time.Time + Subject []string + GroupBySubject bool + GroupBy []string + Aggregation models.MeterAggregation + WindowSize *models.WindowSize +} + +// Validate validates query params focusing on `from` and `to` being aligned with query and meter window sizes +func (p *QueryParams) Validate(meterWindowSize models.WindowSize) error { + if p.From != nil && p.To != nil { + if !p.To.After(*p.From) { + return errors.New("to must be after from") + } + } + + // Ensure `from` and `to` aligns with query param window size if any + if p.WindowSize != nil { + err := isRoundedToWindowSize(*p.WindowSize, p.From, p.To) + if err != nil { + return fmt.Errorf("cannot query with %s window size: %w", *p.WindowSize, err) + } + + // Ensure query param window size is not smaller than meter window size + switch meterWindowSize { + case models.WindowSizeHour: + if p.WindowSize != nil && *p.WindowSize == models.WindowSizeMinute { + return fmt.Errorf("cannot query meter with window size %s on window size %s", meterWindowSize, *p.WindowSize) + } + case models.WindowSizeDay: + if p.WindowSize != nil && (*p.WindowSize == models.WindowSizeMinute || *p.WindowSize == models.WindowSizeHour) { + return fmt.Errorf("cannot query meter with window size %s on window size %s", meterWindowSize, *p.WindowSize) + } + } + } + + // Ensure `from` and `to` aligns with meter aggregation window size + err := isRoundedToWindowSize(meterWindowSize, p.From, p.To) + if err != nil { + return fmt.Errorf("cannot query meter aggregating on %s window size: %w", meterWindowSize, err) + } + + return nil +} + +// Checks if `from` and `to` are rounded to window size +func isRoundedToWindowSize(windowSize models.WindowSize, from *time.Time, to *time.Time) error { + switch windowSize { + case models.WindowSizeMinute: + if from != nil && !isMinuteRounded(*from) { + return fmt.Errorf("from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00") + } + if to != nil && !isMinuteRounded(*to) { + return fmt.Errorf("to must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00") + } + case models.WindowSizeHour: + if from != nil && !isHourRounded(*from) { + return fmt.Errorf("from must be rounded to HOUR like YYYY-MM-DDTHH:00:00") + } + if to != nil && !isHourRounded(*to) { + return fmt.Errorf("to must be rounded to HOUR like YYYY-MM-DDTHH:00:00") + } + case models.WindowSizeDay: + if from != nil && !isDayRounded(*from) { + return fmt.Errorf("from must be rounded to DAY like YYYY-MM-DDT00:00:00") + } + if to != nil && !isDayRounded(*to) { + return fmt.Errorf("to must be rounded to DAY like YYYY-MM-DDT00:00:00") + } + default: + return fmt.Errorf("unknown window size %s", windowSize) + } + + return nil +} + +// Is rounded to minute like YYYY-MM-DDTHH:mm:00 +func isMinuteRounded(t time.Time) bool { + return t.Second() == 0 +} + +// Is rounded to hour like YYYY-MM-DDTHH:00:00 +func isHourRounded(t time.Time) bool { + return t.Second() == 0 && t.Minute() == 0 +} + +// Is rounded to day like YYYY-MM-DDT00:00:00 +func isDayRounded(t time.Time) bool { + return t.Second() == 0 && t.Minute() == 0 && t.Hour() == 0 +} diff --git a/internal/streaming/query_params_test.go b/internal/streaming/query_params_test.go new file mode 100644 index 000000000..00cf37d61 --- /dev/null +++ b/internal/streaming/query_params_test.go @@ -0,0 +1,197 @@ +package streaming + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/pkg/models" +) + +func TestQueryParamsValidate(t *testing.T) { + queryWindowSizeMinute := models.WindowSizeMinute + queryWindowSizeHour := models.WindowSizeHour + queryWindowSizeDay := models.WindowSizeDay + + tests := []struct { + name string + paramFrom string + paramTo string + paramWindowSize *models.WindowSize + meterWindowSize models.WindowSize + want error + }{ + { + name: "should fail when from and to are equal", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T00:00:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: fmt.Errorf("to must be after from"), + }, + { + name: "should fail when from is before to", + paramFrom: "2023-01-02T00:00:00Z", + paramTo: "2023-01-01T00:00:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: fmt.Errorf("to must be after from"), + }, + { + name: "should fail when querying on minute but meter is hour", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T00:01:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeHour, + want: fmt.Errorf("cannot query meter with window size HOUR on window size MINUTE"), + }, + { + name: "should fail when querying on minute but meter is day", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T00:01:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeDay, + want: fmt.Errorf("cannot query meter with window size DAY on window size MINUTE"), + }, + { + name: "should fail when querying on hour but meter is day", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:00:00Z", + paramWindowSize: &queryWindowSizeHour, + meterWindowSize: models.WindowSizeDay, + want: fmt.Errorf("cannot query meter with window size DAY on window size HOUR"), + }, + { + name: "should be ok to query per hour on minute meter", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:00:00Z", + paramWindowSize: &queryWindowSizeHour, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should be ok to query per day on minute meter", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-02T00:00:00Z", + paramWindowSize: &queryWindowSizeDay, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should be ok to query per day on hour meter", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-02T00:00:00Z", + paramWindowSize: &queryWindowSizeDay, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should be ok with rounded to minute", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T00:01:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should be with rounded to hour", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:00:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should be with rounded to day", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-02T00:01:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: nil, + }, + { + name: "should fail with not rounded to minute", + paramFrom: "2023-01-01T00:00:01Z", + paramTo: "2023-01-01T00:01:00Z", + paramWindowSize: &queryWindowSizeMinute, + meterWindowSize: models.WindowSizeMinute, + want: fmt.Errorf("cannot query with MINUTE window size: from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00"), + }, + { + name: "should fail with not rounded to minute", + paramFrom: "2023-01-01T00:00:01Z", + paramTo: "2023-01-01T00:01:00Z", + paramWindowSize: nil, + meterWindowSize: models.WindowSizeMinute, + want: fmt.Errorf("cannot query meter aggregating on MINUTE window size: from must be rounded to MINUTE like YYYY-MM-DDTHH:mm:00"), + }, + { + name: "should fail with not rounded to hour", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:01:00Z", + paramWindowSize: &queryWindowSizeHour, + meterWindowSize: models.WindowSizeHour, + want: fmt.Errorf("cannot query with HOUR window size: to must be rounded to HOUR like YYYY-MM-DDTHH:00:00"), + }, + { + name: "should fail with not rounded to hour", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:01:00Z", + paramWindowSize: nil, + meterWindowSize: models.WindowSizeHour, + want: fmt.Errorf("cannot query meter aggregating on HOUR window size: to must be rounded to HOUR like YYYY-MM-DDTHH:00:00"), + }, + { + name: "should fail with not rounded to day", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:00:00Z", + paramWindowSize: &queryWindowSizeDay, + meterWindowSize: models.WindowSizeDay, + want: fmt.Errorf("cannot query with DAY window size: to must be rounded to DAY like YYYY-MM-DDT00:00:00"), + }, + { + name: "should fail with not rounded to day", + paramFrom: "2023-01-01T00:00:00Z", + paramTo: "2023-01-01T01:00:00Z", + paramWindowSize: nil, + meterWindowSize: models.WindowSizeDay, + want: fmt.Errorf("cannot query meter aggregating on DAY window size: to must be rounded to DAY like YYYY-MM-DDT00:00:00"), + }, + } + + for _, tt := range tests { + tt := tt + paramWindowSize := "none" + if tt.paramWindowSize != nil { + paramWindowSize = string(*tt.paramWindowSize) + } + name := fmt.Sprintf("%s/%s/%s", tt.meterWindowSize, paramWindowSize, tt.name) + t.Run(name, func(t *testing.T) { + from, err := time.Parse(time.RFC3339, tt.paramFrom) + if err != nil { + t.Fatal(fmt.Errorf("failed to parse from: %w", err)) + return + } + to, err := time.Parse(time.RFC3339, tt.paramTo) + if err != nil { + t.Fatal(fmt.Errorf("failed to parse to: %w", err)) + return + } + + p := QueryParams{ + From: &from, + To: &to, + WindowSize: tt.paramWindowSize, + } + + got := p.Validate(tt.meterWindowSize) + if tt.want == nil { + assert.NoError(t, got) + } else { + assert.EqualError(t, got, tt.want.Error()) + } + }) + } +}