Skip to content

Commit

Permalink
fix: tidy up nodeinformer.
Browse files Browse the repository at this point in the history
Signed-off-by: IRONICBo <[email protected]>
  • Loading branch information
IRONICBo committed Apr 22, 2024
1 parent 3b66f2f commit f6c7714
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 44 deletions.
40 changes: 2 additions & 38 deletions pkg/agent/monitortool/latency_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,18 @@ import (

"github.com/containernetworking/plugins/pkg/ip"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/k8s"

coreinformers "k8s.io/client-go/informers/core/v1"
)

// LatencyStore is a store for latency information of connections between nodes.
type LatencyStore struct {
// Maybe we need to use small lock for the map
// Lock for the latency store
mutex sync.RWMutex

// isNetworkPolicyOnly is the flag to indicate if the Antrea Agent is running in network policy only mode.
isNetworkPolicyOnly bool
// The map of node name to node info, it will changed by node watcher
nodeInformer coreinformers.NodeInformer
// The map of node ip to latency entry, it will be changed by latency monitor
nodeIPLatencyMap map[string]*NodeIPLatencyEntry
// The map of node ip to node name, it will be changed by node watcher
Expand All @@ -57,47 +52,16 @@ type NodeIPLatencyEntry struct {
LastMeasuredRTT time.Duration
}

func NewLatencyStore(nodeInformer coreinformers.NodeInformer, isNetworkPolicyOnly bool) *LatencyStore {
func NewLatencyStore(isNetworkPolicyOnly bool) *LatencyStore {
store := &LatencyStore{
nodeIPLatencyMap: make(map[string]*NodeIPLatencyEntry),
nodeGatewayMap: make(map[string][]net.IP),
nodeInformer: nodeInformer,
isNetworkPolicyOnly: isNetworkPolicyOnly,
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: store.onNodeAdd,
UpdateFunc: store.onNodeUpdate,
DeleteFunc: store.onNodeDelete,
})

return store
}

func (l *LatencyStore) Run(stopCh <-chan struct{}) {
l.nodeInformer.Informer().Run(stopCh)
}

func (l *LatencyStore) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
l.addNode(node)
}

func (l *LatencyStore) onNodeUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
node := newObj.(*corev1.Node)
l.updateNode(oldNode, node)
}

func (l *LatencyStore) onNodeDelete(obj interface{}) {
// Check if the object is a not a node
node, ok := obj.(*corev1.Node)
if !ok {
return
}

l.deleteNode(node)
}

func (l *LatencyStore) GetNodeIPLatencyEntryByKey(key string) (*NodeIPLatencyEntry, bool) {
l.mutex.RLock()
defer l.mutex.RUnlock()
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/monitortool/latency_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ var (
func TestNewLatencyStore(t *testing.T) {
k8sClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
nodeInformer := informerFactory.Core().V1().Nodes()
latencyStore := NewLatencyStore(nodeInformer, false)
latencyStore := NewLatencyStore(false)

stopCh := make(chan struct{})
defer close(stopCh)
Expand Down
41 changes: 37 additions & 4 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

config "antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -76,6 +76,8 @@ type MonitorTool struct {
// latencyConfigChanged is the channel to notify the latency config changed.
latencyConfigChanged chan struct{}

// The map of node name to node info, it will changed by node watcher
nodeInformer coreinformers.NodeInformer
// nodeLatencyMonitorInformer is the informer for the NodeLatencyMonitor CRD.
nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer
}
Expand All @@ -94,12 +96,19 @@ func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer,
isNetworkPolicyOnly bool) *MonitorTool {
m := &MonitorTool{
gatewayConfig: gatewayConfig,
latencyStore: NewLatencyStore(nodeInformer, isNetworkPolicyOnly),
latencyStore: NewLatencyStore(isNetworkPolicyOnly),
latencyConfig: &LatencyConfig{Enable: false},
latencyConfigChanged: make(chan struct{}, 1),
nodeInformer: nodeInformer,
nodeLatencyMonitorInformer: nlmInformer,
}

nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.onNodeAdd,
UpdateFunc: m.onNodeUpdate,
DeleteFunc: m.onNodeDelete,
})

// Add crd informer event handler for NodeLatencyMonitor
nlmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.onNodeLatencyMonitorAdd,
Expand All @@ -110,6 +119,30 @@ func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer,
return m
}

// onNodeAdd is the event handler for adding Node.
func (m *MonitorTool) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
m.latencyStore.addNode(node)
}

// onNodeUpdate is the event handler for updating Node.
func (m *MonitorTool) onNodeUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
node := newObj.(*corev1.Node)
m.latencyStore.updateNode(oldNode, node)
}

// onNodeDelete is the event handler for deleting Node.
func (m *MonitorTool) onNodeDelete(obj interface{}) {
// Check if the object is a not a node
node, ok := obj.(*corev1.Node)
if !ok {
return
}

m.latencyStore.deleteNode(node)
}

// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor.
func (m *MonitorTool) onNodeLatencyMonitorAdd(obj interface{}) {
nlm := obj.(*v1alpha2.NodeLatencyMonitor)
Expand Down Expand Up @@ -347,6 +380,7 @@ func (m *MonitorTool) Run(stopCh <-chan struct{}) {

// Start the monitor loop
go m.nodeLatencyMonitorInformer.Informer().Run(stopCh)
go m.nodeInformer.Informer().Run(stopCh)
go m.monitorLoop(ctx)
}

Expand Down Expand Up @@ -381,7 +415,6 @@ func (m *MonitorTool) monitorLoop(ctx context.Context) {
}

// Start new pingAll goroutine
go m.latencyStore.Run(innerStopCh)
go m.PingAll(innerStopCh)
go wait.Until(m.testPrint, m.latencyConfig.Interval, innerStopCh)
} else {
Expand Down

0 comments on commit f6c7714

Please sign in to comment.