From cb71a3e2359215ada0372439d3f91547f88873b3 Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Mon, 9 Oct 2023 18:51:32 +0800 Subject: [PATCH] rm dependency to rc when getting recovery info(#25363) (#27405) Signed-off-by: MrPresent-Han --- internal/datacoord/channel_manager.go | 12 ++++++++++ internal/datacoord/services.go | 15 +++--------- internal/datacoord/services_test.go | 23 +++++++++++++++++-- .../querycoordv2/meta/coordinator_broker.go | 3 ++- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 14c4ed3007..2d8ce46c62 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -498,6 +498,18 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo { return c.store.GetNodesChannels() } +func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel { + channels := make([]*channel, 0) + for _, nodeChannels := range c.store.GetNodesChannels() { + for _, channelInfo := range nodeChannels.Channels { + if collectionID == channelInfo.CollectionID { + channels = append(channels, channelInfo) + } + } + } + return channels +} + // GetBufferChannels gets buffer channels. func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo { c.mu.RLock() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8e3292bc52..70080a5297 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -758,20 +758,11 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI Status: merr.Status(err), }, nil } - - dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) - if err != nil { - log.Error("get collection info from rootcoord failed", - zap.Error(err)) - - resp.Status = merr.Status(err) - return resp, nil - } - channels := dresp.GetVirtualChannelNames() + channels := s.channelManager.GetChannelsByCollectionID(collectionID) channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) flushedIDs := make(typeutil.UniqueSet) - for _, c := range channels { - channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionIDs...) + for _, ch := range channels { + channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...) channelInfos = append(channelInfos, channelInfo) log.Info("datacoord append channelInfo in GetRecoveryInfo", zap.String("channel", channelInfo.GetChannelName()), diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 8d509a9358..84b2d9cd98 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -114,8 +114,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.EqualValues(t, 0, len(resp.GetSegments())) - assert.EqualValues(t, 1, len(resp.GetChannels())) - assert.Nil(t, resp.GetChannels()[0].SeekPosition) + assert.EqualValues(t, 0, len(resp.GetChannels())) }) createSegment := func(id, collectionID, partitionID, numOfRows int64, posTs uint64, @@ -231,6 +230,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { }) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -307,6 +310,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -447,6 +454,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -489,6 +500,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -569,6 +584,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { IndexSize: 0, }) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 5153f46bd9..2b986757b3 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -150,7 +150,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti } recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest) if err != nil { - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) + log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), + zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) return nil, nil, err }