Skip to content

Commit

Permalink
feat(connector): add ingest timestamps (#1436)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: add ingested_at and created_at events to om_events, requires clickhouse migration

---------
Signed-off-by: Peter Marton <[email protected]>
Co-authored-by: Márk Sági-Kazár <[email protected]>
  • Loading branch information
hekike authored Sep 3, 2024
1 parent a00e4a6 commit 2975e54
Show file tree
Hide file tree
Showing 9 changed files with 701 additions and 616 deletions.
605 changes: 306 additions & 299 deletions api/api.gen.go

Large diffs are not rendered by default.

605 changes: 306 additions & 299 deletions api/client/go/client.gen.go

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ paths:
tokens: "1234"
model: "gpt-4-turbo"
type: "input"
ingestedAt: "2023-01-01T01:01:01.001Z"
storedAt: "2023-01-01T01:01:01.001Z"
- event:
id: 912d1d87-09ad-4bcc-a57d-5b3552493ed2
source: service-name
Expand All @@ -102,6 +104,8 @@ paths:
tokens: "2345"
model: "gpt-4-turbo"
type: "output"
ingestedAt: "2023-01-01T01:01:02.001Z"
storedAt: "2023-01-01T01:01:02.001Z"
"400":
$ref: "#/components/responses/BadRequestProblemResponse"
"401":
Expand Down Expand Up @@ -1958,29 +1962,43 @@ components:
type: "input"
IngestedEvent:
description: An ingested event with optional validation error.
readOnly: true
type: object
additionalProperties: false
required:
- event
- ingestedAt
- storedAt
properties:
event:
$ref: "#/components/schemas/Event"
validationError:
type: string
readOnly: true
example: "invalid event"
ingestedAt:
type: string
format: date-time
description: The date and time the event was ingested.
example: "2024-01-01T00:00:00Z"
storedAt:
type: string
format: date-time
description: The date and time the event was stored.
example: "2024-01-01T00:00:00Z"
example:
event:
id: 5c10fade-1c9e-4d6c-8275-c52c36731d3d
source: service-name
specversion: "1.0"
type: prompt
subject: customer-id
time: "2023-01-01T01:01:01.001Z"
time: "2024-01-01T01:01:01.001Z"
data:
tokens: "1234"
model: "gpt-4-turbo"
validationError: "meter not found for event"
ingestedAt: "2024-01-01T00:00:00Z"
storedAt: "2024-01-01T00:00:00Z"
FeatureCreateInputs:
type: object
description: |
Expand Down
1 change: 1 addition & 0 deletions openmeter/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event)
Headers: []kafka.Header{
{Key: "namespace", Value: []byte(namespace)},
{Key: "specversion", Value: []byte(ev.SpecVersion())},
{Key: "ingested_at", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
},
Key: key,
Value: value,
Expand Down
37 changes: 36 additions & 1 deletion openmeter/sink/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sink
import (
"context"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/huandu/go-sqlbuilder"
Expand Down Expand Up @@ -32,6 +33,7 @@ type ClickHouseStorage struct {

func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error {
query := InsertEventsQuery{
Clock: realClock{},
Database: c.config.Database,
Messages: messages,
}
Expand All @@ -49,6 +51,7 @@ func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmode
}

type InsertEventsQuery struct {
Clock Clock
Database string
Messages []sinkmodels.SinkMessage
}
Expand All @@ -58,14 +61,31 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) {

query := sqlbuilder.ClickHouse.NewInsertBuilder()
query.InsertInto(tableName)
query.Cols("namespace", "validation_error", "id", "type", "source", "subject", "time", "data")
query.Cols("namespace", "validation_error", "id", "type", "source", "subject", "time", "data", "ingested_at", "stored_at")

for _, message := range q.Messages {
var eventErr string
if message.Status.Error != nil {
eventErr = message.Status.Error.Error()
}

storedAt := q.Clock.Now()
ingestedAt := storedAt

if message.KafkaMessage != nil {
for _, header := range message.KafkaMessage.Headers {
// Parse ingested_at header
if header.Key == "ingested_at" {
var err error

ingestedAt, err = time.Parse(time.RFC3339, string(header.Value))
if err != nil {
eventErr = fmt.Sprintf("failed to parse ingested_at header: %s", err)
}
}
}
}

query.Values(
message.Namespace,
eventErr,
Expand All @@ -75,9 +95,24 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) {
message.Serialized.Subject,
message.Serialized.Time,
message.Serialized.Data,
ingestedAt,
storedAt,
)
}

sql, args := query.Build()
return sql, args, nil
}

// Clock is an interface for getting the current time.
// It is used to make the code testable.
type Clock interface {
Now() time.Time
}

// realClock implements Clock using the system clock.
type realClock struct{}

func (realClock) Now() time.Time {
return time.Now()
}
22 changes: 16 additions & 6 deletions openmeter/sink/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestInsertEventsQuery(t *testing.T) {
now := time.Now()

query := sink.InsertEventsQuery{
Clock: mockClock{now: now},
Database: "database",
Messages: []sinkmodels.SinkMessage{
{
Expand Down Expand Up @@ -60,10 +61,19 @@ func TestInsertEventsQuery(t *testing.T) {

sql, args, err := query.ToSQL()
assert.NoError(t, err)
assert.Equal(t, args, []interface{}{
"my_namespace", "", "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`,
"my_namespace", "", "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`,
"my_namespace", "event data value cannot be parsed as float64: not a number", "3", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": "foo", "method": "GET", "path": "/api/v1"}`,
})
assert.Equal(t, `INSERT INTO database.om_events (namespace, validation_error, id, type, source, subject, time, data) VALUES (?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?)`, sql)

assert.Equal(t, []interface{}{
"my_namespace", "", "1", "api-calls", "source", "subject-1", now.UnixMilli(), `{"duration_ms": 100, "method": "GET", "path": "/api/v1"}`, now, now,
"my_namespace", "", "2", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": 80, "method": "GET", "path": "/api/v1"}`, now, now,
"my_namespace", "event data value cannot be parsed as float64: not a number", "3", "api-calls", "source", "subject-2", now.UnixMilli(), `{"duration_ms": "foo", "method": "GET", "path": "/api/v1"}`, now, now,
}, args)
assert.Equal(t, `INSERT INTO database.om_events (namespace, validation_error, id, type, source, subject, time, data, ingested_at, stored_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, sql)
}

type mockClock struct {
now time.Time
}

func (m mockClock) Now() time.Time {
return m.now
}
11 changes: 8 additions & 3 deletions openmeter/streaming/clickhouse_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,13 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st
var eventType string
var subject string
var source string
var time time.Time
var eventTime time.Time
var dataStr string
var validationError string
var ingestedAt time.Time
var storedAt time.Time

if err = rows.Scan(&id, &eventType, &subject, &source, &time, &dataStr, &validationError); err != nil {
if err = rows.Scan(&id, &eventType, &subject, &source, &eventTime, &dataStr, &validationError, &ingestedAt, &storedAt); err != nil {
return nil, err
}

Expand All @@ -263,7 +265,7 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st
event.SetType(eventType)
event.SetSubject(subject)
event.SetSource(source)
event.SetTime(time)
event.SetTime(eventTime)
err = event.SetData("application/json", data)
if err != nil {
return nil, fmt.Errorf("query events set data: %w", err)
Expand All @@ -277,6 +279,9 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st
ingestedEvent.ValidationError = &validationError
}

ingestedEvent.IngestedAt = ingestedAt
ingestedEvent.StoredAt = storedAt

events = append(events, ingestedEvent)
}

Expand Down
4 changes: 3 additions & 1 deletion openmeter/streaming/clickhouse_connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (d createEventsTable) toSQL() string {
sb.Define("source", "String")
sb.Define("time", "DateTime")
sb.Define("data", "String")
sb.Define("ingested_at", "DateTime")
sb.Define("stored_at", "DateTime")
sb.SQL("ENGINE = MergeTree")
sb.SQL("PARTITION BY toYYYYMM(time)")
sb.SQL("ORDER BY (namespace, time, type, subject)")
Expand All @@ -58,7 +60,7 @@ func (d queryEventsTable) toSQL() (string, []interface{}) {
where := []string{}

query := sqlbuilder.ClickHouse.NewSelectBuilder()
query.Select("id", "type", "subject", "source", "time", "data", "validation_error")
query.Select("id", "type", "subject", "source", "time", "data", "validation_error", "ingested_at", "stored_at")
query.From(tableName)

where = append(where, query.Equal("namespace", d.Namespace))
Expand Down
10 changes: 5 additions & 5 deletions openmeter/streaming/clickhouse_connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCreateEventsTable(t *testing.T) {
data: createEventsTable{
Database: "openmeter",
},
want: "CREATE TABLE IF NOT EXISTS openmeter.om_events (namespace String, validation_error String, id String, type LowCardinality(String), subject String, source String, time DateTime, data String) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (namespace, time, type, subject)",
want: "CREATE TABLE IF NOT EXISTS openmeter.om_events (namespace String, validation_error String, id String, type LowCardinality(String), subject String, source String, time DateTime, data String, ingested_at DateTime, stored_at DateTime) ENGINE = MergeTree PARTITION BY toYYYYMM(time) ORDER BY (namespace, time, type, subject)",
},
}

Expand All @@ -43,7 +43,7 @@ func TestQueryEventsTable(t *testing.T) {
Namespace: "my_namespace",
Limit: 100,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? ORDER BY time DESC LIMIT 100",
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? ORDER BY time DESC LIMIT 100",
wantArgs: []interface{}{"my_namespace"},
},
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestQueryEvents(t *testing.T) {
To: &toTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", fromTime.Unix(), toTime.Unix()},
},
{
Expand All @@ -464,7 +464,7 @@ func TestQueryEvents(t *testing.T) {
From: &fromTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time >= ? ORDER BY time DESC LIMIT 10",
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND time >= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", fromTime.Unix()},
},
{
Expand All @@ -474,7 +474,7 @@ func TestQueryEvents(t *testing.T) {
To: &toTime,
Limit: 10,
},
wantSQL: "SELECT id, type, subject, source, time, data, validation_error FROM openmeter.om_events WHERE namespace = ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantSQL: "SELECT id, type, subject, source, time, data, validation_error, ingested_at, stored_at FROM openmeter.om_events WHERE namespace = ? AND time <= ? ORDER BY time DESC LIMIT 10",
wantArgs: []interface{}{"my_namespace", toTime.Unix()},
},
}
Expand Down

0 comments on commit 2975e54

Please sign in to comment.