mirror of https://github.com/milvus-io/milvus.git
Refine test chunkmanager of querynode v2 (#25888)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/26022/head
parent
2180ef180c
commit
7fec0d61cc
|
@ -594,6 +594,20 @@ func GenTestVectorFiledData(dType schemapb.DataType, fieldName string, fieldID i
|
|||
return ret
|
||||
}
|
||||
|
||||
func NewTestChunkManagerFactory(params *paramtable.ComponentParam, rootPath string) *storage.ChunkManagerFactory {
|
||||
return storage.NewChunkManagerFactory("minio",
|
||||
storage.RootPath(rootPath),
|
||||
storage.Address(params.MinioCfg.Address.GetValue()),
|
||||
storage.AccessKeyID(params.MinioCfg.AccessKeyID.GetValue()),
|
||||
storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey.GetValue()),
|
||||
storage.UseSSL(params.MinioCfg.UseSSL.GetAsBool()),
|
||||
storage.BucketName(params.MinioCfg.BucketName.GetValue()),
|
||||
storage.UseIAM(params.MinioCfg.UseIAM.GetAsBool()),
|
||||
storage.CloudProvider(params.MinioCfg.CloudProvider.GetValue()),
|
||||
storage.IAMEndpoint(params.MinioCfg.IAMEndpoint.GetValue()),
|
||||
storage.CreateBucket(true))
|
||||
}
|
||||
|
||||
func SaveBinLog(ctx context.Context,
|
||||
collectionID int64,
|
||||
partitionID int64,
|
||||
|
@ -625,7 +639,7 @@ func SaveBinLog(ctx context.Context,
|
|||
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
|
||||
//key := path.Join(defaultLocalStorage, "insert-log", k)
|
||||
key := path.Join(paramtable.Get().MinioCfg.RootPath.GetValue(), "insert-log", k)
|
||||
key := path.Join(chunkManager.RootPath(), "insert-log", k)
|
||||
kvs[key] = blob.Value
|
||||
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
|
||||
FieldID: fieldID,
|
||||
|
@ -648,7 +662,7 @@ func SaveBinLog(ctx context.Context,
|
|||
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
|
||||
//key := path.Join(defaultLocalStorage, "stats-log", k)
|
||||
key := path.Join(paramtable.Get().MinioCfg.RootPath.GetValue(), "stats-log", k)
|
||||
key := path.Join(chunkManager.RootPath(), "stats-log", k)
|
||||
kvs[key] = blob.Value[:]
|
||||
statsBinlog = append(statsBinlog, &datapb.FieldBinlog{
|
||||
FieldID: fieldID,
|
||||
|
@ -826,7 +840,7 @@ func SaveDeltaLog(collectionID int64,
|
|||
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID))
|
||||
key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID)
|
||||
//keyPath := path.Join(defaultLocalStorage, "delta-log", key)
|
||||
keyPath := path.Join(paramtable.Get().MinioCfg.RootPath.GetValue(), "delta-log", key)
|
||||
keyPath := path.Join(cm.RootPath(), "delta-log", key)
|
||||
kvs[keyPath] = blob.Value[:]
|
||||
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
|
||||
FieldID: pkFieldID,
|
||||
|
@ -878,7 +892,7 @@ func GenAndSaveIndex(collectionID, partitionID, segmentID, fieldID int64, msgLen
|
|||
indexPaths := make([]string, 0)
|
||||
for _, index := range serializedIndexBlobs {
|
||||
//indexPath := filepath.Join(defaultLocalStorage, strconv.Itoa(int(segmentID)), index.Key)
|
||||
indexPath := filepath.Join(paramtable.Get().MinioCfg.RootPath.GetValue(), "index_files",
|
||||
indexPath := filepath.Join(cm.RootPath(), "index_files",
|
||||
strconv.Itoa(int(segmentID)), index.Key)
|
||||
indexPaths = append(indexPaths, indexPath)
|
||||
err := cm.Write(context.Background(), indexPath, index.Value)
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
type ReduceSuite struct {
|
||||
suite.Suite
|
||||
chunkManager storage.ChunkManager
|
||||
rootPath string
|
||||
|
||||
collectionID int64
|
||||
partitionID int64
|
||||
|
@ -56,7 +57,8 @@ func (suite *ReduceSuite) SetupTest() {
|
|||
ctx := context.Background()
|
||||
msgLength := 100
|
||||
|
||||
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(paramtable.Get())
|
||||
suite.rootPath = suite.T().Name()
|
||||
chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
|
||||
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
|
||||
initcore.InitRemoteChunkManager(paramtable.Get())
|
||||
|
||||
|
@ -100,7 +102,7 @@ func (suite *ReduceSuite) TearDownTest() {
|
|||
DeleteSegment(suite.segment)
|
||||
DeleteCollection(suite.collection)
|
||||
ctx := context.Background()
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, paramtable.Get().MinioCfg.RootPath.GetValue())
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
|
||||
}
|
||||
|
||||
func (suite *ReduceSuite) TestReduceParseSliceInfo() {
|
||||
|
|
|
@ -34,6 +34,7 @@ type RetrieveSuite struct {
|
|||
suite.Suite
|
||||
|
||||
// Dependencies
|
||||
rootPath string
|
||||
chunkManager storage.ChunkManager
|
||||
|
||||
// Data
|
||||
|
@ -55,7 +56,8 @@ func (suite *RetrieveSuite) SetupTest() {
|
|||
ctx := context.Background()
|
||||
msgLength := 100
|
||||
|
||||
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(paramtable.Get())
|
||||
suite.rootPath = suite.T().Name()
|
||||
chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
|
||||
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
|
||||
initcore.InitRemoteChunkManager(paramtable.Get())
|
||||
|
||||
|
@ -131,7 +133,7 @@ func (suite *RetrieveSuite) TearDownTest() {
|
|||
DeleteSegment(suite.growing)
|
||||
DeleteCollection(suite.collection)
|
||||
ctx := context.Background()
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, paramtable.Get().MinioCfg.RootPath.GetValue())
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
|
||||
}
|
||||
|
||||
func (suite *RetrieveSuite) TestRetrieveSealed() {
|
||||
|
|
|
@ -38,6 +38,7 @@ type SegmentLoaderSuite struct {
|
|||
|
||||
// Dependencies
|
||||
manager *Manager
|
||||
rootPath string
|
||||
chunkManager storage.ChunkManager
|
||||
|
||||
// Data
|
||||
|
@ -50,6 +51,7 @@ type SegmentLoaderSuite struct {
|
|||
|
||||
func (suite *SegmentLoaderSuite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
suite.rootPath = suite.T().Name()
|
||||
suite.collectionID = rand.Int63()
|
||||
suite.partitionID = rand.Int63()
|
||||
suite.segmentID = rand.Int63()
|
||||
|
@ -64,7 +66,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
|
|||
// TODO:: cpp chunk manager not support local chunk manager
|
||||
//suite.chunkManager = storage.NewLocalChunkManager(storage.RootPath(
|
||||
// fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63())))
|
||||
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(paramtable.Get())
|
||||
chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
|
||||
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
|
||||
suite.loader = NewLoader(suite.manager, suite.chunkManager)
|
||||
initcore.InitRemoteChunkManager(paramtable.Get())
|
||||
|
@ -85,7 +87,7 @@ func (suite *SegmentLoaderSuite) TearDownTest() {
|
|||
for i := 0; i < suite.segmentNum; i++ {
|
||||
suite.manager.Segment.Remove(suite.segmentID+int64(i), querypb.DataScope_All)
|
||||
}
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, paramtable.Get().MinioCfg.RootPath.GetValue())
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
|
||||
}
|
||||
|
||||
func (suite *SegmentLoaderSuite) TestLoad() {
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
|
||||
type SegmentSuite struct {
|
||||
suite.Suite
|
||||
rootPath string
|
||||
chunkManager storage.ChunkManager
|
||||
|
||||
// Data
|
||||
|
@ -39,7 +40,8 @@ func (suite *SegmentSuite) SetupTest() {
|
|||
ctx := context.Background()
|
||||
msgLength := 100
|
||||
|
||||
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(paramtable.Get())
|
||||
suite.rootPath = suite.T().Name()
|
||||
chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
|
||||
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
|
||||
initcore.InitRemoteChunkManager(paramtable.Get())
|
||||
|
||||
|
@ -115,7 +117,7 @@ func (suite *SegmentSuite) TearDownTest() {
|
|||
DeleteSegment(suite.sealed)
|
||||
DeleteSegment(suite.growing)
|
||||
DeleteCollection(suite.collection)
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, paramtable.Get().MinioCfg.RootPath.GetValue())
|
||||
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
|
||||
}
|
||||
|
||||
func (suite *SegmentSuite) TestDelete() {
|
||||
|
|
|
@ -72,6 +72,7 @@ type ServiceSuite struct {
|
|||
// Dependency
|
||||
node *QueryNode
|
||||
etcdClient *clientv3.Client
|
||||
rootPath string
|
||||
chunkManagerFactory *storage.ChunkManagerFactory
|
||||
|
||||
// Mock
|
||||
|
@ -86,6 +87,7 @@ func (suite *ServiceSuite) SetupSuite() {
|
|||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GCEnabled.Key, "false")
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CacheEnabled.Key, "false")
|
||||
|
||||
suite.rootPath = suite.T().Name()
|
||||
suite.collectionID = 111
|
||||
suite.collectionName = "test-collection"
|
||||
suite.partitionIDs = []int64{222}
|
||||
|
@ -109,7 +111,7 @@ func (suite *ServiceSuite) SetupTest() {
|
|||
suite.msgStream = msgstream.NewMockMsgStream(suite.T())
|
||||
// TODO:: cpp chunk manager not support local chunk manager
|
||||
//suite.chunkManagerFactory = storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus-test"))
|
||||
suite.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(paramtable.Get())
|
||||
suite.chunkManagerFactory = segments.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
|
||||
suite.factory.EXPECT().Init(mock.Anything).Return()
|
||||
suite.factory.EXPECT().NewPersistentStorageChunkManager(mock.Anything).Return(suite.chunkManagerFactory.NewPersistentStorageChunkManager(ctx))
|
||||
|
||||
|
@ -152,7 +154,7 @@ func (suite *ServiceSuite) TearDownTest() {
|
|||
})
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
suite.node.vectorStorage.RemoveWithPrefix(ctx, paramtable.Get().MinioCfg.RootPath.GetValue())
|
||||
suite.node.vectorStorage.RemoveWithPrefix(ctx, suite.rootPath)
|
||||
|
||||
suite.node.Stop()
|
||||
suite.etcdClient.Close()
|
||||
|
|
Loading…
Reference in New Issue