feat: adding the Dowait primitive in cache (#31578)

See: #31577

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/31610/head
Ted Xu 2024-03-27 16:17:15 +08:00 committed by GitHub
parent 92971707de
commit efa645db6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 189 additions and 28 deletions

View File

@ -4,15 +4,18 @@ import (
"container/list"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"golang.org/x/sync/singleflight"
"github.com/milvus-io/milvus/pkg/util/merr"
)
var (
ErrNoSuchItem = errors.New("no such item")
ErrNotEnoughSpace = errors.New("not enough space")
ErrNoSuchItem = merr.WrapErrServiceInternal("no such item")
ErrNotEnoughSpace = merr.WrapErrServiceInternal("not enough space")
ErrTimeOut = merr.WrapErrServiceInternal("time out")
)
type cacheItem[K comparable, V any] struct {
@ -21,21 +24,6 @@ type cacheItem[K comparable, V any] struct {
pinCount atomic.Int32
}
func newCacheItem[K comparable, V any](key K, value V) *cacheItem[K, V] {
return &cacheItem[K, V]{
key: key,
value: value,
}
}
func (item *cacheItem[K, V]) Unpin() {
item.pinCount.Dec()
}
func (i *cacheItem[K, V]) Value() V {
return i.value
}
type (
Loader[K comparable, V any] func(key K) (V, bool)
Finalizer[K comparable, V any] func(key K, value V) error
@ -48,10 +36,16 @@ type (
type Scavenger[K comparable] interface {
// Collect records entry additions, if there is room, return true, or else return false and a collector.
// The collector is a function which can be invoked repetedly, each invocation will test if there is enough
// room provided that all entries in the collector is evicted.
// room provided that all entries in the collector is evicted. Typically, the collector will get multiple false
// before it gets a true.
Collect(key K) (bool, func(K) bool)
// Throw records entry removals.
Throw(key K)
// Spare returns a collector function based on given key.
// The collector is a function which can be invoked repetedly, each invocation will test if there is enough
// room for all the pending entries if the thrown entry is evicted. Typically, the collector will get multiple true
// before it gets a false.
Spare(key K) func(K) bool
}
type LazyScavenger[K comparable] struct {
@ -84,8 +78,30 @@ func (s *LazyScavenger[K]) Throw(key K) {
s.size -= s.weight(key)
}
func (s *LazyScavenger[K]) Spare(key K) func(K) bool {
w := s.weight(key)
available := s.capacity - s.size + w
return func(k K) bool {
available -= s.weight(k)
return available >= 0
}
}
type Cache[K comparable, V any] interface {
Do(key K, doer func(V) error) error
DoWait(key K, timeout time.Duration, doer func(V) error) error
}
type Waiter[K comparable] struct {
key K
c *sync.Cond
}
func newWaiter[K comparable](key K) Waiter[K] {
return Waiter[K]{
key: key,
c: sync.NewCond(&sync.Mutex{}),
}
}
// lruCache extends the ccache library to provide pinning and unpinning of items.
@ -96,6 +112,8 @@ type lruCache[K comparable, V any] struct {
accessList *list.List
loaderSingleFlight singleflight.Group
waitQueue *list.List
loader Loader[K, V]
finalizer Finalizer[K, V]
scavenger Scavenger[K]
@ -157,6 +175,7 @@ func newLRUCache[K comparable, V any](
return &lruCache[K, V]{
items: make(map[K]*list.Element),
accessList: list.New(),
waitQueue: list.New(),
loaderSingleFlight: singleflight.Group{},
loader: loader,
finalizer: finalizer,
@ -170,17 +189,88 @@ func (c *lruCache[K, V]) Do(key K, doer func(V) error) error {
if err != nil {
return err
}
defer item.Unpin()
return doer(item.Value())
defer c.Unpin(key)
return doer(item.value)
}
func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] {
func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error) error {
timedWait := func(cond *sync.Cond, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
cond.L.Lock()
defer cond.L.Unlock()
defer close(c)
cond.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
var ele *list.Element
start := time.Now()
for {
item, err := c.getAndPin(key)
if err == nil {
if ele != nil {
c.rwlock.Lock()
c.waitQueue.Remove(ele)
c.rwlock.Unlock()
}
defer c.Unpin(key)
return doer(item.value)
} else if err != ErrNotEnoughSpace {
return err
}
if ele == nil {
// If no enough space, enqueue the key
c.rwlock.Lock()
waiter := newWaiter(key)
ele = c.waitQueue.PushBack(&waiter)
c.rwlock.Unlock()
}
// Wait for the key to be available
timeLeft := time.Until(start.Add(timeout))
if timeLeft <= 0 || timedWait(ele.Value.(*Waiter[K]).c, timeLeft) {
return ErrTimeOut
}
}
}
func (c *lruCache[K, V]) Unpin(key K) {
c.rwlock.Lock()
defer c.rwlock.Unlock()
e, ok := c.items[key]
if !ok {
return
}
item := e.Value.(*cacheItem[K, V])
item.pinCount.Dec()
if c.waitQueue.Len() > 0 {
// Notify waiters
collector := c.scavenger.Spare(key)
for e := c.waitQueue.Front(); e != nil; e = e.Next() {
w := e.Value.(*Waiter[K])
if ok := collector(w.key); ok {
w.c.Broadcast()
} else {
break
}
}
}
}
func (c *lruCache[K, V]) peekAndPin(key K) *cacheItem[K, V] {
c.rwlock.Lock()
defer c.rwlock.Unlock()
e, ok := c.items[key]
if ok {
item := e.Value.(*cacheItem[K, V])
c.accessList.MoveToFront(e)
item.pinCount.Inc()
return item
}
return nil
@ -188,8 +278,7 @@ func (c *lruCache[K, V]) peek(key K) *cacheItem[K, V] {
// GetAndPin gets and pins the given key if it exists
func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
if item := c.peek(key); item != nil {
item.pinCount.Inc()
if item := c.peekAndPin(key); item != nil {
return item, nil
}
@ -202,8 +291,7 @@ func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
strKey := fmt.Sprint(key)
item, err, _ := c.loaderSingleFlight.Do(strKey, func() (interface{}, error) {
if item := c.peek(key); item != nil {
item.pinCount.Inc()
if item := c.peekAndPin(key); item != nil {
return item, nil
}
@ -222,6 +310,7 @@ func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
if err == nil {
return item.(*cacheItem[K, V]), nil
}
return nil, err
}
return nil, ErrNoSuchItem
@ -261,7 +350,7 @@ func (c *lruCache[K, V]) setAndPin(key K, value V) (*cacheItem[K, V], error) {
c.rwlock.Lock()
defer c.rwlock.Unlock()
item := newCacheItem[K, V](key, value)
item := &cacheItem[K, V]{key: key, value: value}
item.pinCount.Inc()
// tryScavenge is done again since the load call is lock free.

View File

@ -2,11 +2,12 @@ package cache
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
func TestLRUCache(t *testing.T) {
@ -175,4 +176,75 @@ func TestLRUCacheConcurrency(t *testing.T) {
wg.Done()
assert.Equal(t, ErrNotEnoughSpace, err)
})
t.Run("test time out", func(t *testing.T) {
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}).WithCapacity(1).WithFinalizer(func(key, value int) error {
return nil
}).Build()
var wg sync.WaitGroup // Let key 1000 be blocked
var wg1 sync.WaitGroup // Make sure goroutine is started
wg.Add(1)
wg1.Add(1)
go cache.Do(1000, func(v int) error {
wg1.Done()
wg.Wait()
return nil
})
wg1.Wait()
err := cache.DoWait(1001, time.Nanosecond, func(v int) error {
return nil
})
wg.Done()
assert.Equal(t, ErrTimeOut, err)
})
t.Run("test wait", func(t *testing.T) {
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}).WithCapacity(1).WithFinalizer(func(key, value int) error {
return nil
}).Build()
var wg1 sync.WaitGroup // Make sure goroutine is started
wg1.Add(1)
go cache.Do(1000, func(v int) error {
wg1.Done()
time.Sleep(time.Second)
return nil
})
wg1.Wait()
err := cache.DoWait(1001, time.Second*2, func(v int) error {
return nil
})
assert.NoError(t, err)
})
t.Run("test wait race condition", func(t *testing.T) {
numEvict := new(atomic.Int32)
cache := NewCacheBuilder[int, int]().WithLoader(func(key int) (int, bool) {
return key, true
}).WithCapacity(5).WithFinalizer(func(key, value int) error {
numEvict.Add(1)
return nil
}).Build()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 100; j++ {
err := cache.DoWait(j, 2*time.Second, func(v int) error {
return nil
})
assert.NoError(t, err)
}
}(i)
}
wg.Wait()
})
}