Skip to content

Commit

Permalink
feat: add open-telemetry metrics to http clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Shivaprasad committed Jul 28, 2023
1 parent 1f39c3a commit 31804ba
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 9 deletions.
5 changes: 4 additions & 1 deletion plugins/extractors/grafana/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -60,7 +61,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
}

// build client
e.client = NewClient(&http.Client{}, e.config)
e.client = NewClient(&http.Client{
Transport: telemetry.NewHTTPTransport(nil),
}, e.config)

return nil
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/extractors/http/http_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/MakeNowJust/heredoc"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
)
Expand Down Expand Up @@ -109,7 +110,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return err
}

e.executeRequest = makeRequestExecutor(e.config.SuccessCodes, &http.Client{})
e.executeRequest = makeRequestExecutor(e.config.SuccessCodes, &http.Client{
Transport: telemetry.NewHTTPTransport(nil),
})
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/extractors/merlin/internal/merlin/merlin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"regexp"
"time"

"github.com/goto/meteor/plugins/internal/telemetry"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
)
Expand All @@ -32,6 +33,7 @@ func NewClient(ctx context.Context, params ClientParams) (Client, error) {
if err != nil {
return Client{}, fmt.Errorf("new Merlin client: %w", err)
}
httpClient.Transport = telemetry.NewHTTPTransport(httpClient.Transport)

urlb, err := NewURLBuilderSource(params.BaseURL)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions plugins/extractors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"net/url"
"strings"

// used to register the postgres driver

"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
Expand Down
4 changes: 3 additions & 1 deletion plugins/extractors/redash/redash.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down Expand Up @@ -67,7 +68,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
}

e.client = &http.Client{
Timeout: 4 * time.Second,
Timeout: 4 * time.Second,
Transport: telemetry.NewHTTPTransport(nil),
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion plugins/extractors/superset/superset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -71,7 +72,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return err
}
e.client = &http.Client{
Timeout: 4 * time.Second,
Timeout: 4 * time.Second,
Transport: telemetry.NewHTTPTransport(nil),
}
// get access token for further api calls in superset
var err error
Expand Down
2 changes: 2 additions & 0 deletions plugins/extractors/tableau/tableau.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Option func(*Extractor)
// WithHTTPClient assign custom http client to the Extractor constructor
func WithHTTPClient(hcl *http.Client) Option {
return func(e *Extractor) {
hcl.Transport = telemetry.NewHTTPTransport(hcl.Transport)
e.httpClient = hcl
}
}
Expand Down
121 changes: 121 additions & 0 deletions plugins/internal/telemetry/http_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package telemetry

