Skip to content

Commit

Permalink
Merge pull request #1730 from openmeterio/telemetry-config
Browse files Browse the repository at this point in the history
feat: add otel log support
  • Loading branch information
sagikazarmark authored Oct 24, 2024
2 parents 8d44ff1 + 15af4e9 commit 4143b1d
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 34 deletions.
51 changes: 44 additions & 7 deletions app/common/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import (
"github.com/go-slog/otelslog"
"github.com/prometheus/client_golang/prometheus/promhttp"
slogmulti "github.com/samber/slog-multi"
realotelslog "go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -61,13 +64,47 @@ func NewTelemetryResource(metadata Metadata) *resource.Resource {
return res
}

func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource) *slog.Logger {
return slog.New(slogmulti.Pipe(
otelslog.ResourceMiddleware(res),
otelslog.NewHandler,
contextx.NewLogHandler,
operation.NewLogHandler,
).Handler(conf.NewHandler(os.Stdout)))
func NewLoggerProvider(ctx context.Context, conf config.LogTelemetryConfig, res *resource.Resource) (*sdklog.LoggerProvider, func(), error) {
loggerProvider, err := conf.NewLoggerProvider(ctx, res)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize OpenTelemetry Trace provider: %w", err)
}

return loggerProvider, func() {
// Use dedicated context with timeout for shutdown as parent context might be canceled
// by the time the execution reaches this stage.
ctx, cancel := context.WithTimeout(context.Background(), DefaultShutdownTimeout)
defer cancel()

if err := loggerProvider.ForceFlush(ctx); err != nil {
// no logger initialized at this point yet, so we are using the global logger
slog.Error("flushing logger provider", slog.String("error", err.Error()))
}

if err := loggerProvider.Shutdown(ctx); err != nil {
// no logger initialized at this point yet, so we are using the global logger
slog.Error("shutting down logger provider", slog.String("error", err.Error()))
}
}, nil
}

func NewLogger(conf config.LogTelemetryConfig, res *resource.Resource, loggerProvider log.LoggerProvider, metadata Metadata) *slog.Logger {
return slog.New(
slogmulti.Pipe(
// Common handlers
contextx.NewLogHandler,
operation.NewLogHandler,
).Handler(slogmulti.Fanout(
// Original logger (with otel context middleware)
slogmulti.Pipe(
otelslog.ResourceMiddleware(res),
otelslog.NewHandler,
).Handler(conf.NewHandler(os.Stdout)),

// Otel logger
realotelslog.NewHandler(metadata.OpenTelemetryName, realotelslog.WithLoggerProvider(loggerProvider)),
)),
)
}

func NewMeterProvider(ctx context.Context, conf config.MetricsTelemetryConfig, res *resource.Resource, logger *slog.Logger) (*sdkmetric.MeterProvider, func(), error) {
Expand Down
4 changes: 4 additions & 0 deletions app/common/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package common

import (
"github.com/google/wire"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -83,6 +85,8 @@ var KafkaTopic = wire.NewSet(
var Telemetry = wire.NewSet(
NewTelemetryResource,

NewLoggerProvider,
wire.Bind(new(log.LoggerProvider), new(*sdklog.LoggerProvider)),
NewLogger,

NewMeterProvider,
Expand Down
76 changes: 76 additions & 0 deletions app/config/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/lmittmann/tint"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -327,6 +329,8 @@ type LogTelemetryConfig struct {
//
// Requires [mapstructure.TextUnmarshallerHookFunc] to be high up in the decode hook chain.
Level slog.Level

Exporters ExportersLogTelemetryConfig
}

// Validate validates the configuration.
Expand Down Expand Up @@ -369,6 +373,76 @@ func (c LogTelemetryConfig) NewHandler(w io.Writer) slog.Handler {
return slog.NewJSONHandler(w, &slog.HandlerOptions{Level: c.Level})
}

func (c LogTelemetryConfig) NewLoggerProvider(ctx context.Context, res *resource.Resource) (*sdklog.LoggerProvider, error) {
options := []sdklog.LoggerProviderOption{
sdklog.WithResource(res),
}

if c.Exporters.OTLP.Enabled {
exporter, err := c.Exporters.OTLP.NewExporter(ctx)
if err != nil {
return nil, err
}

options = append(options, sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)))
}

return sdklog.NewLoggerProvider(options...), nil
}

type ExportersLogTelemetryConfig struct {
OTLP OTLPExportersLogTelemetryConfig
}

// Validate validates the configuration.
func (c ExportersLogTelemetryConfig) Validate() error {
var errs []error

if err := c.OTLP.Validate(); err != nil {
errs = append(errs, errorsx.WithPrefix(err, "otlp"))
}

return errors.Join(errs...)
}

type OTLPExportersLogTelemetryConfig struct {
Enabled bool

OTLPExporterTelemetryConfig `mapstructure:",squash"`
}

// Validate validates the configuration.
func (c OTLPExportersLogTelemetryConfig) Validate() error {
if !c.Enabled {
return nil
}

return c.OTLPExporterTelemetryConfig.Validate()
}

// NewExporter creates a new [sdklog.Exporter].
func (c OTLPExportersLogTelemetryConfig) NewExporter(ctx context.Context) (sdklog.Exporter, error) {
if !c.Enabled {
return nil, errors.New("telemetry: log: exporter: otlp: disabled")
}

// TODO: make this configurable
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

conn, err := c.DialExporter(ctx)
if err != nil {
return nil, fmt.Errorf("telemetry: log: exporter: otlp: %w", err)
}

exporter, err := otlploggrpc.New(ctx, otlploggrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("telemetry: log: exporter: otlp: initializing exporter: %w", err)
}

return exporter, nil
}

// ConfigureTelemetry configures some defaults in the Viper instance.
func ConfigureTelemetry(v *viper.Viper, flags *pflag.FlagSet) {
flags.String("telemetry-address", ":10000", "Telemetry HTTP server address")
Expand All @@ -385,4 +459,6 @@ func ConfigureTelemetry(v *viper.Viper, flags *pflag.FlagSet) {

v.SetDefault("telemetry.log.format", "json")
v.SetDefault("telemetry.log.level", "info")
v.SetDefault("telemetry.log.exporters.otlp.enabled", false)
v.SetDefault("telemetry.log.exporters.otlp.address", "")
}
30 changes: 23 additions & 7 deletions cmd/balance-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4143b1d

Please sign in to comment.