Skip to content

Commit

Permalink
Fix rare issue concerning captures going stale on the AWS VPC CNI, as…
Browse files Browse the repository at this point in the history
… well as auto-resolve addresses with no currently seen DNS traffic in the cluster (#242)

Co-authored-by: omri.s <[email protected]>
  • Loading branch information
orishoshan and omris94 authored Sep 26, 2024
1 parent 35157de commit 88d4933
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 146 deletions.
1 change: 0 additions & 1 deletion src/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions src/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/mapper/pkg/clouduploader/cloud_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *CloudUploader) NotifyIntents(ctx context.Context, intents []intentsstor
toCloud.Intent.ServerAlias = &cloudclient.ServerAliasInput{Name: intent.Intent.Server.KubernetesService, Kind: lo.ToPtr(serviceidentity.KindService)}
}
// debug log all the fields of intent input one by one with their values
logrus.Debugf("intent CleintName: %s\t Namespace: %s\t ServerName: %s\t ServerNamespace: %s\t ClientWorkloadKind: %s\t ServerWorkloadKind: %s\t ServerAlias: %v", lo.FromPtr(toCloud.Intent.ClientName), lo.FromPtr(toCloud.Intent.Namespace), lo.FromPtr(toCloud.Intent.ServerName), lo.FromPtr(toCloud.Intent.ServerNamespace), lo.FromPtr(toCloud.Intent.ClientWorkloadKind), lo.FromPtr(toCloud.Intent.ServerWorkloadKind), lo.FromPtr(toCloud.Intent.ServerAlias))
logrus.Debugf("intent ClientName: %s\t Namespace: %s\t ServerName: %s\t ServerNamespace: %s\t ClientWorkloadKind: %s\t ServerWorkloadKind: %s\t ServerAlias: %v", lo.FromPtr(toCloud.Intent.ClientName), lo.FromPtr(toCloud.Intent.Namespace), lo.FromPtr(toCloud.Intent.ServerName), lo.FromPtr(toCloud.Intent.ServerNamespace), lo.FromPtr(toCloud.Intent.ClientWorkloadKind), lo.FromPtr(toCloud.Intent.ServerWorkloadKind), lo.FromPtr(toCloud.Intent.ServerAlias))

return toCloud
})
Expand Down
2 changes: 1 addition & 1 deletion src/mapper/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
DNSCacheItemsMaxCapacityKey = "dns-cache-items-max-capacity"
DNSCacheItemsMaxCapacityDefault = 100000
DNSClientIntentsUpdateIntervalKey = "dns-client-intents-update-interval"
DNSClientIntentsUpdateIntervalDefault = 1 * time.Second
DNSClientIntentsUpdateIntervalDefault = 100 * time.Millisecond
DNSClientIntentsUpdateEnabledKey = "dns-client-intents-update-enabled"
DNSClientIntentsUpdateEnabledDefault = true
ServiceCacheTTLDurationKey = "service-cache-ttl-duration"
Expand Down
41 changes: 20 additions & 21 deletions src/mapper/pkg/dnscache/dns_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,44 @@ package dnscache

import (
"context"
"github.com/jellydator/ttlcache/v3"
"github.com/otterize/network-mapper/src/mapper/pkg/config"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"net"
"time"
)

type DNSCache struct {
cache *ttlcache.Cache[string, string]
cache *TTLCache[string, string]
}

type Resolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
}

func NewDNSCache() *DNSCache {
capacity := viper.GetInt(config.DNSCacheItemsMaxCapacityKey)
dnsRecordCache := ttlcache.New[string, string](ttlcache.WithCapacity[string, string](uint64(capacity)))
go dnsRecordCache.Start()

lastCapacityReachedErrorPrint := time.Time{}
dnsRecordCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, string]) {
if reason == ttlcache.EvictionReasonCapacityReached && time.Since(lastCapacityReachedErrorPrint) > time.Minute {
logrus.Warningf("DNS cache capacity reached entries are being dropped, consider increasing config '%s'",
config.DNSCacheItemsMaxCapacityKey)
lastCapacityReachedErrorPrint = time.Now()
}
})
if capacity == 0 {
logrus.Panic("Capacity cannot be 0")
}
dnsRecordCache := NewTTLCache[string, string](capacity)

return &DNSCache{
cache: dnsRecordCache,
}
}

