mirror of https://github.com/milvus-io/milvus.git
enhance: the proxy metric in the query request (#33307)
/kind improvement issue: #33306 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/33357/head^2
parent
e895cfed84
commit
2964f60edc
|
@ -3415,21 +3415,8 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
|
|||
// Query get the records by primary keys.
|
||||
func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryResults, error) {
|
||||
request := qt.request
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
|
||||
metrics.ProxyReceivedNQ.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(1))
|
||||
|
||||
subLabel := GetCollectionRateSubLabel(request)
|
||||
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)
|
||||
method := "Query"
|
||||
isProxyRequest := GetRequestLabelFromContext(ctx)
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.QueryResults{
|
||||
|
@ -3437,20 +3424,6 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
|||
}, nil
|
||||
}
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
|
||||
defer sp.End()
|
||||
tr := timerecord.NewTimeRecorder("Query")
|
||||
|
||||
method := "Query"
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.TotalLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.String("db", request.DbName),
|
||||
|
@ -3458,6 +3431,16 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
|||
zap.Strings("partitions", request.PartitionNames),
|
||||
)
|
||||
|
||||
log.Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("expr", request.Expr),
|
||||
zap.Strings("OutputFields", request.OutputFields),
|
||||
zap.Uint64("travel_timestamp", request.TravelTimestamp),
|
||||
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
|
||||
)
|
||||
|
||||
tr := timerecord.NewTimeRecorder(method)
|
||||
|
||||
defer func() {
|
||||
span := tr.ElapseSpan()
|
||||
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
|
||||
|
@ -3475,27 +3458,21 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
|||
}
|
||||
}()
|
||||
|
||||
log.Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("expr", request.Expr),
|
||||
zap.Strings("OutputFields", request.OutputFields),
|
||||
zap.Uint64("travel_timestamp", request.TravelTimestamp),
|
||||
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
|
||||
)
|
||||
|
||||
if err := node.sched.dqQueue.Enqueue(qt); err != nil {
|
||||
log.Warn(
|
||||
rpcFailedToEnqueue(method),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.AbandonLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
if isProxyRequest {
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.AbandonLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
}
|
||||
|
||||
return &milvuspb.QueryResults{
|
||||
Status: merr.Status(err),
|
||||
|
@ -3510,45 +3487,36 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
|
|||
rpcFailedToWaitToFinish(method),
|
||||
zap.Error(err))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
|
||||
if isProxyRequest {
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
|
||||
}
|
||||
|
||||
return &milvuspb.QueryResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
span := tr.CtxRecord(ctx, "wait query result")
|
||||
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
).Observe(float64(span.Milliseconds()))
|
||||
|
||||
log.Debug(rpcDone(method))
|
||||
if isProxyRequest {
|
||||
span := tr.CtxRecord(ctx, "wait query result")
|
||||
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
).Observe(float64(span.Milliseconds()))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.SuccessLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
metrics.ProxySQLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
metrics.ProxySQLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
metrics.ProxyCollectionSQLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.CollectionName,
|
||||
).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
sentSize := proto.Size(qt.result)
|
||||
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
|
||||
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
|
||||
metrics.ProxyCollectionSQLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.CollectionName,
|
||||
).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
}
|
||||
|
||||
return qt.result, nil
|
||||
}
|
||||
|
@ -3570,22 +3538,73 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
|||
lb: node.lbPolicy,
|
||||
mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(),
|
||||
}
|
||||
res, err := node.query(ctx, qt)
|
||||
if merr.Ok(res.Status) && err == nil {
|
||||
username := GetCurUserFromContextOrDefault(ctx)
|
||||
nodeID := paramtable.GetStringNodeID()
|
||||
v := Extension.Report(map[string]any{
|
||||
hookutil.OpTypeKey: hookutil.OpTypeQuery,
|
||||
hookutil.DatabaseKey: request.DbName,
|
||||
hookutil.UsernameKey: username,
|
||||
hookutil.ResultDataSizeKey: proto.Size(res),
|
||||
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
|
||||
hookutil.RelatedCntKey: qt.allQueryCnt,
|
||||
})
|
||||
SetReportValue(res.Status, v)
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
|
||||
|
||||
subLabel := GetCollectionRateSubLabel(request)
|
||||
receiveSize := proto.Size(request)
|
||||
metrics.ProxyReceiveBytes.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.QueryLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(receiveSize))
|
||||
metrics.ProxyReceivedNQ.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
metrics.SearchLabel,
|
||||
request.GetCollectionName(),
|
||||
).Add(float64(1))
|
||||
|
||||
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.QueryResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
return res, err
|
||||
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
|
||||
defer sp.End()
|
||||
method := "Query"
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.TotalLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
|
||||
ctx = SetRequestLabelForContext(ctx)
|
||||
res, err := node.query(ctx, qt)
|
||||
if err != nil || !merr.Ok(res.Status) {
|
||||
return res, err
|
||||
}
|
||||
|
||||
log.Debug(rpcDone(method))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
method,
|
||||
metrics.SuccessLabel,
|
||||
request.GetDbName(),
|
||||
request.GetCollectionName(),
|
||||
).Inc()
|
||||
|
||||
sentSize := proto.Size(qt.result)
|
||||
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
|
||||
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
|
||||
|
||||
username := GetCurUserFromContextOrDefault(ctx)
|
||||
nodeID := paramtable.GetStringNodeID()
|
||||
v := Extension.Report(map[string]any{
|
||||
hookutil.OpTypeKey: hookutil.OpTypeQuery,
|
||||
hookutil.DatabaseKey: request.DbName,
|
||||
hookutil.UsernameKey: username,
|
||||
hookutil.ResultDataSizeKey: proto.Size(res),
|
||||
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
|
||||
hookutil.RelatedCntKey: qt.allQueryCnt,
|
||||
})
|
||||
SetReportValue(res.Status, v)
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// CreateAlias create alias for collection, then you can search the collection with alias.
|
||||
|
|
|
@ -1620,3 +1620,22 @@ func GetCostValue(status *commonpb.Status) int {
|
|||
}
|
||||
return value
|
||||
}
|
||||
|
||||
type isProxyRequestKeyType struct{}
|
||||
|
||||
var ctxProxyRequestKey = isProxyRequestKeyType{}
|
||||
|
||||
func SetRequestLabelForContext(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, ctxProxyRequestKey, true)
|
||||
}
|
||||
|
||||
func GetRequestLabelFromContext(ctx context.Context) bool {
|
||||
if ctx == nil {
|
||||
return false
|
||||
}
|
||||
v := ctx.Value(ctxProxyRequestKey)
|
||||
if v == nil {
|
||||
return false
|
||||
}
|
||||
return v.(bool)
|
||||
}
|
||||
|
|
|
@ -2294,3 +2294,24 @@ func TestGetCostValue(t *testing.T) {
|
|||
assert.Equal(t, 100, cost)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRequestLabelWithContext(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
{
|
||||
label := GetRequestLabelFromContext(ctx)
|
||||
assert.False(t, label)
|
||||
}
|
||||
|
||||
ctx = SetRequestLabelForContext(ctx)
|
||||
{
|
||||
label := GetRequestLabelFromContext(ctx)
|
||||
assert.True(t, label)
|
||||
}
|
||||
|
||||
{
|
||||
// nolint
|
||||
label := GetRequestLabelFromContext(nil)
|
||||
assert.False(t, label)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue