diff --git a/build/yamls/elk-flow-collector/logstash/ipfix.yml b/build/yamls/elk-flow-collector/logstash/ipfix.yml index 8b866abd4e2..fb35cf35ec2 100644 --- a/build/yamls/elk-flow-collector/logstash/ipfix.yml +++ b/build/yamls/elk-flow-collector/logstash/ipfix.yml @@ -183,3 +183,27 @@ 144: - :string - :destinationPodLabels + 145: + - :uint64 + - :throughput + 146: + - :uint64 + - :reverseThroughput + 147: + - :uint64 + - :throughputFromSourceNode + 148: + - :uint64 + - :throughputFromDestinationNode + 149: + - :uint64 + - :reverseThroughputFromSourceNode + 150: + - :uint64 + - :reverseThroughputFromDestinationNode + 151: + - :uint32 + - :flowEndSecondsFromSourceNode + 152: + - :uint32 + - :flowEndSecondsFromDestinationNode diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index d5529bd59b8..ee9befefb11 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -129,7 +129,7 @@ COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" \ "projects.registry.vmware.com/library/busybox" \ "projects.registry.vmware.com/antrea/nginx" \ "projects.registry.vmware.com/antrea/perftool" \ - "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.10" \ + "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.11" \ "projects.registry.vmware.com/antrea/wireguard-go:0.0.20210424") for image in "${COMMON_IMAGES_LIST[@]}"; do for i in `seq 3`; do diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index a5a37df64fc..9b69b8d6ec6 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -7,7 +7,7 @@ - [Flow Exporter](#flow-exporter) - [Configuration](#configuration) - [IPFIX Information Elements (IEs) in a Flow Record](#ipfix-information-elements-ies-in-a-flow-record) - - [IEs from IANA-assigned IE registry](#ies-from-iana-assigned-ie-registry) + - [IEs from IANA-assigned IE Registry](#ies-from-iana-assigned-ie-registry) - [IEs from Reverse IANA-assigned IE Registry](#ies-from-reverse-iana-assigned-ie-registry) - [IEs from Antrea IE Registry](#ies-from-antrea-ie-registry) - [Supported capabilities](#supported-capabilities) @@ -131,62 +131,64 @@ Please modify them as per your requirements. There are 34 IPFIX IEs in each exported flow record, which are defined in the IANA-assigned IE registry, the Reverse IANA-assigned IE registry and the Antrea IE registry. The reverse IEs are used to provide bi-directional information about -the flow. All the IEs used by the Antrea Flow Exporter are listed below: - -#### IEs from IANA-assigned IE registry - -| IPFIX Information Element| Enterprise ID | Field ID | Type | -|--------------------------|---------------|----------|----------------| -| flowStartSeconds | 0 | 150 | dateTimeSeconds| -| flowEndSeconds | 0 | 151 | dateTimeSeconds| -| flowEndReason | 0 | 136 | unsigned8 | -| sourceIPv4Address | 0 | 8 | ipv4Address | -| destinationIPv4Address | 0 | 12 | ipv4Address | -| sourceIPv6Address | 0 | 27 | ipv6Address | -| destinationIPv6Address | 0 | 28 | ipv6Address | -| sourceTransportPort | 0 | 7 | unsigned16 | -| destinationTransportPort | 0 | 11 | unsigned16 | -| protocolIdentifier | 0 | 4 | unsigned8 | -| packetTotalCount | 0 | 86 | unsigned64 | -| octetTotalCount | 0 | 85 | unsigned64 | -| packetDeltaCount | 0 | 2 | unsigned64 | -| octetDeltaCount | 0 | 1 | unsigned64 | +the flow. The Enterprise ID is 0 for IANA-assigned IE registry, 29305 for reverse +IANA IE registry, 56505 for Antrea IE registry. All the IEs used by the Antrea +Flow Exporter are listed below: + +#### IEs from IANA-assigned IE Registry + +| IPFIX Information Element| Field ID | Type | +|--------------------------|----------|----------------| +| flowStartSeconds | 150 | dateTimeSeconds| +| flowEndSeconds | 151 | dateTimeSeconds| +| flowEndReason | 136 | unsigned8 | +| sourceIPv4Address | 8 | ipv4Address | +| destinationIPv4Address | 12 | ipv4Address | +| sourceIPv6Address | 27 | ipv6Address | +| destinationIPv6Address | 28 | ipv6Address | +| sourceTransportPort | 7 | unsigned16 | +| destinationTransportPort | 11 | unsigned16 | +| protocolIdentifier | 4 | unsigned8 | +| packetTotalCount | 86 | unsigned64 | +| octetTotalCount | 85 | unsigned64 | +| packetDeltaCount | 2 | unsigned64 | +| octetDeltaCount | 1 | unsigned64 | #### IEs from Reverse IANA-assigned IE Registry -| IPFIX Information Element| Enterprise ID | Field ID | Type | -|--------------------------|---------------|----------|----------------| -| reversePacketTotalCount | 29305 | 86 | unsigned64 | -| reverseOctetTotalCount | 29305 | 85 | unsigned64 | -| reversePacketDeltaCount | 29305 | 2 | unsigned64 | -| reverseOctetDeltaCount | 29305 | 1 | unsigned64 | +| IPFIX Information Element| Field ID | Type | +|--------------------------|----------|----------------| +| reversePacketTotalCount | 86 | unsigned64 | +| reverseOctetTotalCount | 85 | unsigned64 | +| reversePacketDeltaCount | 2 | unsigned64 | +| reverseOctetDeltaCount | 1 | unsigned64 | #### IEs from Antrea IE Registry -| IPFIX Information Element | Enterprise ID | Field ID | Type | -|----------------------------------|---------------|----------|-------------| -| sourcePodNamespace | 56506 | 100 | string | -| sourcePodName | 56506 | 101 | string | -| destinationPodNamespace | 56506 | 102 | string | -| destinationPodName | 56506 | 103 | string | -| sourceNodeName | 56506 | 104 | string | -| destinationNodeName | 56506 | 105 | string | -| destinationClusterIPv4 | 56506 | 106 | ipv4Address | -| destinationClusterIPv6 | 56506 | 107 | ipv6Address | -| destinationServicePort | 56506 | 108 | unsigned16 | -| destinationServicePortName | 56506 | 109 | string | -| ingressNetworkPolicyName | 56506 | 110 | string | -| ingressNetworkPolicyNamespace | 56506 | 111 | string | -| ingressNetworkPolicyType | 56506 | 115 | unsigned8 | -| ingressNetworkPolicyRuleName | 56506 | 141 | string | -| egressNetworkPolicyName | 56506 | 112 | string | -| egressNetworkPolicyNamespace | 56506 | 113 | string | -| egressNetworkPolicyType | 56506 | 118 | unsigned8 | -| egressNetworkPolicyRuleName | 56506 | 142 | string | -| ingressNetworkPolicyRuleAction | 56506 | 139 | unsigned8 | -| egressNetworkPolicyRuleAction | 56506 | 140 | unsigned8 | -| tcpState | 56506 | 136 | string | -| flowType | 56506 | 137 | unsigned8 | +| IPFIX Information Element | Field ID | Type | Description | +|----------------------------------|----------|-------------|-------------| +| sourcePodNamespace | 100 | string | | +| sourcePodName | 101 | string | | +| destinationPodNamespace | 102 | string | | +| destinationPodName | 103 | string | | +| sourceNodeName | 104 | string | | +| destinationNodeName | 105 | string | | +| destinationClusterIPv4 | 106 | ipv4Address | | +| destinationClusterIPv6 | 107 | ipv6Address | | +| destinationServicePort | 108 | unsigned16 | | +| destinationServicePortName | 109 | string | | +| ingressNetworkPolicyName | 110 | string | Name of the ingress network policy applied to the destination Pod for this flow. | +| ingressNetworkPolicyNamespace | 111 | string | Namespace of the ingress network policy applied to the destination Pod for this flow. | +| ingressNetworkPolicyType | 115 | unsigned8 | 1 stands for Kubernetes Network Policy. 2 stands for Antrea Network Policy. 3 stands for Antrea Cluster Network Policy. | +| ingressNetworkPolicyRuleName | 141 | string | Name of the ingress network policy Rule applied to the destination Pod for this flow. | +| egressNetworkPolicyName | 112 | string | Name of the egress network policy applied to the source Pod for this flow. | +| egressNetworkPolicyNamespace | 113 | string | Namespace of the egress network policy applied to the source Pod for this flow. | +| egressNetworkPolicyType | 118 | unsigned8 | | +| egressNetworkPolicyRuleName | 142 | string | Name of the egress network policy rule applied to the source Pod for this flow. | +| ingressNetworkPolicyRuleAction | 139 | unsigned8 | 1 stands for Allow. 2 stands for Drop. 3 stands for Reject. | +| egressNetworkPolicyRuleAction | 140 | unsigned8 | | +| tcpState | 136 | string | The state of the TCP connection. The states are: LISTEN, SYN-SENT, SYN-RECEIVED, ESTABLISHED, FIN-WAIT-1, FIN-WAIT-2, CLOSE-WAIT, CLOSING, LAST-ACK, TIME-WAIT, and CLOSED. | +| flowType | 137 | unsigned8 | 1 stands for Intra-Node. 2 stands for Inter-Node. 3 stands for To External. 4 stands for From External. | ### Supported capabilities @@ -336,24 +338,34 @@ the Flow Aggregator adds the following fields to the flow records. #### IEs from Antrea IE Registry -| IPFIX Information Element | Enterprise ID | Field ID | Type | -|-------------------------------------------|---------------|----------|-------------| -| packetTotalCountFromSourceNode | 56506 | 120 | unsigned64 | -| octetTotalCountFromSourceNode | 56506 | 121 | unsigned64 | -| packetDeltaCountFromSourceNode | 56506 | 122 | unsigned64 | -| octetDeltaCountFromSourceNode | 56506 | 123 | unsigned64 | -| reversePacketTotalCountFromSourceNode | 56506 | 124 | unsigned64 | -| reverseOctetTotalCountFromSourceNode | 56506 | 125 | unsigned64 | -| reversePacketDeltaCountFromSourceNode | 56506 | 126 | unsigned64 | -| reverseOctetDeltaCountFromSourceNode | 56506 | 127 | unsigned64 | -| packetTotalCountFromDestinationNode | 56506 | 128 | unsigned64 | -| octetTotalCountFromDestinationNode | 56506 | 129 | unsigned64 | -| packetDeltaCountFromDestinationNode | 56506 | 130 | unsigned64 | -| octetDeltaCountFromDestinationNode | 56506 | 131 | unsigned64 | -| reversePacketTotalCountFromDestinationNode| 56506 | 132 | unsigned64 | -| reverseOctetTotalCountFromDestinationNode | 56506 | 133 | unsigned64 | -| reversePacketDeltaCountFromDestinationNode| 56506 | 134 | unsigned64 | -| reverseOctetDeltaCountFromDestinationNode | 56506 | 135 | unsigned64 | +| IPFIX Information Element | Field ID | Type | Description | +|-------------------------------------------|----------|-------------|-------------| +| packetTotalCountFromSourceNode | 120 | unsigned64 | The cumulative number of packets for this flow as reported by the source Node, since the flow started. | +| octetTotalCountFromSourceNode | 121 | unsigned64 | The cumulative number of octets for this flow as reported by the source Node, since the flow started. | +| packetDeltaCountFromSourceNode | 122 | unsigned64 | The number of packets for this flow as reported by the source Node, since the previous report for this flow at the observation point. | +| octetDeltaCountFromSourceNode | 123 | unsigned64 | The number of octets for this flow as reported by the source Node, since the previous report for this flow at the observation point. | +| reversePacketTotalCountFromSourceNode | 124 | unsigned64 | The cumulative number of reverse packets for this flow as reported by the source Node, since the flow started. | +| reverseOctetTotalCountFromSourceNode | 125 | unsigned64 | The cumulative number of reverse octets for this flow as reported by the source Node, since the flow started. | +| reversePacketDeltaCountFromSourceNode | 126 | unsigned64 | The number of reverse packets for this flow as reported by the source Node, since the previous report for this flow at the observation point. | +| reverseOctetDeltaCountFromSourceNode | 127 | unsigned64 | The number of reverse octets for this flow as reported by the source Node, since the previous report for this flow at the observation point. | +| packetTotalCountFromDestinationNode | 128 | unsigned64 | The cumulative number of packets for this flow as reported by the destination Node, since the flow started. | +| octetTotalCountFromDestinationNode | 129 | unsigned64 | The cumulative number of octets for this flow as reported by the destination Node, since the flow started. | +| packetDeltaCountFromDestinationNode | 130 | unsigned64 | The number of packets for this flow as reported by the destination Node, since the previous report for this flow at the observation point. | +| octetDeltaCountFromDestinationNode | 131 | unsigned64 | The number of octets for this flow as reported by the destination Node, since the previous report for this flow at the observation point. | +| reversePacketTotalCountFromDestinationNode| 132 | unsigned64 | The cumulative number of reverse packets for this flow as reported by the destination Node, since the flow started. | +| reverseOctetTotalCountFromDestinationNode | 133 | unsigned64 | The cumulative number of reverse octets for this flow as reported by the destination Node, since the flow started. | +| reversePacketDeltaCountFromDestinationNode| 134 | unsigned64 | The number of reverse packets for this flow as reported by the destination Node, since the previous report for this flow at the observation point. | +| reverseOctetDeltaCountFromDestinationNode | 135 | unsigned64 | The number of reverse octets for this flow as reported by the destination Node, since the previous report for this flow at the observation point. | +| sourcePodLabels | 143 | string | | +| destinationPodLabels | 144 | string | | +| throughput | 145 | unsigned64 | The average amount of traffic flowing from source to destination, since the previous report for this flow at the observation point. The unit is bits per second. | +| reverseThroughput | 146 | unsigned64 | The average amount of reverse traffic flowing from destination to source, since the previous report for this flow at the observation point. The unit is bits per second. | +| throughputFromSourceNode | 147 | unsigned64 | The average amount of traffic flowing from source to destination, since the previous report for this flow at the observation point, based on the records sent from the source Node. The unit is bits per second. | +| throughputFromDestinationNode | 148 | unsigned64 | The average amount of traffic flowing from source to destination, since the previous report for this flow at the observation point, based on the records sent from the destination Node. The unit is bits per second. | +| reverseThroughputFromSourceNode | 149 | unsigned64 | The average amount of reverse traffic flowing from destination to source, since the previous report for this flow at the observation point, based on the records sent from the source Node. The unit is bits per second. | +| reverseThroughputFromDestinationNode | 150 | unsigned64 | The average amount of reverse traffic flowing from destination to source, since the previous report for this flow at the observation point, based on the records sent from the destination Node. The unit is bits per second. | +| flowEndSecondsFromSourceNode | 151 | unsigned32 | The absolute timestamp of the last packet of this flow, based on the records sent from the source Node. The unit is seconds. | +| flowEndSecondsFromDestinationNode | 152 | unsigned32 | The absolute timestamp of the last packet of this flow, based on the records sent from the destination Node. The unit is seconds. | ### Supported capabilities diff --git a/go.mod b/go.mod index dc63347a450..3ad1fbea8dd 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/ti-mo/conntrack v0.4.0 github.com/vishvananda/netlink v1.1.1-0.20210510164352-d17758a128bf - github.com/vmware/go-ipfix v0.5.10 + github.com/vmware/go-ipfix v0.5.11 golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/mod v0.4.2 diff --git a/go.sum b/go.sum index c3b4d02105d..d071986b48f 100644 --- a/go.sum +++ b/go.sum @@ -658,8 +658,8 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3Cym0ZtKyq7L16eZUtYKs+BaHDN6mAns= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vmware/go-ipfix v0.5.10 h1:32ITLwn/IZcagB+bG0g9f8KsSk2atWG9uHHy8InIuUc= -github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= +github.com/vmware/go-ipfix v0.5.11 h1:1731EiUCTkhrK0YTxVbpT3YVyfyIj3ACua+QjL+9eq0= +github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 553a6430688..95f4b4d9afc 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -121,11 +121,31 @@ var ( "sourcePodLabels", "destinationPodLabels", } + antreaFlowEndSecondsElementList = []string{ + "flowEndSecondsFromSourceNode", + "flowEndSecondsFromDestinationNode", + } + antreaThroughputElementList = []string{ + "throughput", + "reverseThroughput", + } + antreaSourceThroughputElementList = []string{ + "throughputFromSourceNode", + "reverseThroughputFromSourceNode", + } + antreaDestinationThroughputElementList = []string{ + "throughputFromDestinationNode", + "reverseThroughputFromDestinationNode", + } aggregationElements = &ipfixintermediate.AggregationElements{ NonStatsElements: nonStatsElementList, StatsElements: statsElementList, AggregatedSourceStatsElements: antreaSourceStatsElementList, AggregatedDestinationStatsElements: antreaDestinationStatsElementList, + AntreaFlowEndSecondsElements: antreaFlowEndSecondsElementList, + ThroughputElements: antreaThroughputElementList, + SourceThroughputElements: antreaSourceThroughputElementList, + DestinationThroughputElements: antreaDestinationThroughputElementList, } correlateFields = []string{ @@ -289,7 +309,8 @@ func (fa *flowAggregator) InitCollectingProcess() error { IsEncrypted: false, } } - cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList) + cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList) + + len(antreaFlowEndSecondsElementList) + len(antreaThroughputElementList) + len(antreaSourceThroughputElementList) + len(antreaDestinationThroughputElementList) var err error fa.collectingProcess, err = ipfix.NewIPFIXCollectingProcess(cpInput) return err @@ -457,7 +478,6 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor if err = fa.aggregationProcess.ResetStatElementsInRecord(record.Record); err != nil { return err } - klog.V(4).Infof("Data set sent successfully: %d Bytes sent", sentBytes) fa.numRecordsExported = fa.numRecordsExported + 1 return nil @@ -474,33 +494,21 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) { templateID = fa.templateIDv6 } for _, ie := range ianaInfoElements { - element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID) - if err != nil { - return 0, fmt.Errorf("%s not present. returned error: %v", ie, err) - } - ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) } for _, ie := range ianaReverseInfoElements { - element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID) - if err != nil { - return 0, fmt.Errorf("%s not present. returned error: %v", ie, err) - } - ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAReversedEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) } for _, ie := range antreaInfoElements { - element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, fmt.Errorf("%s not present. returned error: %v", ie, err) - } - ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) if err != nil { return 0, err } @@ -511,22 +519,44 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) { for i := range statsElementList { // Add Antrea source stats fields ieName := antreaSourceStatsElementList[i] - element, err := fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err) - } - ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { return 0, err } elements = append(elements, ie) // Add Antrea destination stats fields ieName = antreaDestinationStatsElementList[i] - element, err = fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID) + ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err) + return 0, err } - ie, err = ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + elements = append(elements, ie) + } + for _, ie := range antreaFlowEndSecondsElementList { + ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + } + for i := range antreaThroughputElementList { + // Add common throughput fields + ieName := antreaThroughputElementList[i] + ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + // Add source node specific throughput fields + ieName = antreaSourceThroughputElementList[i] + ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) + // Add destination node specific throughput fields + ieName = antreaDestinationThroughputElementList[i] + ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { return 0, err } @@ -534,11 +564,7 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) { } if fa.includePodLabels { for _, ie := range antreaLabelsElementList { - element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID) - if err != nil { - return 0, fmt.Errorf("error when getting InformationElement %s from registry: %v", ie, err) - } - ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID) if err != nil { return 0, err } @@ -668,3 +694,15 @@ func (fa *flowAggregator) GetRecordMetrics() querier.Metrics { NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), } } + +func (fa *flowAggregator) createInfoElementForTemplateSet(ieName string, enterpriseID uint32) (ipfixentities.InfoElementWithValue, error) { + element, err := fa.registry.GetInfoElement(ieName, enterpriseID) + if err != nil { + return nil, fmt.Errorf("%s not present. returned error: %v", ieName, err) + } + ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) + if err != nil { + return nil, err + } + return ie, nil +} diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index e6485c94b5f..877bb959e34 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -225,28 +225,40 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) { // Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals) // Only the element name is needed, other arguments have dummy values. elemList := make([]ipfixentities.InfoElementWithValue, 0) - for i, ie := range ianaInfoElements { + for _, ie := range ianaInfoElements { elemList = append(elemList, createElement(ie, ipfixregistry.IANAEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } - for i, ie := range ianaReverseInfoElements { + for _, ie := range ianaReverseInfoElements { elemList = append(elemList, createElement(ie, ipfixregistry.IANAReversedEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } - for i, ie := range antreaInfoElements { + for _, ie := range antreaInfoElements { elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } for i := range statsElementList { elemList = append(elemList, createElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) elemList = append(elemList, createElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+1+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + } + for _, ie := range antreaFlowEndSecondsElementList { + elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + } + for i := range antreaThroughputElementList { + elemList = append(elemList, createElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + elemList = append(elemList, createElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + elemList = append(elemList, createElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } if tc.includePodLabels { - for i, ie := range antreaLabelsElementList { + for _, ie := range antreaLabelsElementList { elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(antreaSourceStatsElementList)+len(antreaDestinationStatsElementList)].GetInfoElement(), nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } } mockTempSet.EXPECT().ResetSet() diff --git a/plugins/octant/go.sum b/plugins/octant/go.sum index 8fddce477ad..aa452d6df85 100644 --- a/plugins/octant/go.sum +++ b/plugins/octant/go.sum @@ -972,7 +972,7 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vmware-tanzu/octant v0.24.0 h1:PMLU2QG6czSdCl6/xn7lHkHGi+Dv43rxL+AZL6sPJ24= github.com/vmware-tanzu/octant v0.24.0/go.mod h1:yK0Nu7wzzvV25/T8nf4NQFMUUA7sLcp0gk96CjOTUiQ= -github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= +github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index d409b3d44eb..9f0bd06aa4e 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -39,67 +39,74 @@ import ( /* Sample output from the collector: IPFIX-HDR: - version: 10, Message Length: 435 - Exported Time: 1608338076 (2020-12-19 00:34:36 +0000 UTC) - Sequence No.: 3, Observation Domain ID: 1350683189 + version: 10, Message Length: 617 + Exported Time: 1637706974 (2021-11-23 22:36:14 +0000 UTC) + Sequence No.: 27, Observation Domain ID: 2569248951 DATA SET: DATA RECORD-0: - flowStartSeconds: 1608338066 - flowEndSeconds: 1608338072 - flowEndReason: 2 - sourceTransportPort: 43600 + flowStartSeconds: 1637706961 + flowEndSeconds: 1637706973 + flowEndReason: 3 + sourceTransportPort: 44752 destinationTransportPort: 5201 protocolIdentifier: 6 - packetTotalCount: 537924 - octetTotalCount: 23459802093 - packetDeltaCount: 0 - octetDeltaCount: 0 - sourceIPv4Address: 10.10.0.22 - destinationIPv4Address: 10.10.0.23 - reversePacketTotalCount: 444320 - reverseOctetTotalCount: 23108308 - reversePacketDeltaCount: 0 - reverseOctetDeltaCount: 0 + packetTotalCount: 823188 + octetTotalCount: 30472817041 + packetDeltaCount: 241333 + octetDeltaCount: 8982624938 + sourceIPv4Address: 10.10.0.79 + destinationIPv4Address: 10.10.0.80 + reversePacketTotalCount: 471111 + reverseOctetTotalCount: 24500996 + reversePacketDeltaCount: 136211 + reverseOctetDeltaCount: 7083284 sourcePodName: perftest-a sourcePodNamespace: antrea-test sourceNodeName: k8s-node-control-plane destinationPodName: perftest-b destinationPodNamespace: antrea-test destinationNodeName: k8s-node-control-plane - destinationServicePort: 5201 + destinationServicePort: 0 destinationServicePortName: - ingressNetworkPolicyName: test-flow-aggregator-networkpolicy-ingress + ingressNetworkPolicyName: test-flow-aggregator-networkpolicy-ingress-allow ingressNetworkPolicyNamespace: antrea-test - ingressNetworkPolicyType: 2 - ingressNetworkPolicyRuleName: test-ingress-rule-name + ingressNetworkPolicyType: 1 + ingressNetworkPolicyRuleName: ingressNetworkPolicyRuleAction: 1 - egressNetworkPolicyName: test-flow-aggregator-networkpolicy-egress + egressNetworkPolicyName: test-flow-aggregator-networkpolicy-egress-allow egressNetworkPolicyNamespace: antrea-test - egressNetworkPolicyType: 2 - egressNetworkPolicyRuleName: test-egress-rule-name + egressNetworkPolicyType: 1 + egressNetworkPolicyRuleName: egressNetworkPolicyRuleAction: 1 + tcpState: TIME_WAIT flowType: 1 destinationClusterIPv4: 0.0.0.0 - originalExporterIPv4Address: 10.10.0.1 - originalObservationDomainId: 2134708971 - octetDeltaCountFromSourceNode: 0 - octetTotalCountFromSourceNode: 23459802093 - packetDeltaCountFromSourceNode: 0 - packetTotalCountFromSourceNode: 537924 - reverseOctetDeltaCountFromSourceNode: 0 - reverseOctetTotalCountFromSourceNode: 23108308 - reversePacketDeltaCountFromSourceNode: 0 - reversePacketTotalCountFromSourceNode: 444320 - octetDeltaCountFromDestinationNode: 0 - octetTotalCountFromDestinationNode: 23459802093 - packetDeltaCountFromDestinationNode: 0 - packetTotalCountFromDestinationNode: 537924 - reverseOctetDeltaCountFromDestinationNode: 0 - reverseOctetTotalCountFromDestinationNode: 23108308 - reversePacketDeltaCountFromDestinationNode: 0 - reversePacketTotalCountFromDestinationNode: 444320 - sourcePodLabels: {"antrea-e2e":"perftest-a","app":"perftool"} - destinationPodLabels: {"antrea-e2e":"perftest-b","app":"perftool"} + octetDeltaCountFromSourceNode: 8982624938 + octetDeltaCountFromDestinationNode: 8982624938 + octetTotalCountFromSourceNode: 30472817041 + octetTotalCountFromDestinationNode: 30472817041 + packetDeltaCountFromSourceNode: 241333 + packetDeltaCountFromDestinationNode: 241333 + packetTotalCountFromSourceNode: 823188 + packetTotalCountFromDestinationNode: 823188 + reverseOctetDeltaCountFromSourceNode: 7083284 + reverseOctetDeltaCountFromDestinationNode: 7083284 + reverseOctetTotalCountFromSourceNode: 24500996 + reverseOctetTotalCountFromDestinationNode: 24500996 + reversePacketDeltaCountFromSourceNode: 136211 + reversePacketDeltaCountFromDestinationNode: 136211 + reversePacketTotalCountFromSourceNode: 471111 + reversePacketTotalCountFromDestinationNode: 471111 + flowEndSecondsFromSourceNode: 1637706973 + flowEndSecondsFromDestinationNode: 1637706973 + throughput: 15902813472 + throughputFromSourceNode: 15902813472 + throughputFromDestinationNode: 15902813472 + reverseThroughput: 12381344 + reverseThroughputFromSourceNode: 12381344 + reverseThroughputFromDestinationNode: 12381344 + sourcePodLabels: {"antrea-e2e":"perftest-a","app":"perftool"} + destinationPodLabels: {"antrea-e2e":"perftest-b","app":"perftool"} Intra-Node: Flow record information is complete for source and destination e.g. sourcePodName, destinationPodName Inter-Node: Flow record from destination Node is ignored, so only flow record from the source Node has its K8s info e.g., sourcePodName, sourcePodNamespace, sourceNodeName etc. AntreaProxy enabled (Intra-Node): Flow record information is complete for source and destination along with K8s service info such as destinationClusterIP, destinationServicePort, destinationServicePortName etc. @@ -562,7 +569,6 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 - var octetTotalCount uint64 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { // Check the source port along with source and destination IPs as there @@ -609,30 +615,26 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") } - // Skip the bandwidth check for the iperf control flow records which have 0 delta count. - if checkBandwidth && !strings.Contains(record, "octetDeltaCount: 0") { - exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds")) - curOctetTotalCount := getUnit64FieldFromRecord(t, record, "octetTotalCountFromSourceNode") - flowStartTime := int64(getUnit64FieldFromRecord(t, record, "flowStartSeconds")) - if curOctetTotalCount > octetTotalCount { - octetTotalCount = curOctetTotalCount - } - curOctetDeltaCount := getUnit64FieldFromRecord(t, record, "octetDeltaCountFromSourceNode") - // Check the bandwidth using octetDeltaCountFromSourceNode, if this record - // is neither the first record nor the last in the stream of records. - if curOctetDeltaCount != curOctetTotalCount && exportTime < flowStartTime+iperfTimeSec { - t.Logf("Check the bandwidth using octetDeltaCountFromSourceNode %d in data record.", curOctetDeltaCount) - // This middle record should aggregate two records from Flow Exporter - checkBandwidthByInterval(t, bandwidthInMbps, curOctetDeltaCount, float64(2*exporterActiveFlowExportTimeout/time.Second), "octetDeltaCountFromSourceNode") + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if checkBandwidth && !strings.Contains(record, "throughput: 0") { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) + var recBandwidth float64 + // Check bandwidth with the field "throughput" except for the last record, + // as its throughput is significantly lower than the average Iperf throughput. + if exportTime < flowStartTime+iperfTimeSec { + throughput := getUint64FieldFromRecord(t, record, "throughput") + recBandwidth = float64(throughput) / 1000000 + } else { + // Check average bandwidth after receiving all the records. + octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") + recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000 } + t.Logf("Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%") } } } - // Average bandwidth check is done after iterating through all records using the largest octetTotalCountFromSourceNode. - if checkBandwidth && octetTotalCount > 0 { - t.Logf("Check the average bandwidth using octetTotalCountFromSourceNode %d in data record.", octetTotalCount) - checkBandwidthByInterval(t, bandwidthInMbps, octetTotalCount, float64(iperfTimeSec), "octetTotalCountFromSourceNode") - } // Checking only data records as data records cannot be decoded without template // record. assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) @@ -739,12 +741,6 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 } } -func checkBandwidthByInterval(t *testing.T, bandwidthInMbps float64, octetCount uint64, interval float64, field string) { - recBandwidth := float64(octetCount) * 8 / 1000000 / interval - t.Logf("Iperf throughput: %.2f Mbits/s, IPFIX record throughput calculated through %s: %.2f Mbits/s", bandwidthInMbps, field, recBandwidth) - assert.InDeltaf(t, recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth calculated through %s should be lower than 15%%", field) -} - func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode string) { assert := assert.New(t) assert.Contains(record, srcPod, "Record with srcIP does not have Pod name: %s", srcPod) @@ -768,7 +764,7 @@ func checkFlowType(t *testing.T, record string, flowType uint8) { assert.Containsf(t, record, fmt.Sprintf("flowType: %d", flowType), "Record does not have correct flowType") } -func getUnit64FieldFromRecord(t *testing.T, record string, field string) uint64 { +func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 { if strings.Contains(record, "TEMPLATE SET") { return 0 } @@ -804,8 +800,8 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) if checkAllRecords { for _, record := range recordSlices { - flowStartTime := int64(getUnit64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds")) + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { if exportTime >= flowStartTime+iperfTimeSec { return true, nil diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 35f4982569c..6556940dd1d 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -112,7 +112,7 @@ const ( busyboxImage = "projects.registry.vmware.com/library/busybox" nginxImage = "projects.registry.vmware.com/antrea/nginx" perftoolImage = "projects.registry.vmware.com/antrea/perftool" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.10" + ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.11" ipfixCollectorPort = "4739" nginxLBService = "nginx-loadbalancer"