Skip to content

Commit

Permalink
feat(connector): subject list from to (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Oct 29, 2023
1 parent 646132f commit 43f7b23
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 10 deletions.
2 changes: 1 addition & 1 deletion internal/server/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (a *Router) ListMeterSubjects(w http.ResponseWriter, r *http.Request, meter
namespace = *params.NamespaceInput
}

subjects, err := a.config.StreamingConnector.ListMeterSubjects(r.Context(), namespace, meterIDOrSlug)
subjects, err := a.config.StreamingConnector.ListMeterSubjects(r.Context(), namespace, meterIDOrSlug, nil, nil)
if err != nil {
if _, ok := err.(*models.MeterNotFoundError); ok {
logger.Warn("meter not found", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *MockConnector) QueryMeter(ctx context.Context, namespace string, meterS
}, nil
}

func (c *MockConnector) ListMeterSubjects(ctx context.Context, namespace string, meterSlug string) ([]string, error) {
func (c *MockConnector) ListMeterSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error) {
return []string{"s1"}, nil
}

Expand Down
8 changes: 5 additions & 3 deletions internal/streaming/clickhouse_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ func (c *ClickhouseConnector) QueryMeter(ctx context.Context, namespace string,
}, nil
}

func (c *ClickhouseConnector) ListMeterSubjects(ctx context.Context, namespace string, meterSlug string) ([]string, error) {
func (c *ClickhouseConnector) ListMeterSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error) {
if namespace == "" {
return nil, fmt.Errorf("namespace is required")
}
if meterSlug == "" {
return nil, fmt.Errorf("slug is required")
}

subjects, err := c.listMeterViewSubjects(ctx, namespace, meterSlug)
subjects, err := c.listMeterViewSubjects(ctx, namespace, meterSlug, from, to)
if err != nil {
if _, ok := err.(*models.MeterNotFoundError); ok {
return nil, err
Expand Down Expand Up @@ -417,10 +417,12 @@ func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace stri
return values, nil
}

func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespace string, meterSlug string) ([]string, error) {
func (c *ClickhouseConnector) listMeterViewSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error) {
query := listMeterViewSubjects{
Database: c.config.Database,
MeterViewName: getMeterViewNameBySlug(namespace, meterSlug),
From: from,
To: to,
}

sql, args, err := query.toSQL()
Expand Down
16 changes: 16 additions & 0 deletions internal/streaming/clickhouse_connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,30 @@ func sortedKeys(m map[string]string) []string {
type listMeterViewSubjects struct {
Database string
MeterViewName string
From *time.Time
To *time.Time
}

func (d listMeterViewSubjects) toSQL() (string, []interface{}, error) {
viewName := fmt.Sprintf("%s.%s", sqlbuilder.Escape(d.Database), sqlbuilder.Escape(d.MeterViewName))

var where []string
sb := sqlbuilder.ClickHouse.NewSelectBuilder()
sb.Select("DISTINCT subject")
sb.From(viewName)

if d.From != nil {
where = append(where, sb.GreaterEqualThan("windowstart", d.From.Unix()))
}

if d.To != nil {
where = append(where, sb.LessEqualThan("windowend", d.To.Unix()))
}

if len(where) > 0 {
sb.Where(where...)
}

sb.OrderBy("subject")

sql, args := sb.Build()
Expand Down
61 changes: 57 additions & 4 deletions internal/streaming/clickhouse_connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestQueryMeterView(t *testing.T) {
WindowSize: &windowSize,
},
wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1)) AS windowstart, tumbleEnd(windowstart, toIntervalHour(1)) AS windowend, subject, sumMerge(value) AS value, group1, group2 FROM openmeter.meter_meter1 WHERE (subject = ?) AND windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart",
wantArgs: []interface{}{"subject1", int64(1672531200), int64(1672617600)},
wantArgs: []interface{}{"subject1", from.Unix(), to.Unix()},
},
{ // Aggregate all available data
query: queryMeterView{
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestQueryMeterView(t *testing.T) {
From: &from,
},
wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ?",
wantArgs: []interface{}{int64(1672531200)},
wantArgs: []interface{}{from.Unix()},
},
{ // Aggregate data between interval
query: queryMeterView{
Expand All @@ -255,7 +255,7 @@ func TestQueryMeterView(t *testing.T) {
To: &to,
},
wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ?",
wantArgs: []interface{}{int64(1672531200), int64(1672617600)},
wantArgs: []interface{}{from.Unix(), to.Unix()},
},
{ // Aggregate data between interval, groupped by window size
query: queryMeterView{
Expand All @@ -267,7 +267,7 @@ func TestQueryMeterView(t *testing.T) {
WindowSize: &windowSize,
},
wantSQL: "SELECT tumbleStart(windowstart, toIntervalHour(1)) AS windowstart, tumbleEnd(windowstart, toIntervalHour(1)) AS windowend, sumMerge(value) AS value FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? GROUP BY windowstart, windowend ORDER BY windowstart",
wantArgs: []interface{}{int64(1672531200), int64(1672617600)},
wantArgs: []interface{}{from.Unix(), to.Unix()},
},
{ // Aggregate data for a single subject
query: queryMeterView{
Expand Down Expand Up @@ -316,3 +316,56 @@ func TestQueryMeterView(t *testing.T) {
})
}
}

func TestListMeterViewSubjects(t *testing.T) {
from, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00.001Z")
to, _ := time.Parse(time.RFC3339, "2023-01-02T00:00:00Z")

tests := []struct {
query listMeterViewSubjects
wantSQL string
wantArgs []interface{}
}{
{
query: listMeterViewSubjects{
Database: "openmeter",
MeterViewName: "meter_meter1",
},
wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 ORDER BY subject",
wantArgs: nil,
},
{
query: listMeterViewSubjects{
Database: "openmeter",
MeterViewName: "meter_meter1",
From: &from,
},
wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? ORDER BY subject",
wantArgs: []interface{}{from.Unix()},
},
{
query: listMeterViewSubjects{
Database: "openmeter",
MeterViewName: "meter_meter1",
From: &from,
To: &to,
},
wantSQL: "SELECT DISTINCT subject FROM openmeter.meter_meter1 WHERE windowstart >= ? AND windowend <= ? ORDER BY subject",
wantArgs: []interface{}{from.Unix(), to.Unix()},
},
}

for _, tt := range tests {
tt := tt
t.Run("", func(t *testing.T) {
gotSql, gotArgs, err := tt.query.toSQL()
if err != nil {
t.Error(err)
return
}

assert.Equal(t, tt.wantArgs, gotArgs)
assert.Equal(t, tt.wantSQL, gotSql)
})
}
}
2 changes: 1 addition & 1 deletion internal/streaming/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ type Connector interface {
CreateMeter(ctx context.Context, namespace string, meter *models.Meter) error
DeleteMeter(ctx context.Context, namespace string, meterSlug string) error
QueryMeter(ctx context.Context, namespace string, meterSlug string, params *QueryParams) (*QueryResult, error)
ListMeterSubjects(ctx context.Context, namespace string, meterSlug string) ([]string, error)
ListMeterSubjects(ctx context.Context, namespace string, meterSlug string, from *time.Time, to *time.Time) ([]string, error)
// Add more methods as needed ...
}

0 comments on commit 43f7b23

Please sign in to comment.