Skip to content

Commit

Permalink
Merge pull request #4742 from twz123/telemetry-cleanup
Browse files Browse the repository at this point in the history
Telemetry component cleanup
  • Loading branch information
twz123 authored Jul 23, 2024
2 parents 65ec138 + 2def786 commit dcb64c5
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 107 deletions.
18 changes: 12 additions & 6 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ func (c *command) start(ctx context.Context) error {
return err
}
logrus.Infof("DNS address: %s", dnsAddress)

var storageBackend manager.Component
storageType := nodeConfig.Spec.Storage.Type

switch nodeConfig.Spec.Storage.Type {
switch storageType {
case v1beta1.KineStorageType:
storageBackend = &controller.Kine{
Config: nodeConfig.Spec.Storage.Kine,
Expand Down Expand Up @@ -545,11 +547,15 @@ func (c *command) start(ctx context.Context) error {
})
}

clusterComponents.Add(ctx, &telemetry.Component{
Version: build.Version,
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
})
if telemetry.IsEnabled() {
clusterComponents.Add(ctx, &telemetry.Component{
K0sVars: c.K0sVars,
StorageType: storageType,
KubeClientFactory: adminClientFactory,
})
} else {
logrus.Info("Telemetry is disabled")
}

clusterComponents.Add(ctx, &controller.Autopilot{
K0sVars: c.K0sVars,
Expand Down
129 changes: 64 additions & 65 deletions pkg/telemetry/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@ package telemetry

import (
"context"
"sync"
"time"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"

kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/segmentio/analytics-go"
"github.com/sirupsen/logrus"
)

// Component is a telemetry component for k0s component manager
type Component struct {
clusterConfig *v1beta1.ClusterConfig
K0sVars *config.CfgVars
Version string
StorageType string
KubeClientFactory kubeutil.ClientFactoryInterface

kubernetesClient kubernetes.Interface
analyticsClient analyticsClient
log *logrus.Entry

log *logrus.Entry
stopCh chan struct{}
mu sync.Mutex
stop func()
}

var _ manager.Component = (*Component)(nil)
Expand All @@ -50,82 +51,80 @@ var _ manager.Reconciler = (*Component)(nil)
var interval = time.Minute * 10

// Init set up for external service clients (segment, k8s api)
func (c *Component) Init(_ context.Context) error {
func (c *Component) Init(context.Context) error {
c.log = logrus.WithField("component", "telemetry")

if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil
}

c.analyticsClient = newSegmentClient(segmentToken)
c.log.Info("segment client has been init")
return nil
}

func (c *Component) retrieveKubeClient(ch chan struct{}) {
client, err := c.KubeClientFactory.GetClient()
if err != nil {
c.log.WithError(err).Warning("can't init kube client")
return
}
c.kubernetesClient = client
close(ch)
}

// Run runs work cycle
func (c *Component) Start(_ context.Context) error {
func (c *Component) Start(context.Context) error {
return nil
}

// Run does nothing
func (c *Component) Stop() error {
if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil
}
if c.stopCh != nil {
close(c.stopCh)
}
if c.analyticsClient != nil {
_ = c.analyticsClient.Close()
c.mu.Lock()
defer c.mu.Unlock()

if c.stop != nil {
c.stop()
c.stop = nil
}

return nil
}

// Reconcile detects changes in configuration and applies them to the component
func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error {
logrus.Debug("reconcile method called for: Telemetry")
func (c *Component) Reconcile(_ context.Context, clusterCfg *v1beta1.ClusterConfig) error {
c.mu.Lock()
defer c.mu.Unlock()

if !clusterCfg.Spec.Telemetry.IsEnabled() {
return c.Stop()
}
if c.stopCh != nil {
// We must have the worker stuff already running, do nothing
if c.stop == nil {
c.log.Debug("Telemetry remains disabled")
} else {
c.stop()
c.stop = nil
}

return nil
}
if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil

if c.stop != nil {
return nil // already running
}

clients, err := c.KubeClientFactory.GetClient()
if err != nil {
return err
}
c.clusterConfig = clusterCfg
initedCh := make(chan struct{})
wait.Until(func() {
c.retrieveKubeClient(initedCh)
}, time.Second, initedCh)
go c.run(ctx)

c.stop = c.start(clients)

return nil
}

func (c Component) run(ctx context.Context) {
c.stopCh = make(chan struct{})
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.sendTelemetry(ctx)
case <-c.stopCh:
return
func (c *Component) start(clients kubernetes.Interface) (stop func()) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})

go func() {
defer close(done)
c.log.Info("Starting to collect telemetry")
c.run(ctx, clients)
c.log.Info("Stopped to collect telemetry")
}()

return func() { cancel(); <-done }
}

func (c *Component) run(ctx context.Context, clients kubernetes.Interface) {
analyticsClient := analytics.New(segmentToken)
defer func() {
if err := analyticsClient.Close(); err != nil {
c.log.WithError(err).Debug("Failed to close analytics client")
}
}
}()

wait.UntilWithContext(ctx, func(ctx context.Context) {
c.sendTelemetry(ctx, analyticsClient, clients)
}, interval)
}
13 changes: 4 additions & 9 deletions pkg/telemetry/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,14 @@ var segmentToken = ""

const heartbeatEvent = "cluster-heartbeat"

// Analytics is the interface used for our analytics client.
type analyticsClient interface {
Enqueue(msg analytics.Message) error
Close() error
func IsEnabled() bool {
return segmentToken != ""
}

func NewDefaultSegmentClient() analyticsClient {
if segmentToken == "" {
func NewDefaultSegmentClient() analytics.Client {
if !IsEnabled() {
return nil
}
return newSegmentClient(segmentToken)
}

func newSegmentClient(segmentToken string) analyticsClient {
return analytics.New(segmentToken)
}
48 changes: 21 additions & 27 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"runtime"

"github.com/segmentio/analytics-go"
"github.com/k0sproject/k0s/pkg/build"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/segmentio/analytics-go"
)

type telemetryData struct {
Expand Down Expand Up @@ -59,17 +61,17 @@ func (td telemetryData) asProperties() analytics.Properties {
}
}

func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) {
func (c *Component) collectTelemetry(ctx context.Context, clients kubernetes.Interface) (telemetryData, error) {
var err error
data := telemetryData{}

data.StorageType = c.getStorageType()
data.ClusterID, err = c.getClusterID(ctx)
data.StorageType = c.StorageType
data.ClusterID, err = getClusterID(ctx, clients)

if err != nil {
return data, fmt.Errorf("can't collect cluster ID: %w", err)
}
wds, sums, err := c.getWorkerData(ctx)
wds, sums, err := getWorkerData(ctx, clients)
if err != nil {
return data, fmt.Errorf("can't collect workers count: %w", err)
}
Expand All @@ -78,23 +80,15 @@ func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error)
data.WorkerData = wds
data.MEMTotal = sums.memTotal
data.CPUTotal = sums.cpuTotal
data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, c.kubernetesClient)
data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, clients)
if err != nil {
return data, fmt.Errorf("can't collect control plane nodes count: %w", err)
}
return data, nil
}

func (c Component) getStorageType() string {
switch c.clusterConfig.Spec.Storage.Type {
case v1beta1.EtcdStorageType, v1beta1.KineStorageType:
return c.clusterConfig.Spec.Storage.Type
}
return "unknown"
}

func (c Component) getClusterID(ctx context.Context) (string, error) {
ns, err := c.kubernetesClient.CoreV1().Namespaces().Get(ctx,
func getClusterID(ctx context.Context, clients kubernetes.Interface) (string, error) {
ns, err := clients.CoreV1().Namespaces().Get(ctx,
"kube-system",
metav1.GetOptions{})
if err != nil {
Expand All @@ -104,8 +98,8 @@ func (c Component) getClusterID(ctx context.Context) (string, error) {
return fmt.Sprintf("kube-system:%s", ns.UID), nil
}

func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, error) {
nodes, err := c.kubernetesClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
func getWorkerData(ctx context.Context, clients kubernetes.Interface) ([]workerData, workerSums, error) {
nodes, err := clients.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, workerSums{}, err
}
Expand All @@ -129,8 +123,8 @@ func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums,
return wds, workerSums{cpuTotal: cpuTotal, memTotal: memTotal}, nil
}

func (c Component) sendTelemetry(ctx context.Context) {
data, err := c.collectTelemetry(ctx)
func (c *Component) sendTelemetry(ctx context.Context, analyticsClient analytics.Client, clients kubernetes.Interface) {
data, err := c.collectTelemetry(ctx, clients)
if err != nil {
c.log.WithError(err).Warning("can't prepare telemetry data")
return
Expand All @@ -140,16 +134,16 @@ func (c Component) sendTelemetry(ctx context.Context) {
Extra: map[string]interface{}{"direct": true},
}

hostData.App.Version = c.Version
hostData.App.Version = build.Version
hostData.App.Name = "k0s"
hostData.App.Namespace = "k0s"
hostData.Extra["cpuArch"] = runtime.GOARCH

addSysInfo(&hostData)
c.addCustomData(ctx, &hostData)
addCustomData(ctx, &hostData, clients)

c.log.WithField("data", data).WithField("hostdata", hostData).Info("sending telemetry")
if err := c.analyticsClient.Enqueue(analytics.Track{
if err := analyticsClient.Enqueue(analytics.Track{
AnonymousId: "(removed)",
Event: heartbeatEvent,
Properties: data.asProperties(),
Expand All @@ -159,8 +153,8 @@ func (c Component) sendTelemetry(ctx context.Context) {
}
}

func (c Component) addCustomData(ctx context.Context, analyticCtx *analytics.Context) {
cm, err := c.kubernetesClient.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{})
func addCustomData(ctx context.Context, analyticCtx *analytics.Context, clients kubernetes.Interface) {
cm, err := clients.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{})
if err != nil {
return
}
Expand Down

0 comments on commit dcb64c5

Please sign in to comment.