mirror of https://github.com/milvus-io/milvus.git
Fix channels in query node and fix search (#5795)
* fix add channel error Signed-off-by: bigsheeper <yihao.dai@zilliz.com> * fix tSafe vChannel key Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/5796/head
parent
1c6786f85c
commit
bf39644457
|
@ -512,9 +512,10 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
GlobalSealedSegmentIDs: sealedSegmentSearched,
|
||||
},
|
||||
}
|
||||
log.Debug("QueryNode SearchResultMsg",
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
log.Debug("QueryNode Empty SearchResultMsg",
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
zap.Any("msgID", searchMsg.ID()),
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
|
||||
)
|
||||
err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID)
|
||||
|
@ -603,8 +604,9 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
},
|
||||
}
|
||||
log.Debug("QueryNode SearchResultMsg",
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
zap.Any("msgID", searchMsg.ID()),
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
|
||||
)
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
log.Debug("starting WatchDmChannels ...",
|
||||
zap.Any("collectionName", w.req.Schema.Name),
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.String("ChannelIDs", fmt.Sprintln(vChannels)))
|
||||
zap.String("vChannels", fmt.Sprintln(vChannels)))
|
||||
|
||||
// get physical channels
|
||||
desColReq := &milvuspb.DescribeCollectionRequest{
|
||||
|
@ -135,10 +135,10 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
desColRsp, err := w.node.masterService.DescribeCollection(ctx, desColReq)
|
||||
if err != nil {
|
||||
log.Error("get physical channels failed, err = " + err.Error())
|
||||
log.Error("get channels failed, err = " + err.Error())
|
||||
return err
|
||||
}
|
||||
log.Debug("get physical channels from master",
|
||||
log.Debug("get channels from master",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("vChannels", desColRsp.VirtualChannelNames),
|
||||
zap.Any("pChannels", desColRsp.PhysicalChannelNames),
|
||||
|
@ -165,13 +165,13 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
w.node.streaming.replica.initExcludedSegments(collectionID)
|
||||
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
collection.addVChannels(vChannels)
|
||||
collection.addPChannels(pChannels)
|
||||
}
|
||||
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
collection.addVChannels(vChannels)
|
||||
collection.addPChannels(pChannels)
|
||||
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
|
||||
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
|
||||
if err != nil {
|
||||
|
@ -227,7 +227,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID))
|
||||
|
||||
// create tSafe
|
||||
for _, channel := range VPChannels {
|
||||
for _, channel := range vChannels {
|
||||
w.node.streaming.tSafeReplica.addTSafe(channel)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue