diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 9e80984d64..56fc61e9aa 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -18,6 +18,7 @@ package checkers import ( "context" + "sort" "time" "github.com/samber/lo" @@ -32,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type SegmentChecker struct { @@ -207,9 +207,12 @@ func (c *SegmentChecker) getHistoricalSegmentDiff( return } dist := c.getHistoricalSegmentsDist(replica) - distMap := typeutil.NewUniqueSet() + sort.Slice(dist, func(i, j int) bool { + return dist[i].Version < dist[j].Version + }) + distMap := make(map[int64]int64) for _, s := range dist { - distMap.Insert(s.GetID()) + distMap[s.GetID()] = s.Node } nextTargetMap := c.targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.NextTarget) @@ -217,7 +220,15 @@ func (c *SegmentChecker) getHistoricalSegmentDiff( // Segment which exist on next target, but not on dist for segmentID, segment := range nextTargetMap { - if !distMap.Contain(segmentID) { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, + segment.GetInsertChannel(), + ) + node, ok := distMap[segmentID] + if !ok || + // the L0 segments have to been in the same node as the channel watched + leader != nil && + segment.GetLevel() == datapb.SegmentLevel_L0 && + node != leader.ID { toLoad = append(toLoad, segment) } } @@ -232,6 +243,16 @@ func (c *SegmentChecker) getHistoricalSegmentDiff( } } + level0Segments := lo.Filter(toLoad, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetLevel() == datapb.SegmentLevel_L0 + }) + // L0 segment found, + // QueryCoord loads the L0 segments first, + // to make sure all L0 delta logs will be delivered to the other segments. + if len(level0Segments) > 0 { + toLoad = level0Segments + } + return } @@ -312,25 +333,46 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] if len(segments) == 0 { return nil } - packedSegments := make([]*meta.Segment, 0, len(segments)) + + isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 + + shardSegments := make(map[string][]*meta.Segment) for _, s := range segments { - if len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 { + if isLevel0 && + len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 { continue } - packedSegments = append(packedSegments, &meta.Segment{SegmentInfo: s}) + channel := s.GetInsertChannel() + packedSegments := shardSegments[channel] + packedSegments = append(packedSegments, &meta.Segment{ + SegmentInfo: s, + }) + shardSegments[channel] = packedSegments } - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { - stop, err := c.nodeMgr.IsStoppingNode(node) - if err != nil { - return false + + plans := make([]balance.SegmentAssignPlan, 0) + for shard, segments := range shardSegments { + outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) + availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { + stop, err := c.nodeMgr.IsStoppingNode(node) + if err != nil { + return false + } + + if isLevel0 { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) + return !outboundNodes.Contain(node) && !stop && node == leader.ID + } + return !outboundNodes.Contain(node) && !stop + }) + + shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes) + for i := range shardPlans { + shardPlans[i].ReplicaID = replica.GetID() } - return !outboundNodes.Contain(node) && !stop - }) - plans := c.balancer.AssignSegment(replica.CollectionID, packedSegments, availableNodes) - for i := range plans { - plans[i].ReplicaID = replica.GetID() + plans = append(plans, shardPlans...) } + return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), plans) } diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 8d5078492b..2d58cce3fe 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -229,3 +229,19 @@ func (mgr *LeaderViewManager) GetLeadersByShard(shard string) map[int64]*LeaderV } return ret } + +func (mgr *LeaderViewManager) GetLatestLeadersByReplicaShard(replica *Replica, shard string) *LeaderView { + mgr.rwmutex.RLock() + defer mgr.rwmutex.RUnlock() + + var ret *LeaderView + for _, views := range mgr.views { + view, ok := views[shard] + if ok && + replica.Contains(view.ID) && + (ret == nil || ret.Version < view.Version) { + ret = view + } + } + return ret +}