diff --git a/internal/querynodev2/delegator/segment_pruner.go b/internal/querynodev2/delegator/segment_pruner.go index e2ced8ed9d..d5b1116d39 100644 --- a/internal/querynodev2/delegator/segment_pruner.go +++ b/internal/querynodev2/delegator/segment_pruner.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -63,6 +64,7 @@ func PruneSegments(ctx context.Context, } filteredSegments := make(map[UniqueID]struct{}, 0) + pruneType := "scalar" // currently we only prune based on one column if typeutil.IsVectorType(clusteringKeyField.GetDataType()) { // parse searched vectors @@ -84,6 +86,7 @@ func PruneSegments(ctx context.Context, for _, partStats := range partitionStats { FilterSegmentsByVector(partStats, searchReq, vectorsBytes, dimValue, clusteringKeyField, filteredSegments, info.filterRatio) } + pruneType = "vector" } else { // 0. parse expr from plan plan := planpb.PlanNode{} @@ -104,13 +107,23 @@ func PruneSegments(ctx context.Context, // 2. prune segments by scalar field targetSegmentStats := make([]storage.SegmentStats, 0, 32) targetSegmentIDs := make([]int64, 0, 32) - for _, partID := range partitionIDs { - partStats := partitionStats[partID] - for segID, segStat := range partStats.SegmentStats { - targetSegmentIDs = append(targetSegmentIDs, segID) - targetSegmentStats = append(targetSegmentStats, segStat) + if len(partitionIDs) > 0 { + for _, partID := range partitionIDs { + partStats := partitionStats[partID] + for segID, segStat := range partStats.SegmentStats { + targetSegmentIDs = append(targetSegmentIDs, segID) + targetSegmentStats = append(targetSegmentStats, segStat) + } + } + } else { + for _, partStats := range partitionStats { + for segID, segStat := range partStats.SegmentStats { + targetSegmentIDs = append(targetSegmentIDs, segID) + targetSegmentStats = append(targetSegmentStats, segStat) + } } } + PruneByScalarField(expr, targetSegmentStats, targetSegmentIDs, filteredSegments) } @@ -118,6 +131,8 @@ func PruneSegments(ctx context.Context, if len(filteredSegments) > 0 { realFilteredSegments := 0 totalSegNum := 0 + minSegmentCount := math.MaxInt + maxSegmentCount := 0 for idx, item := range sealedSegments { newSegments := make([]SegmentEntry, 0) totalSegNum += len(item.Segments) @@ -131,11 +146,30 @@ func PruneSegments(ctx context.Context, } item.Segments = newSegments sealedSegments[idx] = item + segmentCount := len(item.Segments) + if segmentCount > maxSegmentCount { + maxSegmentCount = segmentCount + } + if segmentCount < minSegmentCount { + minSegmentCount = segmentCount + } } + bias := 1.0 + if maxSegmentCount != 0 && minSegmentCount != math.MaxInt { + bias = float64(maxSegmentCount) / float64(minSegmentCount) + } + metrics.QueryNodeSegmentPruneBias. + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collectionID), + pruneType, + ).Set(bias) + filterRatio := float32(realFilteredSegments) / float32(totalSegNum) metrics.QueryNodeSegmentPruneRatio. - WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))). - Observe(float64(filterRatio)) + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collectionID), + pruneType, + ).Set(float64(filterRatio)) log.Ctx(ctx).Debug("Pruned segment for search/query", zap.Int("filtered_segment_num[stats]", len(filteredSegments)), zap.Int("filtered_segment_num[excluded]", realFilteredSegments), @@ -144,8 +178,10 @@ func PruneSegments(ctx context.Context, ) } - metrics.QueryNodeSegmentPruneLatency.WithLabelValues(fmt.Sprint(collectionID), - fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))). + metrics.QueryNodeSegmentPruneLatency.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(collectionID), + pruneType). Observe(float64(tr.ElapseSpan().Milliseconds())) log.Ctx(ctx).Debug("Pruned segment for search/query", zap.Duration("duration", tr.ElapseSpan())) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 9f862a460f..91304fbb1e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -76,6 +76,7 @@ const ( compactionTypeLabelName = "compaction_type" isVectorFieldLabelName = "is_vector_field" + segmentPruneLabelName = "segment_prune_label" stageLabelName = "compaction_stage" nodeIDLabelName = "node_id" statusLabelName = "status" diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index e46da21fb6..94d08f9911 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -362,16 +362,28 @@ var ( nodeIDLabelName, }) - QueryNodeSegmentPruneRatio = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ + QueryNodeSegmentPruneRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: milvusNamespace, Subsystem: typeutil.QueryNodeRole, Name: "segment_prune_ratio", - Help: "latency of compaction operation", - Buckets: buckets, + Help: "ratio of segments pruned by segment_pruner", }, []string{ + nodeIDLabelName, collectionIDLabelName, - isVectorFieldLabelName, + segmentPruneLabelName, + }) + + QueryNodeSegmentPruneBias = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "segment_prune_bias", + Help: "bias of workload when enabling segment prune", + }, []string{ + nodeIDLabelName, + collectionIDLabelName, + segmentPruneLabelName, }) QueryNodeSegmentPruneLatency = prometheus.NewHistogramVec( @@ -382,8 +394,9 @@ var ( Help: "latency of segment prune", Buckets: buckets, }, []string{ + nodeIDLabelName, collectionIDLabelName, - isVectorFieldLabelName, + segmentPruneLabelName, }) QueryNodeEvictedReadReqCount = prometheus.NewCounterVec( @@ -803,6 +816,7 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeDiskCacheEvictGlobalDuration) registry.MustRegister(QueryNodeSegmentPruneRatio) registry.MustRegister(QueryNodeSegmentPruneLatency) + registry.MustRegister(QueryNodeSegmentPruneBias) registry.MustRegister(QueryNodeApplyBFCost) registry.MustRegister(QueryNodeForwardDeleteCost) // Add cgo metrics