get channels by static converting (#6015)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/6036/head
bigsheeper 2021-06-23 17:26:09 +08:00 committed by GitHub
parent 17303bf4d3
commit 47d89abaa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 32 deletions

View File

@ -22,11 +22,10 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord"
)
type task interface {
@ -118,45 +117,26 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
// get all vChannels
vChannels := make([]Channel, 0)
pChannels := make([]Channel, 0)
VPChannels := make(map[string]string) // map[vChannel]pChannel
for _, info := range w.req.Infos {
vChannels = append(vChannels, info.ChannelName)
v := info.ChannelName
p := rootcoord.ToPhysicalChannel(info.ChannelName)
vChannels = append(vChannels, v)
pChannels = append(pChannels, p)
VPChannels[v] = p
}
log.Debug("starting WatchDmChannels ...",
zap.Any("collectionName", w.req.Schema.Name),
zap.Any("collectionID", collectionID),
zap.String("vChannels", fmt.Sprintln(vChannels)))
// get physical channels
desColReq := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: collectionID,
}
desColRsp, err := w.node.rootCoord.DescribeCollection(ctx, desColReq)
if err != nil {
log.Error("get channels failed, err = " + err.Error())
return err
}
log.Debug("get channels from master",
zap.Any("collectionID", collectionID),
zap.Any("vChannels", desColRsp.VirtualChannelNames),
zap.Any("pChannels", desColRsp.PhysicalChannelNames),
zap.Any("vChannels", vChannels),
zap.Any("pChannels", pChannels),
)
VPChannels := make(map[string]string) // map[vChannel]pChannel
for _, ch := range vChannels {
for i := range desColRsp.VirtualChannelNames {
if desColRsp.VirtualChannelNames[i] == ch {
VPChannels[ch] = desColRsp.PhysicalChannelNames[i]
pChannels = append(pChannels, desColRsp.PhysicalChannelNames[i])
break
}
}
}
if len(VPChannels) != len(vChannels) {
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
}
log.Debug("get physical channels done", zap.Any("collectionID", collectionID))
log.Debug("get physical channels done",
zap.Any("collectionID", collectionID),
)
// init replica
if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {