enhance: add the related data size for the read apis (#31816)

issue: #30436
origin pr: #30438
related pr: #31772

---------

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/31062/head
SimFG 2024-04-10 15:07:17 +08:00 committed by GitHub
parent c9faa6d936
commit 90bed1caf9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 220 additions and 122 deletions

View File

@ -73,7 +73,7 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor
ret.Collections[collectionID].IndexInfo = append(ret.Collections[collectionID].IndexInfo, &metricsinfo.DataCoordIndexInfo{
NumEntitiesIndexed: info.GetIndexedRows(),
IndexName: info.GetIndexName(),
FieldID: info.GetIndexID(),
FieldID: info.GetFieldID(),
})
}
}

View File

@ -161,6 +161,7 @@ message CostAggregation {
int64 responseTime = 1;
int64 serviceTime = 2;
int64 totalNQ = 3;
int64 totalRelatedDataSize = 4;
}
message RetrieveRequest {

View File

@ -2436,23 +2436,31 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeInsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(request),
hookutil.SuccessCntKey: successCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
hookutil.OpTypeKey: hookutil.OpTypeInsert,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.RequestDataSizeKey: proto.Size(request),
hookutil.SuccessCntKey: successCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
})
SetReportValue(it.result.GetStatus(), v)
if merr.Ok(it.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeInsert, request.DbName, username).Add(float64(v))
}
metrics.ProxyInsertVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(successCnt))
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.InsertLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).
WithLabelValues(nodeID, metrics.InsertLabel, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
return it.result, nil
}
@ -2511,7 +2519,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
log.Debug("Run delete in Proxy")
if err := dr.Run(ctx); err != nil {
log.Error("Failed to enqueue delete task: " + err.Error())
log.Error("Failed to run delete task: " + err.Error())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
@ -2526,21 +2534,28 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
successCnt := dr.result.GetDeleteCnt()
metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeDelete,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.SuccessCntKey: successCnt,
hookutil.RelatedCntKey: dr.allQueryCnt.Load(),
})
SetReportValue(dr.result.GetStatus(), v)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
if merr.Ok(dr.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeDelete, dbName, username).Add(float64(v))
}
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method,
metrics.SuccessLabel, dbName, collectionName).Inc()
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.DeleteLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.DeleteLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
return dr.result, nil
}
@ -2652,28 +2667,34 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
// UpsertCnt always equals to the number of entities in the request
it.result.UpsertCnt = int64(request.NumRows)
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeUpsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(it.req),
hookutil.SuccessCntKey: it.result.UpsertCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
hookutil.OpTypeKey: hookutil.OpTypeUpsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.RequestDataSizeKey: proto.Size(it.req),
hookutil.SuccessCntKey: it.result.UpsertCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
})
SetReportValue(it.result.GetStatus(), v)
rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size()))
if merr.Ok(it.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v))
}
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method,
metrics.SuccessLabel, dbName, collectionName).Inc()
successCnt := it.result.UpsertCnt - int64(len(it.result.ErrIndex))
metrics.ProxyUpsertVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(successCnt))
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.UpsertLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.UpsertLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("Finish processing upsert request in Proxy")
return it.result, nil
@ -2686,7 +2707,8 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.search(ctx, request)
rsp, err = node.
search(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
@ -2835,8 +2857,11 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
}
span := tr.CtxRecord(ctx, "wait search result")
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
).Observe(float64(span.Milliseconds()))
@ -2844,42 +2869,47 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
log.Debug(rpcDone(method))
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Inc()
metrics.ProxySearchVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(qt.result.GetResults().GetNumQueries()))
searchDur := tr.ElapseSpan().Milliseconds()
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Observe(float64(searchDur))
metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
request.CollectionName,
collectionName,
).Observe(float64(searchDur))
if qt.result != nil {
username := GetCurUserFromContextOrDefault(ctx)
sentSize := proto.Size(qt.result)
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeSearch,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: sentSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
hookutil.OpTypeKey: hookutil.OpTypeSearch,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: sentSize,
hookutil.RelatedDataSizeKey: qt.relatedDataSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
})
SetReportValue(qt.result.GetStatus(), v)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v))
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
}
return qt.result, nil
@ -3016,8 +3046,11 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
}
span := tr.CtxRecord(ctx, "wait hybrid search result")
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
).Observe(float64(span.Milliseconds()))
@ -3025,7 +3058,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
log.Debug(rpcDone(method))
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
method,
metrics.SuccessLabel,
request.GetDbName(),
@ -3033,34 +3066,39 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
).Inc()
metrics.ProxySearchVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(len(request.GetRequests())))
searchDur := tr.ElapseSpan().Milliseconds()
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Observe(float64(searchDur))
metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
request.CollectionName,
collectionName,
).Observe(float64(searchDur))
if qt.result != nil {
sentSize := proto.Size(qt.result)
username := GetCurUserFromContextOrDefault(ctx)
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeHybridSearch,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: sentSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
hookutil.OpTypeKey: hookutil.OpTypeHybridSearch,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: sentSize,
hookutil.RelatedDataSizeKey: qt.relatedDataSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
})
SetReportValue(qt.result.GetStatus(), v)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v))
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
}
return qt.result, nil
@ -3350,15 +3388,18 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
}
res, err := node.query(ctx, qt)
if merr.Ok(res.Status) && err == nil {
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(res),
hookutil.RelatedCntKey: qt.allQueryCnt,
hookutil.DimensionKey: qt.dimension,
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
}
return res, err
}

