diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 2362f9549..02d7d5462 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -262,7 +262,7 @@ func initIngestEventPublisher(logger *slog.Logger, conf config.Configuration, me }) ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{ - MaxEventsInBatch: conf.BalanceWorker.ChunkSize, + MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch, }) if err != nil { return nil, err diff --git a/config/balanceworker.go b/config/balanceworker.go index 58d5c6f00..4717f8548 100644 --- a/config/balanceworker.go +++ b/config/balanceworker.go @@ -11,7 +11,6 @@ import ( type BalanceWorkerConfiguration struct { DLQ DLQConfiguration Retry RetryConfiguration - ChunkSize int ConsumerGroupName string } @@ -28,11 +27,6 @@ func (c BalanceWorkerConfiguration) Validate() error { return errors.New("consumer group name is required") } - // Can be 0 - if c.ChunkSize > 1000 { - return errors.New("chunk size must be less than 1000") - } - return nil } @@ -51,5 +45,4 @@ func ConfigureBalanceWorker(v *viper.Viper) { v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond) v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker") - v.SetDefault("balanceWorker.chunkSize", 500) } diff --git a/config/config_test.go b/config/config_test.go index bc3462780..a6171eb87 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -132,6 +132,9 @@ func TestComplete(t *testing.T) { Expiration: 768 * time.Hour, }, }, + IngestNotifications: IngestNotificationsConfiguration{ + MaxEventsInBatch: 500, + }, }, Dedupe: DedupeConfiguration{ Enabled: true, @@ -211,7 +214,6 @@ func TestComplete(t *testing.T) { InitialInterval: 100 * time.Millisecond, }, ConsumerGroupName: "om_balance_worker", - ChunkSize: 500, }, NotificationService: NotificationServiceConfiguration{ Consumer: NotificationServiceConsumerConfiguration{ diff --git a/config/sink.go b/config/sink.go index 1ed5de805..aec90da21 100644 --- a/config/sink.go +++ b/config/sink.go @@ -2,17 +2,19 @@ package config import ( "errors" + "fmt" "time" "github.com/spf13/viper" ) type SinkConfiguration struct { - GroupId string - Dedupe DedupeConfiguration - MinCommitCount int - MaxCommitWait time.Duration - NamespaceRefetch time.Duration + GroupId string + Dedupe DedupeConfiguration + MinCommitCount int + MaxCommitWait time.Duration + NamespaceRefetch time.Duration + IngestNotifications IngestNotificationsConfiguration } func (c SinkConfiguration) Validate() error { @@ -28,6 +30,26 @@ func (c SinkConfiguration) Validate() error { return errors.New("NamespaceRefetch must be greater than 0") } + if err := c.IngestNotifications.Validate(); err != nil { + return fmt.Errorf("ingest notifications: %w", err) + } + + return nil +} + +type IngestNotificationsConfiguration struct { + MaxEventsInBatch int +} + +func (c IngestNotificationsConfiguration) Validate() error { + if c.MaxEventsInBatch < 0 { + return errors.New("ChunkSize must not be negative") + } + + if c.MaxEventsInBatch > 1000 { + return errors.New("ChunkSize must not be greater than 1000") + } + return nil } @@ -56,4 +78,5 @@ func ConfigureSink(v *viper.Viper) { v.SetDefault("sink.minCommitCount", 500) v.SetDefault("sink.maxCommitWait", "5s") v.SetDefault("sink.namespaceRefetch", "15s") + v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500) } diff --git a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go index 1e8618612..9300474c7 100644 --- a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go +++ b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go @@ -18,6 +18,7 @@ const ( type ( IngestEventData = ingestnotification.IngestEventData EventBatchedIngest = ingestnotification.EventBatchedIngest + HandlerConfig = ingestnotification.HandlerConfig ) // Ingest notification handler diff --git a/openmeter/watermill/driver/kafka/driver.go b/openmeter/watermill/driver/kafka/driver.go index ae9e1214c..3d397d314 100644 --- a/openmeter/watermill/driver/kafka/driver.go +++ b/openmeter/watermill/driver/kafka/driver.go @@ -13,7 +13,8 @@ const ( ) type ( - PublisherOptions = watermillkafka.PublisherOptions + PublisherOptions = watermillkafka.PublisherOptions + AutoProvisionTopic = watermillkafka.AutoProvisionTopic ) func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {