From 4c934955878bc90bb900789521788a277cc7ebc1 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 12 Jul 2023 14:26:28 +0800 Subject: [PATCH] Add segment size metric in querynode (#25406) Signed-off-by: bigsheeper --- internal/querynodev2/metrics_info.go | 29 +++++++++++++++++++++++++--- pkg/metrics/querynode_metrics.go | 13 +++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index aae78ba68b..7f7d192a81 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -18,6 +18,7 @@ package querynodev2 import ( "context" + "fmt" "time" "github.com/samber/lo" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -102,10 +104,31 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error minTsafeChannel, minTsafe := node.tSafeManager.Min() + var totalGrowingSize int64 growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) - growingSegmentsSize := lo.SumBy(growingSegments, func(seg segments.Segment) int64 { - return seg.MemSize() + growingGroupByCollection := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 { + return seg.Collection() }) + for collection, segs := range growingGroupByCollection { + size := lo.SumBy(segs, func(seg segments.Segment) int64 { + return seg.MemSize() + }) + totalGrowingSize += size + metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collection), segments.SegmentTypeGrowing.String()).Set(float64(size)) + } + + sealedSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) + sealedGroupByCollection := lo.GroupBy(sealedSegments, func(seg segments.Segment) int64 { + return seg.Collection() + }) + for collection, segs := range sealedGroupByCollection { + size := lo.SumBy(segs, func(seg segments.Segment) int64 { + return seg.MemSize() + }) + metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collection), segments.SegmentTypeSealed.String()).Set(float64(size)) + } allSegments := node.manager.Segment.GetBy() collections := typeutil.NewUniqueSet() @@ -123,7 +146,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error }, SearchQueue: sqms, QueryQueue: qqms, - GrowingSegmentsSize: growingSegmentsSize, + GrowingSegmentsSize: totalGrowingSize, Effect: metricsinfo.NodeEffect{ NodeID: paramtable.GetNodeID(), CollectionIDs: collections.Collect(), diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 30cc95ae2f..30b1a920d1 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -345,6 +345,18 @@ var ( indexCountLabelName, }) + QueryNodeEntitiesSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "entity_size", + Help: "entities' memory size, clustered by collection and state", + }, []string{ + nodeIDLabelName, + collectionIDLabelName, + segmentStateLabelName, + }) + // QueryNodeConsumeCounter counts the bytes QueryNode consumed from message storage. QueryNodeConsumeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -427,6 +439,7 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeSearchTopK) registry.MustRegister(QueryNodeNumFlowGraphs) registry.MustRegister(QueryNodeNumEntities) + registry.MustRegister(QueryNodeEntitiesSize) registry.MustRegister(QueryNodeConsumeCounter) registry.MustRegister(QueryNodeExecuteCounter) registry.MustRegister(QueryNodeConsumerMsgCount)