Skip to content

Commit

Permalink
chore: update exports for openmeter
Browse files Browse the repository at this point in the history
We are also moving the batch size on ingest notification setting to the sink
worker because it doesn't make sense on the consumer side.
  • Loading branch information
turip committed Aug 5, 2024
1 parent 6c66089 commit b5accb9
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func initIngestEventPublisher(logger *slog.Logger, conf config.Configuration, me
})

ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{
MaxEventsInBatch: conf.BalanceWorker.ChunkSize,
MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch,
})
if err != nil {
return nil, err
Expand Down
7 changes: 0 additions & 7 deletions config/balanceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type BalanceWorkerConfiguration struct {
DLQ DLQConfiguration
Retry RetryConfiguration
ChunkSize int
ConsumerGroupName string
}

Expand All @@ -28,11 +27,6 @@ func (c BalanceWorkerConfiguration) Validate() error {
return errors.New("consumer group name is required")
}

// Can be 0
if c.ChunkSize > 1000 {
return errors.New("chunk size must be less than 1000")
}

return nil
}

Expand All @@ -51,5 +45,4 @@ func ConfigureBalanceWorker(v *viper.Viper) {
v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond)

v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker")
v.SetDefault("balanceWorker.chunkSize", 500)
}
4 changes: 3 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func TestComplete(t *testing.T) {
Expiration: 768 * time.Hour,
},
},
IngestNotifications: IngestNotificationsConfiguration{
MaxEventsInBatch: 500,
},
},
Dedupe: DedupeConfiguration{
Enabled: true,
Expand Down Expand Up @@ -211,7 +214,6 @@ func TestComplete(t *testing.T) {
InitialInterval: 100 * time.Millisecond,
},
ConsumerGroupName: "om_balance_worker",
ChunkSize: 500,
},
NotificationService: NotificationServiceConfiguration{
Consumer: NotificationServiceConsumerConfiguration{
Expand Down
33 changes: 28 additions & 5 deletions config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package config

import (
"errors"
"fmt"
"time"

"github.com/spf13/viper"
)

type SinkConfiguration struct {
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
IngestNotifications IngestNotificationsConfiguration
}

func (c SinkConfiguration) Validate() error {
Expand All @@ -28,6 +30,26 @@ func (c SinkConfiguration) Validate() error {
return errors.New("NamespaceRefetch must be greater than 0")
}

if err := c.IngestNotifications.Validate(); err != nil {
return fmt.Errorf("ingest notifications: %w", err)
}

return nil
}

type IngestNotificationsConfiguration struct {
MaxEventsInBatch int
}

func (c IngestNotificationsConfiguration) Validate() error {
if c.MaxEventsInBatch < 0 {
return errors.New("ChunkSize must not be negative")
}

if c.MaxEventsInBatch > 1000 {
return errors.New("ChunkSize must not be greater than 1000")
}

return nil
}

Expand Down Expand Up @@ -56,4 +78,5 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.minCommitCount", 500)
v.SetDefault("sink.maxCommitWait", "5s")
v.SetDefault("sink.namespaceRefetch", "15s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
type (
IngestEventData = ingestnotification.IngestEventData
EventBatchedIngest = ingestnotification.EventBatchedIngest
HandlerConfig = ingestnotification.HandlerConfig
)

// Ingest notification handler
Expand Down
3 changes: 2 additions & 1 deletion openmeter/watermill/driver/kafka/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (
)

type (
PublisherOptions = watermillkafka.PublisherOptions
PublisherOptions = watermillkafka.PublisherOptions
AutoProvisionTopic = watermillkafka.AutoProvisionTopic
)

func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
Expand Down

0 comments on commit b5accb9

Please sign in to comment.