Skip to content

Commit

Permalink
Merge pull request #1526 from openmeterio/reduce-sarama-metrics
Browse files Browse the repository at this point in the history
fix: reduce the number of sarma metrics
  • Loading branch information
turip authored Sep 17, 2024
2 parents a1a7f07 + 03cb249 commit a43fd8c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 39 deletions.
11 changes: 2 additions & 9 deletions openmeter/watermill/driver/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import (
)

const (
defaultMeterPrefix = "sarama."
defaultKeepalive = time.Minute
defaultKeepalive = time.Minute
)

type BrokerOptions struct {
KafkaConfig config.KafkaConfiguration
ClientID string
Logger *slog.Logger
MetricMeter otelmetric.Meter
MeterPrefix string
DebugLogging bool
}

Expand Down Expand Up @@ -53,11 +51,6 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) {
if role == "" {
return nil, errors.New("role is required")
}

if o.MeterPrefix == "" {
o.MeterPrefix = defaultMeterPrefix
}

if o.KafkaConfig.SocketKeepAliveEnabled {
config.Net.KeepAlive = defaultKeepalive
}
Expand Down Expand Up @@ -100,7 +93,7 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) {

meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{
MetricMeter: o.MetricMeter,
NameTransformFn: metrics.MetricAddNamePrefix(fmt.Sprintf("%s%s.", o.MeterPrefix, role)),
NameTransformFn: SaramaMetricRenamer(role),
ErrorHandler: metrics.LoggingErrorHandler(o.Logger),
})
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions openmeter/watermill/driver/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package kafka

import (
"regexp"
"slices"
"strings"

"go.opentelemetry.io/otel/attribute"

"github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka/metrics"
)

var (
forBrokerMetricRegex = regexp.MustCompile("(.*)-for-broker-(.*)")
forTopicMetricRegex = regexp.MustCompile("(.*)-for-topic-(.*)")

ignoreMetrics = []string{
"batch-size", // we have batch-size-for-topic

"consumer-batch-size", // we have batch-size-for-topic
"consumer-fetch-rate", // we have for topic metric
"incoming-byte-rate", // we have for broker metric
"outgoing-byte-rate", // we have for broker metric
"record-send-rate", // we have for broker metric
"request-latency-in-ms", // we have for broker metric
"request-size", // we have for broker metric
"request-rate-total", // we have for broker metric
"records-per-request", // we have for topic metric
"requests-in-flight", // we have for broker metric
"response-rate", // we have for broker metric
"response-size", // we have for broker metric
}

ingorePrefixes = []string{
"protocol-requests-rate", // too low level, we don't need it for now
"compression-", // don't care
}
)

func SaramaMetricRenamer(role string) metrics.TransformMetricsNameToOtel {
return func(name string) metrics.TransformedMetric {
res := metrics.TransformedMetric{
Name: "sarama." + name,
}

if slices.Contains(ignoreMetrics, name) {
res.Drop = true
return res
}

for _, prefix := range ingorePrefixes {
if strings.HasPrefix(name, prefix) {
res.Drop = true
return res
}
}

attributes := []attribute.KeyValue{
attribute.String("role", role),
}

if matches := forBrokerMetricRegex.FindStringSubmatch(name); len(matches) == 3 {
res.Name = "sarama." + matches[1] + "_for_broker"

attributes = append(attributes, attribute.String("broker_id", matches[2]))

res.Attributes = attribute.NewSet(attributes...)
return res
}

if matches := forTopicMetricRegex.FindStringSubmatch(name); len(matches) == 3 {
res.Name = "sarama." + matches[1] + "_for_topic"

attributes = append(attributes, attribute.String("topic", matches[2]))

res.Attributes = attribute.NewSet(attributes...)
return res
}

res.Attributes = attribute.NewSet(attributes...)
return res
}
}
76 changes: 46 additions & 30 deletions openmeter/watermill/driver/kafka/metrics/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
"sync"

"github.com/rcrowley/go-metrics"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)

type TransformedMetric struct {
Name string
Attributes attribute.Set
Drop bool
}

type (
TransformMetricsNameToOtel func(string) string
TransformMetricsNameToOtel func(string) TransformedMetric
ErrorHandler func(error)
)

Expand All @@ -23,12 +30,6 @@ func LoggingErrorHandler(dest *slog.Logger) ErrorHandler {
}
}

func MetricAddNamePrefix(prefix string) TransformMetricsNameToOtel {
return func(name string) string {
return prefix + name
}
}

