Skip to content

Commit

Permalink
spy: properly handle connreset errors while spying
Browse files Browse the repository at this point in the history
  • Loading branch information
streambinder committed Aug 31, 2021
1 parent 434932a commit ba99821
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions spy/spy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package spy

import (
"net"
"syscall"

"github.com/sirupsen/logrus"
_mold "github.com/streambinder/peephole/mold"
_salt "github.com/streambinder/peephole/salt"
Expand Down Expand Up @@ -47,18 +50,14 @@ func (s *Spy) Endpoints() (e []string) {
func (s *Spy) Watch() error {
peephole := make(chan *_salt.EventsResponse, len(s.endpoints))
for k, v := range s.endpoints {
go func(endpoint, token string, peephole chan *_salt.EventsResponse) {
if err := _salt.Events(endpoint, token, peephole); err != nil {
logrus.WithError(err).Fatalln("Unable to watch for events")
}
}(k, v, peephole)
go s.spy(k, v, peephole)
}

for {
e := <-peephole
logrus.WithFields(logrus.Fields{
"Endpoint": e.Endpoint,
"Tag": e.Tag,
"endpoint": e.Endpoint,
"tag": e.Tag,
}).Debugln("Event received")

o, err := _mold.Parse(e.Endpoint, e.Tag, e.Data)
Expand All @@ -73,11 +72,27 @@ func (s *Spy) Watch() error {
}

logrus.WithFields(logrus.Fields{
"Master": o.Master,
"Minion": o.Minion,
"Jid": o.Jid,
"Function": o.Function,
"master": o.Master,
"minion": o.Minion,
"jid": o.Jid,
"function": o.Function,
}).Println("Event persisted")
s.EventChan <- o
}
}

func (s *Spy) spy(endpoint, token string, peephole chan *_salt.EventsResponse) error {
for {
err := _salt.Events(endpoint, token, peephole)
if err == nil {
continue
}

netErr, ok := err.(*net.OpError)
if ok && netErr.Err.Error() == syscall.ECONNRESET.Error() {
logrus.WithField("endpoint", endpoint).Println("Connection reset: reattaching...")
}

logrus.WithError(err).Fatalln("Unable to watch for events")
}
}

0 comments on commit ba99821

Please sign in to comment.