Refine QueryNode errors (#27013)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/27025/head
yah01 2023-09-12 16:07:18 +08:00 committed by GitHub
parent f85af0732c
commit 00c65fa0d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 177 additions and 245 deletions

View File

@ -353,7 +353,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
zap.Error(err))
return indexTaskInProgress
}
if response.Status.ErrorCode != commonpb.ErrorCode_Success {
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID),
zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason))
return indexTaskInProgress

View File

@ -123,7 +123,7 @@ func (nm *IndexNodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, type
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.Status.Reason))
return
@ -179,7 +179,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.Status.Reason))
return

View File

@ -512,7 +512,7 @@ func (node *DataNode) Start() error {
),
Count: 1,
})
if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success {
if err != nil || rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
startErr = errors.New("DataNode fail to alloc timestamp")
return

View File

@ -466,7 +466,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
Count: 1,
})
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
if rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success || err != nil {
return returnFailFunc("DataNode alloc ts failed", err)
}
@ -539,7 +539,7 @@ func (node *DataNode) getPartitions(ctx context.Context, dbName string, collecti
log.Warn("failed to get partitions of collection", logFields...)
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("failed to get partitions of collection", logFields...)
return nil, errors.New(resp.Status.Reason)
}
@ -683,7 +683,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
if err != nil {
return 0, "", fmt.Errorf("syncSegmentID Failed:%w", err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason)
}
if len(resp.SegIDAssignments) == 0 || resp.SegIDAssignments[0] == nil {

View File

@ -46,7 +46,7 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool {
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
return false
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
return false
}
@ -73,7 +73,7 @@ func (h *Handlers) describeCollection(c *gin.Context, dbName string, collectionN
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
return nil, err
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
return nil, errors.New(response.Status.Reason)
}
@ -94,7 +94,7 @@ func (h *Handlers) hasCollection(c *gin.Context, dbName string, collectionName s
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
return false, err
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
return false, errors.New(response.Status.Reason)
} else {
@ -128,7 +128,7 @@ func (h *Handlers) listCollections(c *gin.Context) {
response, err := h.proxy.ShowCollections(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
var collections []string
@ -262,7 +262,7 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) {
collLoadState := ""
if stateErr != nil {
log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.String("err", stateErr.Error()))
} else if stateResp.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if stateResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.String("err", stateResp.Status.Reason))
} else {
collLoadState = stateResp.State.String()
@ -283,7 +283,7 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) {
if indexErr != nil {
indexDesc = []gin.H{}
log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.String("err", indexErr.Error()))
} else if indexResp.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if indexResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
indexDesc = []gin.H{}
log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.String("err", indexResp.Status.Reason))
} else {
@ -381,7 +381,7 @@ func (h *Handlers) query(c *gin.Context) {
response, err := h.proxy.Query(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil)
@ -435,7 +435,7 @@ func (h *Handlers) get(c *gin.Context) {
response, err := h.proxy.Query(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil)
@ -487,7 +487,7 @@ func (h *Handlers) delete(c *gin.Context) {
response, err := h.proxy.Delete(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
@ -548,7 +548,7 @@ func (h *Handlers) insert(c *gin.Context) {
response, err := h.proxy.Insert(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
switch response.IDs.GetIdField().(type) {
@ -607,7 +607,7 @@ func (h *Handlers) search(c *gin.Context) {
response, err := h.proxy.Search(c, &req)
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()})
} else if response.Status.ErrorCode != commonpb.ErrorCode_Success {
} else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason})
} else {
if response.Results.TopK == int64(0) {

View File

@ -959,7 +959,7 @@ func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckReque
if err != nil {
return ret, err
}
if state.Status.ErrorCode != commonpb.ErrorCode_Success {
if state.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return ret, nil
}
if state.State.StateCode != commonpb.StateCode_Healthy {
@ -978,7 +978,7 @@ func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt
if err != nil {
return server.Send(ret)
}
if state.Status.ErrorCode != commonpb.ErrorCode_Success {
if state.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return server.Send(ret)
}
if state.State.StateCode != commonpb.StateCode_Healthy {

View File

@ -2184,7 +2184,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
return constructFailedResponse(err), nil
}
if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
setErrorIndex := func() {
numRows := request.NumRows
errIndex := make([]uint32, numRows)
@ -2379,13 +2379,13 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
metrics.FailLabel).Inc()
// Not every error case changes the status internally
// change status there to handle it
if it.result.Status.ErrorCode == commonpb.ErrorCode_Success {
if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
}
return constructFailedResponse(err, it.result.Status.ErrorCode), nil
}
if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
setErrorIndex := func() {
numRows := request.NumRows
errIndex := make([]uint32, numRows)
@ -3110,7 +3110,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
log.Debug("GetPersistentSegmentInfo",
zap.Int("len(infos)", len(infoResp.Infos)),
zap.Any("status", infoResp.Status))
if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
if infoResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
resp.Status.Reason = infoResp.Status.Reason
@ -3185,7 +3185,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
log.Debug("GetQuerySegmentInfo",
zap.Any("infos", infoResp.Infos),
zap.Any("status", infoResp.Status))
if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
if infoResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
log.Error("Failed to get segment info from QueryCoord",
zap.String("errMsg", infoResp.Status.Reason))

View File

@ -621,7 +621,7 @@ func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectio
if err != nil {
return nil, err
}
if partitions.Status.ErrorCode != commonpb.ErrorCode_Success {
if partitions.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("%s", partitions.Status.Reason)
}
@ -808,11 +808,11 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col
if err != nil {
return retry.Unrecoverable(err)
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success {
return nil
}
// do not retry unless got NoReplicaAvailable from querycoord
if resp.Status.ErrorCode != commonpb.ErrorCode_NoReplicaAvailable {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_NoReplicaAvailable {
return retry.Unrecoverable(fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason))
}
return fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
@ -820,7 +820,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
}

View File

@ -324,7 +324,7 @@ func (sa *segIDAssigner) syncSegments() (bool, error) {
return false, fmt.Errorf("syncSegmentID Failed:%w", err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false, fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason)
}

View File

@ -434,7 +434,7 @@ func (hct *hasCollectionTask) Execute(ctx context.Context) error {
if hct.result == nil {
return errors.New("has collection resp is nil")
}
if hct.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if hct.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(hct.result.Status.Reason)
}
return nil
@ -522,7 +522,7 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error {
return err
}
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
dct.result.Status = result.Status
// compatibility with PyMilvus existing implementation
@ -645,7 +645,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error {
return errors.New("failed to show collections")
}
if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success {
if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(respFromRootCoord.Status.Reason)
}
@ -683,7 +683,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error {
return errors.New("failed to show collections")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
// update collectionID to collection name, and return new error info to sdk
newErrorReason := resp.Status.Reason
for _, collectionID := range collectionIDs {
@ -1059,7 +1059,7 @@ func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
if hpt.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if hpt.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(hpt.result.Status.Reason)
}
return err
@ -1144,7 +1144,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error {
return errors.New("failed to show partitions")
}
if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success {
if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(respFromRootCoord.Status.Reason)
}
@ -1188,7 +1188,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error {
return errors.New("failed to show partitions")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(resp.Status.Reason)
}
@ -1303,7 +1303,7 @@ func (ft *flushTask) Execute(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error())
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(resp.Status.Reason)
}
coll2Segments[collName] = &schemapb.LongArray{Data: resp.GetSegmentIDs()}
@ -1417,7 +1417,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(indexResponse.Status.Reason)
}
@ -1645,7 +1645,7 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(indexResponse.Status.Reason)
}
@ -2230,7 +2230,7 @@ func (t *DescribeResourceGroupTask) Execute(ctx context.Context) error {
return ret, nil
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success {
rgInfo := resp.GetResourceGroup()
numLoadedReplica, err := getCollectionName(rgInfo.NumLoadedReplica)

View File

@ -495,7 +495,7 @@ func (dit *describeIndexTask) Execute(ctx context.Context) error {
}
dit.result = &milvuspb.DescribeIndexResponse{}
dit.result.Status = resp.GetStatus()
if dit.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(dit.result.Status.Reason)
}
for _, indexInfo := range resp.IndexInfos {
@ -614,7 +614,7 @@ func (dit *getIndexStatisticsTask) Execute(ctx context.Context) error {
}
dit.result = &milvuspb.GetIndexStatisticsResponse{}
dit.result.Status = resp.GetStatus()
if dit.result.Status.ErrorCode != commonpb.ErrorCode_Success {
if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(dit.result.Status.Reason)
}
for _, indexInfo := range resp.IndexInfos {

View File

@ -241,7 +241,7 @@ func (g *getStatisticsTask) getStatisticsFromDataCoord(ctx context.Context) erro
if err != nil {
return err
}
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(result.Status.Reason)
}
if g.resultBuf == nil {
@ -335,7 +335,7 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, dbName string, co
if err != nil {
return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason())
}
@ -360,7 +360,7 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, dbName string, co
if err != nil {
return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason())
}
@ -462,7 +462,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
@ -534,7 +534,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
@ -557,7 +557,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
@ -656,7 +656,7 @@ func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(result.Status.Reason)
}
g.result = &milvuspb.GetCollectionStatisticsResponse{
@ -746,7 +746,7 @@ func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error {
if result == nil {
return errors.New("get partition statistics resp is nil")
}
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(result.Status.Reason)
}
g.result = &milvuspb.GetPartitionStatisticsResponse{

View File

@ -66,7 +66,7 @@ func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timest
if err != nil {
return nil, fmt.Errorf("syncTimestamp Failed:%w", err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("syncTimeStamp Failed:%s", resp.Status.Reason)
}
start, cnt := resp.Timestamp, resp.Count

View File

@ -1011,7 +1011,7 @@ func isCollectionLoaded(ctx context.Context, qc types.QueryCoord, collID int64)
if err != nil {
return false, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false, errors.New(resp.Status.Reason)
}
@ -1032,7 +1032,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoord, collID int64, p
if err != nil {
return false, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false, errors.New(resp.Status.Reason)
}
@ -1203,13 +1203,13 @@ func getCollectionProgress(
return
}
if resp.Status.ErrorCode == commonpb.ErrorCode_InsufficientMemoryToLoad {
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad {
err = ErrInsufficientMemory
log.Warn("detected insufficientMemoryError when getCollectionProgress", zap.Int64("collection_id", collectionID), zap.String("reason", resp.GetStatus().GetReason()))
return
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = merr.Error(resp.GetStatus())
log.Warn("fail to show collections", zap.Int64("collection_id", collectionID),
zap.String("reason", resp.Status.Reason))

View File

@ -149,10 +149,10 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
zap.String("scope", req.GetScope().String()),
)
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
var err error
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
}
}()
@ -170,18 +170,16 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
// get delegator
sd, ok := node.delegators.Get(channel)
if !ok {
err := merr.WrapErrServiceUnavailable("failed to get shard delegator for query")
err := merr.WrapErrChannelNotFound(channel)
log.Warn("Query failed, failed to get shard delegator for query", zap.Error(err))
failRet.Status = merr.Status(err)
return failRet, nil
return nil, err
}
// do query
results, err := sd.Query(queryCtx, req)
if err != nil {
log.Warn("failed to query on delegator", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, nil
return nil, err
}
// reduce result
@ -196,16 +194,14 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
if collection == nil {
err := merr.WrapErrCollectionNotFound(req.Req.GetCollectionID())
log.Warn("Query failed, failed to get collection", zap.Error(err))
failRet.Status = merr.Status(err)
return failRet, nil
return nil, err
}
reducer := segments.CreateInternalReducer(req, collection.Schema())
ret, err := reducer.Reduce(ctx, results)
resp, err := reducer.Reduce(ctx, results)
if err != nil {
failRet.Status.Reason = err.Error()
return failRet, nil
return nil, err
}
tr.CtxElapse(ctx, fmt.Sprintf("do query with channel done , vChannel = %s, segmentIDs = %v",
@ -213,17 +209,14 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
req.GetSegmentIDs(),
))
//
ret.Status = merr.Status(nil)
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc()
return ret, nil
return resp, nil
}
func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.QueryRequest, channel string, srv streamrpc.QueryStreamServer) error {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
msgID := req.Req.Base.GetMsgID()
log := log.Ctx(ctx).With(
zap.Int64("msgID", msgID),
@ -232,8 +225,9 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer
zap.String("scope", req.GetScope().String()),
)
var err error
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
}
}()
@ -252,13 +246,13 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer
// get delegator
sd, ok := node.delegators.Get(channel)
if !ok {
err := merr.WrapErrServiceUnavailable("failed to get query shard delegator")
err := merr.WrapErrChannelNotFound(channel)
log.Warn("Query failed, failed to get query shard delegator", zap.Error(err))
return err
}
// do query
err := sd.QueryStream(queryCtx, req, srv)
err = sd.QueryStream(queryCtx, req, srv)
if err != nil {
return err
}
@ -373,10 +367,10 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
}
defer node.lifetime.Done()
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
var err error
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader).Inc()
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc()
}
}()
@ -393,23 +387,20 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
// get delegator
sd, ok := node.delegators.Get(channel)
if !ok {
err := merr.WrapErrServiceUnavailable("failed to get shard delegator for search")
err := merr.WrapErrChannelNotFound(channel)
log.Warn("Query failed, failed to get shard delegator for search", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, err
return nil, err
}
req, err := node.optimizeSearchParams(ctx, req, sd)
req, err = node.optimizeSearchParams(ctx, req, sd)
if err != nil {
log.Warn("failed to optimize search params", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, err
return nil, err
}
// do search
results, err := sd.Search(searchCtx, req)
if err != nil {
log.Warn("failed to search on delegator", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, err
return nil, err
}
// reduce result
@ -420,10 +411,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
req.GetSegmentIDs(),
))
ret, err := segments.ReduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
resp, err := segments.ReduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
if err != nil {
failRet.Status.Reason = err.Error()
return failRet, err
return nil, err
}
tr.CtxElapse(ctx, fmt.Sprintf("do search with channel done , vChannel = %s, segmentIDs = %v",
@ -432,14 +422,13 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
))
// update metric to prometheus
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader).Inc()
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetNq()))
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetTopk()))
return ret, nil
return resp, nil
}
func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.GetStatisticsRequest, channel string) (*internalpb.GetStatisticsResponse, error) {
@ -448,11 +437,8 @@ func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.Ge
zap.String("channel", channel),
zap.String("scope", req.GetScope().String()),
)
failRet := &internalpb.GetStatisticsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
resp := &internalpb.GetStatisticsResponse{}
if req.GetFromShardLeader() {
var (
@ -478,23 +464,26 @@ func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.Ge
sd, ok := node.delegators.Get(channel)
if !ok {
log.Warn("GetStatistics failed, failed to get query shard delegator")
return failRet, nil
err := merr.WrapErrChannelNotFound(channel, "failed to get channel statistics")
log.Warn("GetStatistics failed, failed to get query shard delegator", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
results, err := sd.GetStatistics(ctx, req)
if err != nil {
log.Warn("failed to get statistics from delegator", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, nil
resp.Status = merr.Status(err)
return resp, nil
}
ret, err := reduceStatisticResponse(results)
resp, err = reduceStatisticResponse(results)
if err != nil {
failRet.Status.Reason = err.Error()
return failRet, nil
log.Warn("failed to reduce channel statistics", zap.Error(err))
resp.Status = merr.Status(err)
return resp, nil
}
return ret, nil
return resp, nil
}
func segmentStatsResponse(segStats []segments.SegmentStats) *internalpb.GetStatisticsResponse {

View File

@ -23,7 +23,6 @@ import (
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
@ -163,10 +162,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
quotaMetrics, err := getQuotaMetrics(node)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()),
}, nil
}
@ -201,10 +197,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, paramtable.GetNodeID()),
}, nil

View File

@ -112,16 +112,11 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis
}
defer node.lifetime.Done()
if !CheckTargetID(req.GetReq()) {
targetID := req.GetReq().GetBase().GetTargetID()
log.Warn("target ID not match",
zap.Int64("targetID", targetID),
zap.Int64("nodeID", paramtable.GetNodeID()),
)
err := merr.CheckTargetID(req.GetReq().GetBase())
if err != nil {
log.Warn("target ID check failed", zap.Error(err))
return &internalpb.GetStatisticsResponse{
Status: util.WrapStatus(commonpb.ErrorCode_NodeIDNotMatch,
common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID()),
),
Status: merr.Status(err),
}, nil
}
failRet := &internalpb.GetStatisticsResponse{
@ -145,7 +140,7 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis
mu.Lock()
defer mu.Unlock()
if err != nil {
failRet.Status.Reason = err.Error()
failRet.Status = merr.Status(err)
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
return err
}
@ -164,7 +159,7 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis
ret, err := reduceStatisticResponse(toReduceResults)
if err != nil {
failRet.Status.Reason = err.Error()
failRet.Status = merr.Status(err)
return failRet, nil
}
log.Debug("reduce statistic result done")
@ -486,10 +481,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
req.GetInfos()...,
)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
return merr.Status(err), nil
}
node.manager.Collection.Ref(req.GetCollectionID(), uint32(len(loaded)))
@ -686,16 +678,16 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
zap.String("scope", req.GetScope().String()),
)
failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "")
resp := &internalpb.SearchResults{}
if !node.lifetime.Add(commonpbutil.IsHealthy) {
failRet.Status = merr.Status(merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())))
return failRet, nil
resp.Status = merr.Status(merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())))
return resp, nil
}
defer node.lifetime.Done()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc()
}
}()
@ -713,22 +705,22 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
if collection == nil {
err := merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID())
log.Warn("failed to search segments", zap.Error(err))
failRet.Status = merr.Status(err)
return failRet, nil
resp.Status = merr.Status(err)
return resp, nil
}
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
if err := node.scheduler.Add(task); err != nil {
log.Warn("failed to search channel", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, nil
resp.Status = merr.Status(err)
return resp, nil
}
err := task.Wait()
if err != nil {
log.Warn("failed to search segments", zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, nil
resp.Status = merr.Status(err)
return resp, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
@ -736,16 +728,14 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
req.GetSegmentIDs(),
))
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
result := task.Result()
result.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
result.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
return result, nil
resp = task.Result()
resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
resp.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
return resp, nil
}
// Search performs replica search tasks.
@ -776,14 +766,12 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
}
defer node.lifetime.Done()
if !CheckTargetID(req.GetReq()) {
targetID := req.GetReq().GetBase().GetTargetID()
log.Warn("target ID not match",
zap.Int64("targetID", targetID),
zap.Int64("nodeID", paramtable.GetNodeID()),
)
return WrapSearchResult(commonpb.ErrorCode_NodeIDNotMatch,
common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())), nil
err := merr.CheckTargetID(req.GetReq().GetBase())
if err != nil {
log.Warn("target ID check failed", zap.Error(err))
return &internalpb.SearchResults{
Status: merr.Status(err),
}, nil
}
failRet := &internalpb.SearchResults{
@ -828,8 +816,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
mu.Lock()
defer mu.Unlock()
if err != nil {
failRet.Status.Reason = err.Error()
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
failRet.Status = merr.Status(err)
return err
}
if ret.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
@ -848,7 +835,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
if err != nil {
log.Warn("failed to reduce search results", zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
failRet.Status.Reason = err.Error()
failRet.Status = merr.Status(err)
return failRet, nil
}
reduceLatency := tr.RecordSpan()
@ -868,7 +855,11 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
// only used for delegator query segments from worker
func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
failRet := &internalpb.RetrieveResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
msgID := req.Req.Base.GetMsgID()
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
channel := req.GetDmlChannels()[0]
@ -888,7 +879,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if failRet.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
}
}()
@ -941,7 +932,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
}
func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, streamer streamrpc.QueryStreamer) error {
failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "")
msgID := req.Req.Base.GetMsgID()
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
channel := req.GetDmlChannels()[0]
@ -954,16 +944,17 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que
zap.String("scope", req.GetScope().String()),
)
resp := &internalpb.RetrieveResults{}
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
defer func() {
if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
}
}()
if !node.lifetime.Add(commonpbutil.IsHealthy) {
failRet.Status = merr.Status(merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())))
srv.Send(failRet)
resp.Status = merr.Status(merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())))
srv.Send(resp)
return nil
}
defer node.lifetime.Done()
@ -977,8 +968,8 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que
err := node.queryStreamSegments(ctx, req, srv)
if err != nil {
failRet.Status = merr.Status(err)
srv.Send(failRet)
resp.Status = merr.Status(err)
srv.Send(resp)
return nil
}
@ -989,7 +980,6 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que
req.GetSegmentIDs(),
))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
@ -1027,14 +1017,12 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
}
defer node.lifetime.Done()
if !CheckTargetID(req.GetReq()) {
targetID := req.GetReq().GetBase().GetTargetID()
log.Warn("target ID not match",
zap.Int64("targetID", targetID),
zap.Int64("nodeID", paramtable.GetNodeID()),
)
return WrapRetrieveResult(commonpb.ErrorCode_NodeIDNotMatch,
common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())), nil
err := merr.CheckTargetID(req.GetReq().GetBase())
if err != nil {
log.Warn("target ID check failed", zap.Error(err))
return &internalpb.RetrieveResults{
Status: merr.Status(err),
}, nil
}
toMergeResults := make([]*internalpb.RetrieveResults, len(req.GetDmlChannels()))
@ -1064,14 +1052,18 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
})
}
if err := runningGp.Wait(); err != nil {
return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err), nil
return &internalpb.RetrieveResults{
Status: merr.Status(err),
}, nil
}
tr.RecordSpan()
reducer := segments.CreateInternalReducer(req, node.manager.Collection.Get(req.GetReq().GetCollectionID()).Schema())
ret, err := reducer.Reduce(ctx, toMergeResults)
if err != nil {
return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err), nil
return &internalpb.RetrieveResults{
Status: merr.Status(err),
}, nil
}
reduceLatency := tr.RecordSpan()
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.ReduceShards).
@ -1112,15 +1104,10 @@ func (node *QueryNode) QueryStream(ctx context.Context, req *querypb.QueryReques
}
defer node.lifetime.Done()
if !CheckTargetID(req.GetReq()) {
targetID := req.GetReq().GetBase().GetTargetID()
log.Warn("target ID not match",
zap.Int64("targetID", targetID),
zap.Int64("nodeID", paramtable.GetNodeID()),
)
srv.Send(WrapRetrieveResult(commonpb.ErrorCode_NodeIDNotMatch,
common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())))
return nil
err := merr.CheckTargetID(req.GetReq().GetBase())
if err != nil {
log.Warn("target ID check failed", zap.Error(err))
return err
}
runningGp, runningCtx := errgroup.WithContext(ctx)
@ -1145,7 +1132,9 @@ func (node *QueryNode) QueryStream(ctx context.Context, req *querypb.QueryReques
}
if err := runningGp.Wait(); err != nil {
srv.Send(WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err))
srv.Send(&internalpb.RetrieveResults{
Status: merr.Status(err),
})
return nil
}
@ -1214,10 +1203,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
@ -1230,10 +1216,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
zap.String("metricType", metricType),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
log.RatedDebug(50, "QueryNode.GetMetrics",
@ -1251,11 +1234,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
zap.String("metricType", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
}, nil
}
@ -1378,11 +1357,9 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
// get shard delegator
shardDelegator, ok := node.delegators.Get(req.GetChannel())
if !ok {
err := merr.WrapErrChannelNotFound(req.GetChannel())
log.Warn("failed to find shard cluster when sync")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "shard not exist",
}, nil
return merr.Status(err), nil
}
// translate segment action
@ -1416,10 +1393,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
action.GetSealedInTarget(), action.GetDroppedInTarget())
default:
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unexpected action type",
}, nil
return merr.Status(merr.WrapErrServiceInternal("unknown action type", action.GetType().String())), nil
}
}
@ -1503,10 +1477,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
err := segment.Delete(pks, req.GetTimestamps())
if err != nil {
log.Warn("segment delete failed", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("delete on segment %d failed, %s", req.GetSegmentId(), err.Error()),
}, nil
return merr.Status(err), nil
}
}

