mirror of https://github.com/milvus-io/milvus.git
fix: fix collectionInfo leak in datacoord (#32175)
issue: #32029 lack of logic to clean collection info in datacoord's meta, This PR clean collection info after drop channel, to avoid collection info leak in datacoord --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32306/head
parent
b87f41128b
commit
0d849a6c0a
|
@ -91,7 +91,7 @@ class MilvusConan(ConanFile):
|
||||||
def requirements(self):
|
def requirements(self):
|
||||||
if self.settings.os != "Macos":
|
if self.settings.os != "Macos":
|
||||||
# MacOS does not need openblas
|
# MacOS does not need openblas
|
||||||
self.requires("openblas/0.3.23@milvus/dev")
|
# self.requires("openblas/0.3.23@milvus/dev")
|
||||||
self.requires("libunwind/1.7.2")
|
self.requires("libunwind/1.7.2")
|
||||||
|
|
||||||
def imports(self):
|
def imports(self):
|
||||||
|
|
|
@ -258,7 +258,7 @@ func (c *ChannelManagerImpl) unwatchDroppedChannels() {
|
||||||
log.Warn("unable to remove channel", zap.String("channel", ch.GetName()), zap.Error(err))
|
log.Warn("unable to remove channel", zap.String("channel", ch.GetName()), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = c.h.FinishDropChannel(ch.GetName())
|
err = c.h.FinishDropChannel(ch.GetName(), ch.GetCollectionID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.GetName()), zap.Error(err))
|
log.Warn("FinishDropChannel failed when unwatchDroppedChannels", zap.String("channel", ch.GetName()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -821,7 +821,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string)
|
||||||
if err := c.remove(originNodeID, ch); err != nil {
|
if err := c.remove(originNodeID, ch); err != nil {
|
||||||
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
|
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
|
||||||
}
|
}
|
||||||
if err := c.h.FinishDropChannel(channelName); err != nil {
|
if err := c.h.FinishDropChannel(channelName, ch.GetCollectionID()); err != nil {
|
||||||
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
||||||
}
|
}
|
||||||
log.Info("removed channel assignment", zap.String("channelName", channelName))
|
log.Info("removed channel assignment", zap.String("channelName", channelName))
|
||||||
|
@ -878,7 +878,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("try to cleanup removal flag ", zap.String("channelName", channelName))
|
log.Info("try to cleanup removal flag ", zap.String("channelName", channelName))
|
||||||
if err := c.h.FinishDropChannel(channelName); err != nil {
|
if err := c.h.FinishDropChannel(channelName, chToCleanUp.GetCollectionID()); err != nil {
|
||||||
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -580,7 +580,7 @@ func TestChannelManager(t *testing.T) {
|
||||||
handler.EXPECT().
|
handler.EXPECT().
|
||||||
CheckShouldDropChannel(mock.Anything).
|
CheckShouldDropChannel(mock.Anything).
|
||||||
Return(true)
|
Return(true)
|
||||||
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
|
handler.EXPECT().FinishDropChannel(mock.Anything, mock.Anything).Return(nil)
|
||||||
chManager, err := NewChannelManager(watchkv, handler)
|
chManager, err := NewChannelManager(watchkv, handler)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -658,7 +658,7 @@ func TestChannelManager(t *testing.T) {
|
||||||
handler.EXPECT().
|
handler.EXPECT().
|
||||||
CheckShouldDropChannel(mock.Anything).
|
CheckShouldDropChannel(mock.Anything).
|
||||||
Return(true)
|
Return(true)
|
||||||
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
|
handler.EXPECT().FinishDropChannel(mock.Anything, mock.Anything).Return(nil)
|
||||||
chManager, err := NewChannelManager(watchkv, handler)
|
chManager, err := NewChannelManager(watchkv, handler)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ type Handler interface {
|
||||||
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
|
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
|
||||||
GetDataVChanPositions(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo
|
GetDataVChanPositions(ch RWChannel, partitionID UniqueID) *datapb.VchannelInfo
|
||||||
CheckShouldDropChannel(ch string) bool
|
CheckShouldDropChannel(ch string) bool
|
||||||
FinishDropChannel(ch string) error
|
FinishDropChannel(ch string, collectionID int64) error
|
||||||
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
|
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,7 +410,7 @@ func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
|
||||||
|
|
||||||
// FinishDropChannel cleans up the remove flag for channels
|
// FinishDropChannel cleans up the remove flag for channels
|
||||||
// this function is a wrapper of server.meta.FinishDropChannel
|
// this function is a wrapper of server.meta.FinishDropChannel
|
||||||
func (h *ServerHandler) FinishDropChannel(channel string) error {
|
func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) error {
|
||||||
err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
|
err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
|
log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
|
||||||
|
@ -418,5 +418,9 @@ func (h *ServerHandler) FinishDropChannel(channel string) error {
|
||||||
}
|
}
|
||||||
log.Info("DropChannel succeeded", zap.String("vChannel", channel))
|
log.Info("DropChannel succeeded", zap.String("vChannel", channel))
|
||||||
// Channel checkpoints are cleaned up during garbage collection.
|
// Channel checkpoints are cleaned up during garbage collection.
|
||||||
|
|
||||||
|
// clean collection info cache when meet drop collection info
|
||||||
|
h.s.meta.DropCollection(collectionID)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,7 +164,7 @@ func (m *meta) reloadFromKV() error {
|
||||||
// AddCollection adds a collection into meta
|
// AddCollection adds a collection into meta
|
||||||
// Note that collection info is just for caching and will not be set into etcd from datacoord
|
// Note that collection info is just for caching and will not be set into etcd from datacoord
|
||||||
func (m *meta) AddCollection(collection *collectionInfo) {
|
func (m *meta) AddCollection(collection *collectionInfo) {
|
||||||
log.Debug("meta update: add collection", zap.Int64("collectionID", collection.ID))
|
log.Info("meta update: add collection", zap.Int64("collectionID", collection.ID))
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
m.collections[collection.ID] = collection
|
m.collections[collection.ID] = collection
|
||||||
|
@ -172,6 +172,16 @@ func (m *meta) AddCollection(collection *collectionInfo) {
|
||||||
log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID))
|
log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DropCollection drop a collection from meta
|
||||||
|
func (m *meta) DropCollection(collectionID int64) {
|
||||||
|
log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID))
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
delete(m.collections, collectionID)
|
||||||
|
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
|
||||||
|
log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID))
|
||||||
|
}
|
||||||
|
|
||||||
// GetCollection returns collection info with provided collection id from local cache
|
// GetCollection returns collection info with provided collection id from local cache
|
||||||
func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo {
|
func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
|
|
|
@ -64,13 +64,13 @@ func (_c *NMockHandler_CheckShouldDropChannel_Call) RunAndReturn(run func(string
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
// FinishDropChannel provides a mock function with given fields: ch
|
// FinishDropChannel provides a mock function with given fields: ch, collectionID
|
||||||
func (_m *NMockHandler) FinishDropChannel(ch string) error {
|
func (_m *NMockHandler) FinishDropChannel(ch string, collectionID int64) error {
|
||||||
ret := _m.Called(ch)
|
ret := _m.Called(ch, collectionID)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
if rf, ok := ret.Get(0).(func(string) error); ok {
|
if rf, ok := ret.Get(0).(func(string, int64) error); ok {
|
||||||
r0 = rf(ch)
|
r0 = rf(ch, collectionID)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Error(0)
|
r0 = ret.Error(0)
|
||||||
}
|
}
|
||||||
|
@ -85,13 +85,14 @@ type NMockHandler_FinishDropChannel_Call struct {
|
||||||
|
|
||||||
// FinishDropChannel is a helper method to define mock.On call
|
// FinishDropChannel is a helper method to define mock.On call
|
||||||
// - ch string
|
// - ch string
|
||||||
func (_e *NMockHandler_Expecter) FinishDropChannel(ch interface{}) *NMockHandler_FinishDropChannel_Call {
|
// - collectionID int64
|
||||||
return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", ch)}
|
func (_e *NMockHandler_Expecter) FinishDropChannel(ch interface{}, collectionID interface{}) *NMockHandler_FinishDropChannel_Call {
|
||||||
|
return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", ch, collectionID)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(ch string)) *NMockHandler_FinishDropChannel_Call {
|
func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(ch string, collectionID int64)) *NMockHandler_FinishDropChannel_Call {
|
||||||
_c.Call.Run(func(args mock.Arguments) {
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
run(args[0].(string))
|
run(args[0].(string), args[1].(int64))
|
||||||
})
|
})
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
@ -101,7 +102,7 @@ func (_c *NMockHandler_FinishDropChannel_Call) Return(_a0 error) *NMockHandler_F
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *NMockHandler_FinishDropChannel_Call) RunAndReturn(run func(string) error) *NMockHandler_FinishDropChannel_Call {
|
func (_c *NMockHandler_FinishDropChannel_Call) RunAndReturn(run func(string, int64) error) *NMockHandler_FinishDropChannel_Call {
|
||||||
_c.Call.Return(run)
|
_c.Call.Return(run)
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
|
@ -727,7 +727,7 @@ func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *mockHandler) FinishDropChannel(channel string) error {
|
func (h *mockHandler) FinishDropChannel(channel string, collectionID int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,7 @@ func (q *QuotaCenter) Start() {
|
||||||
func (q *QuotaCenter) run() {
|
func (q *QuotaCenter) run() {
|
||||||
defer q.wg.Done()
|
defer q.wg.Done()
|
||||||
|
|
||||||
interval := time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second))
|
interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second)
|
||||||
log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval))
|
log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval))
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
Loading…
Reference in New Issue