View File

@ -54,7 +54,6 @@ type queryTask struct {
collectionName string
queryParams *queryParams
schema *schemaInfo
dimension int64
userOutputFields []string
@ -66,8 +65,9 @@ type queryTask struct {
channelsMvcc map[string]Timestamp
fastSkip bool
reQuery bool
allQueryCnt int64
reQuery bool
allQueryCnt int64
totalRelatedDataSize int64
}
type queryParams struct {
@ -341,11 +341,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
return err
}
t.schema = schema
t.dimension, err = typeutil.GetCollectionDim(t.schema.CollectionSchema)
if err != nil {
log.Warn("get collection dimension failed", zap.Error(err))
return err
}
if t.ids != nil {
pkField := ""
@ -481,6 +476,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
toReduceResults := make([]*internalpb.RetrieveResults, 0)
t.allQueryCnt = 0
t.totalRelatedDataSize = 0
select {
case <-t.TraceCtx().Done():
log.Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID()))
@ -490,6 +486,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
t.resultBuf.Range(func(res *internalpb.RetrieveResults) bool {
toReduceResults = append(toReduceResults, res)
t.allQueryCnt += res.GetAllRetrieveCount()
t.totalRelatedDataSize += res.GetCostAggregation().GetTotalRelatedDataSize()
log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()))
return true
})

View File

