mirror of https://github.com/milvus-io/milvus.git
Add metric of waiting for tSafe latency (#24765)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/24775/head
parent
41af0a98fa
commit
e81eb56b92
|
@ -38,10 +38,12 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
|
@ -215,11 +217,15 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
|
|||
}
|
||||
|
||||
// wait tsafe
|
||||
waitTr := timerecord.NewTimeRecorder("wait tSafe")
|
||||
err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
|
||||
if err != nil {
|
||||
log.Warn("delegator search failed to wait tsafe", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
metrics.QueryNodeSQLatencyWaitTSafe.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).
|
||||
Observe(float64(waitTr.ElapseSpan().Milliseconds()))
|
||||
|
||||
sealed, growing, version := sd.distribution.GetCurrent(req.GetReq().GetPartitionIDs()...)
|
||||
defer sd.distribution.FinishUsage(version)
|
||||
|
@ -270,11 +276,15 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
|
|||
}
|
||||
|
||||
// wait tsafe
|
||||
waitTr := timerecord.NewTimeRecorder("wait tSafe")
|
||||
err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
|
||||
if err != nil {
|
||||
log.Warn("delegator query failed to wait tsafe", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
metrics.QueryNodeSQLatencyWaitTSafe.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel).
|
||||
Observe(float64(waitTr.ElapseSpan().Milliseconds()))
|
||||
|
||||
sealed, growing, version := sd.distribution.GetCurrent(req.GetReq().GetPartitionIDs()...)
|
||||
defer sd.distribution.FinishUsage(version)
|
||||
|
|
|
@ -128,6 +128,18 @@ var (
|
|||
requestScope,
|
||||
})
|
||||
|
||||
QueryNodeSQLatencyWaitTSafe = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryNodeRole,
|
||||
Name: "sq_wait_tsafe_latency",
|
||||
Help: "latency of search or query to wait for tsafe",
|
||||
Buckets: buckets,
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
queryTypeLabelName,
|
||||
})
|
||||
|
||||
QueryNodeSQLatencyInQueue = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -356,6 +368,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
|
|||
registry.MustRegister(QueryNodeNumDeltaChannels)
|
||||
registry.MustRegister(QueryNodeSQCount)
|
||||
registry.MustRegister(QueryNodeSQReqLatency)
|
||||
registry.MustRegister(QueryNodeSQLatencyWaitTSafe)
|
||||
registry.MustRegister(QueryNodeSQLatencyInQueue)
|
||||
registry.MustRegister(QueryNodeSQSegmentLatency)
|
||||
registry.MustRegister(QueryNodeSQSegmentLatencyInCore)
|
||||
|
|
Loading…
Reference in New Issue