diff --git a/plugins/extractors/bigtable/bigtable.go b/plugins/extractors/bigtable/bigtable.go index 7a5735990..314444153 100644 --- a/plugins/extractors/bigtable/bigtable.go +++ b/plugins/extractors/bigtable/bigtable.go @@ -110,6 +110,12 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { if err != nil { return err } + + client, err = WithInstancesAdminClientMW(client, e.config.ProjectID) + if err != nil { + return err + } + e.instanceNames, err = instanceInfoGetter(ctx, client) if err != nil { return err @@ -143,6 +149,12 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) error if err != nil { return err } + + adminClient, err = WithAdminClientMW(adminClient, e.config.ProjectID, instance) + if err != nil { + return err + } + tables, _ := adminClient.Tables(ctx) var wg sync.WaitGroup for _, table := range tables { diff --git a/plugins/extractors/bigtable/middleware.go b/plugins/extractors/bigtable/middleware.go new file mode 100644 index 000000000..6d283c8c6 --- /dev/null +++ b/plugins/extractors/bigtable/middleware.go @@ -0,0 +1,120 @@ +package bigtable + +import ( + "context" + "errors" + "time" + + "cloud.google.com/go/bigtable" + "github.com/googleapis/gax-go/v2/apierror" + "github.com/goto/meteor/utils" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type AdminClientMW struct { + tableDuration metric.Int64Histogram + tablesDuration metric.Int64Histogram + next AdminClient + attributes []attribute.KeyValue +} + +type InstancesAdminClientMW struct { + instancesDuration metric.Int64Histogram + next InstanceAdminClient + attributes []attribute.KeyValue +} + +func WithAdminClientMW(next AdminClient, projectID, instanceName string) (AdminClient, error) { + meter := otel.Meter("") + + tablesDuration, err := meter.Int64Histogram("meteor.bigtable.client.tables.duration", metric.WithUnit("ms")) + if err != nil { + return nil, err + } + + tableDuration, err := meter.Int64Histogram("meteor.bigtable.client.table.duration", metric.WithUnit("ms")) + if err != nil { + return nil, err + } + + return &AdminClientMW{ + tableDuration: tableDuration, + tablesDuration: tablesDuration, + next: next, + attributes: []attribute.KeyValue{ + attribute.String("bt.project_id", projectID), + attribute.String("bt.instance_name", instanceName), + }, + }, nil +} + +func WithInstancesAdminClientMW(next InstanceAdminClient, projectID string) (InstanceAdminClient, error) { + meter := otel.Meter("") + + instancesDuration, err := meter.Int64Histogram("meteor.bigtable.client.instances.duration", metric.WithUnit("ms")) + if err != nil { + return nil, err + } + + return &InstancesAdminClientMW{ + instancesDuration: instancesDuration, + next: next, + attributes: []attribute.KeyValue{ + attribute.String("bt.project_id", projectID), + }, + }, nil +} + +func (o *AdminClientMW) Tables(ctx context.Context) (res []string, err error) { + defer func(start time.Time) { + attrs := o.attributes + if err != nil { + attrs = append(attrs, attribute.String("bt.error_code", getAPIErrReason(err))) + } + o.tablesDuration.Record(ctx, + time.Since(start).Milliseconds(), + metric.WithAttributes(attrs...)) + }(time.Now()) + + return o.next.Tables(ctx) +} + +func (o *AdminClientMW) TableInfo(ctx context.Context, table string) (res *bigtable.TableInfo, err error) { + defer func(start time.Time) { + attrs := append(o.attributes, attribute.String("bt.table_name", table)) + if err != nil { + attrs = append(attrs, attribute.String("bt.error_code", getAPIErrReason(err))) + } + o.tableDuration.Record(ctx, + time.Since(start).Milliseconds(), + metric.WithAttributes(attrs...)) + }(time.Now()) + return o.next.TableInfo(ctx, table) +} + +func (o *InstancesAdminClientMW) Instances(ctx context.Context) (res []*bigtable.InstanceInfo, err error) { + defer func(start time.Time) { + attrs := o.attributes + if err != nil { + attrs = append(o.attributes, attribute.String("bt.error_code", getAPIErrReason(err))) + } + + o.instancesDuration.Record(ctx, + time.Since(start).Milliseconds(), + metric.WithAttributes(attrs...)) + }(time.Now()) + + return o.next.Instances(ctx) +} + +func getAPIErrReason(err error) string { + reason := utils.StatusCode(err).String() + var apiErr *apierror.APIError + if errors.As(err, &apiErr) { + reason = apiErr.Reason() + } + + return reason +}