mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/27580/head
parent
7e9ed91304
commit
cb71a3e235
|
@ -498,6 +498,18 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
|
|||
return c.store.GetNodesChannels()
|
||||
}
|
||||
|
||||
func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel {
|
||||
channels := make([]*channel, 0)
|
||||
for _, nodeChannels := range c.store.GetNodesChannels() {
|
||||
for _, channelInfo := range nodeChannels.Channels {
|
||||
if collectionID == channelInfo.CollectionID {
|
||||
channels = append(channels, channelInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
return channels
|
||||
}
|
||||
|
||||
// GetBufferChannels gets buffer channels.
|
||||
func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
|
||||
c.mu.RLock()
|
||||
|
|
|
@ -758,20 +758,11 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
|
|||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID)
|
||||
if err != nil {
|
||||
log.Error("get collection info from rootcoord failed",
|
||||
zap.Error(err))
|
||||
|
||||
resp.Status = merr.Status(err)
|
||||
return resp, nil
|
||||
}
|
||||
channels := dresp.GetVirtualChannelNames()
|
||||
channels := s.channelManager.GetChannelsByCollectionID(collectionID)
|
||||
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
|
||||
flushedIDs := make(typeutil.UniqueSet)
|
||||
for _, c := range channels {
|
||||
channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionIDs...)
|
||||
for _, ch := range channels {
|
||||
channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...)
|
||||
channelInfos = append(channelInfos, channelInfo)
|
||||
log.Info("datacoord append channelInfo in GetRecoveryInfo",
|
||||
zap.String("channel", channelInfo.GetChannelName()),
|
||||
|
|
|
@ -114,8 +114,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
assert.EqualValues(t, 0, len(resp.GetSegments()))
|
||||
assert.EqualValues(t, 1, len(resp.GetChannels()))
|
||||
assert.Nil(t, resp.GetChannels()[0].SeekPosition)
|
||||
assert.EqualValues(t, 0, len(resp.GetChannels()))
|
||||
})
|
||||
|
||||
createSegment := func(id, collectionID, partitionID, numOfRows int64, posTs uint64,
|
||||
|
@ -231,6 +230,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := &channel{Name: "vchan1", CollectionID: 0}
|
||||
svr.channelManager.AddNode(0)
|
||||
svr.channelManager.Watch(ch)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequestV2{
|
||||
CollectionID: 0,
|
||||
}
|
||||
|
@ -307,6 +310,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := &channel{Name: "vchan1", CollectionID: 0}
|
||||
svr.channelManager.AddNode(0)
|
||||
svr.channelManager.Watch(ch)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequestV2{
|
||||
CollectionID: 0,
|
||||
}
|
||||
|
@ -447,6 +454,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := &channel{Name: "vchan1", CollectionID: 0}
|
||||
svr.channelManager.AddNode(0)
|
||||
svr.channelManager.Watch(ch)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequestV2{
|
||||
CollectionID: 0,
|
||||
}
|
||||
|
@ -489,6 +500,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
|
||||
assert.NoError(t, err)
|
||||
|
||||
ch := &channel{Name: "vchan1", CollectionID: 0}
|
||||
svr.channelManager.AddNode(0)
|
||||
svr.channelManager.Watch(ch)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequestV2{
|
||||
CollectionID: 0,
|
||||
}
|
||||
|
@ -569,6 +584,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||
IndexSize: 0,
|
||||
})
|
||||
|
||||
ch := &channel{Name: "vchan1", CollectionID: 0}
|
||||
svr.channelManager.AddNode(0)
|
||||
svr.channelManager.Watch(ch)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequestV2{
|
||||
CollectionID: 0,
|
||||
}
|
||||
|
|
|
@ -150,7 +150,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
|
|||
}
|
||||
recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest)
|
||||
if err != nil {
|
||||
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
|
||||
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue