Skip to content

Commit

Permalink
feat: instrument Kafka extractor with OpenTelemetry (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudo-suhas authored Jul 27, 2023
1 parent 76fdf80 commit 1f39c3a
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"crypto/tls"
"crypto/x509"
_ "embed" // used to print the embedded assets
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/goto/meteor/models"
Expand All @@ -15,6 +17,9 @@ import (
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)
Expand Down Expand Up @@ -69,9 +74,10 @@ var info = plugins.Info{
type Extractor struct {
plugins.BaseExtractor
// internal states
conn *kafka.Conn
logger log.Logger
config Config
conn *kafka.Conn
logger log.Logger
config Config
clientDurn metric.Int64Histogram
}

// New returns a pointer to an initialized Extractor Object
Expand All @@ -86,6 +92,14 @@ func New(logger log.Logger) *Extractor {

// Init initializes the extractor
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
clientDurn, err := otel.Meter("").
Int64Histogram("meteor.kafka.client.duration", metric.WithUnit("ms"))
if err != nil {
return fmt.Errorf("create client duration histogram: %w", err)
}

e.clientDurn = clientDurn

if err := e.BaseExtractor.Init(ctx, config); err != nil {
return err
}
Expand All @@ -106,7 +120,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
}

// create connection
var err error
e.conn, err = dialer.DialContext(ctx, "tcp", e.config.Broker)
if err != nil {
return fmt.Errorf("create connection: %w", err)
Expand All @@ -117,10 +130,10 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {

// Extract checks if the extractor is ready to extract
// if so, then extracts metadata from the kafka broker
func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
defer e.conn.Close()

partitions, err := e.conn.ReadPartitions()
partitions, err := e.readPartitions(ctx)
if err != nil {
return fmt.Errorf("fetch partitions: %w", err)
}
Expand Down Expand Up @@ -202,6 +215,31 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.
}, nil
}

func (e *Extractor) readPartitions(ctx context.Context) (partitions []kafka.Partition, err error) {
defer func(start time.Time) {
attributes := []attribute.KeyValue{
attribute.String("kafka.broker", e.config.Broker),
attribute.Bool("success", err == nil),
}
if err != nil {
errorCode := "UNKNOWN"
var kErr kafka.Error
if errors.As(err, &kErr) {
errorCode = strings.ReplaceAll(
strings.ToUpper(kErr.Title()), " ", "_",
)
}
attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
}

e.clientDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
)
}(time.Now())

return e.conn.ReadPartitions()
}

func init() {
if err := registry.Extractors.Register("kafka", func() plugins.Extractor {
return New(plugins.GetLog())
Expand Down

0 comments on commit 1f39c3a

Please sign in to comment.