mirror of https://github.com/milvus-io/milvus.git
Write the cache file to the cacheStorage.rootpath dir (#25715)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/25982/head
parent
d7cd1c849d
commit
f33451b3d8
|
@ -19,6 +19,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -91,9 +92,10 @@ func (vcm *VectorChunkManager) initCache(ctx context.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("close mmap file failed", zap.Any("file", filePath))
|
log.Error("close mmap file failed", zap.Any("file", filePath))
|
||||||
}
|
}
|
||||||
err = vcm.cacheStorage.Remove(ctx, filePath)
|
localPath := path.Join(vcm.cacheStorage.RootPath(), filePath)
|
||||||
|
err = vcm.cacheStorage.Remove(ctx, localPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("cache storage remove file failed", zap.Any("file", filePath))
|
log.Error("cache storage remove file failed", zap.Any("file", localPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
vcm.cacheSizeMutex.Lock()
|
vcm.cacheSizeMutex.Lock()
|
||||||
|
@ -173,12 +175,13 @@ func (vcm *VectorChunkManager) readFile(ctx context.Context, filePath string) (*
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = vcm.cacheStorage.Write(ctx, filePath, results)
|
localPath := path.Join(vcm.cacheStorage.RootPath(), filePath)
|
||||||
|
err = vcm.cacheStorage.Write(ctx, localPath, results)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := vcm.cacheStorage.Mmap(ctx, filePath)
|
r, err := vcm.cacheStorage.Mmap(ctx, localPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,23 +115,21 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob {
|
||||||
return blobs
|
return blobs
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildVectorChunkManager(localPath string, localCacheEnable bool) (*VectorChunkManager, context.CancelFunc, error) {
|
func buildVectorChunkManager(ctx context.Context, localPath string, localCacheEnable bool) (*VectorChunkManager, error) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
bucketName := "vector-chunk-manager"
|
bucketName := "vector-chunk-manager"
|
||||||
|
|
||||||
rcm, err := newMinIOChunkManager(ctx, bucketName, "")
|
rcm, err := newMinIOChunkManager(ctx, bucketName, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, cancel, err
|
return nil, err
|
||||||
}
|
}
|
||||||
lcm := NewLocalChunkManager(RootPath(localPath))
|
lcm := NewLocalChunkManager(RootPath(localPath))
|
||||||
|
|
||||||
vcm, err := NewVectorChunkManager(ctx, lcm, rcm, 16, localCacheEnable)
|
vcm, err := NewVectorChunkManager(ctx, lcm, rcm, 16, localCacheEnable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, cancel, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return vcm, cancel, nil
|
return vcm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var Params = paramtable.Get()
|
var Params = paramtable.Get()
|
||||||
|
@ -171,7 +169,7 @@ func TestVectorChunkManager_GetPath(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
localCaches := []bool{true, false}
|
localCaches := []bool{true, false}
|
||||||
for _, localCache := range localCaches {
|
for _, localCache := range localCaches {
|
||||||
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
|
vcm, err := buildVectorChunkManager(ctx, localPath, localCache)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, vcm)
|
assert.NotNil(t, vcm)
|
||||||
|
|
||||||
|
@ -190,7 +188,6 @@ func TestVectorChunkManager_GetPath(t *testing.T) {
|
||||||
|
|
||||||
err = vcm.RemoveWithPrefix(ctx, localPath)
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
cancel()
|
|
||||||
vcm.Close()
|
vcm.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -200,7 +197,7 @@ func TestVectorChunkManager_GetSize(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
localCaches := []bool{true, false}
|
localCaches := []bool{true, false}
|
||||||
for _, localCache := range localCaches {
|
for _, localCache := range localCaches {
|
||||||
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
|
vcm, err := buildVectorChunkManager(ctx, localPath, localCache)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, vcm)
|
assert.NotNil(t, vcm)
|
||||||
|
|
||||||
|
@ -219,7 +216,6 @@ func TestVectorChunkManager_GetSize(t *testing.T) {
|
||||||
|
|
||||||
err = vcm.RemoveWithPrefix(ctx, localPath)
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
cancel()
|
|
||||||
vcm.Close()
|
vcm.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,7 +226,7 @@ func TestVectorChunkManager_Write(t *testing.T) {
|
||||||
|
|
||||||
localCaches := []bool{true, false}
|
localCaches := []bool{true, false}
|
||||||
for _, localCache := range localCaches {
|
for _, localCache := range localCaches {
|
||||||
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
|
vcm, err := buildVectorChunkManager(ctx, localPath, localCache)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, vcm)
|
assert.NotNil(t, vcm)
|
||||||
|
|
||||||
|
@ -259,7 +255,6 @@ func TestVectorChunkManager_Write(t *testing.T) {
|
||||||
|
|
||||||
err = vcm.RemoveWithPrefix(ctx, localPath)
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
cancel()
|
|
||||||
vcm.Close()
|
vcm.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,7 +265,7 @@ func TestVectorChunkManager_Remove(t *testing.T) {
|
||||||
|
|
||||||
localCaches := []bool{true, false}
|
localCaches := []bool{true, false}
|
||||||
for _, localCache := range localCaches {
|
for _, localCache := range localCaches {
|
||||||
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
|
vcm, err := buildVectorChunkManager(ctx, localPath, localCache)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, vcm)
|
assert.NotNil(t, vcm)
|
||||||
|
|
||||||
|
@ -305,7 +300,6 @@ func TestVectorChunkManager_Remove(t *testing.T) {
|
||||||
|
|
||||||
err = vcm.RemoveWithPrefix(ctx, localPath)
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
cancel()
|
|
||||||
vcm.Close()
|
vcm.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,13 +342,45 @@ func TestVectorChunkManager_Remove_Fail(t *testing.T) {
|
||||||
assert.Error(t, vcm.RemoveWithPrefix(ctx, "test"))
|
assert.Error(t, vcm.RemoveWithPrefix(ctx, "test"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestVectorChunkManager_LocalPath(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
vcm, err := buildVectorChunkManager(ctx, localPath, true)
|
||||||
|
assert.NotNil(t, vcm)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
meta := initMeta()
|
||||||
|
binlogs := initBinlogFile(meta)
|
||||||
|
keys := make([]string, len(binlogs))
|
||||||
|
for i, binlog := range binlogs {
|
||||||
|
err = vcm.vectorStorage.Write(ctx, binlog.Key, binlog.Value)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
keys[i] = binlog.Key
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache file to cacheStorage
|
||||||
|
_, err = vcm.Read(ctx, keys[0])
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// check file under cacheStorage.rootPath
|
||||||
|
absLocalPath := path.Join(vcm.cacheStorage.RootPath(), keys[0])
|
||||||
|
exit, err := vcm.cacheStorage.Exist(ctx, absLocalPath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, exit)
|
||||||
|
|
||||||
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
vcm.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func TestVectorChunkManager_Read(t *testing.T) {
|
func TestVectorChunkManager_Read(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
localCaches := []bool{true, false}
|
localCaches := []bool{true, false}
|
||||||
for _, localCache := range localCaches {
|
for _, localCache := range localCaches {
|
||||||
vcm, cancel, err := buildVectorChunkManager(localPath, localCache)
|
vcm, err := buildVectorChunkManager(ctx, localPath, localCache)
|
||||||
assert.NotNil(t, vcm)
|
assert.NotNil(t, vcm)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -453,7 +479,6 @@ func TestVectorChunkManager_Read(t *testing.T) {
|
||||||
err = vcm.RemoveWithPrefix(ctx, localPath)
|
err = vcm.RemoveWithPrefix(ctx, localPath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
cancel()
|
|
||||||
vcm.Close()
|
vcm.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue