-
Notifications
You must be signed in to change notification settings - Fork 0
/
registry.go
211 lines (168 loc) · 5.29 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package tell
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel/metric"
metricNoop "go.opentelemetry.io/otel/metric/noop"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
traceNoop "go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var ErrSetConnetion = errors.New("grpc connection not set")
const defaultShutdownTimeOut = 2 * time.Second
// Collector hold metric and trace informations.
type Collector struct {
Conn *grpc.ClientConn
// metrics
MeterProvider metric.MeterProvider
MeterProviderSDK *metricsdk.MeterProvider
MetricReader metricsdk.Reader
isMetricNoop bool
// traces
TracerProvider trace.TracerProvider
TracerProviderSDK *tracesdk.TracerProvider
isTraceNoop bool
// ShutdownTimeOut for closing providers, default 2 seconds.
ShutdownTimeOut time.Duration
isUp int64
registered []metric.Registration
logger Logger
}
func (c *Collector) IsMetricNoop() bool {
return c.isMetricNoop
}
func (c *Collector) IsTraceNoop() bool {
return c.isTraceNoop
}
func (c *Collector) setUpMetric() {
meter := c.MeterProvider.Meter("")
up, err := meter.Int64ObservableGauge("up", metric.WithDescription("application up status"))
if err != nil {
c.logger.Error("failed to set up gauge metric", "error", err.Error())
}
regUp, err := meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(up, c.isUp)
return nil
}, up)
if err != nil {
c.logger.Error("failed to register up gauge metric", "error", err.Error())
}
c.AddRegister(regUp)
}
// AddRegister adding metric.Registration for unregister in shutdown.
func (c *Collector) AddRegister(r metric.Registration) {
c.registered = append(c.registered, r)
}
// New generate collectors based on configuration.
func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (*Collector, error) {
if cfg.Collector == "" {
cfg.Collector = os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
}
logger := cfg.Logger
if logger == nil {
logger = adapterKV{Log: log.Logger}
}
c := &Collector{
logger: logger,
}
if cfg.Collector != "" {
c.logger.Info(fmt.Sprintf("opentelemetry collector endpoint: [%s]", cfg.Collector))
}
// check grpc need
if cfg.Collector != "" {
if cfg.TLS.Enabled {
tlsConfig, err := cfg.TLS.Generate()
if err != nil {
return nil, fmt.Errorf("failed to generate tls config; %w", err)
}
opts = append([]grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
}, opts...)
}
if cfg.ServerName != "" {
opts = append([]grpc.DialOption{grpc.WithAuthority(cfg.ServerName)}, opts...)
}
if err := c.ConnectGRPC(ctx, cfg.Collector, opts...); err != nil {
return nil, err
}
c.logger.Info("connected to grpc opentelemetry collector")
}
// metric
if cfg.Collector != "" && !cfg.Metric.Disable {
if err := c.MetricProvider(ctx, cfg.Metric.Provider); err != nil {
return nil, fmt.Errorf("failed initialize metric provider; %w", err)
}
c.logger.Info("started metric provider for [otel]")
// add enabled metrics
if cfg.Metric.Default.GoRuntime {
if err := runtime.Start(); err != nil {
return nil, fmt.Errorf("failed to start runtime metrics; %w", err)
}
c.logger.Info("started runtime metrics")
}
} else {
c.MeterProvider = metricNoop.NewMeterProvider()
c.isMetricNoop = true
c.logger.Info("started metric provider for [noop]")
}
c.SetMetricProviderGlobal()
// trace
if cfg.Collector != "" && !cfg.Trace.Disable {
if err := c.TraceProvider(ctx, cfg.Trace.Provider); err != nil {
return nil, fmt.Errorf("failed initialize metric provider; %w", err)
}
c.logger.Info("started trace provider for [otel]")
} else {
c.TracerProvider = traceNoop.NewTracerProvider()
c.isTraceNoop = true
c.logger.Info("started trace provider for [noop]")
}
c.SetTraceProviderGlobal()
// everything is works fine, send up information
c.isUp = 1
c.setUpMetric()
return c, nil
}
// Shutdown to flush and shutdown providers and close grpc connection.
// Providers will not export metrics after shutdown.
func (c *Collector) Shutdown() (err error) {
c.isUp = 0
// set the default context timeout
if c.ShutdownTimeOut == 0 {
c.ShutdownTimeOut = defaultShutdownTimeOut
}
defer func() {
if c.Conn != nil {
if errClose := c.Conn.Close(); errClose != nil {
err = fmt.Errorf("failed to close connection; %v; %w", errClose, err)
}
}
}()
ctxMetric, cancelCtxMetric := context.WithTimeout(context.Background(), c.ShutdownTimeOut)
defer cancelCtxMetric()
if c.MeterProviderSDK != nil {
if errShutdown := c.MeterProviderSDK.Shutdown(ctxMetric); errShutdown != nil {
err = fmt.Errorf("failed to shutdown meter provider; %w; %v", errShutdown, err)
}
}
ctxTrace, cancelCtxTrace := context.WithTimeout(context.Background(), c.ShutdownTimeOut)
defer cancelCtxTrace()
if c.TracerProviderSDK != nil {
if errShutdown := c.TracerProviderSDK.Shutdown(ctxTrace); errShutdown != nil {
err = fmt.Errorf("failed to shutdown trace provider; %w; %v", errShutdown, err)
}
}
// remove registiration
for _, r := range c.registered {
_ = r.Unregister()
}
return nil
}