mirror of https://github.com/milvus-io/milvus.git
parent
7e60460aee
commit
8a4c6a039f
|
@ -132,23 +132,34 @@ func (s *searchCollection) receiveSearchMsg() {
|
|||
for {
|
||||
select {
|
||||
case <-s.releaseCtx.Done():
|
||||
log.Debug("stop receiveSearchMsg", zap.Int64("collectionID", s.collectionID))
|
||||
log.Debug("stop searchCollection's receiveSearchMsg", zap.Int64("collectionID", s.collectionID))
|
||||
return
|
||||
case sm := <-s.msgBuffer:
|
||||
log.Debug("get search message from msgBuffer",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
serviceTime := s.getServiceableTime()
|
||||
if sm.BeginTs() > serviceTime {
|
||||
s.addToUnsolvedMsg(sm)
|
||||
continue
|
||||
}
|
||||
log.Debug("doing search in receiveSearchMsg...",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
err := s.search(sm)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Debug("do search failed in receiveSearchMsg, prepare to publish failed search result",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
err2 := s.publishFailedSearchResult(sm, err.Error())
|
||||
if err2 != nil {
|
||||
log.Error("publish FailedSearchResult failed", zap.Error(err2))
|
||||
}
|
||||
}
|
||||
log.Debug("ReceiveSearchMsg, do search done, num of searchMsg = 1")
|
||||
log.Debug("do search done in receiveSearchMsg",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -157,7 +168,7 @@ func (s *searchCollection) doUnsolvedMsgSearch() {
|
|||
for {
|
||||
select {
|
||||
case <-s.releaseCtx.Done():
|
||||
log.Debug("stop doUnsolvedMsgSearch", zap.Int64("collectionID", s.collectionID))
|
||||
log.Debug("stop searchCollection's doUnsolvedMsgSearch", zap.Int64("collectionID", s.collectionID))
|
||||
return
|
||||
default:
|
||||
serviceTime, err := s.waitNewTSafe()
|
||||
|
@ -166,11 +177,17 @@ func (s *searchCollection) doUnsolvedMsgSearch() {
|
|||
// TODO: emptySearch or continue, note: collection has been released
|
||||
continue
|
||||
}
|
||||
log.Debug("get tSafe from flow graph",
|
||||
zap.Int64("collectionID", s.collectionID),
|
||||
zap.Uint64("tSafe", serviceTime))
|
||||
|
||||
searchMsg := make([]*msgstream.SearchMsg, 0)
|
||||
tempMsg := s.popAllUnsolvedMsg()
|
||||
|
||||
for _, sm := range tempMsg {
|
||||
log.Debug("get search message from unsolvedMsg",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
if sm.EndTs() <= serviceTime {
|
||||
searchMsg = append(searchMsg, sm)
|
||||
continue
|
||||
|
@ -184,15 +201,24 @@ func (s *searchCollection) doUnsolvedMsgSearch() {
|
|||
for _, sm := range searchMsg {
|
||||
sp, ctx := trace.StartSpanFromContext(sm.TraceCtx())
|
||||
sm.SetTraceCtx(ctx)
|
||||
log.Debug("doing search in doUnsolvedMsgSearch...",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
err := s.search(sm)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
log.Debug("do search failed in doUnsolvedMsgSearch, prepare to publish failed search result",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
err2 := s.publishFailedSearchResult(sm, err.Error())
|
||||
if err2 != nil {
|
||||
log.Error("publish FailedSearchResult failed", zap.Error(err2))
|
||||
}
|
||||
}
|
||||
sp.Finish()
|
||||
log.Debug("do search done in doUnsolvedMsgSearch",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
}
|
||||
log.Debug("doUnsolvedMsgSearch, do search done", zap.Int("num of searchMsg", len(searchMsg)))
|
||||
}
|
||||
|
@ -304,7 +330,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
MetricType: plan.getMetricType(),
|
||||
},
|
||||
}
|
||||
err = s.publishSearchResult(searchResultMsg)
|
||||
err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -378,7 +404,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
// fmt.Println(testHits.IDs)
|
||||
// fmt.Println(testHits.Scores)
|
||||
//}
|
||||
err = s.publishSearchResult(searchResultMsg)
|
||||
err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -391,13 +417,23 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *searchCollection) publishSearchResult(msg msgstream.TsMsg) error {
|
||||
func (s *searchCollection) publishSearchResult(msg msgstream.TsMsg, collectionID UniqueID) error {
|
||||
log.Debug("publishing search result...",
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
zap.Int64("collectionID", collectionID))
|
||||
span, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
||||
defer span.Finish()
|
||||
msg.SetTraceCtx(ctx)
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||
err := s.searchResultMsgStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
} else {
|
||||
log.Debug("publish search result done",
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -82,8 +82,8 @@ func (s *searchService) consumeSearch() {
|
|||
if msgPack == nil || len(msgPack.Msgs) <= 0 {
|
||||
continue
|
||||
}
|
||||
emptySearchNum := 0
|
||||
for _, msg := range msgPack.Msgs {
|
||||
log.Debug("consume search message", zap.Int64("msgID", msg.ID()))
|
||||
sm, ok := msg.(*msgstream.SearchMsg)
|
||||
if !ok {
|
||||
continue
|
||||
|
@ -93,17 +93,20 @@ func (s *searchService) consumeSearch() {
|
|||
err := s.collectionCheck(sm.CollectionID)
|
||||
if err != nil {
|
||||
s.emptySearchCollection.emptySearch(sm)
|
||||
emptySearchNum++
|
||||
log.Debug("cannot found collection, do empty search done",
|
||||
zap.Int64("msgID", sm.ID()),
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
continue
|
||||
}
|
||||
sc, ok := s.searchCollections[sm.CollectionID]
|
||||
if !ok {
|
||||
s.startSearchCollection(sm.CollectionID)
|
||||
log.Debug("new search collection, start search collection service",
|
||||
zap.Int64("collectionID", sm.CollectionID))
|
||||
}
|
||||
sc.msgBuffer <- sm
|
||||
sp.Finish()
|
||||
}
|
||||
log.Debug("do empty search done", zap.Int("num of searchMsg", emptySearchNum))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue