mirror of https://github.com/milvus-io/milvus.git
skip load/release segment when more than one delegator exist (#25718)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/25784/head
parent
168f346620
commit
1748c54fd7
|
@ -83,8 +83,22 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
|
|||
}
|
||||
|
||||
func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task {
|
||||
log := log.Ctx(ctx).WithRateGroup("qcv2.SegmentChecker", 1, 60).With(
|
||||
zap.Int64("collectionID", replica.CollectionID),
|
||||
zap.Int64("replicaID", replica.ID))
|
||||
ret := make([]task.Task, 0)
|
||||
|
||||
// get channel dist by replica (ch -> node list), cause more then one delegator may exists during channel balance.
|
||||
// if more than one delegator exist, load/release segment may causes chaos, so we can skip it until channel balance finished.
|
||||
dist := c.dist.ChannelDistManager.GetChannelDistByReplica(replica)
|
||||
for ch, nodes := range dist {
|
||||
if len(nodes) > 1 {
|
||||
log.Info("skip check segment due to two shard leader exists",
|
||||
zap.String("channelName", ch))
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
// compare with targets to find the lack and redundancy of segments
|
||||
lacks, redundancies := c.getHistoricalSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID())
|
||||
tasks := c.createSegmentLoadTasks(ctx, lacks, replica)
|
||||
|
@ -124,7 +138,7 @@ func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager,
|
|||
return
|
||||
}
|
||||
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 60, 1).With(
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 1, 60).With(
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("replicaID", replica.ID))
|
||||
|
||||
|
|
|
@ -136,7 +136,38 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
|
|||
suite.Equal(task.ActionTypeGrow, action.Type())
|
||||
suite.EqualValues(1, action.SegmentID())
|
||||
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {
|
||||
checker := suite.checker
|
||||
// set meta
|
||||
checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// set target
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 1,
|
||||
PartitionID: 1,
|
||||
InsertChannel: "test-insert-channel",
|
||||
},
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
nil, segments, nil)
|
||||
checker.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1))
|
||||
|
||||
// set dist
|
||||
checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel"))
|
||||
checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel"))
|
||||
checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 11, 1, 1, "test-insert-channel"))
|
||||
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
|
||||
|
||||
tasks := checker.Check(context.TODO())
|
||||
suite.Len(tasks, 0)
|
||||
}
|
||||
|
||||
func (suite *SegmentCheckerTestSuite) TestReleaseSegments() {
|
||||
|
|
|
@ -119,6 +119,27 @@ func (m *ChannelDistManager) GetShardLeadersByReplica(replica *Replica) map[stri
|
|||
return ret
|
||||
}
|
||||
|
||||
func (m *ChannelDistManager) GetChannelDistByReplica(replica *Replica) map[string][]int64 {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
||||
ret := make(map[string][]int64)
|
||||
for _, node := range replica.GetNodes() {
|
||||
channels := m.channels[node]
|
||||
for _, dmc := range channels {
|
||||
if dmc.GetCollectionID() == replica.GetCollectionID() {
|
||||
channelName := dmc.GetChannelName()
|
||||
_, ok := ret[channelName]
|
||||
if !ok {
|
||||
ret[channelName] = make([]int64, 0)
|
||||
}
|
||||
ret[channelName] = append(ret[channelName], node)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *ChannelDistManager) GetByCollection(collectionID UniqueID) []*DmChannel {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
|
|
@ -148,6 +148,47 @@ func (suite *ChannelDistManagerSuite) TestGetShardLeader() {
|
|||
suite.Equal(leaders["dmc1"], suite.nodes[1])
|
||||
}
|
||||
|
||||
func (suite *ChannelDistManagerSuite) TestGetChannelDistByReplica() {
|
||||
replica := NewReplica(
|
||||
&querypb.Replica{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
typeutil.NewUniqueSet(11, 22, 33),
|
||||
)
|
||||
|
||||
ch1 := &DmChannel{
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: "test-channel1",
|
||||
},
|
||||
Node: 11,
|
||||
Version: 1,
|
||||
}
|
||||
ch2 := &DmChannel{
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: "test-channel1",
|
||||
},
|
||||
Node: 22,
|
||||
Version: 1,
|
||||
}
|
||||
ch3 := &DmChannel{
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: "test-channel2",
|
||||
},
|
||||
Node: 33,
|
||||
Version: 1,
|
||||
}
|
||||
suite.dist.Update(11, ch1)
|
||||
suite.dist.Update(22, ch2)
|
||||
suite.dist.Update(33, ch3)
|
||||
|
||||
dist := suite.dist.GetChannelDistByReplica(replica)
|
||||
suite.Len(dist["test-channel1"], 2)
|
||||
suite.Len(dist["test-channel2"], 1)
|
||||
}
|
||||
|
||||
func (suite *ChannelDistManagerSuite) AssertNames(channels []*DmChannel, names ...string) bool {
|
||||
for _, channel := range channels {
|
||||
hasChannel := false
|
||||
|
|
Loading…
Reference in New Issue