mirror of https://github.com/milvus-io/milvus.git
Fix query coord set seek position error (#10651)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/10689/head
parent
a36c35d40e
commit
a06dab29d7
|
@ -861,18 +861,21 @@ func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.Q
|
|||
m.channelMu.Lock()
|
||||
defer m.channelMu.Unlock()
|
||||
|
||||
//TODO::to remove
|
||||
collectionID = 0
|
||||
if info, ok := m.queryChannelInfos[collectionID]; ok {
|
||||
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
|
||||
}
|
||||
|
||||
info := createQueryChannel(collectionID)
|
||||
// TODO::to remove
|
||||
// all collection use the same query channel
|
||||
colIDForAssignChannel := UniqueID(0)
|
||||
info := createQueryChannel(colIDForAssignChannel)
|
||||
err := saveQueryChannelInfo(collectionID, info, m.client)
|
||||
if err != nil {
|
||||
log.Error("getQueryChannel: save channel to etcd error", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
// set info.collectionID from 0 to realID
|
||||
info.CollectionID = collectionID
|
||||
m.queryChannelInfos[collectionID] = info
|
||||
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
|
||||
}
|
||||
|
|
|
@ -126,14 +126,13 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
return status, err
|
||||
}
|
||||
consumeChannels := []string{in.RequestChannelID}
|
||||
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
if in.SeekPosition == nil || len(in.SeekPosition.MsgID) == 0 {
|
||||
// as consumer
|
||||
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
|
||||
} else {
|
||||
// seek query channel
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, in.SeekPosition.MsgGroup)
|
||||
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{in.SeekPosition})
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
|
|
Loading…
Reference in New Issue