type NewRegistryOptions struct {
MetricMeter otelmetric.Meter
NameTransformFn TransformMetricsNameToOtel
Expand All @@ -41,8 +42,11 @@ func NewRegistry(opts NewRegistryOptions) (metrics.Registry, error) {
}

if opts.NameTransformFn == nil {
opts.NameTransformFn = func(name string) string {
return name
opts.NameTransformFn = func(name string) TransformedMetric {
return TransformedMetric{
Name: name,
Attributes: attribute.NewSet(),
}
}
}

Expand Down Expand Up @@ -110,43 +114,50 @@ func (r *registry) getWrappedMeter(name string, def interface{}) (interface{}, e
def = v.Call(nil)[0].Interface()
}

transfomedMetric := r.nameTransformFn(name)

if transfomedMetric.Drop {
// If we are not interested in the metric, let's just return the original metric
return def, nil
}

switch meterDef := def.(type) {
case metrics.Meter:
otelMeter, err := r.meticMeter.Int64Counter(r.nameTransformFn(name))
otelMeter, err := r.meticMeter.Int64Counter(transfomedMetric.Name)
if err != nil {
return def, err
}

return &wrappedMeter{Meter: meterDef, otelMeter: otelMeter}, nil
return &wrappedMeter{Meter: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil
case metrics.Counter:
otelMeter, err := r.meticMeter.Int64UpDownCounter(r.nameTransformFn(name))
otelMeter, err := r.meticMeter.Int64UpDownCounter(transfomedMetric.Name)
if err != nil {
return def, err
}

return &wrappedCounter{Counter: meterDef, otelMeter: otelMeter}, nil
return &wrappedCounter{Counter: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil
case metrics.GaugeFloat64:
otelMeter, err := r.meticMeter.Float64Gauge(r.nameTransformFn(name))
otelMeter, err := r.meticMeter.Float64Gauge(transfomedMetric.Name)
if err != nil {
return def, err
}

return &wrappedGaugeFloat64{GaugeFloat64: meterDef, otelMeter: otelMeter}, nil
return &wrappedGaugeFloat64{GaugeFloat64: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil
case metrics.Gauge:
otelMeter, err := r.meticMeter.Int64Gauge(r.nameTransformFn(name))
otelMeter, err := r.meticMeter.Int64Gauge(transfomedMetric.Name)
if err != nil {
return def, err
}

return &wrappedGauge{Gauge: meterDef, otelMeter: otelMeter}, nil
return &wrappedGauge{Gauge: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil
case metrics.Histogram:
otelMeter, err := r.meticMeter.Int64Histogram(r.nameTransformFn(name))
otelMeter, err := r.meticMeter.Int64Histogram(transfomedMetric.Name)
if err != nil {
r.errorHandler(err)
break
}

return &wrappedHistogram{Histogram: meterDef, otelMeter: otelMeter}, nil
return &wrappedHistogram{Histogram: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil
default:
// this is just a safety net, as we should have handled all the cases above (based on the lib)
r.errorHandler(fmt.Errorf("unsupported metric type (name=%s): %v", name, def))
Expand All @@ -157,55 +168,60 @@ func (r *registry) getWrappedMeter(name string, def interface{}) (interface{}, e

type wrappedMeter struct {
metrics.Meter
otelMeter otelmetric.Int64Counter
otelMeter otelmetric.Int64Counter
attributes attribute.Set
}

func (m *wrappedMeter) Mark(n int64) {
m.otelMeter.Add(context.Background(), n)
m.otelMeter.Add(context.Background(), n, otelmetric.WithAttributeSet(m.attributes))
m.Meter.Mark(n)
}

type wrappedCounter struct {
metrics.Counter
otelMeter otelmetric.Int64UpDownCounter
otelMeter otelmetric.Int64UpDownCounter
attributes attribute.Set
}

func (m *wrappedCounter) Inc(n int64) {
m.otelMeter.Add(context.Background(), n)
m.otelMeter.Add(context.Background(), n, otelmetric.WithAttributeSet(m.attributes))
m.Counter.Inc(n)
}

func (m *wrappedCounter) Dec(n int64) {
m.otelMeter.Add(context.Background(), -n)
m.otelMeter.Add(context.Background(), -n, otelmetric.WithAttributeSet(m.attributes))
m.Counter.Dec(n)
}

type wrappedGaugeFloat64 struct {
metrics.GaugeFloat64
otelMeter otelmetric.Float64Gauge
otelMeter otelmetric.Float64Gauge
attributes attribute.Set
}

func (m *wrappedGaugeFloat64) Update(newVal float64) {
m.otelMeter.Record(context.Background(), newVal)
m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes))
m.GaugeFloat64.Update(newVal)
}

type wrappedGauge struct {
metrics.Gauge
otelMeter otelmetric.Int64Gauge
otelMeter otelmetric.Int64Gauge
attributes attribute.Set
}

func (m *wrappedGauge) Update(newVal int64) {
m.otelMeter.Record(context.Background(), newVal)
m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes))
m.Gauge.Update(newVal)
}

type wrappedHistogram struct {
metrics.Histogram
otelMeter otelmetric.Int64Histogram
otelMeter otelmetric.Int64Histogram
attributes attribute.Set
}

func (m *wrappedHistogram) Update(newVal int64) {
m.otelMeter.Record(context.Background(), newVal)
m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes))
m.Histogram.Update(newVal)
}

0 comments on commit a43fd8c

Please sign in to comment.