mirror of https://github.com/milvus-io/milvus.git
				
				
				
			Add metrics on QueryNode and Proxy (#17328)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/17349/head
							parent
							
								
									adf3b14027
								
							
						
					
					
						commit
						5f44e45480
					
				| 
						 | 
				
			
			@ -223,6 +223,24 @@ var (
 | 
			
		|||
			Help:      "latency of each DQL request excluding search and query",
 | 
			
		||||
			Buckets:   buckets, // unit: ms
 | 
			
		||||
		}, []string{nodeIDLabelName, functionLabelName})
 | 
			
		||||
 | 
			
		||||
	// ProxyMutationReceiveBytes record the received bytes of Insert/Delete in Proxy
 | 
			
		||||
	ProxyMutationReceiveBytes = prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.ProxyRole,
 | 
			
		||||
			Name:      "receive_bytes_count",
 | 
			
		||||
			Help:      "count of bytes received  from sdk",
 | 
			
		||||
		}, []string{nodeIDLabelName})
 | 
			
		||||
 | 
			
		||||
	// ProxyReadReqSendBytes record the bytes sent back to client by Proxy
 | 
			
		||||
	ProxyReadReqSendBytes = prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.ProxyRole,
 | 
			
		||||
			Name:      "send_bytes_count",
 | 
			
		||||
			Help:      "count of bytes sent back to sdk",
 | 
			
		||||
		}, []string{nodeIDLabelName})
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
//RegisterProxy registers Proxy metrics
 | 
			
		||||
