mirror of https://github.com/milvus-io/milvus.git
Change LRU to a generic type (#20744)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/20689/head
parent
e35b523a99
commit
f76ea292d2
|
@ -40,7 +40,7 @@ var (
|
|||
type VectorChunkManager struct {
|
||||
cacheStorage ChunkManager
|
||||
vectorStorage ChunkManager
|
||||
cache *cache.LRU
|
||||
cache *cache.LRU[string, *mmap.ReaderAt]
|
||||
|
||||
insertCodec *InsertCodec
|
||||
|
||||
|
@ -68,14 +68,13 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto
|
|||
if cacheLimit <= 0 {
|
||||
return nil, errors.New("cache limit must be positive if cacheEnable")
|
||||
}
|
||||
c, err := cache.NewLRU(defaultLocalCacheSize, func(k cache.Key, v cache.Value) {
|
||||
r := v.(*mmap.ReaderAt)
|
||||
size := r.Len()
|
||||
err := r.Close()
|
||||
c, err := cache.NewLRU(defaultLocalCacheSize, func(k string, v *mmap.ReaderAt) {
|
||||
size := v.Len()
|
||||
err := v.Close()
|
||||
if err != nil {
|
||||
log.Error("Unmmap file failed", zap.Any("file", k))
|
||||
}
|
||||
err = cacheStorage.Remove(ctx, k.(string))
|
||||
err = cacheStorage.Remove(ctx, k)
|
||||
if err != nil {
|
||||
log.Error("cache storage remove file failed", zap.Any("file", k))
|
||||
}
|
||||
|
@ -191,9 +190,8 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin
|
|||
func (vcm *VectorChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
|
||||
if vcm.cacheEnable {
|
||||
if r, ok := vcm.cache.Get(filePath); ok {
|
||||
at := r.(*mmap.ReaderAt)
|
||||
p := make([]byte, at.Len())
|
||||
_, err := at.ReadAt(p, 0)
|
||||
p := make([]byte, r.Len())
|
||||
_, err := r.ReadAt(p, 0)
|
||||
if err != nil {
|
||||
return p, err
|
||||
}
|
||||
|
@ -241,7 +239,7 @@ func (vcm *VectorChunkManager) ListWithPrefix(ctx context.Context, prefix string
|
|||
func (vcm *VectorChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
|
||||
if vcm.cacheEnable && vcm.cache != nil {
|
||||
if r, ok := vcm.cache.Get(filePath); ok {
|
||||
return r.(*mmap.ReaderAt), nil
|
||||
return r, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("the file mmap has not been cached")
|
||||
|
@ -255,9 +253,8 @@ func (vcm *VectorChunkManager) Reader(ctx context.Context, filePath string) (Fil
|
|||
func (vcm *VectorChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) {
|
||||
if vcm.cacheEnable {
|
||||
if r, ok := vcm.cache.Get(filePath); ok {
|
||||
at := r.(*mmap.ReaderAt)
|
||||
p := make([]byte, length)
|
||||
_, err := at.ReadAt(p, off)
|
||||
_, err := r.ReadAt(p, off)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -24,15 +24,15 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
type LRU struct {
|
||||
type LRU[K comparable, V any] struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
evictList *list.List
|
||||
items map[interface{}]*list.Element
|
||||
items map[K]*list.Element
|
||||
capacity int
|
||||
onEvicted func(k Key, v Value)
|
||||
onEvicted func(k K, v V)
|
||||
m sync.RWMutex
|
||||
evictedCh chan *entry
|
||||
evictedCh chan *entry[K, V]
|
||||
stats *Stats
|
||||
}
|
||||
|
||||
|
@ -54,36 +54,31 @@ func (s *Stats) String() string {
|
|||
return fmt.Sprintf("lru cache hit ratio = %f, evictedRatio = %f", hitRatio, evictedRatio)
|
||||
}
|
||||
|
||||
type Key interface {
|
||||
}
|
||||
type Value interface {
|
||||
type entry[K comparable, V any] struct {
|
||||
key K
|
||||
value V
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
key Key
|
||||
value Value
|
||||
}
|
||||
|
||||
func NewLRU(capacity int, onEvicted func(k Key, v Value)) (*LRU, error) {
|
||||
func NewLRU[K comparable, V any](capacity int, onEvicted func(k K, v V)) (*LRU[K, V], error) {
|
||||
if capacity <= 0 {
|
||||
return nil, errors.New("cache size must be positive")
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &LRU{
|
||||
c := &LRU[K, V]{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
capacity: capacity,
|
||||
evictList: list.New(),
|
||||
items: make(map[interface{}]*list.Element),
|
||||
items: make(map[K]*list.Element),
|
||||
onEvicted: onEvicted,
|
||||
evictedCh: make(chan *entry, 16),
|
||||
evictedCh: make(chan *entry[K, V], 16),
|
||||
stats: &Stats{},
|
||||
}
|
||||
go c.evictedWorker()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *LRU) evictedWorker() {
|
||||
func (c *LRU[K, V]) evictedWorker() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
|
@ -98,16 +93,16 @@ func (c *LRU) evictedWorker() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *LRU) Add(key, value Value) {
|
||||
func (c *LRU[K, V]) Add(key K, value V) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
c.stats.writeCount++
|
||||
if e, ok := c.items[key]; ok {
|
||||
c.evictList.MoveToFront(e)
|
||||
e.Value.(*entry).value = value
|
||||
e.Value.(*entry[K, V]).value = value
|
||||
return
|
||||
}
|
||||
e := &entry{key: key, value: value}
|
||||
e := &entry[K, V]{key: key, value: value}
|
||||
listE := c.evictList.PushFront(e)
|
||||
c.items[key] = listE
|
||||
|
||||
|
@ -116,7 +111,7 @@ func (c *LRU) Add(key, value Value) {
|
|||
oldestE := c.evictList.Back()
|
||||
if oldestE != nil {
|
||||
c.evictList.Remove(oldestE)
|
||||
kv := oldestE.Value.(*entry)
|
||||
kv := oldestE.Value.(*entry[K, V])
|
||||
delete(c.items, kv.key)
|
||||
if c.onEvicted != nil {
|
||||
c.evictedCh <- kv
|
||||
|
@ -125,25 +120,27 @@ func (c *LRU) Add(key, value Value) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *LRU) Get(key Key) (value Value, ok bool) {
|
||||
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
c.stats.readCount++
|
||||
if e, ok := c.items[key]; ok {
|
||||
c.stats.hitCount++
|
||||
c.evictList.MoveToFront(e)
|
||||
kv := e.Value.(*entry)
|
||||
kv := e.Value.(*entry[K, V])
|
||||
return kv.value, true
|
||||
}
|
||||
return nil, false
|
||||
|
||||
var zeroV V
|
||||
return zeroV, false
|
||||
}
|
||||
|
||||
func (c *LRU) Remove(key Key) {
|
||||
func (c *LRU[K, V]) Remove(key K) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
if e, ok := c.items[key]; ok {
|
||||
c.evictList.Remove(e)
|
||||
kv := e.Value.(*entry)
|
||||
kv := e.Value.(*entry[K, V])
|
||||
delete(c.items, kv.key)
|
||||
if c.onEvicted != nil {
|
||||
c.evictedCh <- kv
|
||||
|
@ -151,48 +148,48 @@ func (c *LRU) Remove(key Key) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *LRU) Contains(key Key) bool {
|
||||
func (c *LRU[K, V]) Contains(key K) bool {
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
_, ok := c.items[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *LRU) Keys() []Key {
|
||||
func (c *LRU[K, V]) Keys() []K {
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
keys := make([]Key, len(c.items))
|
||||
keys := make([]K, len(c.items))
|
||||
i := 0
|
||||
for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() {
|
||||
keys[i] = ent.Value.(*entry).key
|
||||
keys[i] = ent.Value.(*entry[K, V]).key
|
||||
i++
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (c *LRU) Len() int {
|
||||
func (c *LRU[K, V]) Len() int {
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
return c.evictList.Len()
|
||||
}
|
||||
|
||||
func (c *LRU) Capacity() int {
|
||||
func (c *LRU[K, V]) Capacity() int {
|
||||
return c.capacity
|
||||
}
|
||||
|
||||
func (c *LRU) Purge() {
|
||||
func (c *LRU[K, V]) Purge() {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
for k, v := range c.items {
|
||||
if c.onEvicted != nil {
|
||||
c.evictedCh <- v.Value.(*entry)
|
||||
c.evictedCh <- v.Value.(*entry[K, V])
|
||||
}
|
||||
delete(c.items, k)
|
||||
}
|
||||
c.evictList.Init()
|
||||
}
|
||||
|
||||
func (c *LRU) Resize(capacity int) int {
|
||||
func (c *LRU[K, V]) Resize(capacity int) int {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
c.capacity = capacity
|
||||
|
@ -204,7 +201,7 @@ func (c *LRU) Resize(capacity int) int {
|
|||
oldestE := c.evictList.Back()
|
||||
if oldestE != nil {
|
||||
c.evictList.Remove(oldestE)
|
||||
kv := oldestE.Value.(*entry)
|
||||
kv := oldestE.Value.(*entry[K, V])
|
||||
delete(c.items, kv.key)
|
||||
if c.onEvicted != nil {
|
||||
c.evictedCh <- kv
|
||||
|
@ -214,18 +211,23 @@ func (c *LRU) Resize(capacity int) int {
|
|||
return diff
|
||||
}
|
||||
|
||||
func (c *LRU) GetOldest() (Key, Value, bool) {
|
||||
func (c *LRU[K, V]) GetOldest() (K, V, bool) {
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
ent := c.evictList.Back()
|
||||
if ent != nil {
|
||||
kv := ent.Value.(*entry)
|
||||
kv := ent.Value.(*entry[K, V])
|
||||
return kv.key, kv.value, true
|
||||
}
|
||||
return nil, nil, false
|
||||
|
||||
var (
|
||||
zeroK K
|
||||
zeroV V
|
||||
)
|
||||
return zeroK, zeroV, false
|
||||
}
|
||||
|
||||
func (c *LRU) Close() {
|
||||
func (c *LRU[K, V]) Close() {
|
||||
c.Purge()
|
||||
c.cancel()
|
||||
remain := len(c.evictedCh)
|
||||
|
@ -238,6 +240,6 @@ func (c *LRU) Close() {
|
|||
close(c.evictedCh)
|
||||
}
|
||||
|
||||
func (c *LRU) Stats() *Stats {
|
||||
func (c *LRU[K, V]) Stats() *Stats {
|
||||
return c.stats
|
||||
}
|
||||
|
|
|
@ -25,22 +25,22 @@ import (
|
|||
)
|
||||
|
||||
func TestNewLRU(t *testing.T) {
|
||||
c, err := NewLRU(1, nil)
|
||||
c, err := NewLRU[int, int](1, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, c)
|
||||
|
||||
c, err = NewLRU(0, nil)
|
||||
c, err = NewLRU[int, int](0, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, c)
|
||||
|
||||
c, err = NewLRU(-1, nil)
|
||||
c, err = NewLRU[int, int](-1, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, c)
|
||||
}
|
||||
|
||||
func TestLRU_Add(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
|
||||
testKey1 := "test_key_1"
|
||||
|
@ -77,7 +77,7 @@ func TestLRU_Add(t *testing.T) {
|
|||
|
||||
v, ok = c.Get(testKey2)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, v)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
return atomic.LoadInt32(&evicted) == 1
|
||||
|
@ -86,7 +86,7 @@ func TestLRU_Add(t *testing.T) {
|
|||
|
||||
func TestLRU_Contains(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(1, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
|
||||
testKey1 := "test_key_1"
|
||||
|
@ -112,7 +112,7 @@ func TestLRU_Contains(t *testing.T) {
|
|||
|
||||
func TestLRU_Get(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(1, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(1, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
|
||||
testKey1 := "test_key_1"
|
||||
|
@ -132,7 +132,7 @@ func TestLRU_Get(t *testing.T) {
|
|||
|
||||
v, ok = c.Get(testKey1)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, v)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
return atomic.LoadInt32(&evicted) == 1
|
||||
|
@ -141,7 +141,7 @@ func TestLRU_Get(t *testing.T) {
|
|||
|
||||
func TestLRU_GetOldest(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
|
||||
testKey1 := "test_key_1"
|
||||
|
@ -153,8 +153,8 @@ func TestLRU_GetOldest(t *testing.T) {
|
|||
|
||||
k, v, ok := c.GetOldest()
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, k)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, k)
|
||||
assert.Empty(t, v)
|
||||
|
||||
c.Add(testKey1, testValue1)
|
||||
k, v, ok = c.GetOldest()
|
||||
|
@ -190,7 +190,7 @@ func TestLRU_GetOldest(t *testing.T) {
|
|||
|
||||
func TestLRU_Keys(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
|
||||
testKey1 := "test_key_1"
|
||||
|
@ -222,7 +222,7 @@ func TestLRU_Keys(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLRU_Len(t *testing.T) {
|
||||
c, err := NewLRU(2, nil)
|
||||
c, err := NewLRU[string, string](2, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -242,7 +242,7 @@ func TestLRU_Len(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLRU_Capacity(t *testing.T) {
|
||||
c, err := NewLRU(5, nil)
|
||||
c, err := NewLRU[string, string](5, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -264,7 +264,7 @@ func TestLRU_Capacity(t *testing.T) {
|
|||
|
||||
func TestLRU_Purge(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -291,7 +291,7 @@ func TestLRU_Purge(t *testing.T) {
|
|||
|
||||
func TestLRU_Remove(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -315,7 +315,7 @@ func TestLRU_Remove(t *testing.T) {
|
|||
|
||||
func TestLRU_RemoveOldest(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -341,11 +341,11 @@ func TestLRU_RemoveOldest(t *testing.T) {
|
|||
|
||||
v, ok = c.Get(testKey1)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, v)
|
||||
|
||||
v, ok = c.Get(testKey2)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, v)
|
||||
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
assert.Eventually(t, func() bool {
|
||||
|
@ -356,7 +356,7 @@ func TestLRU_RemoveOldest(t *testing.T) {
|
|||
|
||||
func TestLRU_Resize(t *testing.T) {
|
||||
evicted := int32(0)
|
||||
c, err := NewLRU(2, func(Key, Value) { atomic.AddInt32(&evicted, 1) })
|
||||
c, err := NewLRU(2, func(string, string) { atomic.AddInt32(&evicted, 1) })
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, c.Len(), 0)
|
||||
|
||||
|
@ -381,7 +381,7 @@ func TestLRU_Resize(t *testing.T) {
|
|||
|
||||
v, ok = c.Get(testKey1)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, v)
|
||||
assert.Empty(t, v)
|
||||
|
||||
v, ok = c.Get(testKey2)
|
||||
assert.True(t, ok)
|
||||
|
|
Loading…
Reference in New Issue