diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 31bfdf5319..5174ee3998 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -195,10 +195,19 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons } } + var version int64 + for _, channel := range resp.GetChannels() { + if channel.GetChannel() == lview.GetChannel() { + version = channel.GetVersion() + break + } + } + view := &meta.LeaderView{ ID: resp.GetNodeID(), CollectionID: lview.GetCollection(), Channel: lview.GetChannel(), + Version: version, Segments: lview.GetSegmentDist(), GrowingSegments: segments, } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 8200c7bf6b..98a3e3d40c 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -374,3 +374,31 @@ func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error { } return nil } + +func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { + type leaderID struct { + ReplicaID int64 + Shard string + } + + newLeaders := make(map[leaderID]*meta.LeaderView) + for _, view := range leaders { + replica := replicaManager.GetByCollectionAndNode(view.CollectionID, view.ID) + if replica == nil { + continue + } + + id := leaderID{replica.GetID(), view.Channel} + if old, ok := newLeaders[id]; ok && old.Version > view.Version { + continue + } + + newLeaders[id] = view + } + + result := make(map[int64]*meta.LeaderView) + for _, v := range newLeaders { + result[v.ID] = v + } + return result +} diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 19c65b633b..f0f7085ee1 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -27,6 +27,7 @@ type LeaderView struct { ID int64 CollectionID int64 Channel string + Version int64 Segments map[int64]*querypb.SegmentDist GrowingSegments map[int64]*Segment } @@ -46,6 +47,7 @@ func (view *LeaderView) Clone() *LeaderView { ID: view.ID, CollectionID: view.CollectionID, Channel: view.Channel, + Version: view.Version, Segments: segments, GrowingSegments: growings, } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 5c86548a58..313a6fdc4a 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -859,6 +859,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade log := log.With(zap.String("channel", channel.GetChannelName())) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) + leaders = filterDupLeaders(s.meta.ReplicaManager, leaders) ids := make([]int64, 0, len(leaders)) addrs := make([]string, 0, len(leaders))