Skip to content

Commit

Permalink
Introduce Config Provider as an option to set the initial configurati…
Browse files Browse the repository at this point in the history
…on and update it. (open-telemetry#1010)

* Add config.Provider to load/unload probes

* Added comments, improve log message and unit test

* update test file

* changleog entry

* rename config fields to be traces specific

* Hadle potential races between applying config and closing

* clarigy shutdown function

---------

Co-authored-by: Eden Federman <[email protected]>
  • Loading branch information
2 people authored and wilsonxscai committed Sep 9, 2024
1 parent d08c146 commit 5a3cb70
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
- Support `golang.org/x/net` `v0.28.0`. ([#988](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/988))
- Support `google.golang.org/grpc` `1.67.0-dev`. ([#1007](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1007))
- Support Go `1.23.0`. ([#1007](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1007))
- Introduce `config.Provider` as an option to set the initial configuration and update it in runtime. ([#1010](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1010))

### Fixed

Expand Down
74 changes: 74 additions & 0 deletions config/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package config

import (
"context"

"go.opentelemetry.io/otel/trace"
)

// InstrumentationLibraryID is used to identify an instrumentation library.
type InstrumentationLibraryID struct {
// Name of the instrumentation pkg (e.g. "net/http").
InstrumentedPkg string
// SpanKind is the relevant span kind for the instrumentation.
// This can be used to configure server-only, client-only spans.
// If not set, the identifier is assumed to be applicable to all span kinds relevant to the instrumented package.
SpanKind trace.SpanKind
}

// InstrumentationLibrary is used to configure instrumentation for a specific library.
type InstrumentationLibrary struct {
// TracesEnabled determines whether traces are enabled for the instrumentation library.
// if nil - take DefaultTracesDisabled value.
TracesEnabled *bool
}

// InstrumentationConfig is used to configure instrumentation.
type InstrumentationConfig struct {
// InstrumentationLibraryConfigs defines library-specific configuration.
// If a package is referenced by more than one key, the most specific key is used.
// For example, if ("net/http", unspecified) and ("net/http", client) are both present,
// the configuration for ("net/http", client) is used for client spans and the configuration for ("net/http", unspecified) is used for server spans.
InstrumentationLibraryConfigs map[InstrumentationLibraryID]InstrumentationLibrary

// DefaultTracesDisabled determines whether traces are disabled by default.
// If set to true, traces are disabled by default for all libraries, unless the library is explicitly enabled.
// If set to false, traces are enabled by default for all libraries, unless the library is explicitly disabled.
// default is false - traces are enabled by default.
DefaultTracesDisabled bool
}

// Provider provides the initial configuration and updates to the instrumentation configuration.
type Provider interface {
// InitialConfig returns the initial instrumentation configuration.
InitialConfig(ctx context.Context) InstrumentationConfig
// Watch returns a channel that receives updates to the instrumentation configuration.
Watch() <-chan InstrumentationConfig
// Shutdown releases any resources held by the provider.
// It is an error to send updates after Shutdown is called.
Shutdown(ctx context.Context) error
}

type noopProvider struct{}

// NewNoopProvider returns a provider that does not provide any updates and provide the default configuration as the initial one.
func NewNoopProvider() Provider {
return &noopProvider{}
}

func (p *noopProvider) InitialConfig(_ context.Context) InstrumentationConfig {
return InstrumentationConfig{}
}

func (p *noopProvider) Watch() <-chan InstrumentationConfig {
c := make(chan InstrumentationConfig)
close(c)
return c
}

func (p *noopProvider) Shutdown(_ context.Context) error {
return nil
}
25 changes: 21 additions & 4 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

"go.opentelemetry.io/auto/config"
"go.opentelemetry.io/auto/internal/pkg/instrumentation"
"go.opentelemetry.io/auto/internal/pkg/opentelemetry"
"go.opentelemetry.io/auto/internal/pkg/process"
Expand Down Expand Up @@ -74,11 +75,11 @@ func newLogger(logLevel LogLevel) logr.Logger {
level, _ = zap.ParseAtomicLevel(LogLevelInfo.String())
}

config := zap.NewProductionConfig()
c := zap.NewProductionConfig()

config.Level.SetLevel(level.Level())
c.Level.SetLevel(level.Level())

zapLog, err := config.Build()
zapLog, err := c.Build()

var logger logr.Logger
if err != nil {
Expand Down Expand Up @@ -130,7 +131,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
return nil, err
}

mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator)
mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator, c.cp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,6 +222,7 @@ type instConfig struct {
globalImpl bool
loadIndicator chan struct{}
logLevel LogLevel
cp config.Provider
}

func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfig, error) {
Expand Down Expand Up @@ -255,6 +257,10 @@ func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfi
c.logLevel = LogLevelInfo
}

if c.cp == nil {
c.cp = config.NewNoopProvider()
}

return c, err
}

Expand Down Expand Up @@ -562,3 +568,14 @@ func WithLogLevel(level LogLevel) InstrumentationOption {
return c, nil
})
}

// WithConfigProvider returns an [InstrumentationOption] that will configure
// an [Instrumentation] to use the provided config.Provider. The config.Provider
// is used to provide the initial configuration and update the configuration of
// the instrumentation in runtime.
func WithConfigProvider(cp config.Provider) InstrumentationOption {
return fnOpt(func(_ context.Context, c instConfig) (instConfig, error) {
c.cp = cp
return c, nil
})
}
166 changes: 149 additions & 17 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/cilium/ebpf/rlimit"
"github.com/go-logr/logr"

"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/auto/config"
dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql"
kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer"
kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer"
Expand All @@ -35,24 +38,41 @@ var (
bpffsCleanup = bpffs.Cleanup
)

type managerState int

const (
managerStateRunning managerState = iota
managerStateStopped
)

// Manager handles the management of [probe.Probe] instances.
type Manager struct {
logger logr.Logger
probes map[probe.ID]probe.Probe
otelController *opentelemetry.Controller
globalImpl bool
loadedIndicator chan struct{}
cp config.Provider
exe *link.Executable
td *process.TargetDetails
runningProbesWG sync.WaitGroup
eventCh chan *probe.Event
currentConfig config.InstrumentationConfig
probeMu sync.Mutex
state managerState
}

// NewManager returns a new [Manager].
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}) (*Manager, error) {
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}, cp config.Provider) (*Manager, error) {
logger = logger.WithName("Manager")
m := &Manager{
logger: logger,
probes: make(map[probe.ID]probe.Probe),
otelController: otelController,
globalImpl: globalImpl,
loadedIndicator: loadIndicator,
cp: cp,
eventCh: make(chan *probe.Event),
}

err := m.registerProbes()
Expand Down Expand Up @@ -133,43 +153,147 @@ func (m *Manager) FilterUnusedProbes(target *process.TargetDetails) {
}
}

func getProbeConfig(id probe.ID, c config.InstrumentationConfig) (config.InstrumentationLibrary, bool) {
libKindID := config.InstrumentationLibraryID{
InstrumentedPkg: id.InstrumentedPkg,
SpanKind: id.SpanKind,
}

if lib, ok := c.InstrumentationLibraryConfigs[libKindID]; ok {
return lib, true
}

libID := config.InstrumentationLibraryID{
InstrumentedPkg: id.InstrumentedPkg,
SpanKind: trace.SpanKindUnspecified,
}

if lib, ok := c.InstrumentationLibraryConfigs[libID]; ok {
return lib, true
}

return config.InstrumentationLibrary{}, false
}

func isProbeEnabled(id probe.ID, c config.InstrumentationConfig) bool {
if pc, ok := getProbeConfig(id, c); ok && pc.TracesEnabled != nil {
return *pc.TracesEnabled
}
return !c.DefaultTracesDisabled
}

func (m *Manager) applyConfig(c config.InstrumentationConfig) error {
if m.td == nil {
return errors.New("failed to apply config: target details not set")
}
if m.exe == nil {
return errors.New("failed to apply config: executable not set")
}

var err error
m.probeMu.Lock()
defer m.probeMu.Unlock()

if m.state != managerStateRunning {
return nil
}

for id, p := range m.probes {
currentlyEnabled := isProbeEnabled(id, m.currentConfig)
newEnabled := isProbeEnabled(id, c)

if currentlyEnabled && !newEnabled {
m.logger.Info("Disabling probe", "id", id)
err = errors.Join(err, p.Close())
continue
}

if !currentlyEnabled && newEnabled {
m.logger.Info("Enabling probe", "id", id)
err = errors.Join(err, p.Load(m.exe, m.td))
if err == nil {
m.runProbe(p)
}
continue
}
}

return nil
}

func (m *Manager) runProbe(p probe.Probe) {
m.runningProbesWG.Add(1)
go func(ap probe.Probe) {
defer m.runningProbesWG.Done()
ap.Run(m.eventCh)
}(p)
}

func (m *Manager) ConfigLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case c, ok := <-m.cp.Watch():
if !ok {
m.logger.Info("Configuration provider closed, configuration updates will no longer be received")
return
}
err := m.applyConfig(c)
if err != nil {
m.logger.Error(err, "Failed to apply config")
continue
}
m.currentConfig = c
}
}
}

