diff --git a/cmd/milvus/mck.go b/cmd/milvus/mck.go index 4270f614ce..49a692f34b 100644 --- a/cmd/milvus/mck.go +++ b/cmd/milvus/mck.go @@ -47,7 +47,7 @@ const ( ) type mck struct { - params paramtable.GrpcServerConfig + params *paramtable.ComponentParam taskKeyMap map[int64]string taskNameMap map[int64]string allTaskInfo map[string]string @@ -232,27 +232,12 @@ func (c *mck) connectEctd() { } func (c *mck) connectMinio() { - useSSL := c.params.MinioCfg.UseSSL - if c.minioUseSSL == "true" || c.minioUseSSL == "false" { - minioUseSSL, err := strconv.ParseBool(c.minioUseSSL) - if err != nil { - log.Panic("fail to parse the 'minioUseSSL' string to the bool value", zap.String("minioUseSSL", c.minioUseSSL), zap.Error(err)) - } - useSSL = minioUseSSL - } - chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio", - storage.RootPath(c.params.LocalStorageCfg.Path), - storage.Address(getConfigValue(c.minioAddress, c.params.MinioCfg.Address, "minio_address")), - storage.AccessKeyID(getConfigValue(c.minioUsername, c.params.MinioCfg.AccessKeyID, "minio_username")), - storage.SecretAccessKeyID(getConfigValue(c.minioPassword, c.params.MinioCfg.SecretAccessKey, "minio_password")), - storage.UseSSL(useSSL), - storage.BucketName(getConfigValue(c.minioBucketName, c.params.MinioCfg.BucketName, "minio_bucket_name")), - storage.CreateBucket(true)) + chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(c.params) var err error - c.minioChunkManager, err = chunkManagerFactory.NewVectorStorageChunkManager(context.Background()) + c.minioChunkManager, err = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) if err != nil { - log.Fatal("failed to connect to etcd", zap.Error(err)) + log.Fatal("failed to connect to minio", zap.Error(err)) } } diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index eafdf905ce..a29b6db4a3 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -48,7 +48,6 @@ type GcOption struct { checkInterval time.Duration // each interval missingTolerance time.Duration // key missing in meta tolerance time dropTolerance time.Duration // dropped segment related key tolerance time - rootPath string } // garbageCollector handles garbage files in object storage @@ -130,9 +129,9 @@ func (gc *garbageCollector) scan() { // walk only data cluster related prefixes prefixes := make([]string, 0, 3) - prefixes = append(prefixes, path.Join(gc.option.rootPath, insertLogPrefix)) - prefixes = append(prefixes, path.Join(gc.option.rootPath, statsLogPrefix)) - prefixes = append(prefixes, path.Join(gc.option.rootPath, deltaLogPrefix)) + prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), insertLogPrefix)) + prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), statsLogPrefix)) + prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), deltaLogPrefix)) var removedKeys []string for _, prefix := range prefixes { @@ -148,7 +147,7 @@ func (gc *garbageCollector) scan() { continue } - segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.rootPath, infoKey) + segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey) if err != nil && !common.IsIgnorableError(err) { log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err)) continue diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 9936ec1029..8d4ebba5c8 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -65,7 +65,6 @@ func Test_garbageCollector_basic(t *testing.T) { checkInterval: time.Millisecond * 10, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - rootPath: rootPath, }) gc.start() @@ -82,7 +81,6 @@ func Test_garbageCollector_basic(t *testing.T) { checkInterval: time.Millisecond * 10, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - rootPath: rootPath, }) assert.NotPanics(t, func() { gc.start() @@ -145,7 +143,6 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - rootPath: rootPath, }) gc.segRefer = segReferManager gc.scan() @@ -167,7 +164,6 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - rootPath: rootPath, }) gc.scan() @@ -193,7 +189,6 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - rootPath: rootPath, }) gc.start() gc.scan() @@ -222,7 +217,6 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: 0, - rootPath: rootPath, }) gc.clearEtcd() validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:]) @@ -239,7 +233,6 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: 0, dropTolerance: 0, - rootPath: rootPath, }) gc.start() gc.scan() @@ -324,7 +317,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i mcm = &storage.MinioChunkManager{ Client: cli, } - mcm.SetVar(context.TODO(), bucket) + mcm.SetVar(context.TODO(), bucket, root) return mcm, inserts, stats, delta, other, nil } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c444abd602..15d54ff934 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -361,39 +361,17 @@ func (s *Server) stopCompactionTrigger() { func (s *Server) initGarbageCollection() error { var cli storage.ChunkManager var err error - if Params.CommonCfg.StorageType == "minio" { - chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio", - storage.RootPath(Params.LocalStorageCfg.Path), - storage.Address(Params.MinioCfg.Address), - storage.AccessKeyID(Params.MinioCfg.AccessKeyID), - storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey), - storage.UseSSL(Params.MinioCfg.UseSSL), - storage.BucketName(Params.MinioCfg.BucketName), - storage.UseIAM(Params.MinioCfg.UseIAM), - storage.IAMEndpoint(Params.MinioCfg.IAMEndpoint), - storage.CreateBucket(true)) - cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx) - if err != nil { - log.Error("minio chunk manager init failed", zap.String("error", err.Error())) - return err - } - log.Info("minio chunk manager init success", zap.String("bucketname", Params.MinioCfg.BucketName)) - } else if Params.CommonCfg.StorageType == "local" { - chunkManagerFactory := storage.NewChunkManagerFactory("local", "local", - storage.RootPath(Params.LocalStorageCfg.Path)) - cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx) - if err != nil { - log.Error("local chunk manager init failed", zap.String("error", err.Error())) - return err - } - log.Info("local chunk manager init success") + + chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(&Params) + cli, err = chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx) + if err != nil { + log.Error("chunk manager init failed", zap.Error(err)) + return err } - + log.Info("Datacoord garbage collector chunk manager init success") s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.indexCoord, GcOption{ - cli: cli, - enabled: Params.DataCoordCfg.EnableGarbageCollection, - rootPath: Params.MinioCfg.RootPath, - + cli: cli, + enabled: Params.DataCoordCfg.EnableGarbageCollection, checkInterval: Params.DataCoordCfg.GCInterval, missingTolerance: Params.DataCoordCfg.GCMissingTolerance, dropTolerance: Params.DataCoordCfg.GCDropTolerance, diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b9af916962..e4dd48c2c0 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -494,7 +494,7 @@ func (node *DataNode) Start() error { return errors.New("DataNode fail to connect etcd") } - chunkManager, err := node.factory.NewVectorStorageChunkManager(node.ctx) + chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx) if err != nil { return err diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index d6e7dd0412..90fb7c5a63 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -209,7 +209,7 @@ func (i *IndexCoord) Init() error { // TODO silverxia add Rewatch logic i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil) - chunkManager, err := i.factory.NewVectorStorageChunkManager(i.loopCtx) + chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx) if err != nil { log.Error("IndexCoord new minio chunkManager failed", zap.Error(err)) initErr = err diff --git a/internal/indexnode/chunk_mgr_factory.go b/internal/indexnode/chunk_mgr_factory.go index 5734d7e297..d07c3b73c8 100644 --- a/internal/indexnode/chunk_mgr_factory.go +++ b/internal/indexnode/chunk_mgr_factory.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" ) @@ -23,24 +24,13 @@ func (m *chunkMgr) NewChunkManager(ctx context.Context, config *indexpb.StorageC return v.(storage.ChunkManager), nil } - opts := make([]storage.Option, 0) - if config.StorageType == "local" { - opts = append(opts, storage.RootPath(config.RootPath)) - } else { - opts = append(opts, storage.Address(config.Address), - storage.AccessKeyID(config.AccessKeyID), - storage.SecretAccessKeyID(config.SecretAccessKey), - storage.UseSSL(config.UseSSL), - storage.BucketName(config.BucketName), - storage.UseIAM(config.UseIAM), - storage.IAMEndpoint(config.IAMEndpoint)) - } - factory := storage.NewChunkManagerFactory("local", config.StorageType, opts...) - mgr, err := factory.NewVectorStorageChunkManager(ctx) + chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(&Params) + mgr, err := chunkManagerFactory.NewPersistentStorageChunkManager(ctx) if err != nil { return nil, err } v, _ := m.cached.LoadOrStore(key, mgr) + log.Ctx(ctx).Info("index node successfully init chunk manager") return v.(storage.ChunkManager), nil } diff --git a/internal/indexnode/chunkmgr_mock.go b/internal/indexnode/chunkmgr_mock.go index 23ba6cd4b7..02db28e10d 100644 --- a/internal/indexnode/chunkmgr_mock.go +++ b/internal/indexnode/chunkmgr_mock.go @@ -227,7 +227,7 @@ func (f *mockFactory) NewCacheStorageChunkManager(context.Context) (storage.Chun return nil, errNotImplErr } -func (f *mockFactory) NewVectorStorageChunkManager(context.Context) (storage.ChunkManager, error) { +func (f *mockFactory) NewPersistentStorageChunkManager(context.Context) (storage.ChunkManager, error) { if f.chunkMgr != nil { return f.chunkMgr, nil } diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 53eda24a8e..3522099927 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -196,7 +196,7 @@ func (qc *QueryCoord) Init() error { // we only try best to reload the leader addresses reloadShardLeaderAddress(qc.meta, qc.cluster) - qc.chunkManager, initError = qc.factory.NewVectorStorageChunkManager(qc.loopCtx) + qc.chunkManager, initError = qc.factory.NewPersistentStorageChunkManager(qc.loopCtx) if initError != nil { log.Error("query coordinator init cluster failed", zap.Error(initError)) diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index cb81743fc5..1db9eef3c8 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "go.uber.org/zap" @@ -37,20 +36,16 @@ type CoordinatorBroker struct { dataCoord types.DataCoord rootCoord types.RootCoord indexCoord types.IndexCoord - - cm storage.ChunkManager } func NewCoordinatorBroker( dataCoord types.DataCoord, rootCoord types.RootCoord, - indexCoord types.IndexCoord, - cm storage.ChunkManager) *CoordinatorBroker { + indexCoord types.IndexCoord) *CoordinatorBroker { return &CoordinatorBroker{ dataCoord, rootCoord, indexCoord, - cm, } } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d22ded2272..f902e367d5 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -35,7 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/metricsinfo" @@ -60,7 +59,6 @@ type Server struct { idAllocator func() (int64, error) factory dependency.Factory metricsCacheManager *metricsinfo.MetricsCacheManager - chunkManager storage.ChunkManager // Coordinators dataCoord types.DataCoord @@ -159,13 +157,6 @@ func (s *Server) Init() error { // Init metrics cache manager s.metricsCacheManager = metricsinfo.NewMetricsCacheManager() - // Init chunk manager - s.chunkManager, err = s.factory.NewVectorStorageChunkManager(s.ctx) - if err != nil { - log.Error("failed to init chunk manager", zap.Error(err)) - return err - } - // Init meta err = s.initMeta() if err != nil { @@ -252,7 +243,6 @@ func (s *Server) initMeta() error { s.dataCoord, s.rootCoord, s.indexCoord, - s.chunkManager, ) return nil } diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 8a4ce4baef..6376563f88 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -1741,9 +1741,12 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory // init shard cluster service node.ShardClusterService = newShardClusterService(node.etcdCli, node.session, node) - node.queryShardService = newQueryShardService(node.queryNodeLoopCtx, + node.queryShardService, err = newQueryShardService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica, node.ShardClusterService, node.factory, node.scheduler) + if err != nil { + return nil, err + } node.UpdateStateCode(internalpb.StateCode_Healthy) @@ -1908,7 +1911,7 @@ func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstrea func (mm *mockMsgStreamFactory) NewCacheStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { return nil, nil } -func (mm *mockMsgStreamFactory) NewVectorStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { +func (mm *mockMsgStreamFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { return nil, nil } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 796dabdeff..b40cba7120 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -115,7 +115,6 @@ type QueryNode struct { eventCh <-chan *sessionutil.SessionEvent vectorStorage storage.ChunkManager - cacheStorage storage.ChunkManager etcdKV *etcdkv.EtcdKV // shard cluster service, handle shard leader functions @@ -245,20 +244,13 @@ func (node *QueryNode) Init() error { } log.Info("QueryNode init rateCollector done", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) - node.vectorStorage, err = node.factory.NewVectorStorageChunkManager(node.queryNodeLoopCtx) + node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.queryNodeLoopCtx) if err != nil { log.Error("QueryNode init vector storage failed", zap.Error(err)) initError = err return } - node.cacheStorage, err = node.factory.NewCacheStorageChunkManager(node.queryNodeLoopCtx) - if err != nil { - log.Error("QueryNode init cache storage failed", zap.Error(err)) - initError = err - return - } - node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath) log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) @@ -320,8 +312,12 @@ func (node *QueryNode) Start() error { // create shardClusterService for shardLeader functions. node.ShardClusterService = newShardClusterService(node.etcdCli, node.session, node) // create shard-level query service - node.queryShardService = newQueryShardService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica, + queryShardService, err := newQueryShardService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica, node.ShardClusterService, node.factory, node.scheduler) + if err != nil { + return err + } + node.queryShardService = queryShardService Params.QueryNodeCfg.CreatedTime = time.Now() Params.QueryNodeCfg.UpdatedTime = time.Now() diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index a6e40677db..e7df42b203 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -103,11 +103,7 @@ func newQueryNodeMock() *QueryNode { replica := newCollectionReplica(pool) svr.metaReplica = replica svr.dataSyncService = newDataSyncService(ctx, svr.metaReplica, tsReplica, factory) - svr.vectorStorage, err = factory.NewVectorStorageChunkManager(ctx) - if err != nil { - panic(err) - } - svr.cacheStorage, err = factory.NewCacheStorageChunkManager(ctx) + svr.vectorStorage, err = factory.NewPersistentStorageChunkManager(ctx) if err != nil { panic(err) } diff --git a/internal/querynode/query_shard_service.go b/internal/querynode/query_shard_service.go index 8a2bc010bf..e65fd79497 100644 --- a/internal/querynode/query_shard_service.go +++ b/internal/querynode/query_shard_service.go @@ -47,14 +47,15 @@ type queryShardService struct { scheduler *taskScheduler } -func newQueryShardService(ctx context.Context, metaReplica ReplicaInterface, tSafeReplica TSafeReplicaInterface, clusterService *ShardClusterService, factory dependency.Factory, scheduler *taskScheduler) *queryShardService { +func newQueryShardService(ctx context.Context, metaReplica ReplicaInterface, tSafeReplica TSafeReplicaInterface, clusterService *ShardClusterService, factory dependency.Factory, scheduler *taskScheduler) (*queryShardService, error) { + // TODO we don't need the local chunk manager any more + localChunkManager := storage.NewLocalChunkManager(storage.RootPath(Params.LocalStorageCfg.Path)) + remoteChunkManager, err := factory.NewPersistentStorageChunkManager(ctx) + if err != nil { + log.Ctx(ctx).Warn("failed to init remote chunk manager", zap.Error(err)) + return nil, err + } queryShardServiceCtx, queryShardServiceCancel := context.WithCancel(ctx) - - path := Params.LoadWithDefault("localStorage.Path", "/tmp/milvus/data") - - localChunkManager := storage.NewLocalChunkManager(storage.RootPath(path)) - remoteChunkManager, _ := factory.NewVectorStorageChunkManager(ctx) - qss := &queryShardService{ ctx: queryShardServiceCtx, cancel: queryShardServiceCancel, @@ -68,7 +69,7 @@ func newQueryShardService(ctx context.Context, metaReplica ReplicaInterface, tSa factory: factory, scheduler: scheduler, } - return qss + return qss, nil } func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel, replicaID int64) error { diff --git a/internal/querynode/query_shard_service_test.go b/internal/querynode/query_shard_service_test.go index fd9ab913ef..b0c9ec7f12 100644 --- a/internal/querynode/query_shard_service_test.go +++ b/internal/querynode/query_shard_service_test.go @@ -28,7 +28,8 @@ func TestQueryShardService(t *testing.T) { qn, err := genSimpleQueryNode(context.Background()) require.NoError(t, err) - qss := newQueryShardService(context.Background(), qn.metaReplica, qn.tSafeReplica, qn.ShardClusterService, qn.factory, qn.scheduler) + qss, err := newQueryShardService(context.Background(), qn.metaReplica, qn.tSafeReplica, qn.ShardClusterService, qn.factory, qn.scheduler) + assert.NoError(t, err) err = qss.addQueryShard(0, "vchan1", 0) assert.NoError(t, err) found1 := qss.hasQueryShard("vchan1") @@ -50,7 +51,8 @@ func TestQueryShardService_InvalidChunkManager(t *testing.T) { qn, err := genSimpleQueryNode(context.Background()) require.NoError(t, err) - qss := newQueryShardService(context.Background(), qn.metaReplica, qn.tSafeReplica, qn.ShardClusterService, qn.factory, qn.scheduler) + qss, err := newQueryShardService(context.Background(), qn.metaReplica, qn.tSafeReplica, qn.ShardClusterService, qn.factory, qn.scheduler) + assert.NoError(t, err) lcm := qss.localChunkManager qss.localChunkManager = nil diff --git a/internal/storage/factory.go b/internal/storage/factory.go index ff65943762..81369118d3 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -3,23 +3,39 @@ package storage import ( "context" "errors" + + "github.com/milvus-io/milvus/internal/util/paramtable" ) type ChunkManagerFactory struct { - cacheStorage string - vectorStorage string - config *config + persistentStorage string + config *config } -func NewChunkManagerFactory(cacheStorage, vectorStorage string, opts ...Option) *ChunkManagerFactory { +func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkManagerFactory { + if params.CommonCfg.StorageType == "local" { + return NewChunkManagerFactory("local", RootPath(params.LocalStorageCfg.Path)) + } + return NewChunkManagerFactory("minio", + RootPath(params.MinioCfg.RootPath), + Address(params.MinioCfg.Address), + AccessKeyID(params.MinioCfg.AccessKeyID), + SecretAccessKeyID(params.MinioCfg.SecretAccessKey), + UseSSL(params.MinioCfg.UseSSL), + BucketName(params.MinioCfg.BucketName), + UseIAM(params.MinioCfg.UseIAM), + IAMEndpoint(params.MinioCfg.IAMEndpoint), + CreateBucket(true)) +} + +func NewChunkManagerFactory(persistentStorage string, opts ...Option) *ChunkManagerFactory { c := newDefaultConfig() for _, opt := range opts { opt(c) } return &ChunkManagerFactory{ - cacheStorage: cacheStorage, - vectorStorage: vectorStorage, - config: c, + persistentStorage: persistentStorage, + config: c, } } @@ -34,15 +50,10 @@ func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string } } -func (f *ChunkManagerFactory) NewCacheStorageChunkManager(ctx context.Context) (ChunkManager, error) { - return f.newChunkManager(ctx, f.cacheStorage) -} - -func (f *ChunkManagerFactory) NewVectorStorageChunkManager(ctx context.Context) (ChunkManager, error) { - return f.newChunkManager(ctx, f.vectorStorage) +func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) { + return f.newChunkManager(ctx, f.persistentStorage) } type Factory interface { - NewCacheStorageChunkManager(ctx context.Context) (ChunkManager, error) - NewVectorStorageChunkManager(ctx context.Context) (ChunkManager, error) + NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) } diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 74f3e3a605..7dd93467bd 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -48,7 +48,7 @@ type MinioChunkManager struct { var _ ChunkManager = (*MinioChunkManager)(nil) // NewMinioChunkManager create a new local manager object. -// Deprecated: Do not call this directly! Use factory.NewVectorStorageChunkManager instead. +// Deprecated: Do not call this directly! Use factory.NewPersistentStorageChunkManager instead. func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManager, error) { c := newDefaultConfig() for _, opt := range opts { @@ -117,9 +117,10 @@ func (mcm *MinioChunkManager) normalizeRootPath(rootPath string) string { } // SetVar set the variable value of mcm -func (mcm *MinioChunkManager) SetVar(ctx context.Context, bucketName string) { +func (mcm *MinioChunkManager) SetVar(ctx context.Context, bucketName string, rootPath string) { mcm.ctx = ctx mcm.bucketName = bucketName + mcm.rootPath = rootPath } // RootPath returns minio root path. diff --git a/internal/util/dependency/factory.go b/internal/util/dependency/factory.go index 9af8eda974..eee61702b2 100644 --- a/internal/util/dependency/factory.go +++ b/internal/util/dependency/factory.go @@ -14,11 +14,12 @@ type DefaultFactory struct { msgStreamFactory msgstream.Factory } +// Only for test func NewDefaultFactory(standAlone bool) *DefaultFactory { return &DefaultFactory{ standAlone: standAlone, msgStreamFactory: msgstream.NewRmsFactory("/tmp/milvus/rocksmq/"), - chunkManagerFactory: storage.NewChunkManagerFactory("local", "local", + chunkManagerFactory: storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus")), } } @@ -37,33 +38,14 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { return } - // init storage - if params.CommonCfg.StorageType == "local" { - f.chunkManagerFactory = storage.NewChunkManagerFactory("local", "local", - storage.RootPath(params.LocalStorageCfg.Path)) - } else { - f.chunkManagerFactory = storage.NewChunkManagerFactory("local", "minio", - storage.RootPath(params.MinioCfg.RootPath), - storage.Address(params.MinioCfg.Address), - storage.AccessKeyID(params.MinioCfg.AccessKeyID), - storage.SecretAccessKeyID(params.MinioCfg.SecretAccessKey), - storage.UseSSL(params.MinioCfg.UseSSL), - storage.BucketName(params.MinioCfg.BucketName), - storage.UseIAM(params.MinioCfg.UseIAM), - storage.IAMEndpoint(params.MinioCfg.IAMEndpoint), - storage.CreateBucket(true)) - } + f.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params) // init mq storage if f.standAlone { f.msgStreamFactory = f.initMQLocalService(params) - if f.msgStreamFactory == nil { - f.msgStreamFactory = f.initMQRemoteService(params) - if f.msgStreamFactory == nil { - panic("no available mq configuration, must config rocksmq, Pulsar or Kafka at least one of these!") - } + if f.msgStreamFactory != nil { + return } - return } f.msgStreamFactory = f.initMQRemoteService(params) @@ -112,17 +94,12 @@ func (f *DefaultFactory) NewMsgStreamDisposer(ctx context.Context) func([]string return f.msgStreamFactory.NewMsgStreamDisposer(ctx) } -func (f *DefaultFactory) NewCacheStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { - return f.chunkManagerFactory.NewCacheStorageChunkManager(ctx) -} - -func (f *DefaultFactory) NewVectorStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { - return f.chunkManagerFactory.NewVectorStorageChunkManager(ctx) +func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) { + return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx) } type Factory interface { msgstream.Factory Init(p *paramtable.ComponentParam) - NewCacheStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) - NewVectorStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) + NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index a85659bdef..31ec2999c4 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -98,7 +98,7 @@ func (mc *MockChunkManager) RemoveWithPrefix(prefix string) error { func Test_NewImportWrapper(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) wrapper := NewImportWrapper(ctx, nil, 2, 1, nil, cm, nil, nil, nil) @@ -129,7 +129,7 @@ func Test_NewImportWrapper(t *testing.T) { func Test_ImportRowBased(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) idAllocator := newIDAllocator(ctx, t) @@ -216,7 +216,7 @@ func Test_ImportRowBased(t *testing.T) { func Test_ImportColumnBased_json(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) defer cm.RemoveWithPrefix("") @@ -316,7 +316,7 @@ func Test_ImportColumnBased_json(t *testing.T) { func Test_ImportColumnBased_StringKey(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) defer cm.RemoveWithPrefix("") @@ -383,7 +383,7 @@ func Test_ImportColumnBased_StringKey(t *testing.T) { func Test_ImportColumnBased_numpy(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) defer cm.RemoveWithPrefix("") @@ -520,7 +520,7 @@ func perfSchema(dim int) *schemapb.CollectionSchema { func Test_ImportRowBased_perf(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) defer cm.RemoveWithPrefix("") @@ -621,7 +621,7 @@ func Test_ImportRowBased_perf(t *testing.T) { func Test_ImportColumnBased_perf(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() - cm, err := f.NewVectorStorageChunkManager(ctx) + cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(t, err) defer cm.RemoveWithPrefix("")