Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Don't add k8s attributes when source/destination IP matches agent IP #260

Merged
merged 8 commits into from
Oct 3, 2023
6 changes: 4 additions & 2 deletions handlers/libhoney_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

// libhoneyEventHandler is an event handler that sends events using libhoney
type libhoneyEventHandler struct {
config config.Config
k8sClient *utils.CachedK8sClient
eventsChan chan assemblers.HttpEvent
}
Expand All @@ -25,6 +26,7 @@ type libhoneyEventHandler struct {
func NewLibhoneyEventHandler(config config.Config, k8sClient *utils.CachedK8sClient, eventsChan chan assemblers.HttpEvent, version string) EventHandler {
initLibhoney(config, version)
return &libhoneyEventHandler{
config: config,
k8sClient: k8sClient,
eventsChan: eventsChan,
}
Expand Down Expand Up @@ -157,8 +159,8 @@ func (handler *libhoneyEventHandler) handleEvent(event assemblers.HttpEvent) {
ev.AddField("http.response.missing", "no response on this event")
}

ev.Add(utils.GetK8sAttrsForIp(handler.k8sClient, event.SrcIp, "source"))
ev.Add(utils.GetK8sAttrsForIp(handler.k8sClient, event.DstIp, "destination"))
ev.Add(handler.k8sClient.GetK8sAttrsForSourceIP(handler.config.AgentPodIP, event.SrcIp))
ev.Add(handler.k8sClient.GetK8sAttrsForDestinationIP(handler.config.AgentPodIP, event.DstIp))

log.Debug().
Str("stream_ident", event.StreamIdent).
Expand Down
66 changes: 66 additions & 0 deletions utils/cached_k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package utils

import (
"context"
"fmt"
"strings"
"time"

"github.com/rs/zerolog/log"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -99,3 +102,66 @@ func (c *CachedK8sClient) GetNodeForPod(pod *v1.Pod) *v1.Node {
}
return val[0].(*v1.Node)
}

// GetK8sAttrsForSourceIP returns a map of kubernetes metadata attributes for
// a given IP address. Attribute names will be prefixed with "source.".
func (c *CachedK8sClient) GetK8sAttrsForSourceIP(agentIP string, ip string) map[string]any {
return c.getK8sAttrsForIp(agentIP, ip, "source")
}

// GetK8sAttrsForDestinationIP returns a map of kubernetes metadata attributes for
// a given IP address. Attribute names will be prefixed with "destination.".
func (c *CachedK8sClient) GetK8sAttrsForDestinationIP(agentIP string, ip string) map[string]any {
return c.getK8sAttrsForIp(agentIP, ip, "destination")
}

// getK8sAttrsForIp returns a map of kubernetes metadata attributes for a given IP address.
//
// Provide a prefix to prepend to the attribute names, example: "source" or "destination".
//
// If the IP address is not found in the kubernetes cache, an empty map is returned.
func (client *CachedK8sClient) getK8sAttrsForIp(agentIP string, ip string, prefix string) map[string]any {
k8sAttrs := map[string]any{}

if ip == "" {
return k8sAttrs
}

// Try add k8s attributes for source and destination when they are not the agent pod IP.
// Because we use hostnetwork in deployments, the agent pod IP and node IP are the same and we
// can't distinguish between the two, or any other pods that is also running with hostnetwork.
if ip == agentIP {
return k8sAttrs
}

if prefix != "" {
prefix = fmt.Sprintf("%s.", prefix)
}

if pod := client.GetPodByIPAddr(ip); pod != nil {
k8sAttrs[prefix+string(semconv.K8SPodNameKey)] = pod.Name
k8sAttrs[prefix+string(semconv.K8SPodUIDKey)] = pod.UID
k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = pod.Namespace

if len(pod.Spec.Containers) > 0 {
var containerNames []string
for _, container := range pod.Spec.Containers {
containerNames = append(containerNames, container.Name)
}
k8sAttrs[prefix+string(semconv.K8SContainerNameKey)] = strings.Join(containerNames, ",")
}

if node := client.GetNodeForPod(pod); node != nil {
k8sAttrs[prefix+string(semconv.K8SNodeNameKey)] = node.Name
k8sAttrs[prefix+string(semconv.K8SNodeUIDKey)] = node.UID
}

if service := client.GetServiceForPod(pod); service != nil {
// no semconv for service yet
k8sAttrs[prefix+"k8s.service.name"] = service.Name
k8sAttrs[prefix+"k8s.service.uid"] = service.UID
}
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
}

return k8sAttrs
}
96 changes: 96 additions & 0 deletions utils/cached_k8s_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package utils

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func Test_GetAttrs(t *testing.T) {
srcPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "src-pod",
Namespace: "unit-tests",
UID: "src-pod-uid",
},
Status: v1.PodStatus{
PodIP: "1.2.3.4",
},
}
destPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "dest-pod",
Namespace: "unit-tests",
UID: "dest-pod-uid",
},
Status: v1.PodStatus{
PodIP: "4.3.2.1",
},
}
client := NewCachedK8sClient(fake.NewSimpleClientset(srcPod, destPod))
client.Start(context.Background())

testCases := []struct {
name string
agentIP string
srcIP string
expectedSrcAttrs map[string]interface{}
destIP string
expectedDestAttrs map[string]interface{}
}{
{
name: "src & dest pods",
agentIP: "1.1.1.1",
srcIP: srcPod.Status.PodIP,
expectedSrcAttrs: map[string]interface{}{
"source.k8s.namespace.name": srcPod.Namespace,
"source.k8s.pod.name": srcPod.Name,
"source.k8s.pod.uid": srcPod.UID,
},
destIP: destPod.Status.PodIP,
expectedDestAttrs: map[string]interface{}{
"destination.k8s.namespace.name": destPod.Namespace,
"destination.k8s.pod.name": destPod.Name,
"destination.k8s.pod.uid": destPod.UID,
},
},
{
name: "src IP matches agent IP - no src pod attrs",
agentIP: srcPod.Status.PodIP,
srcIP: srcPod.Status.PodIP,
expectedSrcAttrs: map[string]interface{}{},
destIP: destPod.Status.PodIP,
expectedDestAttrs: map[string]interface{}{
"destination.k8s.namespace.name": destPod.Namespace,
"destination.k8s.pod.name": destPod.Name,
"destination.k8s.pod.uid": destPod.UID,
},
},
{
name: "dest IP matches agent IP - no dest pod attrs",
agentIP: destPod.Status.PodIP,
srcIP: srcPod.Status.PodIP,
expectedSrcAttrs: map[string]interface{}{
"source.k8s.namespace.name": srcPod.Namespace,
"source.k8s.pod.name": srcPod.Name,
"source.k8s.pod.uid": srcPod.UID,
},
destIP: destPod.Status.PodIP,
expectedDestAttrs: map[string]interface{}{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
srcAttrs := client.GetK8sAttrsForSourceIP(tc.agentIP, tc.srcIP)
assert.Equal(t, tc.expectedSrcAttrs, srcAttrs)

destAttrs := client.GetK8sAttrsForDestinationIP(tc.agentIP, tc.destIP)
assert.Equal(t, tc.expectedDestAttrs, destAttrs)
})
}
}
64 changes: 0 additions & 64 deletions utils/k8sutils.go

This file was deleted.