mirror of https://github.com/milvus-io/milvus.git
parent
949c320185
commit
5602b22531
|
@ -89,7 +89,7 @@ func (c *ChannelChecker) Check(ctx context.Context) []task.Task {
|
|||
func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task {
|
||||
ret := make([]task.Task, 0)
|
||||
|
||||
lacks, redundancies := c.getDmChannelDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
lacks, redundancies := c.getDmChannelDiff(replica.GetCollectionID(), replica.GetID())
|
||||
tasks := c.createChannelLoadTask(ctx, lacks, replica)
|
||||
task.SetReason("lacks of channel", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
@ -98,7 +98,7 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
task.SetReason("collection released", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
repeated := c.findRepeatedChannels(c.dist, c.meta, replica.GetID())
|
||||
repeated := c.findRepeatedChannels(replica.GetID())
|
||||
tasks = c.createChannelReduceTasks(ctx, repeated, replica.GetID())
|
||||
task.SetReason("redundancies of channel")
|
||||
ret = append(ret, tasks...)
|
||||
|
@ -109,25 +109,22 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
}
|
||||
|
||||
// GetDmChannelDiff get channel diff between target and dist
|
||||
func (c *ChannelChecker) getDmChannelDiff(targetMgr *meta.TargetManager,
|
||||
distMgr *meta.DistributionManager,
|
||||
metaInfo *meta.Meta,
|
||||
collectionID int64,
|
||||
func (c *ChannelChecker) getDmChannelDiff(collectionID int64,
|
||||
replicaID int64) (toLoad, toRelease []*meta.DmChannel) {
|
||||
replica := metaInfo.Get(replicaID)
|
||||
replica := c.meta.Get(replicaID)
|
||||
if replica == nil {
|
||||
log.Info("replica does not exist, skip it")
|
||||
return
|
||||
}
|
||||
|
||||
dist := c.getChannelDist(distMgr, replica)
|
||||
dist := c.getChannelDist(replica)
|
||||
distMap := typeutil.NewSet[string]()
|
||||
for _, ch := range dist {
|
||||
distMap.Insert(ch.GetChannelName())
|
||||
}
|
||||
|
||||
nextTargetMap := targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
nextTargetMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
// get channels which exists on dist, but not exist on current and next
|
||||
for _, ch := range dist {
|
||||
|
@ -149,25 +146,23 @@ func (c *ChannelChecker) getDmChannelDiff(targetMgr *meta.TargetManager,
|
|||
return
|
||||
}
|
||||
|
||||
func (c *ChannelChecker) getChannelDist(distMgr *meta.DistributionManager, replica *meta.Replica) []*meta.DmChannel {
|
||||
func (c *ChannelChecker) getChannelDist(replica *meta.Replica) []*meta.DmChannel {
|
||||
dist := make([]*meta.DmChannel, 0)
|
||||
for _, nodeID := range replica.GetNodes() {
|
||||
dist = append(dist, distMgr.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)...)
|
||||
dist = append(dist, c.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID)...)
|
||||
}
|
||||
return dist
|
||||
}
|
||||
|
||||
func (c *ChannelChecker) findRepeatedChannels(distMgr *meta.DistributionManager,
|
||||
metaInfo *meta.Meta,
|
||||
replicaID int64) []*meta.DmChannel {
|
||||
replica := metaInfo.Get(replicaID)
|
||||
func (c *ChannelChecker) findRepeatedChannels(replicaID int64) []*meta.DmChannel {
|
||||
replica := c.meta.Get(replicaID)
|
||||
ret := make([]*meta.DmChannel, 0)
|
||||
|
||||
if replica == nil {
|
||||
log.Info("replica does not exist, skip it")
|
||||
return ret
|
||||
}
|
||||
dist := c.getChannelDist(distMgr, replica)
|
||||
dist := c.getChannelDist(replica)
|
||||
|
||||
versionsMap := make(map[string]*meta.DmChannel)
|
||||
for _, ch := range dist {
|
||||
|
|
|
@ -109,7 +109,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
}
|
||||
|
||||
// compare with targets to find the lack and redundancy of segments
|
||||
lacks, redundancies := c.getHistoricalSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
lacks, redundancies := c.getHistoricalSegmentDiff(replica.GetCollectionID(), replica.GetID())
|
||||
tasks := c.createSegmentLoadTasks(ctx, lacks, replica)
|
||||
task.SetReason("lacks of segment", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
@ -120,14 +120,14 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
ret = append(ret, tasks...)
|
||||
|
||||
// compare inner dists to find repeated loaded segments
|
||||
redundancies = c.findRepeatedHistoricalSegments(c.dist, c.meta, replica.GetID())
|
||||
redundancies = c.findRepeatedHistoricalSegments(replica.GetID())
|
||||
redundancies = c.filterExistedOnLeader(replica, redundancies)
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
|
||||
task.SetReason("redundancies of segment", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
||||
// compare with target to find the lack and redundancy of segments
|
||||
_, redundancies = c.getStreamingSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
_, redundancies = c.getStreamingSegmentDiff(replica.GetCollectionID(), replica.GetID())
|
||||
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Streaming)
|
||||
task.SetReason("streaming segment not exists in target", tasks...)
|
||||
ret = append(ret, tasks...)
|
||||
|
@ -136,12 +136,9 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
|
|||
}
|
||||
|
||||
// GetStreamingSegmentDiff get streaming segment diff between leader view and target
|
||||
func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager,
|
||||
distMgr *meta.DistributionManager,
|
||||
metaInfo *meta.Meta,
|
||||
collectionID int64,
|
||||
func (c *SegmentChecker) getStreamingSegmentDiff(collectionID int64,
|
||||
replicaID int64) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) {
|
||||
replica := metaInfo.Get(replicaID)
|
||||
replica := c.meta.Get(replicaID)
|
||||
if replica == nil {
|
||||
log.Info("replica does not exist, skip it")
|
||||
return
|
||||
|
@ -151,15 +148,15 @@ func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager,
|
|||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("replicaID", replica.ID))
|
||||
|
||||
leaders := distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
|
||||
leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
|
||||
// distMgr.LeaderViewManager.
|
||||
for channelName, node := range leaders {
|
||||
view := distMgr.LeaderViewManager.GetLeaderShardView(node, channelName)
|
||||
view := c.dist.LeaderViewManager.GetLeaderShardView(node, channelName)
|
||||
if view == nil {
|
||||
log.Info("leaderView is not ready, skip", zap.String("channelName", channelName), zap.Int64("node", node))
|
||||
continue
|
||||
}
|
||||
targetVersion := targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)
|
||||
targetVersion := c.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)
|
||||
if view.TargetVersion != targetVersion {
|
||||
// before shard delegator update it's readable version, skip release segment
|
||||
log.RatedInfo(20, "before shard delegator update it's readable version, skip release segment",
|
||||
|
@ -171,9 +168,9 @@ func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager,
|
|||
continue
|
||||
}
|
||||
|
||||
nextTargetSegmentIDs := targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetSegmentIDs := targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
currentTargetChannelMap := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
nextTargetSegmentIDs := c.targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetSegmentIDs := c.targetMgr.GetStreamingSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
currentTargetChannelMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
// get segment which exist on leader view, but not on current target and next target
|
||||
for _, segment := range view.GrowingSegments {
|
||||
|
@ -198,24 +195,21 @@ func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager,
|
|||
|
||||
// GetHistoricalSegmentDiff get historical segment diff between target and dist
|
||||
func (c *SegmentChecker) getHistoricalSegmentDiff(
|
||||
targetMgr *meta.TargetManager,
|
||||
distMgr *meta.DistributionManager,
|
||||
metaInfo *meta.Meta,
|
||||
collectionID int64,
|
||||
replicaID int64) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) {
|
||||
replica := metaInfo.Get(replicaID)
|
||||
replica := c.meta.Get(replicaID)
|
||||
if replica == nil {
|
||||
log.Info("replica does not exist, skip it")
|
||||
return
|
||||
}
|
||||
dist := c.getHistoricalSegmentsDist(distMgr, replica)
|
||||
dist := c.getHistoricalSegmentsDist(replica)
|
||||
distMap := typeutil.NewUniqueSet()
|
||||
for _, s := range dist {
|
||||
distMap.Insert(s.GetID())
|
||||
}
|
||||
|
||||
nextTargetMap := targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
nextTargetMap := c.targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := c.targetMgr.GetHistoricalSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
// Segment which exist on next target, but not on dist
|
||||
for segmentID, segment := range nextTargetMap {
|
||||
|
@ -237,24 +231,22 @@ func (c *SegmentChecker) getHistoricalSegmentDiff(
|
|||
return
|
||||
}
|
||||
|
||||
func (c *SegmentChecker) getHistoricalSegmentsDist(distMgr *meta.DistributionManager, replica *meta.Replica) []*meta.Segment {
|
||||
func (c *SegmentChecker) getHistoricalSegmentsDist(replica *meta.Replica) []*meta.Segment {
|
||||
ret := make([]*meta.Segment, 0)
|
||||
for _, node := range replica.GetNodes() {
|
||||
ret = append(ret, distMgr.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...)
|
||||
ret = append(ret, c.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (c *SegmentChecker) findRepeatedHistoricalSegments(distMgr *meta.DistributionManager,
|
||||
metaInfo *meta.Meta,
|
||||
replicaID int64) []*meta.Segment {
|
||||
func (c *SegmentChecker) findRepeatedHistoricalSegments(replicaID int64) []*meta.Segment {
|
||||
segments := make([]*meta.Segment, 0)
|
||||
replica := metaInfo.Get(replicaID)
|
||||
replica := c.meta.Get(replicaID)
|
||||
if replica == nil {
|
||||
log.Info("replica does not exist, skip it")
|
||||
return segments
|
||||
}
|
||||
dist := c.getHistoricalSegmentsDist(distMgr, replica)
|
||||
dist := c.getHistoricalSegmentsDist(replica)
|
||||
versions := make(map[int64]*meta.Segment)
|
||||
for _, s := range dist {
|
||||
maxVer, ok := versions[s.GetID()]
|
||||
|
|
|
@ -315,28 +315,6 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect
|
|||
return NewCollectionTarget(segments, channels)
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) removePartitionGrowingSegmentFromChannel(partitionIDSet typeutil.UniqueSet,
|
||||
oldChannel *DmChannel) *DmChannel {
|
||||
newChannel := oldChannel.Clone()
|
||||
|
||||
notMatchPartition := func(s *datapb.SegmentInfo, _ int) bool {
|
||||
return !partitionIDSet.Contain(s.GetPartitionID())
|
||||
}
|
||||
|
||||
getSegmentID := func(s *datapb.SegmentInfo, _ int) int64 {
|
||||
return s.GetID()
|
||||
}
|
||||
|
||||
newChannel.UnflushedSegments = lo.Filter(newChannel.GetUnflushedSegments(), notMatchPartition)
|
||||
newChannel.UnflushedSegmentIds = lo.Map(newChannel.GetUnflushedSegments(), getSegmentID)
|
||||
newChannel.FlushedSegments = lo.Filter(newChannel.GetFlushedSegments(), notMatchPartition)
|
||||
newChannel.FlushedSegmentIds = lo.Map(newChannel.GetFlushedSegments(), getSegmentID)
|
||||
newChannel.DroppedSegments = lo.Filter(newChannel.GetDroppedSegments(), notMatchPartition)
|
||||
newChannel.DroppedSegmentIds = lo.Map(newChannel.GetDroppedSegments(), getSegmentID)
|
||||
|
||||
return newChannel
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) getTarget(scope TargetScope) *target {
|
||||
if scope == CurrentTarget {
|
||||
return mgr.current
|
||||
|
|
Loading…
Reference in New Issue