Update ListCollectionPhysicalChannels and ListCollectionVirtualChannels (#11007)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/11083/head
Cai Yudong 2021-11-02 15:50:30 +08:00 committed by GitHub
parent c29dc0be6a
commit 66b9684fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 19 deletions

View File

@ -535,27 +535,27 @@ func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string {
}
// ListCollectionVirtualChannels list virtual channels of all collections
func (mt *MetaTable) ListCollectionVirtualChannels() []string {
func (mt *MetaTable) ListCollectionVirtualChannels() map[typeutil.UniqueID][]string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
vlist := []string{}
chanMap := make(map[typeutil.UniqueID][]string)
for _, c := range mt.collID2Meta {
vlist = append(vlist, c.VirtualChannelNames...)
for id, collInfo := range mt.collID2Meta {
chanMap[id] = collInfo.VirtualChannelNames
}
return vlist
return chanMap
}
// ListCollectionPhysicalChannels list physical channels of all collections
func (mt *MetaTable) ListCollectionPhysicalChannels() []string {
func (mt *MetaTable) ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
plist := []string{}
chanMap := make(map[typeutil.UniqueID][]string)
for _, c := range mt.collID2Meta {
plist = append(plist, c.PhysicalChannelNames...)
for id, collInfo := range mt.collID2Meta {
chanMap[id] = collInfo.PhysicalChannelNames
}
return plist
return chanMap
}
// AddPartition add partition

View File

@ -928,9 +928,11 @@ func (c *Core) Init() error {
c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum)
// recover physical channels for all collections
pc := c.MetaTable.ListCollectionPhysicalChannels()
c.dmlChannels.AddProducerChannels(pc...)
log.Debug("recover all physical channels", zap.Any("chanNames", pc))
chanMap := c.MetaTable.ListCollectionPhysicalChannels()
for collID, chanNames := range chanMap {
c.dmlChannels.AddProducerChannels(chanNames...)
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("chanNames", chanNames))
}
c.chanTimeTick = newTimeTickSync(c)
c.chanTimeTick.AddProxy(c.session)

View File

@ -674,17 +674,21 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, shardsNum, int32(core.dmlChannels.GetNumChannels()))
pChan := core.MetaTable.ListCollectionPhysicalChannels()
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelSubName)
dmlStream.Start()
pChanMap := core.MetaTable.ListCollectionPhysicalChannels()
assert.Greater(t, len(pChanMap[createMeta.ID]), 0)
vChanMap := core.MetaTable.ListCollectionVirtualChannels()
assert.Greater(t, len(vChanMap[createMeta.ID]), 0)
// get CreateCollectionMsg
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())
assert.Equal(t, 1, len(msgs))
createMsg, ok := (msgs[0]).(*msgstream.CreateCollectionMsg)
assert.True(t, ok)
createMeta, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, createMsg.CollectionID)
assert.Equal(t, 1, len(createMeta.PartitionIDs))
assert.Equal(t, createMeta.PartitionIDs[0], createMsg.PartitionID)
@ -2272,9 +2276,10 @@ func TestRootCoord2(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
pChan := core.MetaTable.ListCollectionPhysicalChannels()
collInfo, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
dmlStream, _ := msFactory.NewMsgStream(ctx)
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelSubName)
dmlStream.Start()
msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())