| 
						 | 
				
			
			@ -254,5 +272,6 @@ func RegisterProxy(registry *prometheus.Registry) {
 | 
			
		|||
	registry.MustRegister(ProxyDDLReqLatency)
 | 
			
		||||
	registry.MustRegister(ProxyDMLReqLatency)
 | 
			
		||||
	registry.MustRegister(ProxyDQLReqLatency)
 | 
			
		||||
 | 
			
		||||
	registry.MustRegister(ProxyMutationReceiveBytes)
 | 
			
		||||
	registry.MustRegister(ProxyReadReqSendBytes)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,9 +17,8 @@
 | 
			
		|||
package metrics
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/prometheus/client_golang/prometheus"
 | 
			
		||||
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/util/typeutil"
 | 
			
		||||
	"github.com/prometheus/client_golang/prometheus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
| 
						 | 
				
			
			@ -167,6 +166,111 @@ var (
 | 
			
		|||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeReadTaskUnsolveLen = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "read_task_unsolved_len",
 | 
			
		||||
			Help:      "number of unsolved read tasks in unsolvedQueue",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeReadTaskReadyLen = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "read_task_ready_len",
 | 
			
		||||
			Help:      "number of ready read tasks in readyQueue",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeReadTaskConcurrency = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "read_task_concurrency",
 | 
			
		||||
			Help:      "number of concurrent executing read tasks in QueryNode",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeEstimateCPUUsage = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "estimate_cpu_usage",
 | 
			
		||||
			Help:      "estimated cpu usage by the scheduler in QueryNode",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeSearchGroupNQ = prometheus.NewHistogramVec(
 | 
			
		||||
		prometheus.HistogramOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "search_group_nq",
 | 
			
		||||
			Help:      "the number of queries of each grouped search task",
 | 
			
		||||
			Buckets:   buckets,
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeSearchNQ = prometheus.NewHistogramVec(
 | 
			
		||||
		prometheus.HistogramOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "search_nq",
 | 
			
		||||
			Help:      "the number of queries of each search task",
 | 
			
		||||
			Buckets:   buckets,
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeSearchGroupTopK = prometheus.NewHistogramVec(
 | 
			
		||||
		prometheus.HistogramOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "search_group_topk",
 | 
			
		||||
			Help:      "the topK of each grouped search task",
 | 
			
		||||
			Buckets:   buckets,
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeSearchTopK = prometheus.NewHistogramVec(
 | 
			
		||||
		prometheus.HistogramOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "search_topk",
 | 
			
		||||
			Help:      "the top of each search task",
 | 
			
		||||
			Buckets:   buckets,
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeSearchGroupSize = prometheus.NewHistogramVec(
 | 
			
		||||
		prometheus.HistogramOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "search_group_size",
 | 
			
		||||
			Help:      "the number of tasks of each grouped search task",
 | 
			
		||||
			Buckets:   buckets,
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeEvictedReadReqCount = prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "read_evicted_count",
 | 
			
		||||
			Help:      "count of evicted search / query request",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeNumFlowGraphs = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
| 
						 | 
				
			
			@ -176,6 +280,16 @@ var (
 | 
			
		|||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	QueryNodeNumEntities = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Namespace: milvusNamespace,
 | 
			
		||||
			Subsystem: typeutil.QueryNodeRole,
 | 
			
		||||
			Name:      "entities_num",
 | 
			
		||||
			Help:      "number of entities which can be searched/queried",
 | 
			
		||||
		}, []string{
 | 
			
		||||
			nodeIDLabelName,
 | 
			
		||||
		})
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
//RegisterQueryNode registers QueryNode metrics
 | 
			
		||||
| 
						 | 
				
			
			@ -193,5 +307,16 @@ func RegisterQueryNode(registry *prometheus.Registry) {
 | 
			
		|||
	registry.MustRegister(QueryNodeSQSegmentLatencyInCore)
 | 
			
		||||
	registry.MustRegister(QueryNodeReduceLatency)
 | 
			
		||||
	registry.MustRegister(QueryNodeLoadSegmentLatency)
 | 
			
		||||
	registry.MustRegister(QueryNodeReadTaskUnsolveLen)
 | 
			
		||||
	registry.MustRegister(QueryNodeReadTaskReadyLen)
 | 
			
		||||
	registry.MustRegister(QueryNodeReadTaskConcurrency)
 | 
			
		||||
	registry.MustRegister(QueryNodeEstimateCPUUsage)
 | 
			
		||||
	registry.MustRegister(QueryNodeSearchGroupNQ)
 | 
			
		||||
	registry.MustRegister(QueryNodeSearchNQ)
 | 
			
		||||
	registry.MustRegister(QueryNodeSearchGroupSize)
 | 
			
		||||
	registry.MustRegister(QueryNodeEvictedReadReqCount)
 | 
			
		||||
	registry.MustRegister(QueryNodeSearchGroupTopK)
 | 
			
		||||
	registry.MustRegister(QueryNodeSearchTopK)
 | 
			
		||||
	registry.MustRegister(QueryNodeNumFlowGraphs)
 | 
			
		||||
	registry.MustRegister(QueryNodeNumEntities)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,6 +33,7 @@ import (
 | 
			
		|||
	"github.com/milvus-io/milvus/internal/metrics"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/mq/msgstream"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/commonpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/datapb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/internalpb"
 | 
			
		||||
| 
						 | 
				
			
			@ -2179,6 +2180,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
 | 
			
		|||
	}
 | 
			
		||||
	method := "Insert"
 | 
			
		||||
	tr := timerecord.NewTimeRecorder(method)
 | 
			
		||||
	receiveSize := proto.Size(request)
 | 
			
		||||
	metrics.ProxyMutationReceiveBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(receiveSize))
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
 | 
			
		||||
| 
						 | 
				
			
			@ -2286,7 +2289,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
 | 
			
		|||
 | 
			
		||||
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
 | 
			
		||||
		metrics.SuccessLabel).Inc()
 | 
			
		||||
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(it.result.InsertCnt))
 | 
			
		||||
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
 | 
			
		||||
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(successCnt))
 | 
			
		||||
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
 | 
			
		||||
	return it.result, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -2299,6 +2303,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
 | 
			
		|||
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
 | 
			
		||||
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
 | 
			
		||||
 | 
			
		||||
	receiveSize := proto.Size(request)
 | 
			
		||||
	metrics.ProxyMutationReceiveBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(receiveSize))
 | 
			
		||||
 | 
			
		||||
	if !node.checkHealthy() {
 | 
			
		||||
		return &milvuspb.MutationResult{
 | 
			
		||||
			Status: unhealthyStatus(),
 | 
			
		||||
| 
						 | 
				
			
			@ -2529,6 +2536,11 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
 | 
			
		|||
	searchDur := tr.ElapseSpan().Milliseconds()
 | 
			
		||||
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
 | 
			
		||||
		metrics.SearchLabel).Observe(float64(searchDur))
 | 
			
		||||
 | 
			
		||||
	if qt.result != nil {
 | 
			
		||||
		sentSize := proto.Size(qt.result)
 | 
			
		||||
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
 | 
			
		||||
	}
 | 
			
		||||
	return qt.result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -2743,10 +2755,14 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
 | 
			
		|||
 | 
			
		||||
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
 | 
			
		||||
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
 | 
			
		||||
	return &milvuspb.QueryResults{
 | 
			
		||||
 | 
			
		||||
	ret := &milvuspb.QueryResults{
 | 
			
		||||
		Status:     qt.result.Status,
 | 
			
		||||
		FieldsData: qt.result.FieldsData,
 | 
			
		||||
	}, nil
 | 
			
		||||
	}
 | 
			
		||||
	sentSize := proto.Size(qt.result)
 | 
			
		||||
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize))
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CreateAlias create alias for collection, then you can search the collection with alias.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -674,7 +674,6 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m
 | 
			
		|||
func (m *MetaCache) ClearShards(collectionName string) {
 | 
			
		||||
	log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
 | 
			
		||||
	m.mu.Lock()
 | 
			
		||||
	//var ret map[string][]nodeInfo
 | 
			
		||||
	info, ok := m.collInfo[collectionName]
 | 
			
		||||
	if ok {
 | 
			
		||||
		m.collInfo[collectionName].shardLeaders = nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -579,6 +579,8 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
 | 
			
		|||
	latency := tr.ElapseSpan()
 | 
			
		||||
	metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
 | 
			
		||||
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
 | 
			
		||||
	metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetNq()))
 | 
			
		||||
	metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetTopk()))
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -553,6 +553,10 @@ func (replica *metaReplica) addSegmentPrivate(segmentID UniqueID, partitionID Un
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
 | 
			
		||||
	rowCount := segment.getRowCount()
 | 
			
		||||
	if rowCount > 0 {
 | 
			
		||||
		metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(rowCount))
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -582,13 +586,14 @@ func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentTyp
 | 
			
		|||
 | 
			
		||||
// removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica
 | 
			
		||||
func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) {
 | 
			
		||||
	var rowCount int64
 | 
			
		||||
	switch segType {
 | 
			
		||||
	case segmentTypeGrowing:
 | 
			
		||||
		if segment, ok := replica.growingSegments[segmentID]; ok {
 | 
			
		||||
			if partition, ok := replica.partitions[segment.partitionID]; ok {
 | 
			
		||||
				partition.removeSegmentID(segmentID, segType)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			rowCount = segment.getRowCount()
 | 
			
		||||
			delete(replica.growingSegments, segmentID)
 | 
			
		||||
			deleteSegment(segment)
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -598,6 +603,7 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
 | 
			
		|||
				partition.removeSegmentID(segmentID, segType)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			rowCount = segment.getRowCount()
 | 
			
		||||
			delete(replica.sealedSegments, segmentID)
 | 
			
		||||
			deleteSegment(segment)
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -606,6 +612,9 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec()
 | 
			
		||||
	if rowCount > 0 {
 | 
			
		||||
		metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(rowCount))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getSegmentByID returns the segment which id is segmentID
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -603,7 +603,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
 | 
			
		|||
	if err := HandleCStatus(&status, "Insert failed"); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(numOfRow))
 | 
			
		||||
	s.setRecentlyModified(true)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,6 +27,7 @@ import (
 | 
			
		|||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/log"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/metrics"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
| 
						 | 
				
			
			@ -52,7 +53,8 @@ type taskScheduler struct {
 | 
			
		|||
	schedule scheduleReadTaskPolicy
 | 
			
		||||
	// for search and query end
 | 
			
		||||
 | 
			
		||||
	cpuUsage int32 // 1200 means 1200% 12 cores
 | 
			
		||||
	cpuUsage        int32 // 1200 means 1200% 12 cores
 | 
			
		||||
	readConcurrency int32 // 1200 means 1200% 12 cores
 | 
			
		||||
 | 
			
		||||
	// for other tasks
 | 
			
		||||
	queue       taskQueue
 | 
			
		||||
| 
						 | 
				
			
			@ -153,6 +155,7 @@ func (s *taskScheduler) tryEvictUnsolvedReadTask(headCount int) {
 | 
			
		|||
	if diff <= 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	metrics.QueryNodeEvictedReadReqCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Add(float64(diff))
 | 
			
		||||
	busyErr := fmt.Errorf("server is busy")
 | 
			
		||||
	for e := s.unsolvedReadTasks.Front(); e != nil && diff > 0; e = e.Next() {
 | 
			
		||||
		diff--
 | 
			
		||||
| 
						 | 
				
			
			@ -224,6 +227,7 @@ func (s *taskScheduler) popAndAddToExecute() {
 | 
			
		|||
	if curUsage < 0 {
 | 
			
		||||
		curUsage = 0
 | 
			
		||||
	}
 | 
			
		||||
	metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(curUsage))
 | 
			
		||||
	targetUsage := s.maxCPUUsage - curUsage
 | 
			
		||||
	if targetUsage <= 0 {
 | 
			
		||||
		return
 | 
			
		||||
| 
						 | 
				
			
			@ -246,11 +250,14 @@ func (s *taskScheduler) executeReadTasks() {
 | 
			
		|||
			return
 | 
			
		||||
		case t, ok := <-s.executeReadTaskChan:
 | 
			
		||||
			if ok {
 | 
			
		||||
				pendingTaskLen := len(s.executeReadTaskChan)
 | 
			
		||||
				taskWg.Add(1)
 | 
			
		||||
				atomic.AddInt32(&s.readConcurrency, int32(pendingTaskLen+1))
 | 
			
		||||
				go func(t readTask) {
 | 
			
		||||
					defer taskWg.Done()
 | 
			
		||||
					s.processReadTask(t)
 | 
			
		||||
					cpu := t.CPUUsage()
 | 
			
		||||
					atomic.AddInt32(&s.readConcurrency, -1)
 | 
			
		||||
					atomic.AddInt32(&s.cpuUsage, -cpu)
 | 
			
		||||
					select {
 | 
			
		||||
					case s.notifyChan <- struct{}{}:
 | 
			
		||||
| 
						 | 
				
			
			@ -258,7 +265,6 @@ func (s *taskScheduler) executeReadTasks() {
 | 
			
		|||
					}
 | 
			
		||||
				}(t)
 | 
			
		||||
 | 
			
		||||
				pendingTaskLen := len(s.executeReadTaskChan)
 | 
			
		||||
				for i := 0; i < pendingTaskLen; i++ {
 | 
			
		||||
					taskWg.Add(1)
 | 
			
		||||
					t := <-s.executeReadTaskChan
 | 
			
		||||
| 
						 | 
				
			
			@ -266,6 +272,7 @@ func (s *taskScheduler) executeReadTasks() {
 | 
			
		|||
						defer taskWg.Done()
 | 
			
		||||
						s.processReadTask(t)
 | 
			
		||||
						cpu := t.CPUUsage()
 | 
			
		||||
						atomic.AddInt32(&s.readConcurrency, -1)
 | 
			
		||||
						atomic.AddInt32(&s.cpuUsage, -cpu)
 | 
			
		||||
						select {
 | 
			
		||||
						case s.notifyChan <- struct{}{}:
 | 
			
		||||
| 
						 | 
				
			
			@ -344,4 +351,8 @@ func (s *taskScheduler) tryMergeReadTasks() {
 | 
			
		|||
			s.unsolvedReadTasks.Remove(e)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.unsolvedReadTasks.Len()))
 | 
			
		||||
	metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.readyReadTasks.Len()))
 | 
			
		||||
	readConcurrency := atomic.LoadInt32(&s.readConcurrency)
 | 
			
		||||
	metrics.QueryNodeEstimateCPUUsage.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,6 +30,7 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/log"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/metrics"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/commonpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/internalpb"
 | 
			
		||||
	"github.com/milvus-io/milvus/internal/proto/querypb"
 | 
			
		||||
| 
						 | 
				
			
			@ -161,6 +162,11 @@ func (s *searchTask) Execute(ctx context.Context) error {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (s *searchTask) Notify(err error) {
 | 
			
		||||
	if len(s.otherTasks) > 0 {
 | 
			
		||||
		metrics.QueryNodeSearchGroupSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(len(s.otherTasks) + 1))
 | 
			
		||||
		metrics.QueryNodeSearchGroupNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(s.NQ))
 | 
			
		||||
		metrics.QueryNodeSearchGroupTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(s.TopK))
 | 
			
		||||
	}
 | 
			
		||||
	s.done <- err
 | 
			
		||||
	for i := 0; i < len(s.otherTasks); i++ {
 | 
			
		||||
		s.otherTasks[i].Notify(err)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -260,7 +260,6 @@ fi
 | 
			
		|||
if command -v ccache &> /dev/null
 | 
			
		||||
then
 | 
			
		||||
	ccache -s
 | 
			
		||||
    exit
 | 
			
		||||
fi
 | 
			
		||||
 | 
			
		||||
popd
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue