enhance: refine sync memory watermark configuration (#32138)

pr: #32140

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/32191/head
jaime 2024-04-11 20:07:24 +08:00 committed by GitHub
parent 7f683000e9
commit 5b45debb28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 12 additions and 20 deletions

View File

@ -489,8 +489,6 @@ dataNode:
memory:
forceSyncEnable: true # `true` to force sync if memory usage is too high
forceSyncSegmentNum: 1 # number of segments to sync, segments with top largest buffer will be synced.
watermarkStandalone: 0.2 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
watermarkCluster: 0.5 # memory watermark for cluster, upon reaching this watermark, segments will be synced.
timetick:
byRPC: true
channel:

View File

@ -114,7 +114,7 @@ func (m *bufferManager) memoryCheck() {
}
totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryWatermark.GetAsFloat()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(20, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", toMB(float64(total))),

View File

@ -199,12 +199,12 @@ func (s *ManagerSuite) TestMemoryCheck() {
param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50")
param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "false")
param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.7")
param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.7")
defer func() {
param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key)
param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key)
param.Reset(param.DataNodeCfg.MemoryWatermark.Key)
param.Reset(param.DataNodeCfg.MemoryForceSyncWatermark.Key)
}()
wb := NewMockWriteBuffer(s.T())
@ -232,7 +232,7 @@ func (s *ManagerSuite) TestMemoryCheck() {
<-time.After(time.Millisecond * 100)
wb.AssertNotCalled(s.T(), "SetMemoryHighFlag")
param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.5")
param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.5")
<-signal
wb.AssertExpectations(s.T())

View File

@ -3148,7 +3148,7 @@ type dataNodeConfig struct {
MemoryForceSyncEnable ParamItem `refreshable:"true"`
MemoryForceSyncSegmentNum ParamItem `refreshable:"true"`
MemoryCheckInterval ParamItem `refreshable:"true"`
MemoryWatermark ParamItem `refreshable:"true"`
MemoryForceSyncWatermark ParamItem `refreshable:"true"`
DataNodeTimeTickByRPC ParamItem `refreshable:"false"`
// DataNode send timetick interval per collection
@ -3279,26 +3279,20 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.MemoryCheckInterval.Init(base.mgr)
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode {
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkStandalone",
Version: "2.2.4",
p.MemoryForceSyncWatermark = ParamItem{
Key: "datanode.memory.forceSyncWatermark",
Version: "2.4.0",
DefaultValue: "0.2",
}
} else if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.ClusterDeployMode {
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkCluster",
Version: "2.2.4",
DefaultValue: "0.5",
}
} else {
log.Info("DeployModeEnv is not set, use default", zap.Float64("default", 0.5))
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkCluster",
Version: "2.2.4",
p.MemoryForceSyncWatermark = ParamItem{
Key: "datanode.memory.forceSyncWatermark",
Version: "2.4.0",
DefaultValue: "0.5",
}
}
p.MemoryWatermark.Init(base.mgr)
p.MemoryForceSyncWatermark.Init(base.mgr)
p.FlushDeleteBufferBytes = ParamItem{
Key: "dataNode.segment.deleteBufBytes",