diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index ec2529685b..3f9d1aa18a 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -861,18 +861,21 @@ func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.Q m.channelMu.Lock() defer m.channelMu.Unlock() - //TODO::to remove - collectionID = 0 if info, ok := m.queryChannelInfos[collectionID]; ok { return proto.Clone(info).(*querypb.QueryChannelInfo), nil } - info := createQueryChannel(collectionID) + // TODO::to remove + // all collection use the same query channel + colIDForAssignChannel := UniqueID(0) + info := createQueryChannel(colIDForAssignChannel) err := saveQueryChannelInfo(collectionID, info, m.client) if err != nil { log.Error("getQueryChannel: save channel to etcd error", zap.Error(err)) return nil, err } + // set info.collectionID from 0 to realID + info.CollectionID = collectionID m.queryChannelInfos[collectionID] = info return proto.Clone(info).(*querypb.QueryChannelInfo), nil } diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index d694da4c33..991eb3fdea 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -126,14 +126,13 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery return status, err } consumeChannels := []string{in.RequestChannelID} + consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) + sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName) if in.SeekPosition == nil || len(in.SeekPosition.MsgID) == 0 { // as consumer - consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) - sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName) log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) } else { // seek query channel - sc.queryMsgStream.AsConsumer(consumeChannels, in.SeekPosition.MsgGroup) err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{in.SeekPosition}) if err != nil { status := &commonpb.Status{