mirror of https://github.com/milvus-io/milvus.git
fix: Datanode stop progress stuck at writer buffer memory check (#38274)
issue: #38273 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/38245/head
parent
a1e14d62c7
commit
2035575941
|
@ -77,13 +77,19 @@ func (m *bufferManager) Start() {
|
|||
}
|
||||
|
||||
func (m *bufferManager) check() {
|
||||
ticker := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
m.memoryCheck()
|
||||
ticker.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
|
||||
case <-m.ch.CloseCh():
|
||||
log.Info("buffer manager memory check stopped")
|
||||
return
|
||||
|
@ -114,6 +120,13 @@ func (m *bufferManager) memoryCheck() {
|
|||
return mem / 1024 / 1024
|
||||
}
|
||||
|
||||
select {
|
||||
case <-m.ch.CloseCh():
|
||||
log.Info("stop memory check due to manager stop")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
|
||||
size := buf.MemorySize()
|
||||
total += size
|
||||
|
|
|
@ -262,6 +262,39 @@ func (s *ManagerSuite) TestMemoryCheck() {
|
|||
wb.AssertExpectations(s.T())
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestStopDuringMemoryCheck() {
|
||||
manager := s.manager
|
||||
param := paramtable.Get()
|
||||
|
||||
param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50")
|
||||
param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "true")
|
||||
param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.7")
|
||||
|
||||
defer func() {
|
||||
param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key)
|
||||
param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key)
|
||||
param.Reset(param.DataNodeCfg.MemoryForceSyncWatermark.Key)
|
||||
}()
|
||||
|
||||
wb := NewMockWriteBuffer(s.T())
|
||||
|
||||
// mock the memory size reach water mark
|
||||
memoryLimit := hardware.GetMemoryCount()
|
||||
wb.EXPECT().MemorySize().RunAndReturn(func() int64 {
|
||||
return int64(float64(memoryLimit) * 0.8)
|
||||
}).Maybe()
|
||||
//.Return(int64(float64(memoryLimit) * 0.6))
|
||||
wb.EXPECT().EvictBuffer(mock.Anything).Maybe()
|
||||
manager.buffers.Insert(s.channelName, wb)
|
||||
manager.Start()
|
||||
|
||||
// wait memory check triggered
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// expect stop operation won't stuck
|
||||
manager.Stop()
|
||||
}
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
suite.Run(t, new(ManagerSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue