From d216f9abda1fe792092a7518dd1617a9c65defdf Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 14 Jul 2023 10:28:30 +0800 Subject: [PATCH] Clear collection meta after all channels/segments released (#25486) Signed-off-by: yah01 --- .../delegator/delegator_data_test.go | 2 +- .../querynodev2/delegator/delegator_test.go | 2 +- internal/querynodev2/local_worker.go | 13 +- internal/querynodev2/local_worker_test.go | 2 +- internal/querynodev2/segments/collection.go | 64 ++++++++- internal/querynodev2/segments/manager.go | 47 +++++-- .../segments/mock_collection_manager.go | 127 +++++++++++++++--- .../segments/mock_segment_manager.go | 58 ++++++-- .../querynodev2/segments/retrieve_test.go | 2 +- internal/querynodev2/segments/search_test.go | 2 +- .../segments/segment_loader_test.go | 2 +- internal/querynodev2/segments/segment_test.go | 2 +- internal/querynodev2/services.go | 66 ++------- internal/querynodev2/services_test.go | 59 +------- 14 files changed, 288 insertions(+), 160 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index e91f27b578..bd8c9f5329 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -74,7 +74,7 @@ func (s *DelegatorDataSuite) SetupTest() { s.loader = &segments.MockLoader{} // init schema - s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{ + s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{ Name: "TestCollection", Fields: []*schemapb.FieldSchema{ { diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index a108b41450..106e0aa8b5 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -96,7 +96,7 @@ func (s *DelegatorSuite) SetupTest() { }, nil) // init schema - s.manager.Collection.Put(s.collectionID, &schemapb.CollectionSchema{ + s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{ Name: "TestCollection", Fields: []*schemapb.FieldSchema{ { diff --git a/internal/querynodev2/local_worker.go b/internal/querynodev2/local_worker.go index 746daa11be..569343912c 100644 --- a/internal/querynodev2/local_worker.go +++ b/internal/querynodev2/local_worker.go @@ -26,6 +26,8 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/log" + "github.com/samber/lo" + "go.uber.org/zap" ) var _ cluster.Worker = &LocalWorker{} @@ -43,13 +45,20 @@ func NewLocalWorker(node *QueryNode) *LocalWorker { func (w *LocalWorker) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { log := log.Ctx(ctx) log.Info("start to load segments...") - _, err := w.node.loader.Load(ctx, + loaded, err := w.node.loader.Load(ctx, req.GetCollectionID(), segments.SegmentTypeSealed, req.GetVersion(), req.GetInfos()..., ) - log.Info("load segments done") + if err != nil { + return err + } + + w.node.manager.Collection.Ref(req.GetCollectionID(), uint32(len(loaded))) + + log.Info("load segments done...", + zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() }))) return err } diff --git a/internal/querynodev2/local_worker_test.go b/internal/querynodev2/local_worker_test.go index 67370705af..3a68525cfb 100644 --- a/internal/querynodev2/local_worker_test.go +++ b/internal/querynodev2/local_worker_test.go @@ -98,7 +98,7 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) { LoadType: querypb.LoadType_LoadCollection, CollectionID: suite.collectionID, } - suite.node.manager.Collection.Put(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata) + suite.node.manager.Collection.PutOrRef(suite.collectionID, collection.Schema(), suite.indexMeta, loadMata) suite.worker = NewLocalWorker(suite.node) } diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 078eef5c08..abb3d9f139 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -41,7 +41,12 @@ import ( type CollectionManager interface { Get(collectionID int64) *Collection - Put(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) + PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) + Ref(collectionID int64, count uint32) bool + // unref the collection, + // returns true if the collection ref count goes 0, or the collection not exists, + // return false otherwise + Unref(collectionID int64, count uint32) bool } type collectionManager struct { @@ -62,20 +67,51 @@ func (m *collectionManager) Get(collectionID int64) *Collection { return m.collections[collectionID] } -func (m *collectionManager) Put(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { +func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { m.mut.Lock() defer m.mut.Unlock() - if _, ok := m.collections[collectionID]; ok { + if collection, ok := m.collections[collectionID]; ok { + collection.Ref(1) return } collection := NewCollection(collectionID, schema, meta, loadMeta.GetLoadType()) collection.metricType.Store(loadMeta.GetMetricType()) collection.AddPartition(loadMeta.GetPartitionIDs()...) + collection.Ref(1) m.collections[collectionID] = collection } +func (m *collectionManager) Ref(collectionID int64, count uint32) bool { + m.mut.Lock() + defer m.mut.Unlock() + + if collection, ok := m.collections[collectionID]; ok { + collection.Ref(count) + return true + } + + return false +} + +func (m *collectionManager) Unref(collectionID int64, count uint32) bool { + m.mut.Lock() + defer m.mut.Unlock() + + if collection, ok := m.collections[collectionID]; ok { + if collection.Unref(count) == 0 { + log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID)) + delete(m.collections, collectionID) + DeleteCollection(collection) + return true + } + return false + } + + return true +} + // Collection is a wrapper of the underlying C-structure C.CCollection type Collection struct { mu sync.RWMutex // protects colllectionPtr @@ -85,6 +121,8 @@ type Collection struct { loadType querypb.LoadType metricType atomic.String schema *schemapb.CollectionSchema + + refCount *atomic.Uint32 } // ID returns collection id @@ -133,6 +171,24 @@ func (c *Collection) GetMetricType() string { return c.metricType.Load() } +func (c *Collection) Ref(count uint32) uint32 { + refCount := c.refCount.Add(count) + log.Debug("collection ref increment", + zap.Int64("collectionID", c.ID()), + zap.Uint32("refCount", refCount), + ) + return refCount +} + +func (c *Collection) Unref(count uint32) uint32 { + refCount := c.refCount.Sub(count) + log.Debug("collection ref decrement", + zap.Int64("collectionID", c.ID()), + zap.Uint32("refCount", refCount), + ) + return refCount +} + // newCollection returns a new Collection func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadType querypb.LoadType) *Collection { /* @@ -157,6 +213,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM schema: schema, partitions: typeutil.NewConcurrentSet[int64](), loadType: loadType, + refCount: atomic.NewUint32(0), } } @@ -165,6 +222,7 @@ func NewCollectionWithoutSchema(collectionID int64, loadType querypb.LoadType) * id: collectionID, partitions: typeutil.NewConcurrentSet[int64](), loadType: loadType, + refCount: atomic.NewUint32(0), } } diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index c757c2c4e2..149278fe13 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -94,8 +94,8 @@ type SegmentManager interface { // Remove removes the given segment, // and decreases the ref count of the corresponding collection, // will not decrease the ref count if the given segment not exists - Remove(segmentID UniqueID, scope querypb.DataScope) - RemoveBy(filters ...SegmentFilter) + Remove(segmentID UniqueID, scope querypb.DataScope) (int, int) + RemoveBy(filters ...SegmentFilter) (int, int) Clear() } @@ -240,40 +240,56 @@ func (mgr *segmentManager) Empty() bool { return len(mgr.growingSegments)+len(mgr.sealedSegments) == 0 } -func (mgr *segmentManager) Remove(segmentID UniqueID, scope querypb.DataScope) { +// returns true if the segment exists, +// false otherwise +func (mgr *segmentManager) Remove(segmentID UniqueID, scope querypb.DataScope) (int, int) { mgr.mu.Lock() defer mgr.mu.Unlock() + var removeGrowing, removeSealed int switch scope { case querypb.DataScope_Streaming: - remove(segmentID, mgr.growingSegments) + if remove(segmentID, mgr.growingSegments) { + removeGrowing = 1 + } case querypb.DataScope_Historical: - remove(segmentID, mgr.sealedSegments) + if remove(segmentID, mgr.sealedSegments) { + removeSealed = 1 + } case querypb.DataScope_All: - remove(segmentID, mgr.growingSegments) - remove(segmentID, mgr.sealedSegments) + if remove(segmentID, mgr.growingSegments) { + removeGrowing = 1 + } + if remove(segmentID, mgr.sealedSegments) { + removeSealed = 1 + } } + mgr.updateMetric() + return removeGrowing, removeSealed } -func (mgr *segmentManager) RemoveBy(filters ...SegmentFilter) { +func (mgr *segmentManager) RemoveBy(filters ...SegmentFilter) (int, int) { mgr.mu.Lock() defer mgr.mu.Unlock() + var removeGrowing, removeSealed int for id, segment := range mgr.growingSegments { - if filter(segment, filters...) { - remove(id, mgr.growingSegments) + if filter(segment, filters...) && remove(id, mgr.growingSegments) { + removeGrowing++ } } for id, segment := range mgr.sealedSegments { - if filter(segment, filters...) { - remove(id, mgr.sealedSegments) + if filter(segment, filters...) && remove(id, mgr.sealedSegments) { + removeSealed++ } } + mgr.updateMetric() + return removeGrowing, removeSealed } func (mgr *segmentManager) Clear() { @@ -305,10 +321,12 @@ func (mgr *segmentManager) updateMetric() { metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len())) } -func remove(segmentID int64, container map[int64]Segment) { +// returns true if the segment exists, +// false otherwise +func remove(segmentID int64, container map[int64]Segment) bool { segment, ok := container[segmentID] if !ok { - return + return false } delete(container, segmentID) @@ -331,4 +349,5 @@ func remove(segmentID int64, container map[int64]Segment) { fmt.Sprint(len(segment.Indexes())), ).Sub(float64(rowNum)) } + return true } diff --git a/internal/querynodev2/segments/mock_collection_manager.go b/internal/querynodev2/segments/mock_collection_manager.go index 8d54ccb509..5605c7ef8a 100644 --- a/internal/querynodev2/segments/mock_collection_manager.go +++ b/internal/querynodev2/segments/mock_collection_manager.go @@ -1,12 +1,13 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package segments import ( schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" querypb "github.com/milvus-io/milvus/internal/proto/querypb" - segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb" mock "github.com/stretchr/testify/mock" + + segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb" ) // MockCollectionManager is an autogenerated mock type for the CollectionManager type @@ -61,43 +62,139 @@ func (_c *MockCollectionManager_Get_Call) Return(_a0 *Collection) *MockCollectio return _c } -// Put provides a mock function with given fields: collectionID, schema, loadMeta -func (_m *MockCollectionManager) Put(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { +func (_c *MockCollectionManager_Get_Call) RunAndReturn(run func(int64) *Collection) *MockCollectionManager_Get_Call { + _c.Call.Return(run) + return _c +} + +// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta +func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) { _m.Called(collectionID, schema, meta, loadMeta) } -// MockCollectionManager_Put_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Put' -type MockCollectionManager_Put_Call struct { +// MockCollectionManager_PutOrRef_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PutOrRef' +type MockCollectionManager_PutOrRef_Call struct { *mock.Call } -// Put is a helper method to define mock.On call +// PutOrRef is a helper method to define mock.On call // - collectionID int64 // - schema *schemapb.CollectionSchema +// - meta *segcorepb.CollectionIndexMeta // - loadMeta *querypb.LoadMetaInfo -func (_e *MockCollectionManager_Expecter) Put(collectionID interface{}, schema interface{}, meta interface{}, loadMeta interface{}) *MockCollectionManager_Put_Call { - return &MockCollectionManager_Put_Call{Call: _e.mock.On("Put", collectionID, schema, meta, loadMeta)} +func (_e *MockCollectionManager_Expecter) PutOrRef(collectionID interface{}, schema interface{}, meta interface{}, loadMeta interface{}) *MockCollectionManager_PutOrRef_Call { + return &MockCollectionManager_PutOrRef_Call{Call: _e.mock.On("PutOrRef", collectionID, schema, meta, loadMeta)} } -func (_c *MockCollectionManager_Put_Call) Run(run func(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)) *MockCollectionManager_Put_Call { +func (_c *MockCollectionManager_PutOrRef_Call) Run(run func(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)) *MockCollectionManager_PutOrRef_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(int64), args[1].(*schemapb.CollectionSchema), args[2].(*segcorepb.CollectionIndexMeta), args[3].(*querypb.LoadMetaInfo)) }) return _c } -func (_c *MockCollectionManager_Put_Call) Return() *MockCollectionManager_Put_Call { +func (_c *MockCollectionManager_PutOrRef_Call) Return() *MockCollectionManager_PutOrRef_Call { _c.Call.Return() return _c } -type mockConstructorTestingTNewMockCollectionManager interface { - mock.TestingT - Cleanup(func()) +func (_c *MockCollectionManager_PutOrRef_Call) RunAndReturn(run func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo)) *MockCollectionManager_PutOrRef_Call { + _c.Call.Return(run) + return _c +} + +// Ref provides a mock function with given fields: collectionID, count +func (_m *MockCollectionManager) Ref(collectionID int64, count uint32) bool { + ret := _m.Called(collectionID, count) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64, uint32) bool); ok { + r0 = rf(collectionID, count) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockCollectionManager_Ref_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ref' +type MockCollectionManager_Ref_Call struct { + *mock.Call +} + +// Ref is a helper method to define mock.On call +// - collectionID int64 +// - count uint32 +func (_e *MockCollectionManager_Expecter) Ref(collectionID interface{}, count interface{}) *MockCollectionManager_Ref_Call { + return &MockCollectionManager_Ref_Call{Call: _e.mock.On("Ref", collectionID, count)} +} + +func (_c *MockCollectionManager_Ref_Call) Run(run func(collectionID int64, count uint32)) *MockCollectionManager_Ref_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(uint32)) + }) + return _c +} + +func (_c *MockCollectionManager_Ref_Call) Return(_a0 bool) *MockCollectionManager_Ref_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCollectionManager_Ref_Call) RunAndReturn(run func(int64, uint32) bool) *MockCollectionManager_Ref_Call { + _c.Call.Return(run) + return _c +} + +// Unref provides a mock function with given fields: collectionID, count +func (_m *MockCollectionManager) Unref(collectionID int64, count uint32) bool { + ret := _m.Called(collectionID, count) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64, uint32) bool); ok { + r0 = rf(collectionID, count) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockCollectionManager_Unref_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unref' +type MockCollectionManager_Unref_Call struct { + *mock.Call +} + +// Unref is a helper method to define mock.On call +// - collectionID int64 +// - count uint32 +func (_e *MockCollectionManager_Expecter) Unref(collectionID interface{}, count interface{}) *MockCollectionManager_Unref_Call { + return &MockCollectionManager_Unref_Call{Call: _e.mock.On("Unref", collectionID, count)} +} + +func (_c *MockCollectionManager_Unref_Call) Run(run func(collectionID int64, count uint32)) *MockCollectionManager_Unref_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(uint32)) + }) + return _c +} + +func (_c *MockCollectionManager_Unref_Call) Return(_a0 bool) *MockCollectionManager_Unref_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCollectionManager_Unref_Call) RunAndReturn(run func(int64, uint32) bool) *MockCollectionManager_Unref_Call { + _c.Call.Return(run) + return _c } // NewMockCollectionManager creates a new instance of MockCollectionManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockCollectionManager(t mockConstructorTestingTNewMockCollectionManager) *MockCollectionManager { +// The first argument is typically a *testing.T value. +func NewMockCollectionManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockCollectionManager { mock := &MockCollectionManager{} mock.Mock.Test(t) diff --git a/internal/querynodev2/segments/mock_segment_manager.go b/internal/querynodev2/segments/mock_segment_manager.go index 3a6355a95e..127c432469 100644 --- a/internal/querynodev2/segments/mock_segment_manager.go +++ b/internal/querynodev2/segments/mock_segment_manager.go @@ -378,8 +378,27 @@ func (_c *MockSegmentManager_Put_Call) RunAndReturn(run func(commonpb.SegmentSta } // Remove provides a mock function with given fields: segmentID, scope -func (_m *MockSegmentManager) Remove(segmentID int64, scope querypb.DataScope) { - _m.Called(segmentID, scope) +func (_m *MockSegmentManager) Remove(segmentID int64, scope querypb.DataScope) (int, int) { + ret := _m.Called(segmentID, scope) + + var r0 int + var r1 int + if rf, ok := ret.Get(0).(func(int64, querypb.DataScope) (int, int)); ok { + return rf(segmentID, scope) + } + if rf, ok := ret.Get(0).(func(int64, querypb.DataScope) int); ok { + r0 = rf(segmentID, scope) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(int64, querypb.DataScope) int); ok { + r1 = rf(segmentID, scope) + } else { + r1 = ret.Get(1).(int) + } + + return r0, r1 } // MockSegmentManager_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove' @@ -401,25 +420,44 @@ func (_c *MockSegmentManager_Remove_Call) Run(run func(segmentID int64, scope qu return _c } -func (_c *MockSegmentManager_Remove_Call) Return() *MockSegmentManager_Remove_Call { - _c.Call.Return() +func (_c *MockSegmentManager_Remove_Call) Return(_a0 int, _a1 int) *MockSegmentManager_Remove_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockSegmentManager_Remove_Call) RunAndReturn(run func(int64, querypb.DataScope)) *MockSegmentManager_Remove_Call { +func (_c *MockSegmentManager_Remove_Call) RunAndReturn(run func(int64, querypb.DataScope) (int, int)) *MockSegmentManager_Remove_Call { _c.Call.Return(run) return _c } // RemoveBy provides a mock function with given fields: filters -func (_m *MockSegmentManager) RemoveBy(filters ...SegmentFilter) { +func (_m *MockSegmentManager) RemoveBy(filters ...SegmentFilter) (int, int) { _va := make([]interface{}, len(filters)) for _i := range filters { _va[_i] = filters[_i] } var _ca []interface{} _ca = append(_ca, _va...) - _m.Called(_ca...) + ret := _m.Called(_ca...) + + var r0 int + var r1 int + if rf, ok := ret.Get(0).(func(...SegmentFilter) (int, int)); ok { + return rf(filters...) + } + if rf, ok := ret.Get(0).(func(...SegmentFilter) int); ok { + r0 = rf(filters...) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(...SegmentFilter) int); ok { + r1 = rf(filters...) + } else { + r1 = ret.Get(1).(int) + } + + return r0, r1 } // MockSegmentManager_RemoveBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveBy' @@ -447,12 +485,12 @@ func (_c *MockSegmentManager_RemoveBy_Call) Run(run func(filters ...SegmentFilte return _c } -func (_c *MockSegmentManager_RemoveBy_Call) Return() *MockSegmentManager_RemoveBy_Call { - _c.Call.Return() +func (_c *MockSegmentManager_RemoveBy_Call) Return(_a0 int, _a1 int) *MockSegmentManager_RemoveBy_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(...SegmentFilter)) *MockSegmentManager_RemoveBy_Call { +func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(...SegmentFilter) (int, int)) *MockSegmentManager_RemoveBy_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 2a6fb33cd8..4529021024 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -66,7 +66,7 @@ func (suite *RetrieveSuite) SetupTest() { suite.manager = NewManager() schema := GenTestCollectionSchema("test-reduce", schemapb.DataType_Int64) indexMeta := GenTestIndexMeta(suite.collectionID, schema) - suite.manager.Collection.Put(suite.collectionID, + suite.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, &querypb.LoadMetaInfo{ diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 8322b0de9a..0db0bea6ad 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -62,7 +62,7 @@ func (suite *SearchSuite) SetupTest() { suite.manager = NewManager() schema := GenTestCollectionSchema("test-reduce", schemapb.DataType_Int64) indexMeta := GenTestIndexMeta(suite.collectionID, schema) - suite.manager.Collection.Put(suite.collectionID, + suite.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, &querypb.LoadMetaInfo{ diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index a05154a93c..6be73a2f1a 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -76,7 +76,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { CollectionID: suite.collectionID, PartitionIDs: []int64{suite.partitionID}, } - suite.manager.Collection.Put(suite.collectionID, schema, indexMeta, loadMeta) + suite.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, loadMeta) } func (suite *SegmentLoaderSuite) TearDownTest() { diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 808dd05bc1..2ee3cb620c 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -49,7 +49,7 @@ func (suite *SegmentSuite) SetupTest() { suite.manager = NewManager() schema := GenTestCollectionSchema("test-reduce", schemapb.DataType_Int64) indexMeta := GenTestIndexMeta(suite.collectionID, schema) - suite.manager.Collection.Put(suite.collectionID, + suite.manager.Collection.PutOrRef(suite.collectionID, schema, indexMeta, &querypb.LoadMetaInfo{ diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index f448d23d4f..eca1967ba8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" @@ -251,7 +250,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat() maxIndexRecordPerSegment = int64(threshold * proportion / float64(sizePerRecord)) } - node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), &segcorepb.CollectionIndexMeta{ + node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), &segcorepb.CollectionIndexMeta{ IndexMetas: fieldIndexMetas, MaxIndexRowCount: maxIndexRecordPerSegment, }, req.GetLoadMeta()) @@ -377,6 +376,8 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) node.tSafeManager.Remove(req.GetChannelName()) + + node.manager.Collection.Unref(req.GetCollectionID(), 1) } log.Info("unsubscribed channel") @@ -402,58 +403,8 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart collection := node.manager.Collection.Get(req.GetCollectionID()) if collection != nil { collection.AddPartition(req.GetPartitionIDs()...) - return merr.Status(nil), nil } - if req.GetSchema() == nil { - return merr.Status(merr.WrapErrCollectionNotLoaded(req.GetCollectionID(), "failed to load partitions")), nil - } - fieldIndexMetas := make([]*segcorepb.FieldIndexMeta, 0) - for _, info := range req.GetIndexInfoList() { - fieldIndexMetas = append(fieldIndexMetas, &segcorepb.FieldIndexMeta{ - CollectionID: info.GetCollectionID(), - FieldID: info.GetFieldID(), - IndexName: info.GetIndexName(), - TypeParams: info.GetTypeParams(), - IndexParams: info.GetIndexParams(), - IsAutoIndex: info.GetIsAutoIndex(), - UserIndexParams: info.GetUserIndexParams(), - }) - } - sizePerRecord, err := typeutil.EstimateSizePerRecord(req.Schema) - maxIndexRecordPerSegment := int64(0) - if err != nil || sizePerRecord == 0 { - log.Warn("failed to transfer segment size to collection, because failed to estimate size per record", zap.Error(err)) - } else { - threshold := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 - proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat() - maxIndexRecordPerSegment = int64(threshold * proportion / float64(sizePerRecord)) - } - vecField, err := typeutil.GetVectorFieldSchema(req.GetSchema()) - if err != nil { - return merr.Status(err), nil - } - indexInfo, ok := lo.Find(req.GetIndexInfoList(), func(info *indexpb.IndexInfo) bool { - return info.GetFieldID() == vecField.GetFieldID() - }) - if !ok || indexInfo == nil { - err = fmt.Errorf("cannot find index info for %s field", vecField.GetName()) - return merr.Status(err), nil - } - metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.MetricTypeKey, indexInfo.GetIndexParams()) - if err != nil { - return merr.Status(err), nil - } - node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), &segcorepb.CollectionIndexMeta{ - IndexMetas: fieldIndexMetas, - MaxIndexRowCount: maxIndexRecordPerSegment, - }, &querypb.LoadMetaInfo{ - CollectionID: req.GetCollectionID(), - PartitionIDs: req.GetPartitionIDs(), - LoadType: querypb.LoadType_LoadCollection, // TODO: dyh, remove loadType in querynode - MetricType: metricType, - }) - log.Info("load partitions done") return merr.Status(nil), nil } @@ -510,7 +461,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen return node.loadDeltaLogs(ctx, req), nil } - node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), nil, req.GetLoadMeta()) + node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), nil, req.GetLoadMeta()) + defer node.manager.Collection.Unref(req.GetCollectionID(), 1) // Actual load segment log.Info("start to load segments...") @@ -527,8 +479,11 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen }, nil } + node.manager.Collection.Ref(req.GetCollectionID(), uint32(len(loaded))) + log.Info("load segments done...", zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() }))) + return util.SuccessStatus(), nil } @@ -636,9 +591,12 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release } log.Info("start to release segments") + sealedCount := 0 for _, id := range req.GetSegmentIDs() { - node.manager.Segment.Remove(id, req.GetScope()) + _, count := node.manager.Segment.Remove(id, req.GetScope()) + sealedCount += count } + node.manager.Collection.Unref(req.GetCollectionID(), uint32(sealedCount)) return util.SuccessStatus(), nil } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 21f3d6789d..083768eab5 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -40,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -48,7 +46,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ServiceSuite struct { @@ -536,7 +533,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() { PartitionIDs: suite.partitionIDs, } suite.node.manager.Collection = segments.NewCollectionManager() - suite.node.manager.Collection.Put(suite.collectionID, schema, nil, loadMeta) + suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta) req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), @@ -739,7 +736,8 @@ func (suite *ServiceSuite) TestReleaseCollection_Failed() { func (suite *ServiceSuite) TestReleasePartitions_Normal() { ctx := context.Background() - suite.TestLoadPartition() + + suite.TestWatchDmChannelsInt64() req := &querypb.ReleasePartitionsRequest{ CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, @@ -1018,7 +1016,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { PartitionIDs: suite.partitionIDs, MetricType: "L2", } - suite.node.manager.Collection.Put(suite.collectionID, schema, nil, LoadMeta) + suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, LoadMeta) req.GetReq().MetricType = "IP" resp, err = suite.node.Search(ctx, req) suite.NoError(err) @@ -1539,55 +1537,6 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) suite.node.UpdateStateCode(commonpb.StateCode_Healthy) - // collection not exist and schema is nil - suite.node.manager.Collection = segments.NewCollectionManager() - status, err = suite.node.LoadPartitions(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - - // no vec field in schema - req.Schema = &schemapb.CollectionSchema{} - status, err = suite.node.LoadPartitions(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - - // no indexInfo - req.Schema = segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) - status, err = suite.node.LoadPartitions(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - - // no metric type - vecField, err := typeutil.GetVectorFieldSchema(req.GetSchema()) - suite.NoError(err) - req.IndexInfoList = []*indexpb.IndexInfo{ - { - CollectionID: suite.collectionID, - FieldID: vecField.GetFieldID(), - IndexParams: []*commonpb.KeyValuePair{}, - }, - } - status, err = suite.node.LoadPartitions(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - - // collection not exist and schema is not nil - req.IndexInfoList = []*indexpb.IndexInfo{ - { - CollectionID: suite.collectionID, - FieldID: vecField.GetFieldID(), - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.MetricTypeKey, - Value: "L2", - }, - }, - }, - } - status, err = suite.node.LoadPartitions(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) - // collection existed status, err = suite.node.LoadPartitions(ctx, req) suite.NoError(err)