diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index c32d604c95..d3cf3ed6e0 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -22,6 +22,7 @@ import ( "os" "os/signal" "path/filepath" + "runtime/debug" "strings" "sync" "syscall" @@ -48,6 +49,7 @@ import ( "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/expr" + "github.com/milvus-io/milvus/pkg/util/gc" "github.com/milvus-io/milvus/pkg/util/generic" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -369,6 +371,18 @@ func (mr *MilvusRoles) Run() { http.ServeHTTP() setupPrometheusHTTPServer(Registry) + if paramtable.Get().CommonCfg.GCEnabled.GetAsBool() { + if paramtable.Get().CommonCfg.GCHelperEnabled.GetAsBool() { + action := func(GOGC uint32) { + debug.SetGCPercent(int(GOGC)) + } + gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) + } else { + action := func(uint32) {} + gc.NewTuner(paramtable.Get().CommonCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) + } + } + var wg sync.WaitGroup local := mr.Local diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 164bb847e4..a1f95ff5aa 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -22,6 +22,8 @@ import ( sio "io" "math" "path" + "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -447,7 +449,9 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { + t.clusterBufferLocks.RLock(buffer.id) totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load() + t.clusterBufferLocks.RUnlock(buffer.id) } return totalBufferSize } @@ -546,6 +550,7 @@ func (t *clusteringCompactionTask) mappingSegment( err := pkIter.Next() if err != nil { if err == sio.EOF { + pkIter.Close() break } else { log.Warn("compact wrong, failed to iter through data", zap.Error(err)) @@ -923,6 +928,10 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus return err } } + + writer = nil + runtime.GC() + debug.FreeOSMemory() log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()), zap.Int64("cost", time.Since(start).Milliseconds())) return nil diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index e74dd5bdc1..9fe009f8c3 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -34,7 +34,6 @@ import ( "path" "path/filepath" "plugin" - "runtime/debug" "strings" "sync" "time" @@ -64,7 +63,6 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/expr" - "github.com/milvus-io/milvus/pkg/util/gc" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -365,17 +363,6 @@ func (node *QueryNode) Init() error { initError = err return } - if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() { - if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() { - action := func(GOGC uint32) { - debug.SetGCPercent(int(GOGC)) - } - gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) - } else { - action := func(uint32) {} - gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) - } - } log.Info("query node init successfully", zap.Int64("queryNodeID", node.GetNodeID()), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d7eb8bdba6..78f95da38f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -252,6 +252,12 @@ type commonConfig struct { UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` EnableVectorClusteringKey ParamItem `refreshable:"true"` + + GCEnabled ParamItem `refreshable:"false"` + GCHelperEnabled ParamItem `refreshable:"false"` + OverloadedMemoryThresholdPercentage ParamItem `refreshable:"false"` + MaximumGOGCConfig ParamItem `refreshable:"false"` + MinimumGOGCConfig ParamItem `refreshable:"false"` } func (p *commonConfig) init(base *BaseTable) { @@ -799,6 +805,45 @@ like the old password verification when updating the credential`, DefaultValue: "false", } p.EnableVectorClusteringKey.Init(base.mgr) + + p.GCEnabled = ParamItem{ + Key: "common.gcenabled", + Version: "2.4.7", + DefaultValue: "true", + } + p.GCEnabled.Init(base.mgr) + + p.GCHelperEnabled = ParamItem{ + Key: "common.gchelper.enabled", + Version: "2.4.7", + DefaultValue: "true", + } + p.GCHelperEnabled.Init(base.mgr) + + p.OverloadedMemoryThresholdPercentage = ParamItem{ + Key: "common.overloadedMemoryThresholdPercentage", + Version: "2.4.7", + DefaultValue: "90", + PanicIfEmpty: true, + Formatter: func(v string) string { + return fmt.Sprintf("%f", getAsFloat(v)/100) + }, + } + p.OverloadedMemoryThresholdPercentage.Init(base.mgr) + + p.MaximumGOGCConfig = ParamItem{ + Key: "common.gchelper.maximumGoGC", + Version: "2.4.7", + DefaultValue: "200", + } + p.MaximumGOGCConfig.Init(base.mgr) + + p.MinimumGOGCConfig = ParamItem{ + Key: "common.gchelper.minimumGoGC", + Version: "2.4.7", + DefaultValue: "30", + } + p.MinimumGOGCConfig.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index c61f68245f..15bf2fdddb 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -114,6 +114,17 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings()) assert.Equal(t, 1000, params.CommonCfg.BloomFilterApplyBatchSize.GetAsInt()) + + params.Save("common.gcenabled", "false") + assert.False(t, Params.GCEnabled.GetAsBool()) + params.Save("common.gchelper.enabled", "false") + assert.False(t, Params.GCHelperEnabled.GetAsBool()) + params.Save("common.overloadedMemoryThresholdPercentage", "40") + assert.Equal(t, 0.4, Params.OverloadedMemoryThresholdPercentage.GetAsFloat()) + params.Save("common.gchelper.maximumGoGC", "100") + assert.Equal(t, 100, Params.MaximumGOGCConfig.GetAsInt()) + params.Save("common.gchelper.minimumGoGC", "80") + assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt()) }) t.Run("test rootCoordConfig", func(t *testing.T) { diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 7991a71e9f..17c5e73965 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -200,7 +200,8 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { s.Equal(flushedSegmentsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // 30000*(128*4+8+8) = 15.1MB/1MB = 15+1 - // s.Equal(len(flushedSegmentsResp.GetSegments()), 16) + // The check is done every 100 lines written, so the size of each segment may be up to 99 lines larger. + s.Contains([]int{15, 16}, len(flushedSegmentsResp.GetSegments())) log.Info("get flushed segments done", zap.Int64s("segments", flushedSegmentsResp.GetSegments())) totalRows := int64(0) segsInfoResp, err := c.DataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ @@ -209,6 +210,7 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { s.NoError(err) s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) for _, segInfo := range segsInfoResp.GetInfos() { + s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128)) totalRows += segInfo.GetNumOfRows() }