Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throughput calculation into flow-aggregator #2692

Merged
merged 1 commit into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions build/yamls/elk-flow-collector/logstash/ipfix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,27 @@
144:
- :string
- :destinationPodLabels
145:
- :uint64
- :throughput
146:
- :uint64
- :reverseThroughput
147:
- :uint64
- :throughputFromSourceNode
148:
- :uint64
- :throughputFromDestinationNode
149:
- :uint64
- :reverseThroughputFromSourceNode
150:
- :uint64
- :reverseThroughputFromDestinationNode
151:
- :uint32
- :flowEndSecondsFromSourceNode
152:
- :uint32
- :flowEndSecondsFromDestinationNode
Comment on lines +186 to +209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't remember us having all these *SourceNode and *DestinationNodes IE variations for octet and packet counts. I wonder if this is really necessary. For octant and packet counts, do we actually use all these field variations?

Copy link
Contributor Author

@heanlan heanlan Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have all these IE variations. Currently the only use case is we are using octetDeltaCountFromSourceNode/DestinationNode to compute node throughput in logstash. I agree these IE variations can be removed if we decide to add throughput information into the exported records.

2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" \
"projects.registry.vmware.com/library/busybox" \
"projects.registry.vmware.com/antrea/nginx" \
"projects.registry.vmware.com/antrea/perftool" \
"projects.registry.vmware.com/antrea/ipfix-collector:v0.5.10" \
"projects.registry.vmware.com/antrea/ipfix-collector:v0.5.11" \
"projects.registry.vmware.com/antrea/wireguard-go:0.0.20210424")
for image in "${COMMON_IMAGES_LIST[@]}"; do
for i in `seq 3`; do
Expand Down
150 changes: 81 additions & 69 deletions docs/network-flow-visibility.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/ti-mo/conntrack v0.4.0
github.com/vishvananda/netlink v1.1.1-0.20210510164352-d17758a128bf
github.com/vmware/go-ipfix v0.5.10
github.com/vmware/go-ipfix v0.5.11
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/mod v0.4.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3Cym0ZtKyq7L16eZUtYKs+BaHDN6mAns=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vmware/go-ipfix v0.5.10 h1:32ITLwn/IZcagB+bG0g9f8KsSk2atWG9uHHy8InIuUc=
github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/vmware/go-ipfix v0.5.11 h1:1731EiUCTkhrK0YTxVbpT3YVyfyIj3ACua+QjL+9eq0=
github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
98 changes: 68 additions & 30 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,31 @@ var (
"sourcePodLabels",
"destinationPodLabels",
}
antreaFlowEndSecondsElementList = []string{
"flowEndSecondsFromSourceNode",
"flowEndSecondsFromDestinationNode",
}
antreaThroughputElementList = []string{
"throughput",
"reverseThroughput",
}
antreaSourceThroughputElementList = []string{
"throughputFromSourceNode",
"reverseThroughputFromSourceNode",
}
antreaDestinationThroughputElementList = []string{
"throughputFromDestinationNode",
"reverseThroughputFromDestinationNode",
}
aggregationElements = &ipfixintermediate.AggregationElements{
NonStatsElements: nonStatsElementList,
StatsElements: statsElementList,
AggregatedSourceStatsElements: antreaSourceStatsElementList,
AggregatedDestinationStatsElements: antreaDestinationStatsElementList,
AntreaFlowEndSecondsElements: antreaFlowEndSecondsElementList,
ThroughputElements: antreaThroughputElementList,
SourceThroughputElements: antreaSourceThroughputElementList,
DestinationThroughputElements: antreaDestinationThroughputElementList,
}

correlateFields = []string{
Expand Down Expand Up @@ -289,7 +309,8 @@ func (fa *flowAggregator) InitCollectingProcess() error {
IsEncrypted: false,
}
}
cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList)
cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList) +
len(antreaFlowEndSecondsElementList) + len(antreaThroughputElementList) + len(antreaSourceThroughputElementList) + len(antreaDestinationThroughputElementList)
var err error
fa.collectingProcess, err = ipfix.NewIPFIXCollectingProcess(cpInput)
return err
Expand Down Expand Up @@ -457,7 +478,6 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
if err = fa.aggregationProcess.ResetStatElementsInRecord(record.Record); err != nil {
return err
}

klog.V(4).Infof("Data set sent successfully: %d Bytes sent", sentBytes)
fa.numRecordsExported = fa.numRecordsExported + 1
return nil
Expand All @@ -474,33 +494,21 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for _, ie := range ianaReverseInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for _, ie := range antreaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
Expand All @@ -511,34 +519,52 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
for i := range statsElementList {
// Add Antrea source stats fields
ieName := antreaSourceStatsElementList[i]
element, err := fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add Antrea destination stats fields
ieName = antreaDestinationStatsElementList[i]
element, err = fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID)
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err)
return 0, err
}
ie, err = ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
for _, ie := range antreaFlowEndSecondsElementList {
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for i := range antreaThroughputElementList {
// Add common throughput fields
ieName := antreaThroughputElementList[i]
ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add source node specific throughput fields
ieName = antreaSourceThroughputElementList[i]
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add destination node specific throughput fields
ieName = antreaDestinationThroughputElementList[i]
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
if fa.includePodLabels {
for _, ie := range antreaLabelsElementList {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, fmt.Errorf("error when getting InformationElement %s from registry: %v", ie, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -668,3 +694,15 @@ func (fa *flowAggregator) GetRecordMetrics() querier.Metrics {
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
}
}

func (fa *flowAggregator) createInfoElementForTemplateSet(ieName string, enterpriseID uint32) (ipfixentities.InfoElementWithValue, error) {
element, err := fa.registry.GetInfoElement(ieName, enterpriseID)
if err != nil {
return nil, fmt.Errorf("%s not present. returned error: %v", ieName, err)
}
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
if err != nil {
return nil, err
}
return ie, nil
}
32 changes: 22 additions & 10 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,40 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]ipfixentities.InfoElementWithValue, 0)
for i, ie := range ianaInfoElements {
for _, ie := range ianaInfoElements {
elemList = append(elemList, createElement(ie, ipfixregistry.IANAEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
for i, ie := range ianaReverseInfoElements {
for _, ie := range ianaReverseInfoElements {
elemList = append(elemList, createElement(ie, ipfixregistry.IANAReversedEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
for i, ie := range antreaInfoElements {
for _, ie := range antreaInfoElements {
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
for i := range statsElementList {
elemList = append(elemList, createElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
elemList = append(elemList, createElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+1+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
for _, ie := range antreaFlowEndSecondsElementList {
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
for i := range antreaThroughputElementList {
elemList = append(elemList, createElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
elemList = append(elemList, createElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
elemList = append(elemList, createElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
if tc.includePodLabels {
for i, ie := range antreaLabelsElementList {
for _, ie := range antreaLabelsElementList {
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(antreaSourceStatsElementList)+len(antreaDestinationStatsElementList)].GetInfoElement(), nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
}
mockTempSet.EXPECT().ResetSet()
Expand Down
2 changes: 1 addition & 1 deletion plugins/octant/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vmware-tanzu/octant v0.24.0 h1:PMLU2QG6czSdCl6/xn7lHkHGi+Dv43rxL+AZL6sPJ24=
github.com/vmware-tanzu/octant v0.24.0/go.mod h1:yK0Nu7wzzvV25/T8nf4NQFMUUA7sLcp0gk96CjOTUiQ=
github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down
Loading