Skip to content

Commit

Permalink
Merge pull request #880 from openmeterio/refacor-ctx-sink
Browse files Browse the repository at this point in the history
refactor(sink-worker): use contex.Context for fetching namespaces
  • Loading branch information
sagikazarmark authored May 10, 2024
2 parents e2c5228 + 6876878 commit f453098
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func main() {
// Initialize OTel Metrics
otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(ctx, res)
if err != nil {
logger.Error(err.Error())
logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error()))
os.Exit(1)
}
defer func() {
Expand All @@ -134,7 +134,7 @@ func main() {
// Initialize OTel Tracer
otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(ctx, res)
if err != nil {
logger.Error(err.Error())
logger.Error("failed to initialize OpenTelemetry Trace provider", slog.String("error", err.Error()))
os.Exit(1)
}
defer func() {
Expand Down
15 changes: 5 additions & 10 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,7 @@ func (s *Sink) dedupeSet(ctx context.Context, messages []SinkMessage) error {
return nil
}

func (s *Sink) getNamespaces() (*NamespaceStore, error) {
ctx := context.TODO()

func (s *Sink) getNamespaces(ctx context.Context) (*NamespaceStore, error) {
meters, err := s.config.MeterRepository.ListAllMeters(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get meters: %s", err)
Expand All @@ -339,9 +337,9 @@ func (s *Sink) getNamespaces() (*NamespaceStore, error) {
return namespaceStore, nil
}

func (s *Sink) subscribeToNamespaces() error {
func (s *Sink) subscribeToNamespaces(ctx context.Context) error {
logger := s.config.Logger.With("operation", "subscribeToNamespaces")
ns, err := s.getNamespaces()
ns, err := s.getNamespaces(ctx)
if err != nil {
return fmt.Errorf("failed to get namespaces: %s", err)
}
Expand Down Expand Up @@ -370,9 +368,6 @@ func (s *Sink) subscribeToNamespaces() error {
if err != nil {
return fmt.Errorf("failed to subscribe to topics: %s", err)
}
if err != nil {
return fmt.Errorf("failed to subscribe to topics: %s", err)
}
}

return nil
Expand Down Expand Up @@ -411,15 +406,15 @@ func (s *Sink) Run(ctx context.Context) error {
logger.Info("starting sink")

// Fetch namespaces and meters and subscribe to them
err := s.subscribeToNamespaces()
err := s.subscribeToNamespaces(ctx)
if err != nil {
return fmt.Errorf("failed to subscribe to namespaces: %s", err)
}

// Periodically refetch namespaces and meters
var refetch func()
refetch = func() {
err := s.subscribeToNamespaces()
err := s.subscribeToNamespaces(ctx)
if err != nil {
// TODO: should we panic?
logger.Error("failed to subscribe to namespaces", "err", err)
Expand Down

0 comments on commit f453098

Please sign in to comment.