mirror of https://github.com/milvus-io/milvus.git
enhance: Optimized the GC logic to ensure that memory is released in time (#34950)
issue: #34703 master pr: #34949 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/34920/head
parent
ae1636c2be
commit
74adedf750
|
@ -22,6 +22,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
@ -48,6 +49,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/tracer"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/expr"
|
||||
"github.com/milvus-io/milvus/pkg/util/gc"
|
||||
"github.com/milvus-io/milvus/pkg/util/generic"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
|
@ -369,6 +371,18 @@ func (mr *MilvusRoles) Run() {
|
|||
http.ServeHTTP()
|
||||
setupPrometheusHTTPServer(Registry)
|
||||
|
||||
if paramtable.Get().CommonCfg.GCEnabled.GetAsBool() {
|
||||
if paramtable.Get().CommonCfg.GCHelperEnabled.GetAsBool() {
|
||||
action := func(GOGC uint32) {
|
||||
debug.SetGCPercent(int(GOGC))
|
||||
}
|
||||
gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
|
||||
} else {
|
||||
action := func(uint32) {}
|
||||
gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
local := mr.Local
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
sio "io"
|
||||
"math"
|
||||
"path"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -447,7 +449,9 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
|
||||
var totalBufferSize int64 = 0
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
t.clusterBufferLocks.RLock(buffer.id)
|
||||
totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load()
|
||||
t.clusterBufferLocks.RUnlock(buffer.id)
|
||||
}
|
||||
return totalBufferSize
|
||||
}
|
||||
|
@ -546,6 +550,7 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
err := pkIter.Next()
|
||||
if err != nil {
|
||||
if err == sio.EOF {
|
||||
pkIter.Close()
|
||||
break
|
||||
} else {
|
||||
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
|
||||
|
@ -923,6 +928,10 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
writer = nil
|
||||
runtime.GC()
|
||||
debug.FreeOSMemory()
|
||||
log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()),
|
||||
zap.Int64("cost", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
|
|
|
@ -34,7 +34,6 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
"plugin"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -64,7 +63,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
|
||||
"github.com/milvus-io/milvus/pkg/util/expr"
|
||||
"github.com/milvus-io/milvus/pkg/util/gc"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/lifetime"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -365,17 +363,6 @@ func (node *QueryNode) Init() error {
|
|||
initError = err
|
||||
return
|
||||
}
|
||||
if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {
|
||||
if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {
|
||||
action := func(GOGC uint32) {
|
||||
debug.SetGCPercent(int(GOGC))
|
||||
}
|
||||
gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
|
||||
} else {
|
||||
action := func(uint32) {}
|
||||
gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("query node init successfully",
|
||||
zap.Int64("queryNodeID", node.GetNodeID()),
|
||||
|
|
|
@ -252,6 +252,12 @@ type commonConfig struct {
|
|||
UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
|
||||
UseVectorAsClusteringKey ParamItem `refreshable:"true"`
|
||||
EnableVectorClusteringKey ParamItem `refreshable:"true"`
|
||||
|
||||
GCEnabled ParamItem `refreshable:"false"`
|
||||
GCHelperEnabled ParamItem `refreshable:"false"`
|
||||
OverloadedMemoryThresholdPercentage ParamItem `refreshable:"false"`
|
||||
MaximumGOGCConfig ParamItem `refreshable:"false"`
|
||||
MinimumGOGCConfig ParamItem `refreshable:"false"`
|
||||
}
|
||||
|
||||
func (p *commonConfig) init(base *BaseTable) {
|
||||
|
@ -799,6 +805,45 @@ like the old password verification when updating the credential`,
|
|||
DefaultValue: "false",
|
||||
}
|
||||
p.EnableVectorClusteringKey.Init(base.mgr)
|
||||
|
||||
p.GCEnabled = ParamItem{
|
||||
Key: "common.gcenabled",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "true",
|
||||
}
|
||||
p.GCEnabled.Init(base.mgr)
|
||||
|
||||
p.GCHelperEnabled = ParamItem{
|
||||
Key: "common.gchelper.enabled",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "true",
|
||||
}
|
||||
p.GCHelperEnabled.Init(base.mgr)
|
||||
|
||||
p.OverloadedMemoryThresholdPercentage = ParamItem{
|
||||
Key: "common.overloadedMemoryThresholdPercentage",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "90",
|
||||
PanicIfEmpty: true,
|
||||
Formatter: func(v string) string {
|
||||
return fmt.Sprintf("%f", getAsFloat(v)/100)
|
||||
},
|
||||
}
|
||||
p.OverloadedMemoryThresholdPercentage.Init(base.mgr)
|
||||
|
||||
p.MaximumGOGCConfig = ParamItem{
|
||||
Key: "common.gchelper.maximumGoGC",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "200",
|
||||
}
|
||||
p.MaximumGOGCConfig.Init(base.mgr)
|
||||
|
||||
p.MinimumGOGCConfig = ParamItem{
|
||||
Key: "common.gchelper.minimumGoGC",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "30",
|
||||
}
|
||||
p.MinimumGOGCConfig.Init(base.mgr)
|
||||
}
|
||||
|
||||
type gpuConfig struct {
|
||||
|
|
|
@ -114,6 +114,17 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings())
|
||||
|
||||
assert.Equal(t, 1000, params.CommonCfg.BloomFilterApplyBatchSize.GetAsInt())
|
||||
|
||||
params.Save("common.gcenabled", "false")
|
||||
assert.False(t, Params.GCEnabled.GetAsBool())
|
||||
params.Save("common.gchelper.enabled", "false")
|
||||
assert.False(t, Params.GCHelperEnabled.GetAsBool())
|
||||
params.Save("common.overloadedMemoryThresholdPercentage", "40")
|
||||
assert.Equal(t, 0.4, Params.OverloadedMemoryThresholdPercentage.GetAsFloat())
|
||||
params.Save("common.gchelper.maximumGoGC", "100")
|
||||
assert.Equal(t, 100, Params.MaximumGOGCConfig.GetAsInt())
|
||||
params.Save("common.gchelper.minimumGoGC", "80")
|
||||
assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt())
|
||||
})
|
||||
|
||||
t.Run("test rootCoordConfig", func(t *testing.T) {
|
||||
|
|
|
@ -200,7 +200,8 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
|
|||
s.Equal(flushedSegmentsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
|
||||
|
||||
// 30000*(128*4+8+8) = 15.1MB/1MB = 15+1
|
||||
// s.Equal(len(flushedSegmentsResp.GetSegments()), 16)
|
||||
// The check is done every 100 lines written, so the size of each segment may be up to 99 lines larger.
|
||||
s.Contains([]int{15, 16}, len(flushedSegmentsResp.GetSegments()))
|
||||
log.Info("get flushed segments done", zap.Int64s("segments", flushedSegmentsResp.GetSegments()))
|
||||
totalRows := int64(0)
|
||||
segsInfoResp, err := c.DataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
||||
|
@ -209,6 +210,7 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
|
|||
s.NoError(err)
|
||||
s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
|
||||
for _, segInfo := range segsInfoResp.GetInfos() {
|
||||
s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128))
|
||||
totalRows += segInfo.GetNumOfRows()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue