mirror of https://github.com/milvus-io/milvus.git
Add segment size metric in querynode (#25406)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/25485/head
parent
dbf0130803
commit
4c93495587
|
@ -18,6 +18,7 @@ package querynodev2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/collector"
|
"github.com/milvus-io/milvus/internal/querynodev2/collector"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||||
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
@ -102,10 +104,31 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
||||||
|
|
||||||
minTsafeChannel, minTsafe := node.tSafeManager.Min()
|
minTsafeChannel, minTsafe := node.tSafeManager.Min()
|
||||||
|
|
||||||
|
var totalGrowingSize int64
|
||||||
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
|
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
|
||||||
growingSegmentsSize := lo.SumBy(growingSegments, func(seg segments.Segment) int64 {
|
growingGroupByCollection := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 {
|
||||||
return seg.MemSize()
|
return seg.Collection()
|
||||||
})
|
})
|
||||||
|
for collection, segs := range growingGroupByCollection {
|
||||||
|
size := lo.SumBy(segs, func(seg segments.Segment) int64 {
|
||||||
|
return seg.MemSize()
|
||||||
|
})
|
||||||
|
totalGrowingSize += size
|
||||||
|
metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
|
||||||
|
fmt.Sprint(collection), segments.SegmentTypeGrowing.String()).Set(float64(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
sealedSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
|
||||||
|
sealedGroupByCollection := lo.GroupBy(sealedSegments, func(seg segments.Segment) int64 {
|
||||||
|
return seg.Collection()
|
||||||
|
})
|
||||||
|
for collection, segs := range sealedGroupByCollection {
|
||||||
|
size := lo.SumBy(segs, func(seg segments.Segment) int64 {
|
||||||
|
return seg.MemSize()
|
||||||
|
})
|
||||||
|
metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
|
||||||
|
fmt.Sprint(collection), segments.SegmentTypeSealed.String()).Set(float64(size))
|
||||||
|
}
|
||||||
|
|
||||||
allSegments := node.manager.Segment.GetBy()
|
allSegments := node.manager.Segment.GetBy()
|
||||||
collections := typeutil.NewUniqueSet()
|
collections := typeutil.NewUniqueSet()
|
||||||
|
@ -123,7 +146,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
||||||
},
|
},
|
||||||
SearchQueue: sqms,
|
SearchQueue: sqms,
|
||||||
QueryQueue: qqms,
|
QueryQueue: qqms,
|
||||||
GrowingSegmentsSize: growingSegmentsSize,
|
GrowingSegmentsSize: totalGrowingSize,
|
||||||
Effect: metricsinfo.NodeEffect{
|
Effect: metricsinfo.NodeEffect{
|
||||||
NodeID: paramtable.GetNodeID(),
|
NodeID: paramtable.GetNodeID(),
|
||||||
CollectionIDs: collections.Collect(),
|
CollectionIDs: collections.Collect(),
|
||||||
|
|
|
@ -345,6 +345,18 @@ var (
|
||||||
indexCountLabelName,
|
indexCountLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
QueryNodeEntitiesSize = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.QueryNodeRole,
|
||||||
|
Name: "entity_size",
|
||||||
|
Help: "entities' memory size, clustered by collection and state",
|
||||||
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
|
segmentStateLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
// QueryNodeConsumeCounter counts the bytes QueryNode consumed from message storage.
|
// QueryNodeConsumeCounter counts the bytes QueryNode consumed from message storage.
|
||||||
QueryNodeConsumeCounter = prometheus.NewCounterVec(
|
QueryNodeConsumeCounter = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
|
@ -427,6 +439,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
|
||||||
registry.MustRegister(QueryNodeSearchTopK)
|
registry.MustRegister(QueryNodeSearchTopK)
|
||||||
registry.MustRegister(QueryNodeNumFlowGraphs)
|
registry.MustRegister(QueryNodeNumFlowGraphs)
|
||||||
registry.MustRegister(QueryNodeNumEntities)
|
registry.MustRegister(QueryNodeNumEntities)
|
||||||
|
registry.MustRegister(QueryNodeEntitiesSize)
|
||||||
registry.MustRegister(QueryNodeConsumeCounter)
|
registry.MustRegister(QueryNodeConsumeCounter)
|
||||||
registry.MustRegister(QueryNodeExecuteCounter)
|
registry.MustRegister(QueryNodeExecuteCounter)
|
||||||
registry.MustRegister(QueryNodeConsumerMsgCount)
|
registry.MustRegister(QueryNodeConsumerMsgCount)
|
||||||
|
|
Loading…
Reference in New Issue