@ -73,6 +73,7 @@ type searchTask struct {
lb LBPolicy
queryChannelsTs map[string]Timestamp
queryInfos []*planpb.QueryInfo
relatedDataSize int64
reScorers []reScorer
rankParams *rankParams
@ -561,7 +562,9 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
}
t.queryChannelsTs = make(map[string]uint64)
t.relatedDataSize = 0
for _, r := range toReduceResults {
t.relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize()
for ch, ts := range r.GetChannelsMvcc() {
t.queryChannelsTs[ch] = ts
}

View File

@ -13,8 +13,10 @@ type cntReducer struct{}
func (r *cntReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) {
cnt := int64(0)
allRetrieveCount := int64(0)
relatedDataSize := int64(0)
for _, res := range results {
allRetrieveCount += res.GetAllRetrieveCount()
relatedDataSize += res.GetCostAggregation().GetTotalRelatedDataSize()
c, err := funcutil.CntOfInternalResult(res)
if err != nil {
return nil, err
@ -23,6 +25,9 @@ func (r *cntReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveR
}
res := funcutil.WrapCntToInternalResult(cnt)
res.AllRetrieveCount = allRetrieveCount
res.CostAggregation = &internalpb.CostAggregation{
TotalRelatedDataSize: relatedDataSize,
}
return res, nil
}

View File

@ -98,6 +98,13 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
return nil, false
})
searchResults.CostAggregation = mergeRequestCost(requestCosts)
if searchResults.CostAggregation == nil {
searchResults.CostAggregation = &internalpb.CostAggregation{}
}
relatedDataSize := lo.Reduce(results, func(acc int64, result *internalpb.SearchResults, _ int) int64 {
return acc + result.GetCostAggregation().GetTotalRelatedDataSize()
}, 0)
searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize
searchResults.ChannelsMvcc = channelsMvcc
return searchResults, nil
}
@ -108,17 +115,16 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
}
channelsMvcc := make(map[string]uint64)
for _, r := range results {
for ch, ts := range r.GetChannelsMvcc() {
channelsMvcc[ch] = ts
}
}
relatedDataSize := int64(0)
searchResults := &internalpb.SearchResults{
IsAdvanced: true,
ChannelsMvcc: channelsMvcc,
IsAdvanced: true,
}
for _, result := range results {
relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize()
for ch, ts := range result.GetChannelsMvcc() {
channelsMvcc[ch] = ts
}
if !result.GetIsAdvanced() {
continue
}
@ -127,6 +133,7 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
searchResults.SubResults = append(searchResults.SubResults, result.GetSubResults()...)
searchResults.NumQueries = result.GetNumQueries()
}
searchResults.ChannelsMvcc = channelsMvcc
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
return result.GetCostAggregation(), true
@ -139,6 +146,10 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
return nil, false
})
searchResults.CostAggregation = mergeRequestCost(requestCosts)
if searchResults.CostAggregation == nil {
searchResults.CostAggregation = &internalpb.CostAggregation{}
}
searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize
return searchResults, nil
}
@ -148,13 +159,12 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes
}
channelsMvcc := make(map[string]uint64)
for _, r := range results {
for ch, ts := range r.GetChannelsMvcc() {
relatedDataSize := int64(0)
for index, result := range results {
relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize()
for ch, ts := range result.GetChannelsMvcc() {
channelsMvcc[ch] = ts
}
}
searchResults.ChannelsMvcc = channelsMvcc
for index, result := range results {
// we just append here, no need to split subResult and reduce
// defer this reduce to proxy
subResult := &internalpb.SubSearchResults{
@ -169,6 +179,7 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes
searchResults.NumQueries = result.GetNumQueries()
searchResults.SubResults = append(searchResults.SubResults, subResult)
}
searchResults.ChannelsMvcc = channelsMvcc
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
return result.GetCostAggregation(), true
@ -181,6 +192,10 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes
return nil, false
})
searchResults.CostAggregation = mergeRequestCost(requestCosts)
if searchResults.CostAggregation == nil {
searchResults.CostAggregation = &internalpb.CostAggregation{}
}
searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize
return searchResults, nil
}
@ -366,8 +381,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
)
validRetrieveResults := []*internalpb.RetrieveResults{}
relatedDataSize := int64(0)
for _, r := range retrieveResults {
ret.AllRetrieveCount += r.GetAllRetrieveCount()
relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize()
size := typeutil.GetSizeOfIDs(r.GetIds())
if r == nil || len(r.GetFieldsData()) == 0 || size == 0 {
continue
@ -437,7 +454,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
return nil, false
})
ret.CostAggregation = mergeRequestCost(requestCosts)
if ret.CostAggregation == nil {
ret.CostAggregation = &internalpb.CostAggregation{}
}
ret.CostAggregation.TotalRelatedDataSize = relatedDataSize
return ret, nil
}

View File

@ -128,9 +128,12 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
if len(result.GetOffset()) != 0 {
if err = svr.Send(&internalpb.RetrieveResults{
Status: merr.Success(),
Ids: result.GetIds(),
FieldsData: result.GetFieldsData(),
Status: merr.Success(),
Ids: result.GetIds(),
FieldsData: result.GetFieldsData(),
CostAggregation: &internalpb.CostAggregation{
TotalRelatedDataSize: segment.MemSize(),
},
AllRetrieveCount: result.GetAllRetrieveCount(),
}); err != nil {
errs[i] = err

View File

@ -959,10 +959,15 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
collector.Rate.Add(metricsinfo.NQPerSecond, 1)
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req)))
}
relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 {
return acc + result.GetCostAggregation().GetTotalRelatedDataSize()
}, 0)
if ret.GetCostAggregation() != nil {
ret.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
if ret.CostAggregation == nil {
ret.CostAggregation = &internalpb.CostAggregation{}
}
ret.CostAggregation.ResponseTime = tr.ElapseSpan().Milliseconds()
ret.CostAggregation.TotalRelatedDataSize = relatedDataSize
return ret, nil
}

View File

