mirror of https://github.com/milvus-io/milvus.git
Fix LRU Cache Concurrency (#23041)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/23064/head
parent
2a02803392
commit
a3e4b74fe1
|
@ -21,6 +21,7 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -41,17 +42,17 @@ var (
|
|||
type VectorChunkManager struct {
|
||||
cacheStorage ChunkManager
|
||||
vectorStorage ChunkManager
|
||||
evictLock sync.RWMutex
|
||||
cache *cache.LRU[string, *mmap.ReaderAt]
|
||||
|
||||
insertCodec *InsertCodec
|
||||
|
||||
cacheEnable bool
|
||||
cacheLimit int64
|
||||
cacheSize int64
|
||||
cacheSizeMutex sync.Mutex
|
||||
fixSize bool // Prevent cache capactiy from changing too frequently
|
||||
|
||||
singleflight singleflight.Group
|
||||
|
||||
cacheEnable bool
|
||||
cacheLimit int64
|
||||
cacheSize int64
|
||||
fixSize bool // Prevent cache capactiy from changing too frequently
|
||||
}
|
||||
|
||||
var _ ChunkManager = (*VectorChunkManager)(nil)
|
||||
|
@ -59,13 +60,14 @@ var _ ChunkManager = (*VectorChunkManager)(nil)
|
|||
// NewVectorChunkManager create a new vector manager object.
|
||||
func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vectorStorage ChunkManager, schema *etcdpb.CollectionMeta, cacheLimit int64, cacheEnable bool) (*VectorChunkManager, error) {
|
||||
insertCodec := NewInsertCodec(schema)
|
||||
evictLock := sync.RWMutex{}
|
||||
vcm := &VectorChunkManager{
|
||||
cacheStorage: cacheStorage,
|
||||
vectorStorage: vectorStorage,
|
||||
|
||||
insertCodec: insertCodec,
|
||||
cacheEnable: cacheEnable,
|
||||
cacheLimit: cacheLimit,
|
||||
evictLock: evictLock,
|
||||
insertCodec: insertCodec,
|
||||
cacheEnable: cacheEnable,
|
||||
cacheLimit: cacheLimit,
|
||||
}
|
||||
if cacheEnable {
|
||||
if cacheLimit <= 0 {
|
||||
|
@ -73,6 +75,8 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto
|
|||
}
|
||||
c, err := cache.NewLRU(defaultLocalCacheSize, func(k string, v *mmap.ReaderAt) {
|
||||
size := v.Len()
|
||||
evictLock.Lock()
|
||||
defer evictLock.Unlock()
|
||||
err := v.Close()
|
||||
if err != nil {
|
||||
log.Error("Unmmap file failed", zap.Any("file", k))
|
||||
|
@ -81,9 +85,7 @@ func NewVectorChunkManager(ctx context.Context, cacheStorage ChunkManager, vecto
|
|||
if err != nil {
|
||||
log.Error("cache storage remove file failed", zap.Any("file", k))
|
||||
}
|
||||
vcm.cacheSizeMutex.Lock()
|
||||
vcm.cacheSize -= int64(size)
|
||||
vcm.cacheSizeMutex.Unlock()
|
||||
atomic.AddInt64(&vcm.cacheSize, -int64(size))
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -171,9 +173,7 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vcm.cacheSizeMutex.Lock()
|
||||
vcm.cacheSize += size
|
||||
vcm.cacheSizeMutex.Unlock()
|
||||
atomic.AddInt64(&vcm.cacheSize, size)
|
||||
if !vcm.fixSize {
|
||||
if vcm.cacheSize < vcm.cacheLimit {
|
||||
if vcm.cache.Len() == vcm.cache.Capacity() {
|
||||
|
@ -199,6 +199,8 @@ func (vcm *VectorChunkManager) readWithCache(ctx context.Context, filePath strin
|
|||
|
||||
// Read reads the pure vector data. If cached, it reads from local.
|
||||
func (vcm *VectorChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
|
||||
vcm.evictLock.RLock()
|
||||
defer vcm.evictLock.RUnlock()
|
||||
if vcm.cacheEnable {
|
||||
if r, ok := vcm.cache.Get(filePath); ok {
|
||||
p := make([]byte, r.Len())
|
||||
|
@ -262,6 +264,8 @@ func (vcm *VectorChunkManager) Reader(ctx context.Context, filePath string) (Fil
|
|||
|
||||
// ReadAt reads specific position data of vector. If cached, it reads from local.
|
||||
func (vcm *VectorChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) {
|
||||
vcm.evictLock.RLock()
|
||||
defer vcm.evictLock.RUnlock()
|
||||
if vcm.cacheEnable {
|
||||
if r, ok := vcm.cache.Get(filePath); ok {
|
||||
p := make([]byte, length)
|
||||
|
|
|
@ -112,8 +112,8 @@ func (c *LRU[K, V]) Add(key K, value V) {
|
|||
defer c.m.Unlock()
|
||||
|
||||
if c.closed() {
|
||||
// evict since cache closed
|
||||
c.onEvicted(key, value)
|
||||
// evict since cache closed asyncly
|
||||
go c.onEvicted(key, value)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -409,7 +409,10 @@ func TestLRU_closed(t *testing.T) {
|
|||
c.Close()
|
||||
|
||||
c.Add("testKey", "testValue")
|
||||
assert.Equal(t, int32(1), evicted)
|
||||
|
||||
for atomic.LoadInt32(&evicted) != 1 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
_, ok := c.Get("testKey")
|
||||
assert.False(t, ok)
|
||||
|
|
Loading…
Reference in New Issue