Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the cache #817

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 54 additions & 113 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,17 @@ const (
defaultInterval = time.Minute
)

// Cache[T] is a thread-safe in-memory key/object store.
// It can be used to store objects with optional expiration.
// A function to extract the key from the object must be provided.
// Use the New function to create a new cache that is ready to use.
// Cache[T] is a thread-safe in-memory key/value store.
// It can be used to store items with optional expiration.
type Cache[T any] struct {
*cache[T]
// keyFunc is used to make the key for objects stored in and retrieved from index, and
// should be deterministic.
keyFunc KeyFunc[T]
}

// item is an item stored in the cache.
type item[T any] struct {
key string
// object is the item's object.
object T
// value is the item's value.
value T
// expiresAt is the item's expiration time.
expiresAt time.Time
}
Expand All @@ -61,41 +56,39 @@ type cache[T any] struct {
// It is initially true, and set to false when the items are not sorted.
sorted bool
// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
capacity int
metrics *cacheMetrics
janitor *janitor[T]
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
func New[T any](capacity int, opts ...Options) (*Cache[T], error) {
opt, err := makeOptions(opts...)
if err != nil {
return nil, fmt.Errorf("failed to apply options: %w", err)
}

c := &cache[T]{
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
labelsFunc: opt.labelsFunc,
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
janitor: &janitor[T]{
interval: opt.interval,
stop: make(chan bool),
},
}

if opt.registerer != nil {
c.metrics = newCacheMetrics(opt.registerer, opt.extraLabels...)
c.metrics = newCacheMetrics(opt.registerer)
}

C := &Cache[T]{cache: c, keyFunc: keyFunc}
C := &Cache[T]{cache: c}

if opt.interval > 0 {
go c.janitor.run(c)
Expand All @@ -104,8 +97,8 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
return C, nil
}

func makeOptions[T any](opts ...Options[T]) (*storeOptions[T], error) {
opt := storeOptions[T]{}
func makeOptions(opts ...Options) (*storeOptions, error) {
opt := storeOptions{}
for _, o := range opts {
err := o(&opt)
if err != nil {
Expand All @@ -131,14 +124,8 @@ func (c *Cache[T]) Close() error {
}

// Set an item in the cache, existing index will be overwritten.
// If the cache is full, Add will return an error.
func (c *Cache[T]) Set(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

// If the cache is full, an error is returned.
func (c *Cache[T]) Set(key string, value T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -147,14 +134,14 @@ func (c *Cache[T]) Set(object T) error {
}
_, found := c.index[key]
if found {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
return nil
}

if c.capacity > 0 && len(c.index) < c.capacity {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
recordItemIncrement(c.metrics)
Expand All @@ -165,10 +152,10 @@ func (c *Cache[T]) Set(object T) error {
return ErrCacheFull
}

func (c *cache[T]) set(key string, object T) {
func (c *cache[T]) set(key string, value T) {
item := item[T]{
key: key,
object: object,
value: value,
expiresAt: time.Now().Add(noExpiration),
}

Expand All @@ -181,86 +168,41 @@ func (c *cache[T]) set(key string, object T) {
c.items = append(c.items, &item)
}

// Get an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
func (c *Cache[T]) Get(object T) (item T, exists bool, err error) {
var res T
lvs := []string{}
if c.labelsFunc != nil {
lvs, err = c.labelsFunc(object, len(c.metrics.getExtraLabels()))
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidLabels, Err: err}
}
}
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidKey, Err: err}
}
item, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss, lvs...)
return res, false, nil
}
recordEvent(c.metrics, CacheEventTypeHit, lvs...)
return item, true, nil
}

// GetByKey returns the object for the given key.
func (c *Cache[T]) GetByKey(key string) (T, bool, error) {
var res T
index, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss)
return res, false, nil
}

recordEvent(c.metrics, CacheEventTypeHit)
return index, true, nil
}

func (c *cache[T]) get(key string) (T, bool, error) {
var res T
// Get returns a pointer to an item in the cache for the given key. If no item
// is found, it's a nil pointer.
// The caller can record cache hit or miss based on the result with
// Cache.RecordCacheEvent().
func (c *Cache[T]) Get(key string) (*T, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
recordRequest(c.metrics, StatusFailure)
return res, false, ErrCacheClosed
return nil, ErrCacheClosed
}
item, found := c.index[key]
if !found {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return nil, nil
}
if !item.expiresAt.IsZero() {
if item.expiresAt.Compare(time.Now()) < 0 {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return nil, nil
}
}
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return item.object, true, nil
// Copy the value to prevent writes to the cached item.
r := item.value
return &r, nil
}

// Delete an item from the cache. Does nothing if the key is not in the cache.
// It actually sets the item expiration to `now“, so that it will be deleted at
// the cleanup.
func (c *Cache[T]) Delete(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) Delete(key string) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand Down Expand Up @@ -355,13 +297,7 @@ func (c *cache[T]) Resize(size int) (int, error) {
}

// HasExpired returns true if the item has expired.
func (c *Cache[T]) HasExpired(object T) (bool, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return false, &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) HasExpired(key string) (bool, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand All @@ -387,13 +323,7 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) {
}

// SetExpiration sets the expiration for the given key.
func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) SetExpiration(key string, expiration time.Time) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -417,12 +347,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
// GetExpiration returns the expiration for the given key.
// Returns zero if the key is not in the cache or the item
// has already expired.
func (c *Cache[T]) GetExpiration(object T) (time.Time, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return time.Time{}, &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) GetExpiration(key string) (time.Time, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand Down Expand Up @@ -481,6 +406,22 @@ func (c *cache[T]) deleteExpired() {
c.mu.Unlock()
}

// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind,
// name and namespace of the associated object being reconciled.
func (c *Cache[T]) RecordCacheEvent(event, kind, name, namespace string) {
if c.metrics != nil {
c.metrics.incCacheEvents(event, kind, name, namespace)
}
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *Cache[T]) DeleteCacheEvent(event, kind, name, namespace string) {
if c.metrics != nil {
c.metrics.deleteCacheEvent(event, kind, name, namespace)
}
}

type janitor[T any] struct {
interval time.Duration
stop chan bool
Expand Down
Loading
Loading