// Run runs the event processing loop for all managed probes.
func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error {
if len(m.probes) == 0 {
return errors.New("no instrumentation for target process")
}
if m.cp == nil {
return errors.New("no config provider set")
}

m.currentConfig = m.cp.InitialConfig(ctx)

err := m.load(target)
if err != nil {
return err
}

eventCh := make(chan *probe.Event)
var wg sync.WaitGroup
for _, i := range m.probes {
wg.Add(1)
go func(p probe.Probe) {
defer wg.Done()
p.Run(eventCh)
}(i)
for id, p := range m.probes {
if isProbeEnabled(id, m.currentConfig) {
m.runProbe(p)
}
}

if m.loadedIndicator != nil {
close(m.loadedIndicator)
}
m.state = managerStateRunning

go m.ConfigLoop(ctx)

for {
select {
case <-ctx.Done():
m.probeMu.Lock()

m.logger.V(1).Info("Shutting down all probes")
err := m.cleanup(target)

// Wait for all probes to stop before closing the chan they send on.
wg.Wait()
close(eventCh)
m.runningProbesWG.Wait()
close(m.eventCh)

m.state = managerStateStopped
m.probeMu.Unlock()
return errors.Join(err, ctx.Err())
case e := <-eventCh:
case e := <-m.eventCh:
m.otelController.Trace(e)
}
}
Expand All @@ -185,18 +309,25 @@ func (m *Manager) load(target *process.TargetDetails) error {
if err != nil {
return err
}
m.exe = exe

if m.td == nil {
m.td = target
}

if err := m.mount(target); err != nil {
return err
}

// Load probes
for name, i := range m.probes {
m.logger.V(0).Info("loading probe", "name", name)
err := i.Load(exe, target)
if err != nil {
m.logger.Error(err, "error while loading probes, cleaning up", "name", name)
return errors.Join(err, m.cleanup(target))
if isProbeEnabled(name, m.currentConfig) {
m.logger.V(0).Info("loading probe", "name", name)
err := i.Load(exe, target)
if err != nil {
m.logger.Error(err, "error while loading probes, cleaning up", "name", name)
return errors.Join(err, m.cleanup(target))
}
}
}

Expand All @@ -215,6 +346,7 @@ func (m *Manager) mount(target *process.TargetDetails) error {

func (m *Manager) cleanup(target *process.TargetDetails) error {
var err error
err = errors.Join(err, m.cp.Shutdown(context.Background()))
for _, i := range m.probes {
err = errors.Join(err, i.Close())
}
Expand Down
Loading

0 comments on commit 5a3cb70

Please sign in to comment.