From 0d849a6c0a0dd12f8759034ea207390d2d97463c Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 15 Apr 2024 16:33:19 +0800 Subject: [PATCH] fix: fix collectionInfo leak in datacoord (#32175) issue: #32029 lack of logic to clean collection info in datacoord's meta, This PR clean collection info after drop channel, to avoid collection info leak in datacoord --------- Signed-off-by: Wei Liu --- internal/core/conanfile.py | 2 +- internal/datacoord/channel_manager.go | 6 +++--- internal/datacoord/channel_manager_test.go | 4 ++-- internal/datacoord/handler.go | 8 ++++++-- internal/datacoord/meta.go | 12 +++++++++++- internal/datacoord/mock_handler.go | 21 +++++++++++---------- internal/datacoord/mock_test.go | 2 +- internal/rootcoord/quota_center.go | 2 +- 8 files changed, 36 insertions(+), 21 deletions(-) diff --git a/internal/core/conanfile.py b/internal/core/conanfile.py index bc98df4711..f4c3aa32a1 100644 --- a/internal/core/conanfile.py +++ b/internal/core/conanfile.py @@ -91,7 +91,7 @@ class MilvusConan(ConanFile): def requirements(self): if self.settings.os != "Macos": # MacOS does not need openblas - self.requires("openblas/0.3.23@milvus/dev") + # self.requires("openblas/0.3.23@milvus/dev") self.requires("libunwind/1.7.2") def imports(self): diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 4dadfc213d..047f593907 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -258,7 +258,7 @@ func (c *ChannelManagerImpl) unwatchDroppedChannels() { log.Warn("unable to remove channel", zap.String("channel", ch.GetName()), zap.Error(err)) continue } - err = c.h.FinishDropChannel(ch.GetName()) + err = c.h.FinishDropChannel(ch.GetName(), ch.GetCollectionID()) if err != nil { log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.GetName()), zap.Error(err)) } @@ -821,7 +821,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string) if err := c.remove(originNodeID, ch); err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) } - if err := c.h.FinishDropChannel(channelName); err != nil { + if err := c.h.FinishDropChannel(channelName, ch.GetCollectionID()); err != nil { return fmt.Errorf("FinishDropChannel failed, err=%w", err) } log.Info("removed channel assignment", zap.String("channelName", channelName)) @@ -878,7 +878,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str } log.Info("try to cleanup removal flag ", zap.String("channelName", channelName)) - if err := c.h.FinishDropChannel(channelName); err != nil { + if err := c.h.FinishDropChannel(channelName, chToCleanUp.GetCollectionID()); err != nil { return fmt.Errorf("FinishDropChannel failed, err=%w", err) } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 981e67bf76..536f65b1a1 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -580,7 +580,7 @@ func TestChannelManager(t *testing.T) { handler.EXPECT(). CheckShouldDropChannel(mock.Anything). Return(true) - handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil) + handler.EXPECT().FinishDropChannel(mock.Anything, mock.Anything).Return(nil) chManager, err := NewChannelManager(watchkv, handler) require.NoError(t, err) @@ -658,7 +658,7 @@ func TestChannelManager(t *testing.T) { handler.EXPECT(). CheckShouldDropChannel(mock.Anything). Return(true) - handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil) + handler.EXPECT().FinishDropChannel(mock.Anything, mock.Anything).Return(nil) chManager, err := NewChannelManager(watchkv, handler) require.NoError(t, err) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index e897e9fb2b..aebebfab4c 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -41,7 +41,7 @@ type Handler interface { // GetDataVChanPositions gets the information recovery needed of a channel for DataNode GetDataVChanPositions(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(ch string) bool - FinishDropChannel(ch string) error + FinishDropChannel(ch string, collectionID int64) error GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) } @@ -410,7 +410,7 @@ func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { // FinishDropChannel cleans up the remove flag for channels // this function is a wrapper of server.meta.FinishDropChannel -func (h *ServerHandler) FinishDropChannel(channel string) error { +func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) error { err := h.s.meta.catalog.DropChannel(h.s.ctx, channel) if err != nil { log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err)) @@ -418,5 +418,9 @@ func (h *ServerHandler) FinishDropChannel(channel string) error { } log.Info("DropChannel succeeded", zap.String("vChannel", channel)) // Channel checkpoints are cleaned up during garbage collection. + + // clean collection info cache when meet drop collection info + h.s.meta.DropCollection(collectionID) + return nil } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 4f7eb0c176..5f781df27e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -164,7 +164,7 @@ func (m *meta) reloadFromKV() error { // AddCollection adds a collection into meta // Note that collection info is just for caching and will not be set into etcd from datacoord func (m *meta) AddCollection(collection *collectionInfo) { - log.Debug("meta update: add collection", zap.Int64("collectionID", collection.ID)) + log.Info("meta update: add collection", zap.Int64("collectionID", collection.ID)) m.Lock() defer m.Unlock() m.collections[collection.ID] = collection @@ -172,6 +172,16 @@ func (m *meta) AddCollection(collection *collectionInfo) { log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID)) } +// DropCollection drop a collection from meta +func (m *meta) DropCollection(collectionID int64) { + log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID)) + m.Lock() + defer m.Unlock() + delete(m.collections, collectionID) + metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections))) + log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID)) +} + // GetCollection returns collection info with provided collection id from local cache func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo { m.RLock() diff --git a/internal/datacoord/mock_handler.go b/internal/datacoord/mock_handler.go index b4c5fcd5ea..3e02c25c8d 100644 --- a/internal/datacoord/mock_handler.go +++ b/internal/datacoord/mock_handler.go @@ -64,13 +64,13 @@ func (_c *NMockHandler_CheckShouldDropChannel_Call) RunAndReturn(run func(string return _c } -// FinishDropChannel provides a mock function with given fields: ch -func (_m *NMockHandler) FinishDropChannel(ch string) error { - ret := _m.Called(ch) +// FinishDropChannel provides a mock function with given fields: ch, collectionID +func (_m *NMockHandler) FinishDropChannel(ch string, collectionID int64) error { + ret := _m.Called(ch, collectionID) var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(ch) + if rf, ok := ret.Get(0).(func(string, int64) error); ok { + r0 = rf(ch, collectionID) } else { r0 = ret.Error(0) } @@ -85,13 +85,14 @@ type NMockHandler_FinishDropChannel_Call struct { // FinishDropChannel is a helper method to define mock.On call // - ch string -func (_e *NMockHandler_Expecter) FinishDropChannel(ch interface{}) *NMockHandler_FinishDropChannel_Call { - return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", ch)} +// - collectionID int64 +func (_e *NMockHandler_Expecter) FinishDropChannel(ch interface{}, collectionID interface{}) *NMockHandler_FinishDropChannel_Call { + return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", ch, collectionID)} } -func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(ch string)) *NMockHandler_FinishDropChannel_Call { +func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(ch string, collectionID int64)) *NMockHandler_FinishDropChannel_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) + run(args[0].(string), args[1].(int64)) }) return _c } @@ -101,7 +102,7 @@ func (_c *NMockHandler_FinishDropChannel_Call) Return(_a0 error) *NMockHandler_F return _c } -func (_c *NMockHandler_FinishDropChannel_Call) RunAndReturn(run func(string) error) *NMockHandler_FinishDropChannel_Call { +func (_c *NMockHandler_FinishDropChannel_Call) RunAndReturn(run func(string, int64) error) *NMockHandler_FinishDropChannel_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index a5e77c6e30..42fd1b30ca 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -727,7 +727,7 @@ func (h *mockHandler) CheckShouldDropChannel(channel string) bool { return false } -func (h *mockHandler) FinishDropChannel(channel string) error { +func (h *mockHandler) FinishDropChannel(channel string, collectionID int64) error { return nil } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index a0a1bbc24d..516b2def3e 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -277,7 +277,7 @@ func (q *QuotaCenter) Start() { func (q *QuotaCenter) run() { defer q.wg.Done() - interval := time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second)) + interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second) log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval)) ticker := time.NewTicker(interval) defer ticker.Stop()