Skip to content

Commit

Permalink
move to using rwmutex for selector
Browse files Browse the repository at this point in the history
  • Loading branch information
asim committed Dec 18, 2018
1 parent c2cc03a commit 770c16a
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions selector/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type cacheSelector struct {
ttl time.Duration

// registry cache
sync.Mutex
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time

Expand Down Expand Up @@ -81,13 +81,12 @@ func (c *cacheSelector) del(service string) {
}

func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// read lock for the duration
c.RLock()

// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}

// get does the actual request for a service
Expand All @@ -100,15 +99,24 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
}

// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()

return services, nil
}

// check the cache first
services, ok := c.cache[service]
// make a copy
cp := c.cp(services)

// cache miss or no services
if !ok || len(services) == 0 {
// unlock the read
c.RUnlock()

// get and return services
return get(service)
}

Expand All @@ -117,15 +125,22 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {

// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
// unlock the read
c.RUnlock()

// return servics
return cp, nil
}

// unlock read
c.RUnlock()

// expired entry so get service
services, err := get(service)
rservices, err := get(service)

// no error then return error
// no error then return services
if err == nil {
return services, nil
return rservices, nil
}

// not found error then return
Expand All @@ -135,8 +150,8 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {

// other error

// return expired cache as last resort
return c.cp(services), nil
// return expired cache copy as last resort
return cp, nil
}

func (c *cacheSelector) set(service string, services []*registry.Service) {
Expand Down Expand Up @@ -257,6 +272,18 @@ func (c *cacheSelector) update(res *registry.Result) {
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()

// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()

for {
// exit early if already dead
if c.quit() {
Expand Down

0 comments on commit 770c16a

Please sign in to comment.