@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
@ -129,6 +130,10 @@ func (t *QueryTask) Execute() error {
return err
}
relatedDataSize := lo.Reduce(querySegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + seg.MemSize()
}, 0)
t.result = &internalpb.RetrieveResults{
Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(),
@ -137,7 +142,8 @@ func (t *QueryTask) Execute() error {
Ids: reducedResult.Ids,
FieldsData: reducedResult.FieldsData,
CostAggregation: &internalpb.CostAggregation{
ServiceTime: tr.ElapseSpan().Milliseconds(),
ServiceTime: tr.ElapseSpan().Milliseconds(),
TotalRelatedDataSize: relatedDataSize,
},
AllRetrieveCount: reducedResult.GetAllRetrieveCount(),
}

View File

@ -9,6 +9,7 @@ import (
"strconv"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -211,6 +212,10 @@ func (t *SearchTask) Execute() error {
return nil
}
relatedDataSize := lo.Reduce(searchedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + seg.MemSize()
}, 0)
tr.RecordSpan()
blobs, err := segments.ReduceSearchResultsAndFillData(
t.ctx,
@ -259,7 +264,8 @@ func (t *SearchTask) Execute() error {
SlicedOffset: 1,
SlicedNumCount: 1,
CostAggregation: &internalpb.CostAggregation{
ServiceTime: tr.ElapseSpan().Milliseconds(),
ServiceTime: tr.ElapseSpan().Milliseconds(),
TotalRelatedDataSize: relatedDataSize,
},
}
}

View File

@ -21,15 +21,16 @@ package hookutil
var (
// WARN: Please DO NOT modify all constants.
OpTypeKey = "op_type"
DatabaseKey = "database"
UsernameKey = "username"
DataSizeKey = "data_size"
SuccessCntKey = "success_cnt"
FailCntKey = "fail_cnt"
RelatedCntKey = "related_cnt"
NodeIDKey = "id"
DimensionKey = "dim"
OpTypeKey = "op_type"
DatabaseKey = "database"
UsernameKey = "username"
RequestDataSizeKey = "request_data_size"
ResultDataSizeKey = "result_data_size"
RelatedDataSizeKey = "related_data_size"
SuccessCntKey = "success_cnt"
FailCntKey = "fail_cnt"
RelatedCntKey = "related_cnt"
NodeIDKey = "id"
OpTypeInsert = "insert"
OpTypeDelete = "delete"

View File

@ -263,6 +263,15 @@ var (
Help: "count of bytes sent back to sdk",
}, []string{nodeIDLabelName})
// ProxyReportValue records value about the request
ProxyReportValue = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "report_value",
Help: "report value about the request",
}, []string{nodeIDLabelName, msgTypeLabelName, databaseLabelName, usernameLabelName})
// ProxyLimiterRate records rates of rateLimiter in Proxy.
ProxyLimiterRate = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -373,6 +382,7 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyRateLimitReqCount)
registry.MustRegister(ProxySlowQueryCount)
registry.MustRegister(ProxyReportValue)
}
func CleanupProxyDBMetrics(nodeID int64, dbName string) {

View File

@ -12,6 +12,7 @@
package paramtable
import (
"strconv"
"sync"
"time"
)
@ -59,6 +60,10 @@ func GetNodeID() UniqueID {
return params.RuntimeConfig.NodeID.GetAsInt64()
}
func GetStringNodeID() string {
return strconv.FormatInt(GetNodeID(), 10)
}
func SetRole(role string) {
params.RuntimeConfig.Role.SetValue(role)
}

View File

@ -27,14 +27,3 @@ func GetDim(field *schemapb.FieldSchema) (int64, error) {
}
return int64(dim), nil
}
func GetCollectionDim(collection *schemapb.CollectionSchema) (int64, error) {
for _, fieldSchema := range collection.GetFields() {
dim, err := GetDim(fieldSchema)
if err != nil {
continue
}
return dim, nil
}
return 0, fmt.Errorf("dim not found")
}

View File

@ -91,7 +91,7 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
reportInfo := report.(map[string]any)
log.Info("insert report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeInsert, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RequestDataSizeKey])
return
}
}
@ -177,7 +177,8 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
return
}
@ -200,7 +201,8 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
return
}

View File

@ -198,7 +198,8 @@ func (s *PartitionKeySuite) TestPartitionKey() {
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum*3, reportInfo[hookutil.RelatedCntKey])
return
}
@ -237,7 +238,8 @@ func (s *PartitionKeySuite) TestPartitionKey() {
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
return
}
@ -267,7 +269,8 @@ func (s *PartitionKeySuite) TestPartitionKey() {
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(3*rowNum, reportInfo[hookutil.RelatedCntKey])
return
}
@ -301,7 +304,8 @@ func (s *PartitionKeySuite) TestPartitionKey() {
reportInfo := report.(map[string]any)
log.Info("query report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
return
}