import (
"io"
"net/http"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// Refer OpenTelemetry Semantic Conventions for HTTP Client.
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-metrics.md#http-client
const (
metricClientDuration = "http.client.duration"
metricClientRequestSize = "http.client.request.size"
metricClientResponseSize = "http.client.response.size"

attributeServerPort = "server.port"
attributeServerAddress = "server.address"
attributeRequestMethod = "http.request.method"
attributeResponseStatusCode = "http.response.status_code"
)

func NewHTTPTransport(baseTransport http.RoundTripper) http.RoundTripper {
if _, ok := baseTransport.(*httpTransport); ok {
return baseTransport
}

if baseTransport == nil {
baseTransport = http.DefaultTransport
}

icl := &httpTransport{roundTripper: baseTransport}
icl.createMeasures(otel.Meter("github.com/goto/meteor/plugins/internal/telemetry"))

return icl
}

type httpTransport struct {
roundTripper http.RoundTripper

counters map[string]metric.Int64Counter
valueRecorders map[string]metric.Float64Histogram
}

func (tr *httpTransport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
startAt := time.Now()

var bw bodyWrapper
if req.Body != nil && req.Body != http.NoBody {
bw.ReadCloser = req.Body
req.Body = &bw
}

attribs := metric.WithAttributes(
attribute.String(attributeRequestMethod, req.Method),
attribute.String(attributeServerAddress, req.URL.Hostname()),
attribute.String(attributeServerPort, req.URL.Port()),
)

tr.counters[metricClientRequestSize].Add(ctx, bw.read.Load(), attribs)

resp, err := tr.roundTripper.RoundTrip(req)
elapsedTime := float64(time.Since(startAt)) / float64(time.Millisecond)
tr.valueRecorders[metricClientDuration].Record(ctx, elapsedTime, attribs)

if err == nil {
tr.counters[metricClientResponseSize].Add(ctx, resp.ContentLength, attribs,
metric.WithAttributes(attribute.Int(attributeResponseStatusCode, resp.StatusCode)))
}

return resp, err
}

func (tr *httpTransport) createMeasures(meter metric.Meter) {
tr.counters = make(map[string]metric.Int64Counter)
tr.valueRecorders = make(map[string]metric.Float64Histogram)

requestBytesCounter, err := meter.Int64Counter(metricClientRequestSize)
handleErr(err)

responseBytesCounter, err := meter.Int64Counter(metricClientResponseSize)
handleErr(err)

clientLatencyMeasure, err := meter.Float64Histogram(metricClientDuration)
handleErr(err)

tr.counters[metricClientRequestSize] = requestBytesCounter
tr.counters[metricClientResponseSize] = responseBytesCounter
tr.valueRecorders[metricClientDuration] = clientLatencyMeasure
}

func handleErr(err error) {
if err != nil {
otel.Handle(err)
}
}

// bodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number
// of bytes read and the last error.
type bodyWrapper struct {
io.ReadCloser

read atomic.Int64
err error
}

func (w *bodyWrapper) Read(b []byte) (int, error) {
n, err := w.ReadCloser.Read(b)
w.read.Add(int64(n))
w.err = err
return n, err
}

func (w *bodyWrapper) Close() error {
return w.ReadCloser.Close()
}
17 changes: 17 additions & 0 deletions plugins/internal/telemetry/http_transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package telemetry_test

import (
"testing"

"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/stretchr/testify/assert"
)

func TestNewHTTPTransport(t *testing.T) {
tr := telemetry.NewHTTPTransport(nil)
assert.NotNil(t, tr)
}

func TestHTTPTransport_RoundTrip(t *testing.T) {

Check failure on line 15 in plugins/internal/telemetry/http_transport_test.go

View workflow job for this annotation

GitHub Actions / golangci

unused-parameter: parameter 't' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 16 in plugins/internal/telemetry/http_transport_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gofumpt`-ed with `-extra` (gofumpt)
}
5 changes: 4 additions & 1 deletion plugins/sinks/compass/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down Expand Up @@ -292,7 +293,9 @@ func (*Sink) getLabelValueFromProperties(field1, field2 string, asset *v1beta2.A

func init() {
if err := registry.Sinks.Register("compass", func() plugins.Syncer {
return New(&http.Client{}, plugins.GetLog())
return New(&http.Client{
Transport: telemetry.NewHTTPTransport(nil),
}, plugins.GetLog())
}); err != nil {
panic(err)
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/sinks/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/MakeNowJust/heredoc"
"github.com/goto/meteor/models"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
)
Expand Down Expand Up @@ -128,7 +129,9 @@ func (s *Sink) send(ctx context.Context, payloadBytes []byte) error {

func init() {
if err := registry.Sinks.Register("http", func() plugins.Syncer {
return New(&http.Client{}, plugins.GetLog())
return New(&http.Client{
Transport: telemetry.NewHTTPTransport(nil),
}, plugins.GetLog())
}); err != nil {
panic(err)
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/sinks/stencil/stencil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/internal/telemetry"
"github.com/goto/meteor/registry"
"github.com/goto/salt/log"
)
Expand Down Expand Up @@ -314,7 +315,9 @@ func typeToAvroSchemaType(service, columnType string) AvroType {
// init register the sink to the catalog
func init() {
if err := registry.Sinks.Register("stencil", func() plugins.Syncer {
return New(&http.Client{}, plugins.GetLog())
return New(&http.Client{
Transport: telemetry.NewHTTPTransport(nil),
}, plugins.GetLog())
}); err != nil {
panic(err)
}
Expand Down

0 comments on commit 31804ba

Please sign in to comment.