View File

@ -1189,8 +1189,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
// Delegator not found
resp, err = suite.node.Search(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.Contains(resp.GetStatus().GetReason(), merr.ErrServiceUnavailable.Error())
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrChannelNotFound)
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
@ -1335,8 +1334,7 @@ func (suite *ServiceSuite) TestQuery_Failed() {
// Delegator not found
resp, err := suite.node.Query(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
suite.Contains(resp.GetStatus().GetReason(), merr.ErrServiceUnavailable.Error())
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrChannelNotFound)
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
@ -1467,8 +1465,7 @@ func (suite *ServiceSuite) TestQueryStream_Failed() {
err = merr.Error(result.GetStatus())
// Check result
if err != nil {
suite.Equal(commonpb.ErrorCode_UnexpectedError, result.GetStatus().GetErrorCode())
suite.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
suite.ErrorIs(err, merr.ErrChannelNotFound)
}
}
@ -1674,8 +1671,8 @@ func (suite *ServiceSuite) TestGetMetric_Failed() {
resp, err := suite.node.GetMetrics(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
suite.Equal(metricsinfo.MsgUnimplementedMetric, resp.Status.Reason)
err = merr.Error(resp.GetStatus())
suite.ErrorIs(err, merr.ErrMetricNotFound)
// metric parse failed
req.Request = "---"

View File

@ -1,27 +0,0 @@
package querynodev2
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func WrapRetrieveResult(code commonpb.ErrorCode, msg string, errs ...error) *internalpb.RetrieveResults {
return &internalpb.RetrieveResults{
Status: util.WrapStatus(code, msg, errs...),
}
}
func WrapSearchResult(code commonpb.ErrorCode, msg string, errs ...error) *internalpb.SearchResults {
return &internalpb.SearchResults{
Status: util.WrapStatus(code, msg, errs...),
}
}
// CheckTargetID checks whether the target ID of request is the server itself,
// returns true if matched,
// returns false otherwise
func CheckTargetID[R interface{ GetBase() *commonpb.MsgBase }](req R) bool {
return req.GetBase().GetTargetID() == paramtable.GetNodeID()
}

View File

@ -250,7 +250,7 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, errors.New(resp.Status.Reason)
}

View File

@ -36,7 +36,7 @@ func WaitForComponentStates(ctx context.Context, service types.Component, servic
return err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return errors.New(resp.Status.Reason)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var (
@ -148,6 +149,14 @@ func CheckHealthy(state commonpb.StateCode) error {
return nil
}
func CheckTargetID(msg *commonpb.MsgBase) error {
if msg.GetTargetID() != paramtable.GetNodeID() {
return WrapErrNodeNotMatch(paramtable.GetNodeID(), msg.GetTargetID())
}
return nil
}
// Service related
func WrapErrServiceNotReady(stage string, msg ...string) error {
err := errors.Wrapf(ErrServiceNotReady, "stage=%s", stage)