Skip to content

Commit

Permalink
Merge pull request #1307 from openmeterio/chore/expose-other-publishi…
Browse files Browse the repository at this point in the history
…ng-stuff

chore: update exports for openmeter
  • Loading branch information
turip authored Aug 5, 2024
2 parents 5b9411a + b5accb9 commit cb6c8ce
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func initIngestEventPublisher(logger *slog.Logger, conf config.Configuration, me
})

ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{
MaxEventsInBatch: conf.BalanceWorker.ChunkSize,
MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch,
})
if err != nil {
return nil, err
Expand Down
7 changes: 0 additions & 7 deletions config/balanceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type BalanceWorkerConfiguration struct {
DLQ DLQConfiguration
Retry RetryConfiguration
ChunkSize int
ConsumerGroupName string
}

Expand All @@ -28,11 +27,6 @@ func (c BalanceWorkerConfiguration) Validate() error {
return errors.New("consumer group name is required")
}

// Can be 0
if c.ChunkSize > 1000 {
return errors.New("chunk size must be less than 1000")
}

return nil
}

Expand All @@ -51,5 +45,4 @@ func ConfigureBalanceWorker(v *viper.Viper) {
v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond)

v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker")
v.SetDefault("balanceWorker.chunkSize", 500)
}
4 changes: 3 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func TestComplete(t *testing.T) {
Expiration: 768 * time.Hour,
},
},
IngestNotifications: IngestNotificationsConfiguration{
MaxEventsInBatch: 500,
},
},
Dedupe: DedupeConfiguration{
Enabled: true,
Expand Down Expand Up @@ -211,7 +214,6 @@ func TestComplete(t *testing.T) {
InitialInterval: 100 * time.Millisecond,
},
ConsumerGroupName: "om_balance_worker",
ChunkSize: 500,
},
NotificationService: NotificationServiceConfiguration{
Consumer: NotificationServiceConsumerConfiguration{
Expand Down
33 changes: 28 additions & 5 deletions config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package config

import (
"errors"
"fmt"
"time"

"github.com/spf13/viper"
)

type SinkConfiguration struct {
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
IngestNotifications IngestNotificationsConfiguration
}

func (c SinkConfiguration) Validate() error {
Expand All @@ -28,6 +30,26 @@ func (c SinkConfiguration) Validate() error {
return errors.New("NamespaceRefetch must be greater than 0")
}

if err := c.IngestNotifications.Validate(); err != nil {
return fmt.Errorf("ingest notifications: %w", err)
}

return nil
}

type IngestNotificationsConfiguration struct {
MaxEventsInBatch int
}

func (c IngestNotificationsConfiguration) Validate() error {
if c.MaxEventsInBatch < 0 {
return errors.New("ChunkSize must not be negative")
}

if c.MaxEventsInBatch > 1000 {
return errors.New("ChunkSize must not be greater than 1000")
}

return nil
}

Expand Down Expand Up @@ -56,4 +78,5 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.minCommitCount", 500)
v.SetDefault("sink.maxCommitWait", "5s")
v.SetDefault("sink.namespaceRefetch", "15s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
type (
IngestEventData = ingestnotification.IngestEventData
EventBatchedIngest = ingestnotification.EventBatchedIngest
HandlerConfig = ingestnotification.HandlerConfig
)

// Ingest notification handler
Expand Down
3 changes: 2 additions & 1 deletion openmeter/watermill/driver/kafka/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (
)

type (
PublisherOptions = watermillkafka.PublisherOptions
PublisherOptions = watermillkafka.PublisherOptions
AutoProvisionTopic = watermillkafka.AutoProvisionTopic
)

func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) {
Expand Down

0 comments on commit cb6c8ce

Please sign in to comment.