diff --git a/handlers/libhoney_event_handler.go b/handlers/libhoney_event_handler.go index 6660263c..995e9c2c 100644 --- a/handlers/libhoney_event_handler.go +++ b/handlers/libhoney_event_handler.go @@ -17,6 +17,7 @@ import ( // libhoneyEventHandler is an event handler that sends events using libhoney type libhoneyEventHandler struct { + config config.Config k8sClient *utils.CachedK8sClient eventsChan chan assemblers.HttpEvent } @@ -25,6 +26,7 @@ type libhoneyEventHandler struct { func NewLibhoneyEventHandler(config config.Config, k8sClient *utils.CachedK8sClient, eventsChan chan assemblers.HttpEvent, version string) EventHandler { initLibhoney(config, version) return &libhoneyEventHandler{ + config: config, k8sClient: k8sClient, eventsChan: eventsChan, } @@ -173,8 +175,8 @@ func (handler *libhoneyEventHandler) handleEvent(event assemblers.HttpEvent) { ev.AddField("http.response.missing", "no response on this event") } - ev.Add(utils.GetK8sAttrsForIp(handler.k8sClient, event.SrcIp, "source")) - ev.Add(utils.GetK8sAttrsForIp(handler.k8sClient, event.DstIp, "destination")) + ev.Add(handler.k8sClient.GetK8sAttrsForSourceIP(handler.config.AgentPodIP, event.SrcIp)) + ev.Add(handler.k8sClient.GetK8sAttrsForDestinationIP(handler.config.AgentPodIP, event.DstIp)) log.Debug(). Str("stream_ident", event.StreamIdent). diff --git a/utils/cached_k8s_client.go b/utils/cached_k8s_client.go index e0432ebe..23b867e9 100644 --- a/utils/cached_k8s_client.go +++ b/utils/cached_k8s_client.go @@ -2,9 +2,12 @@ package utils import ( "context" + "fmt" + "strings" "time" "github.com/rs/zerolog/log" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" @@ -16,6 +19,12 @@ const ( ResyncTime = time.Minute * 5 byIPIndex = "ipAddr" nodeByNameIndex = "nodeName" + + k8sResourceType = "k8s.resource.type" + k8sResourceTypePod = "pod" + k8sResourceTypeService = "service" + k8sServiceName = "k8s.service.name" + k8sServiceUID = "k8s.service.uid" ) type CachedK8sClient struct { @@ -118,3 +127,72 @@ func (c *CachedK8sClient) GetNodeForPod(pod *v1.Pod) *v1.Node { } return val[0].(*v1.Node) } + +// GetK8sAttrsForSourceIP returns a map of kubernetes metadata attributes for +// a given IP address. Attribute names will be prefixed with "source.". +func (c *CachedK8sClient) GetK8sAttrsForSourceIP(agentIP string, ip string) map[string]any { + return c.getK8sAttrsForIp(agentIP, ip, "source") +} + +// GetK8sAttrsForDestinationIP returns a map of kubernetes metadata attributes for +// a given IP address. Attribute names will be prefixed with "destination.". +func (c *CachedK8sClient) GetK8sAttrsForDestinationIP(agentIP string, ip string) map[string]any { + return c.getK8sAttrsForIp(agentIP, ip, "destination") +} + +// getK8sAttrsForIp returns a map of kubernetes metadata attributes for a given IP address. +// +// Provide a prefix to prepend to the attribute names, example: "source" or "destination". +// +// If the IP address is not found in the kubernetes cache, an empty map is returned. +func (client *CachedK8sClient) getK8sAttrsForIp(agentIP string, ip string, prefix string) map[string]any { + k8sAttrs := map[string]any{} + + if ip == "" { + return k8sAttrs + } + + // Try add k8s attributes for source and destination when they are not the agent pod IP. + // Because we use hostnetwork in deployments, the agent pod IP and node IP are the same and we + // can't distinguish between the two, or any other pods that is also running with hostnetwork. + if ip == agentIP { + return k8sAttrs + } + + if prefix != "" { + prefix = fmt.Sprintf("%s.", prefix) + } + + if pod := client.GetPodByIPAddr(ip); pod != nil { + k8sAttrs[prefix+k8sResourceType] = k8sResourceTypePod + k8sAttrs[prefix+string(semconv.K8SPodNameKey)] = pod.Name + k8sAttrs[prefix+string(semconv.K8SPodUIDKey)] = pod.UID + k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = pod.Namespace + + if len(pod.Spec.Containers) > 0 { + var containerNames []string + for _, container := range pod.Spec.Containers { + containerNames = append(containerNames, container.Name) + } + k8sAttrs[prefix+string(semconv.K8SContainerNameKey)] = strings.Join(containerNames, ",") + } + + if node := client.GetNodeForPod(pod); node != nil { + k8sAttrs[prefix+string(semconv.K8SNodeNameKey)] = node.Name + k8sAttrs[prefix+string(semconv.K8SNodeUIDKey)] = node.UID + } + + if service := client.GetServiceForPod(pod); service != nil { + // no semconv for service yet + k8sAttrs[prefix+k8sServiceName] = service.Name + k8sAttrs[prefix+k8sServiceUID] = service.UID + } + } else if service := client.GetServiceByIPAddr(ip); service != nil { + k8sAttrs[prefix+k8sResourceType] = k8sResourceTypeService + k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = service.Namespace + // no semconv for service yet + k8sAttrs[prefix+k8sServiceName] = service.Name + k8sAttrs[prefix+k8sServiceUID] = service.UID + } + return k8sAttrs +} diff --git a/utils/cached_k8s_client_test.go b/utils/cached_k8s_client_test.go new file mode 100644 index 00000000..3c650e62 --- /dev/null +++ b/utils/cached_k8s_client_test.go @@ -0,0 +1,162 @@ +package utils + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func Test_GetAttrs(t *testing.T) { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + UID: "node-1-uid", + }, + } + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-1", + Namespace: "unit-tests", + UID: "service-1-uid", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": "test", + }, + }, + } + srcPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "src-pod", + Namespace: "unit-tests", + UID: "src-pod-uid", + Labels: service.Spec.Selector, + }, + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + Spec: v1.PodSpec{ + NodeName: node.Name, + Containers: []v1.Container{ + { + Name: "src-pod-container-1", + }, + { + Name: "src-pod-container-2", + }, + }, + }, + } + destPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dest-pod", + Namespace: "unit-tests", + UID: "dest-pod-uid", + Labels: service.Spec.Selector, + }, + Status: v1.PodStatus{ + PodIP: "4.3.2.1", + }, + Spec: v1.PodSpec{ + NodeName: node.Name, + Containers: []v1.Container{ + { + Name: "dest-pod-container-1", + }, + { + Name: "dest-pod-container-2", + }, + }, + }, + } + client := NewCachedK8sClient(fake.NewSimpleClientset(node, service, srcPod, destPod)) + client.Start(context.Background()) + + testCases := []struct { + name string + agentIP string + srcIP string + expectedSrcAttrs map[string]interface{} + destIP string + expectedDestAttrs map[string]interface{} + }{ + { + name: "src & dest pods", + agentIP: "1.1.1.1", + srcIP: srcPod.Status.PodIP, + expectedSrcAttrs: map[string]interface{}{ + "source.k8s.resource.type": "pod", + "source.k8s.namespace.name": srcPod.Namespace, + "source.k8s.pod.name": srcPod.Name, + "source.k8s.pod.uid": srcPod.UID, + "source.k8s.container.name": "src-pod-container-1,src-pod-container-2", + "source.k8s.node.name": node.Name, + "source.k8s.node.uid": node.UID, + "source.k8s.service.name": service.Name, + "source.k8s.service.uid": service.UID, + }, + destIP: destPod.Status.PodIP, + expectedDestAttrs: map[string]interface{}{ + "destination.k8s.resource.type": "pod", + "destination.k8s.namespace.name": destPod.Namespace, + "destination.k8s.pod.name": destPod.Name, + "destination.k8s.pod.uid": destPod.UID, + "destination.k8s.container.name": "dest-pod-container-1,dest-pod-container-2", + "destination.k8s.node.name": node.Name, + "destination.k8s.node.uid": node.UID, + "destination.k8s.service.name": service.Name, + "destination.k8s.service.uid": service.UID, + }, + }, + { + name: "src IP matches agent IP - no src pod attrs", + agentIP: srcPod.Status.PodIP, + srcIP: srcPod.Status.PodIP, + expectedSrcAttrs: map[string]interface{}{}, + destIP: destPod.Status.PodIP, + expectedDestAttrs: map[string]interface{}{ + "destination.k8s.resource.type": "pod", + "destination.k8s.namespace.name": destPod.Namespace, + "destination.k8s.pod.name": destPod.Name, + "destination.k8s.pod.uid": destPod.UID, + "destination.k8s.container.name": "dest-pod-container-1,dest-pod-container-2", + "destination.k8s.node.name": node.Name, + "destination.k8s.node.uid": node.UID, + "destination.k8s.service.name": service.Name, + "destination.k8s.service.uid": service.UID, + }, + }, + { + name: "dest IP matches agent IP - no dest pod attrs", + agentIP: destPod.Status.PodIP, + srcIP: srcPod.Status.PodIP, + expectedSrcAttrs: map[string]interface{}{ + "source.k8s.resource.type": "pod", + "source.k8s.namespace.name": srcPod.Namespace, + "source.k8s.pod.name": srcPod.Name, + "source.k8s.pod.uid": srcPod.UID, + "source.k8s.container.name": "src-pod-container-1,src-pod-container-2", + "source.k8s.node.name": node.Name, + "source.k8s.node.uid": node.UID, + "source.k8s.service.name": service.Name, + "source.k8s.service.uid": service.UID, + }, + destIP: destPod.Status.PodIP, + expectedDestAttrs: map[string]interface{}{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + srcAttrs := client.GetK8sAttrsForSourceIP(tc.agentIP, tc.srcIP) + assert.Equal(t, tc.expectedSrcAttrs, srcAttrs) + + destAttrs := client.GetK8sAttrsForDestinationIP(tc.agentIP, tc.destIP) + assert.Equal(t, tc.expectedDestAttrs, destAttrs) + }) + } +} diff --git a/utils/k8sutils.go b/utils/k8sutils.go deleted file mode 100644 index fc4655d2..00000000 --- a/utils/k8sutils.go +++ /dev/null @@ -1,79 +0,0 @@ -package utils - -import ( - "fmt" - "strings" - - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" -) - -const ( - k8sResourceType = "k8s.resource.type" - k8sResourceTypePod = "pod" - k8sResourceTypeService = "service" - k8sServiceName = "k8s.service.name" - k8sServiceUID = "k8s.service.uid" -) - -// GetK8sAttrsForSourceIp returns a map of kubernetes metadata attributes for -// a given IP address. Attribute names will be prefixed with "source.". -func GetK8sAttrsForSourceIp(client *CachedK8sClient, ip string) map[string]any { - return GetK8sAttrsForIp(client, ip, "source") -} - -// GetK8sAttrsForDestinationIp returns a map of kubernetes metadata attributes for -// a given IP address. Attribute names will be prefixed with "destination.". -func GetK8sAttrsForDestinationIp(client *CachedK8sClient, ip string) map[string]any { - return GetK8sAttrsForIp(client, ip, "destination") -} - -// GetK8sAttrsForIp returns a map of kubernetes metadata attributes for a given IP address. -// -// Provide a prefix to prepend to the attribute names, example: "source" or "destination". -// -// If the IP address is not found in the kubernetes cache, an empty map is returned. -func GetK8sAttrsForIp(client *CachedK8sClient, ip string, prefix string) map[string]any { - k8sAttrs := map[string]any{} - - if ip == "" { - return k8sAttrs - } - - if prefix != "" { - prefix = fmt.Sprintf("%s.", prefix) - } - - if pod := client.GetPodByIPAddr(ip); pod != nil { - k8sAttrs[prefix+k8sResourceType] = k8sResourceTypePod - k8sAttrs[prefix+string(semconv.K8SPodNameKey)] = pod.Name - k8sAttrs[prefix+string(semconv.K8SPodUIDKey)] = pod.UID - k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = pod.Namespace - - if len(pod.Spec.Containers) > 0 { - var containerNames []string - for _, container := range pod.Spec.Containers { - containerNames = append(containerNames, container.Name) - } - k8sAttrs[prefix+string(semconv.K8SContainerNameKey)] = strings.Join(containerNames, ",") - } - - if node := client.GetNodeForPod(pod); node != nil { - k8sAttrs[prefix+string(semconv.K8SNodeNameKey)] = node.Name - k8sAttrs[prefix+string(semconv.K8SNodeUIDKey)] = node.UID - } - - if service := client.GetServiceForPod(pod); service != nil { - // no semconv for service yet - k8sAttrs[prefix+k8sServiceName] = service.Name - k8sAttrs[prefix+k8sServiceUID] = service.UID - } - } else if service := client.GetServiceByIPAddr(ip); service != nil { - k8sAttrs[prefix+k8sResourceType] = k8sResourceTypeService - k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = service.Namespace - // no semconv for service yet - k8sAttrs[prefix+k8sServiceName] = service.Name - k8sAttrs[prefix+k8sServiceUID] = service.UID - } - - return k8sAttrs -}