func (d *DNSCache) AddOrUpdateDNSData(dnsName string, ip string, ttlSeconds int) {
ttl := time.Duration(ttlSeconds) * time.Second
d.cache.Set(dnsName, ip, ttl)
func (d *DNSCache) AddOrUpdateDNSData(dnsName string, ip string, ttl time.Duration) {
d.cache.Insert(dnsName, ip, ttl)
}

func (d *DNSCache) GetResolvedIP(dnsName string) (string, bool) {
func (d *DNSCache) GetResolvedIPs(dnsName string) []string {
entry := d.cache.Get(dnsName)
if entry == nil {
return "", false
}
return entry.Value(), true
return entry
}

// CacheValue holds the value and its expiration time
type CacheValue[V any] struct {
Value V
Expiration time.Time
}
62 changes: 33 additions & 29 deletions src/mapper/pkg/dnscache/dns_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,29 @@ type DNSCacheTestSuite struct {
}

func (s *DNSCacheTestSuite) TearDownTest() {
viper.Reset()
viper.Set(config.DNSCacheItemsMaxCapacityKey, config.DNSCacheItemsMaxCapacityDefault)
}

func (s *DNSCacheTestSuite) TestDNSCache() {
cache := NewDNSCache()
cache.AddOrUpdateDNSData("good-news.com", IP1, 60)
ip, found := cache.GetResolvedIP("good-news.com")
s.Require().True(found)
s.Require().Equal(IP1, ip)

cache.AddOrUpdateDNSData("good-news.com", IP2, 60)
ip, found = cache.GetResolvedIP("good-news.com")
s.Require().True(found)
s.Require().Equal(IP2, ip)

_, found = cache.GetResolvedIP("bad-news.de")
s.Require().False(found)

cache.AddOrUpdateDNSData("bad-news.de", IP1, 60)
ip, found = cache.GetResolvedIP("bad-news.de")
s.Require().True(found)
s.Require().Equal(IP1, ip)
cache.AddOrUpdateDNSData("good-news.com", IP1, 60*time.Second)
ips := cache.GetResolvedIPs("good-news.com")
s.Require().Len(ips, 1)
s.Require().Equal(IP1, ips[0])

cache.AddOrUpdateDNSData("good-news.com", IP2, 60*time.Second)
ips = cache.GetResolvedIPs("good-news.com")
s.Require().Len(ips, 2)
s.Require().Contains(ips, IP1)
s.Require().Contains(ips, IP2)

ips = cache.GetResolvedIPs("bad-news.de")
s.Require().Len(ips, 0)

cache.AddOrUpdateDNSData("bad-news.de", IP1, 60*time.Second)
ips = cache.GetResolvedIPs("bad-news.de")
s.Require().Len(ips, 1)
s.Require().Equal(IP1, ips[0])
}

func (s *DNSCacheTestSuite) TestCapacityConfig() {
Expand All @@ -50,32 +51,35 @@ func (s *DNSCacheTestSuite) TestCapacityConfig() {
names := make([]string, 0)
for i := 0; i < capacityLimit+1; i++ {
dnsName := fmt.Sprintf("dns-%d.com", i)
cache.AddOrUpdateDNSData(dnsName, IP1, 60)
cache.AddOrUpdateDNSData(dnsName, IP1, 60*time.Second)
names = append(names, dnsName)
}

for i, dnsName := range names {
_, found := cache.GetResolvedIP(dnsName)
vals := cache.GetResolvedIPs(dnsName)
if i == 0 {
s.Require().False(found)
s.Require().Len(vals, 0)
} else {
s.Require().True(found)
s.Require().Len(vals, 1)
}
}
}

func (s *DNSCacheTestSuite) TestTTL() {
cache := NewDNSCache()

cache.AddOrUpdateDNSData("my-future-blog.de", IP1, 1)
ip, found := cache.GetResolvedIP("my-future-blog.de")
s.Require().True(found)
s.Require().Equal(IP1, ip)
cache.AddOrUpdateDNSData("my-future-blog.de", IP1, 1*time.Second)
ips := cache.GetResolvedIPs("my-future-blog.de")
s.Require().Len(ips, 1)
s.Require().Equal(IP1, ips[0])

// This is the only place where we sleep in the test, to make sure the TTL works as expected
time.Sleep(1100 * time.Millisecond)
_, found = cache.GetResolvedIP("my-future-blog.de")
s.Require().False(found)
time.Sleep(2 * time.Second)

cache.cache.cleanupExpired()

ips = cache.GetResolvedIPs("my-future-blog.de")
s.Require().Len(ips, 0)

}

Expand Down
174 changes: 174 additions & 0 deletions src/mapper/pkg/dnscache/ttl_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package dnscache

import (
"container/list"
"sync"
"time"
)

// CacheEntry represents an entry in the cache, linking the key with its list element for LRU
type CacheEntry[K comparable, V comparable] struct {
Key K
Value CacheValue[V]
}

// TTLCache is a generic TTL cache that stores unique items with individual TTLs and LRU eviction
type TTLCache[K comparable, V comparable] struct {
items map[K]map[V]*list.Element // Key to map of values, each value points to an LRU element
lru *list.List // List for LRU eviction, stores CacheEntry[K, V]
maxSize int // Maximum size of the cache
mu sync.Mutex
cleanupCh chan struct{}
}

// NewTTLCache creates a new TTL cache with the specified maxSize
func NewTTLCache[K comparable, V comparable](maxSize int) *TTLCache[K, V] {
cache := &TTLCache[K, V]{
items: make(map[K]map[V]*list.Element),
lru: list.New(),
maxSize: maxSize,
cleanupCh: make(chan struct{}),
}

// Start the cleanup process
go cache.startCleanup()

return cache
}

// Insert adds a unique value to the cache under the specified key with its own TTL
// and manages the LRU eviction when the cache exceeds the max size.
func (c *TTLCache[K, V]) Insert(key K, value V, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

// If the key doesn't exist, create an entry for it
if _, exists := c.items[key]; !exists {
c.items[key] = make(map[V]*list.Element)
}

// Check if the value already exists under this key and remove it from LRU if so
if elem, exists := c.items[key][value]; exists {
c.lru.Remove(elem)
}

// Insert or update the value with its expiration time and add it to the LRU list
cacheEntry := CacheEntry[K, V]{Key: key, Value: CacheValue[V]{Value: value, Expiration: time.Now().Add(ttl)}}
lruElem := c.lru.PushFront(cacheEntry)
c.items[key][value] = lruElem

// Manage the cache size, evict the least recently used item if needed
if c.lru.Len() > c.maxSize {
c.evict()
}

}

// evict removes the least recently used item from the cache
func (c *TTLCache[K, V]) evict() {
// Remove the least recently used item (which is at the back of the LRU list)
lruElem := c.lru.Back()
if lruElem == nil {
return
}

cacheEntry := lruElem.Value.(CacheEntry[K, V])
key, value := cacheEntry.Key, cacheEntry.Value

// Remove the value from the cache
if _, exists := c.items[key]; exists {
delete(c.items[key], value.Value)

// If no more values exist under this key, remove the key itself
if len(c.items[key]) == 0 {
delete(c.items, key)
}
}

// Remove from the LRU list
c.lru.Remove(lruElem)
}

// Get retrieves the values for a specific key and removes any expired values
// Returns a slice of valid values for the given key
func (c *TTLCache[K, V]) Get(key K) []V {
c.mu.Lock()
defer c.mu.Unlock()

// Check if the key exists
if _, exists := c.items[key]; !exists {
return make([]V, 0)
}

// Filter out expired values and prepare the result
var result []V
for value, lruElem := range c.items[key] {
cacheEntry := lruElem.Value.(CacheEntry[K, V])

// If the value has expired, remove it
if time.Now().After(c.lruValueExpiration(lruElem)) {
c.lru.Remove(lruElem)
delete(c.items[key], value)
continue
}

// Add valid values to the result
result = append(result, cacheEntry.Value.Value)

// Move the accessed item to the front of the LRU list (mark as recently used)
c.lru.MoveToFront(lruElem)
}

// If all values are expired, remove the key entirely
if len(c.items[key]) == 0 {
delete(c.items, key)
}

return result
}

// cleanupExpired removes expired values from the cache
func (c *TTLCache[K, V]) cleanupExpired() {
for key, values := range c.items {
for value, lruElem := range values {
// If a value has expired, remove it
if time.Now().After(c.lruValueExpiration(lruElem)) {
c.lru.Remove(lruElem)
delete(values, value)
}
}

// If all values are expired, remove the key entirely
if len(values) == 0 {
delete(c.items, key)
}
}
}

// lruValueExpiration gets the expiration time for a given LRU element
func (c *TTLCache[K, V]) lruValueExpiration(elem *list.Element) time.Time {
cacheEntry := elem.Value.(CacheEntry[K, V])
return cacheEntry.Value.Expiration
}

// startCleanup periodically cleans up expired items
func (c *TTLCache[K, V]) startCleanup() {
ticker := time.NewTicker(1 * time.Minute) // Cleanup interval
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.mu.Lock()
c.cleanupExpired()
c.mu.Unlock()
case <-c.cleanupCh:
return
}
}
}

// Stop stops the cache cleanup process
func (c *TTLCache[K, V]) Stop() {
close(c.cleanupCh)
}
Loading

0 comments on commit 88d4933

Please sign in to comment.