mirror of https://github.com/milvus-io/milvus.git
Fix service time check of search in streaming (#6068)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/6130/head
parent
39614aa8eb
commit
0d0c1901dd
|
@ -384,9 +384,9 @@ func (q *queryCollection) receiveSearch(msg *msgstream.SearchMsg) {
|
|||
}
|
||||
|
||||
serviceTime := q.getServiceableTime()
|
||||
bt, _ := tsoutil.ParseTS(msg.BeginTs())
|
||||
st, _ := tsoutil.ParseTS(serviceTime)
|
||||
if msg.BeginTs() > serviceTime {
|
||||
bt, _ := tsoutil.ParseTS(msg.BeginTs())
|
||||
st, _ := tsoutil.ParseTS(serviceTime)
|
||||
log.Debug("query node::receiveSearchMsg: add to unsolvedMsg",
|
||||
zap.Any("collectionID", q.collectionID),
|
||||
zap.Any("sm.BeginTs", bt),
|
||||
|
@ -407,6 +407,10 @@ func (q *queryCollection) receiveSearch(msg *msgstream.SearchMsg) {
|
|||
log.Debug("doing search in receiveSearchMsg...",
|
||||
zap.Int64("collectionID", msg.CollectionID),
|
||||
zap.Int64("msgID", msg.ID()),
|
||||
zap.Any("serviceTime_l", serviceTime),
|
||||
zap.Any("searchTime_l", msg.BeginTs()),
|
||||
zap.Any("serviceTime_p", st),
|
||||
zap.Any("searchTime_p", bt),
|
||||
)
|
||||
err = q.search(msg)
|
||||
if err != nil {
|
||||
|
@ -730,7 +734,7 @@ func (q *queryCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
sp, ctx := trace.StartSpanFromContext(searchMsg.TraceCtx())
|
||||
defer sp.Finish()
|
||||
searchMsg.SetTraceCtx(ctx)
|
||||
searchTimestamp := searchMsg.SearchRequest.TravelTimestamp
|
||||
searchTimestamp := searchMsg.BeginTs()
|
||||
|
||||
collectionID := searchMsg.CollectionID
|
||||
collection, err := q.streaming.replica.getCollectionByID(collectionID)
|
||||
|
|
|
@ -16,11 +16,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
)
|
||||
|
||||
type streaming struct {
|
||||
|
@ -128,19 +129,46 @@ func (s *streaming) search(searchReqs []*searchRequest,
|
|||
zap.Any("searchPartitionIDs", searchPartIDs),
|
||||
)
|
||||
|
||||
log.Debug("print streaming replica when searching...",
|
||||
zap.Any("collectionID", collID),
|
||||
)
|
||||
s.replica.printReplica()
|
||||
|
||||
for _, partID := range searchPartIDs {
|
||||
segIDs, err := s.replica.getSegmentIDsByVChannel(partID, vChannel)
|
||||
log.Debug("get segmentIDs by vChannel",
|
||||
zap.Any("collectionID", collID),
|
||||
zap.Any("vChannel", vChannel),
|
||||
zap.Any("partitionID", partID),
|
||||
zap.Any("segmentIDs", segIDs),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return searchResults, segmentResults, err
|
||||
}
|
||||
for _, segID := range segIDs {
|
||||
seg, err := s.replica.getSegmentByID(segID)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return searchResults, segmentResults, err
|
||||
}
|
||||
|
||||
// TSafe less than searchTs means this vChannel is not available
|
||||
ts := s.tSafeReplica.getTSafe(seg.vChannelID)
|
||||
gracefulTimeInMilliSecond := Params.GracefulTime
|
||||
if gracefulTimeInMilliSecond > 0 {
|
||||
gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
|
||||
ts += gracefulTime
|
||||
}
|
||||
tsp, _ := tsoutil.ParseTS(ts)
|
||||
stp, _ := tsoutil.ParseTS(searchTs)
|
||||
log.Debug("timestamp check in streaming search",
|
||||
zap.Any("collectionID", collID),
|
||||
zap.Any("serviceTime_l", ts),
|
||||
zap.Any("searchTime_l", searchTs),
|
||||
zap.Any("serviceTime_p", tsp),
|
||||
zap.Any("searchTime_p", stp),
|
||||
)
|
||||
if ts < searchTs {
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue