mirror of https://github.com/milvus-io/milvus.git
remove redundant log of query node (#6768)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/6783/head
parent
2f44fc40a3
commit
21bc5810c0
|
@ -43,7 +43,7 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
msMsg, ok := in[0].(*MsgStreamMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for MsgStreamMsg")
|
||||
log.Warn("type assertion failed for MsgStreamMsg")
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for MsgStreamMsg")
|
||||
log.Warn("type assertion failed for MsgStreamMsg")
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ func (gcNode *gcNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
_, ok := in[0].(*gcMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for gcMsg")
|
||||
log.Warn("type assertion failed for gcMsg")
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
iMsg, ok := in[0].(*insertMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for insertMsg")
|
||||
log.Warn("type assertion failed for insertMsg")
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for serviceTimeMsg")
|
||||
log.Warn("type assertion failed for serviceTimeMsg")
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
|
|
|
@ -156,7 +156,7 @@ func (q *queryCollection) waitNewTSafe() Timestamp {
|
|||
// block until any vChannel updating tSafe
|
||||
_, _, recvOK := reflect.Select(q.watcherSelectCase)
|
||||
if !recvOK {
|
||||
log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID))
|
||||
//log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID))
|
||||
return Timestamp(math.MaxInt64)
|
||||
}
|
||||
//log.Debug("wait new tSafe", zap.Any("collectionID", s.collectionID))
|
||||
|
@ -202,13 +202,13 @@ func (q *queryCollection) consumeQuery() {
|
|||
default:
|
||||
msgPack := q.queryMsgStream.Consume()
|
||||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
msgPackNil := msgPack == nil
|
||||
msgPackEmpty := true
|
||||
if msgPack != nil {
|
||||
msgPackEmpty = len(msgPack.Msgs) <= 0
|
||||
}
|
||||
log.Debug("consume query message failed", zap.Any("msgPack is Nil", msgPackNil),
|
||||
zap.Any("msgPackEmpty", msgPackEmpty))
|
||||
//msgPackNil := msgPack == nil
|
||||
//msgPackEmpty := true
|
||||
//if msgPack != nil {
|
||||
// msgPackEmpty = len(msgPack.Msgs) <= 0
|
||||
//}
|
||||
//log.Debug("consume query message failed", zap.Any("msgPack is Nil", msgPackNil),
|
||||
// zap.Any("msgPackEmpty", msgPackEmpty))
|
||||
continue
|
||||
}
|
||||
for _, msg := range msgPack.Msgs {
|
||||
|
@ -267,28 +267,28 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) {
|
|||
case commonpb.MsgType_Retrieve:
|
||||
collectionID = msg.(*msgstream.RetrieveMsg).CollectionID
|
||||
msgTypeStr = "retrieve"
|
||||
log.Debug("consume retrieve message",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
)
|
||||
//log.Debug("consume retrieve message",
|
||||
// zap.Any("collectionID", collectionID),
|
||||
// zap.Int64("msgID", msg.ID()),
|
||||
//)
|
||||
case commonpb.MsgType_Search:
|
||||
collectionID = msg.(*msgstream.SearchMsg).CollectionID
|
||||
msgTypeStr = "search"
|
||||
log.Debug("consume search message",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
)
|
||||
//log.Debug("consume search message",
|
||||
// zap.Any("collectionID", collectionID),
|
||||
// zap.Int64("msgID", msg.ID()),
|
||||
//)
|
||||
default:
|
||||
err := fmt.Errorf("receive invalid msgType = %d", msgType)
|
||||
log.Error(err.Error())
|
||||
return
|
||||
}
|
||||
if collectionID != q.collectionID {
|
||||
log.Error("not target collection query request",
|
||||
zap.Any("collectionID", q.collectionID),
|
||||
zap.Int64("target collectionID", collectionID),
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
)
|
||||
//log.Error("not target collection query request",
|
||||
// zap.Any("collectionID", q.collectionID),
|
||||
// zap.Int64("target collectionID", collectionID),
|
||||
// zap.Int64("msgID", msg.ID()),
|
||||
//)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ package querynode
|
|||
import "C"
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
@ -61,7 +60,7 @@ func reduceSearchResults(searchResults []*SearchResult, numSegments int64, inRed
|
|||
|
||||
func fillTargetEntry(plan *SearchPlan, searchResults []*SearchResult, matchedSegments []*Segment, inReduced []bool) error {
|
||||
wg := &sync.WaitGroup{}
|
||||
fmt.Println(inReduced)
|
||||
//fmt.Println(inReduced)
|
||||
for i := range inReduced {
|
||||
if inReduced[i] {
|
||||
wg.Add(1)
|
||||
|
|
|
@ -58,7 +58,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
wg.Done()
|
||||
fmt.Println(nodeCtx.node.Name(), "closed")
|
||||
//fmt.Println(nodeCtx.node.Name(), "closed")
|
||||
return
|
||||
default:
|
||||
// inputs from inputsMessages for Operate
|
||||
|
|
Loading…
Reference in New Issue