Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR for #6722 -- cache TTLs for individual IP addresses in DNS responses. #6732

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
164 changes: 113 additions & 51 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,16 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool {
// expirationTime of the records, which is the DNS response
// receiving time plus lowest applicable TTL.
type dnsMeta struct {
expirationTime time.Time
//expirationTime time.Time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove if no longer needed

// Key for responseIPs is the string representation of the IP.
// It helps to quickly identify IP address updates when a
// new DNS response is received.
responseIPs map[string]net.IP
responseIPs map[string]ipWithTTL
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
}

type ipWithTTL struct {
ip net.IP
expirationTime time.Time
}

// subscriber is a entity that subsribes for datapath rule realization
Expand Down Expand Up @@ -253,8 +258,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP {
}
for fqdn := range fqdnsMatched {
if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok {
for _, ip := range dnsMeta.responseIPs {
matchedIPs = append(matchedIPs, ip)
for _, ipWithMetaData := range dnsMeta.responseIPs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/ipWithMetaData/ipData

matchedIPs = append(matchedIPs, ipWithMetaData.ip)
}
}
}
Expand Down Expand Up @@ -405,80 +410,118 @@ func (f *fqdnController) deleteRuleSelectedPods(ruleID string) error {

func (f *fqdnController) onDNSResponse(
fqdn string,
responseIPs map[string]net.IP,
lowestTTL uint32,
responseIPs map[string]ipWithTTL,
waitCh chan error,
) {
currentTime := time.Now()
if len(responseIPs) == 0 {
klog.V(4).InfoS("FQDN was not resolved to any addresses, skip updating DNS cache", "fqdn", fqdn)
if waitCh != nil {
waitCh <- nil
}
return
}
// mustCacheResponse is only true if the FQDN is already tracked by this
// controller, or it matches at least one fqdnSelectorItem from the policy rules.
// addressUpdate is only true if there has been an update in IP addresses
// corresponded with the FQDN.
mustCacheResponse, addressUpdate := false, false
recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second)

addressUpdate := false
f.fqdnSelectorMutex.Lock()
defer f.fqdnSelectorMutex.Unlock()
oldDNSMeta, exist := f.dnsEntryCache[fqdn]
ipMetaDataHolder := make(map[string]ipWithTTL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move the variable definitions before the lock, and keep if exist right beneath the existence check makes it more readable

Copy link
Member Author

@hkiiita hkiiita Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dyanngg Thank you , that was helpful in sense that i did realise that the code looked more readable with ease after this.

minTimeToReQuery := time.Unix(1<<63-62135596801, 999999999)

minTime := func(t1, t2 time.Time) time.Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: func earlierOf(t1, t2) / laterOf(t1, t2)

Copy link
Member Author

@hkiiita hkiiita Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dyanngg Yes, the names make more meaning.

if t1.Before(t2) {
return t1
}
return t2
}

maxTime := func(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}

if exist {
mustCacheResponse = true
for ipStr := range responseIPs {
if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok {
addressUpdate = true
break
// Data related to this domain exists in the cache.
// Besides presence of existing IPs, two scenarios : 1) May get New IPs 2) Some old IPs may be absent.

// check for new IPs and these new IPs need to be added with its new TTL value as received in response.
for ipStr, ipMeta := range responseIPs {
if _, exist := oldDNSMeta.responseIPs[ipStr]; !exist {
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ipMeta.ip,
expirationTime: ipMeta.expirationTime,
}
minTimeToReQuery = minTime(ipMeta.expirationTime, minTimeToReQuery)
}
}
for oldIPStr, oldIP := range oldDNSMeta.responseIPs {
if _, ok := responseIPs[oldIPStr]; !ok {
if oldDNSMeta.expirationTime.Before(time.Now()) {
// This IP entry has already expired and not seen in the latest DNS response.
// It should be removed from the cache.

for ipStr, ipMeta := range oldDNSMeta.responseIPs {
if _, exist := responseIPs[ipStr]; !exist {
if ipMeta.expirationTime.Before(currentTime) {
// this ip is expired and stale, remove it by not including it.
addressUpdate = true
} else {
// Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed.
responseIPs[oldIPStr] = oldIP
if oldDNSMeta.expirationTime.Before(recordTTL) {
recordTTL = oldDNSMeta.expirationTime
}
// It hasn't expired yet, so just retain it with its existing expirationTime.
ipMetaDataHolder[ipStr] = ipMeta
minTimeToReQuery = minTime(ipMeta.expirationTime, minTimeToReQuery)
}
} else {
// This old IP also exists in current response, so update it with max time between received time and its old cached time.
expTime := maxTime(responseIPs[ipStr].expirationTime, oldDNSMeta.responseIPs[ipStr].expirationTime)
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ipMeta.ip,
expirationTime: expTime,
}
minTimeToReQuery = minTime(expTime, minTimeToReQuery)
}
}

} else {
// First time seeing this domain.
// check if this needs to be tracked, by checking its presence in the rules.
// If a FQDN policy had been applied then there must be rule records but because it's
// not in cache hence its FQDN:SelectorItem mapping may not be present.
for selectorItem := range f.selectorItemToRuleIDs {
// Only track the FQDN if there is at least one fqdnSelectorItem matching it.
if selectorItem.matches(fqdn) {
mustCacheResponse, addressUpdate = true, true
f.setFQDNMatchSelector(fqdn, selectorItem)
for ipStr, ipMeta := range responseIPs {
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ipMeta.ip,
expirationTime: ipMeta.expirationTime,
}
minTimeToReQuery = minTime(ipMeta.expirationTime, minTimeToReQuery)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since thees lines appear four times at least, might make sense to make it a helper.

Copy link
Member Author

@hkiiita hkiiita Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dyanngg Yes, that was a nice improvement.

}
}
}
}
if mustCacheResponse {

if len(ipMetaDataHolder) != 0 {
addressUpdate = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. Isn't addressUpdate supposed to be determined by whether FQDN <-> IP mappings have changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dyanngg updated .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when no IP is changed, I think the map is not empty, why this is set to true?

f.dnsEntryCache[fqdn] = dnsMeta{
expirationTime: recordTTL,
responseIPs: responseIPs,
responseIPs: ipMetaDataHolder,
}
f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now()))
// The FQDN will be added to the queue only after `lowestTTL` value which
// would already have been derived using the minTTL logic.
f.dnsQueryQueue.AddAfter(fqdn, minTimeToReQuery.Sub(currentTime))
}
f.syncDirtyRules(fqdn, waitCh, addressUpdate)
}

// onDNSResponseMsg handles a DNS response message intercepted.
func (f *fqdnController) onDNSResponseMsg(dnsMsg *dns.Msg, waitCh chan error) {
fqdn, responseIPs, lowestTTL, err := f.parseDNSResponse(dnsMsg)
fqdn, responseIPs, err := f.parseDNSResponse(dnsMsg)
if err != nil {
klog.V(2).InfoS("Failed to parse DNS response")
if waitCh != nil {
waitCh <- fmt.Errorf("failed to parse DNS response: %v", err)
}
return
}
f.onDNSResponse(fqdn, responseIPs, lowestTTL, waitCh)
f.onDNSResponse(fqdn, responseIPs, waitCh)
}

// syncDirtyRules triggers rule syncs for rules that are affected by the FQDN of DNS response
Expand Down Expand Up @@ -594,38 +637,54 @@ func (f *fqdnController) runRuleSyncTracker(stopCh <-chan struct{}) {
}

// parseDNSResponse returns the FQDN, IP query result and lowest applicable TTL of a DNS response.
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]net.IP, uint32, error) {
func (f *fqdnController) parseDNSResponse(msg *dns.Msg) (string, map[string]ipWithTTL, error) {
if len(msg.Question) == 0 {
return "", nil, 0, fmt.Errorf("invalid DNS message")
return "", nil, fmt.Errorf("invalid DNS message")
}
fqdn := strings.ToLower(msg.Question[0].Name)
lowestTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs
responseIPs := map[string]net.IP{}
// Case 1: In the upcoming patch an admin would set this to a maximum value, which will be read from the configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that in with the minTTL feature, expirationTime will be maxTime(minTTL, r.Header().Ttl instead. However, without a minTTL set, it should still be minTime(maxConfiguredTTL, r.Header().Ttl)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dyanngg Get it now, and indeed i was actually getting a little confused as i was trying to put the upcoming patch related modifications here, which i understand now, will happen in upcoming days when i start working on the patch itself.

maxConfiguredTTL := uint32(math.MaxUint32) // a TTL must exist in the RRs
hkiiita marked this conversation as resolved.
Show resolved Hide resolved
responseIPs := map[string]ipWithTTL{}
currentTime := time.Now()
for _, ans := range msg.Answer {
switch r := ans.(type) {
case *dns.A:
if f.ipv4Enabled {
responseIPs[r.A.String()] = r.A
if r.Header().Ttl < lowestTTL {
lowestTTL = r.Header().Ttl
// So, using case 1 above , we may not make below comparison and just assign the max value as expirationTime ; ignoring the incoming dns ttl.
var expirationTime uint32
if r.Header().Ttl < maxConfiguredTTL {
Dyanngg marked this conversation as resolved.
Show resolved Hide resolved
expirationTime = maxConfiguredTTL
} else {
expirationTime = r.Header().Ttl
}
responseIPs[r.A.String()] = ipWithTTL{
ip: r.A,
expirationTime: currentTime.Add(time.Duration(expirationTime) * time.Second),
}

}
case *dns.AAAA:
if f.ipv6Enabled {
responseIPs[r.AAAA.String()] = r.AAAA
if r.Header().Ttl < lowestTTL {
lowestTTL = r.Header().Ttl
var expirationTime uint32
if r.Header().Ttl < maxConfiguredTTL {
expirationTime = maxConfiguredTTL
} else {
expirationTime = r.Header().Ttl
}
responseIPs[r.AAAA.String()] = ipWithTTL{
ip: r.AAAA,
expirationTime: currentTime.Add(time.Duration(expirationTime) * time.Second),
}
}
}
}
if len(responseIPs) > 0 {
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", lowestTTL)
klog.V(4).InfoS("Received DNS Packet with valid Answer", "IPs", responseIPs, "TTL", maxConfiguredTTL)
}
if strings.HasSuffix(fqdn, ".") {
fqdn = fqdn[:len(fqdn)-1]
}
return fqdn, responseIPs, lowestTTL, nil
return fqdn, responseIPs, nil
}

func (f *fqdnController) worker() {
Expand Down Expand Up @@ -662,24 +721,27 @@ func (f *fqdnController) lookupIP(ctx context.Context, fqdn string) error {

var errs []error

makeResponseIPs := func(ips []net.IP) map[string]net.IP {
responseIPs := make(map[string]net.IP)
makeResponseIPs := func(ips []net.IP) map[string]ipWithTTL {
responseIPs := make(map[string]ipWithTTL)
for _, ip := range ips {
responseIPs[ip.String()] = ip
responseIPs[ip.String()] = ipWithTTL{
ip: ip,
expirationTime: time.Now().Add(time.Duration(defaultTTL) * time.Second),
}
}
return responseIPs
}

if f.ipv4Enabled {
if ips, err := resolver.LookupIP(ctx, "ip4", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil)
} else {
errs = append(errs, fmt.Errorf("DNS request failed for IPv4: %w", err))
}
}
if f.ipv6Enabled {
if ips, err := resolver.LookupIP(ctx, "ip6", fqdn); err == nil {
f.onDNSResponse(fqdn, makeResponseIPs(ips), defaultTTL, nil)
f.onDNSResponse(fqdn, makeResponseIPs(ips), nil)
} else {
errs = append(errs, fmt.Errorf("DNS request failed for IPv6: %w", err))
}
Expand Down
90 changes: 86 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ func TestGetIPsForFQDNSelectors(t *testing.T) {
},
existingDNSCache: map[string]dnsMeta{
"test.antrea.io": {
responseIPs: map[string]net.IP{
"127.0.0.1": net.ParseIP("127.0.0.1"),
"192.155.12.1": net.ParseIP("192.155.12.1"),
"192.158.1.38": net.ParseIP("192.158.1.38"),
responseIPs: map[string]ipWithTTL{
"127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()},
"192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()},
"192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()},
},
},
},
Expand Down Expand Up @@ -584,3 +584,85 @@ func TestSyncDirtyRules(t *testing.T) {
})
}
}

func TestOnDNSResponse(t *testing.T) {
currentTime := time.Now()
tests := []struct {
name string
existingDNSCache map[string]dnsMeta
responseIPs map[string]ipWithTTL
lowestTTL uint32
expectedIPs map[string]time.Time
}{
{
name: "new IP added and old IP retained",
existingDNSCache: map[string]dnsMeta{
"fqdn-test-pod.lfx.test": {
responseIPs: map[string]ipWithTTL{
// sample IP with some TTL
"192.0.2.1": {ip: net.ParseIP("192.0.2.1"), expirationTime: currentTime.Add(10 * time.Second)},
// sample IP with time simulating expired time, i thought that using negative time will
// simulate expired time as it will always equate to before when compared to any time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be more concise in your comments:

// expired IP that should be removed from the cache when onDNSResponse is called

"192.0.2.2": {ip: net.ParseIP("192.0.2.2"), expirationTime: currentTime.Add(-1 * time.Second)},
},
},
},
responseIPs: map[string]ipWithTTL{
// we get new IP
"192.0.2.3": {ip: net.ParseIP("192.0.2.3"), expirationTime: currentTime.Add(10 * time.Second)},
// and an exisiting IP
"192.0.2.1": {ip: net.ParseIP("192.0.2.1"), expirationTime: currentTime.Add(10 * time.Second)},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the test more useful if the new TTL for this IP is different from the current one? If the new TTL in the response is greater than the existing one, the new one should be sued. If it is less than the existing one, the current one should remain in use.

},
lowestTTL: 30,
expectedIPs: map[string]time.Time{
// old unexpired IP should continue to have its actual TTL.
"192.0.2.1": currentTime.Add(10 * time.Second),
// new ip should have new expirationTime that was passed (minTTL)
"192.0.2.3": currentTime.Add(10 * time.Second),
},
},
{
name: "old IP expired",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a strict subset of the previous test, and doesn't need to be included

there are 2 possible approaches here:

  1. have a single unit test case with multiple different IPs, ensuring that all the branches / cases of the onDNSResponse are tested
  2. have multiple test cases, each testing a different branch / scenario

Usually, we prefer the second approach because it makes test failures easier to identify / troubleshoot.

existingDNSCache: map[string]dnsMeta{
"fqdn-test-pod.lfx.test": {
responseIPs: map[string]ipWithTTL{
// an ip which is expired.
"192.0.2.1": {ip: net.ParseIP("192.0.2.1"), expirationTime: currentTime.Add(-1 * time.Second)},
},
},
},
responseIPs: map[string]ipWithTTL{
"192.0.2.3": {ip: net.ParseIP("192.0.2.3"), expirationTime: currentTime.Add(10 * time.Second)},
},
lowestTTL: 30,
// so we should expect the removal of expired ip from our cache and presence of only this ip
expectedIPs: map[string]time.Time{
"192.0.2.3": currentTime.Add(10 * time.Second), // new IP
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
f, _ := newMockFQDNController(t, controller, nil) // server set as nil for testing purpose here .
f.dnsEntryCache = tc.existingDNSCache

f.onDNSResponse("fqdn-test-pod.lfx.test", tc.responseIPs, nil) //waitChan nil, though as per original function its nil only when queries are sent by pod and not fqdnController.

dnsMetaData := f.dnsEntryCache["fqdn-test-pod.lfx.test"]
if len(dnsMetaData.responseIPs) != len(tc.expectedIPs) {
t.Errorf("Expected %d IPs in cache, got %d", len(tc.expectedIPs), len(dnsMetaData.responseIPs))
}

for ipStr, expectedTTL := range tc.expectedIPs {
if ipMeta, exists := dnsMetaData.responseIPs[ipStr]; !exists || ipMeta.expirationTime.Before(time.Now()) {
t.Errorf("Expected %s to be found with a valid TTL", ipStr)
} else if !ipMeta.expirationTime.Equal(expectedTTL) {
t.Errorf("Expected TTL for %s to be %v, got %v", ipStr, expectedTTL, ipMeta.expirationTime)
}
}
println("\n\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use a print statement in a test like this

})
}
}
7 changes: 4 additions & 3 deletions test/e2e/framework.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes are unrelated to the rest of the PR, and should be removed

Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ const (
defaultCHDatabaseURL = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"

statefulSetRestartAnnotationKey = "antrea-e2e/restartedAt"

iperfPort = 5201
iperfSvcPort = 9999
randomPatchAnnotationKey = "test.antrea.io/random-value"
annotationValueLen = 8
iperfPort = 5201
iperfSvcPort = 9999
)

type ClusterNode struct {
Expand Down
Loading