mirror of https://github.com/milvus-io/milvus.git
Fix observers may update current target to a unfinished next target (#21107)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/21144/head
parent
b850c4ae2c
commit
5ba1a94858
|
@ -395,6 +395,12 @@ func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, sco
|
|||
return collectionTarget.GetAllSegments()[id]
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
|
||||
newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)
|
||||
|
||||
return len(newChannels) > 0
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool {
|
||||
newChannels := mgr.GetDmChannelsByCollection(collectionID, NextTarget)
|
||||
|
||||
|
|
|
@ -127,7 +127,6 @@ func (suite *TargetManagerSuite) SetupTest() {
|
|||
}
|
||||
|
||||
suite.mgr.UpdateCollectionNextTargetWithPartitions(collection, suite.partitions[collection]...)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -133,7 +133,13 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) {
|
|||
}
|
||||
|
||||
func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
|
||||
replicaNum := len(ob.meta.ReplicaManager.GetByCollection(collectionID))
|
||||
// Collection observer will update the current target as loading done,
|
||||
// avoid double updating, which will cause update current target to a unfinished next target
|
||||
if !ob.targetMgr.IsCurrentTargetExist(collectionID) {
|
||||
return false
|
||||
}
|
||||
|
||||
replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID)
|
||||
|
||||
// check channel first
|
||||
channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
|
||||
|
@ -146,7 +152,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
|
|||
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
|
||||
collectionID,
|
||||
ob.distMgr.LeaderViewManager.GetChannelDist(channel.GetChannelName()))
|
||||
if len(group) < replicaNum {
|
||||
if int32(len(group)) < replicaNum {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -157,7 +163,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
|
|||
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
|
||||
collectionID,
|
||||
ob.distMgr.LeaderViewManager.GetSealedSegmentDist(segment.GetID()))
|
||||
if len(group) < replicaNum {
|
||||
if int32(len(group)) < replicaNum {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
|
@ -79,15 +80,16 @@ func (suite *TargetObserverSuite) SetupTest() {
|
|||
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.distMgr = meta.NewDistributionManager()
|
||||
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker)
|
||||
|
||||
suite.observer.Start(context.TODO())
|
||||
|
||||
suite.collectionID = int64(1000)
|
||||
suite.partitionID = int64(100)
|
||||
|
||||
err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
|
||||
suite.NoError(err)
|
||||
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
|
||||
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1)
|
||||
suite.NoError(err)
|
||||
replicas[0].AddNode(2)
|
||||
err = suite.meta.ReplicaManager.Put(replicas...)
|
||||
suite.NoError(err)
|
||||
|
||||
suite.nextTargetChannels = []*datapb.VchannelInfo{
|
||||
|
@ -118,25 +120,75 @@ func (suite *TargetObserverSuite) SetupTest() {
|
|||
|
||||
func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 2
|
||||
return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 2 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
|
||||
suite.distMgr.LeaderViewManager.Update(2,
|
||||
&meta.LeaderView{
|
||||
ID: 2,
|
||||
CollectionID: suite.collectionID,
|
||||
Channel: "channel-1",
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
11: {NodeID: 2},
|
||||
},
|
||||
},
|
||||
&meta.LeaderView{
|
||||
ID: 2,
|
||||
CollectionID: suite.collectionID,
|
||||
Channel: "channel-2",
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
12: {NodeID: 2},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Never update current target if it's empty, even the next target is ready
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
return len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.CurrentTarget)) == 0
|
||||
}, 3*time.Second, 1*time.Second)
|
||||
|
||||
suite.distMgr.SegmentDistManager.Update(2, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 11, 2, 0, "channel-1"))
|
||||
suite.distMgr.SegmentDistManager.Update(2, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 12, 2, 1, "channel-2"))
|
||||
suite.distMgr.ChannelDistManager.Update(2, utils.CreateTestChannel(suite.collectionID, 2, 0, "channel-1"))
|
||||
suite.distMgr.ChannelDistManager.Update(2, utils.CreateTestChannel(suite.collectionID, 2, 1, "channel-2"))
|
||||
suite.broker.AssertExpectations(suite.T())
|
||||
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
|
||||
suite.nextTargetSegments = append(suite.nextTargetSegments, &datapb.SegmentBinlogs{
|
||||
SegmentID: 13,
|
||||
InsertChannel: "channel-1",
|
||||
})
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partitionID}, nil)
|
||||
suite.targetMgr.UpdateCollectionCurrentTarget(suite.collectionID)
|
||||
|
||||
// Pull next again
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.CurrentTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 7*time.Second, 1*time.Second)
|
||||
|
||||
suite.distMgr.LeaderViewManager.Update(2,
|
||||
&meta.LeaderView{
|
||||
ID: 2,
|
||||
CollectionID: suite.collectionID,
|
||||
Channel: "channel-1",
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
11: {NodeID: 2},
|
||||
13: {NodeID: 2},
|
||||
},
|
||||
},
|
||||
&meta.LeaderView{
|
||||
ID: 2,
|
||||
CollectionID: suite.collectionID,
|
||||
Channel: "channel-2",
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
12: {NodeID: 2},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Able to update current if it's not empty
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.CurrentTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.CurrentTarget)) == 3 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.CurrentTarget)) == 2
|
||||
}, 7*time.Second, 1*time.Second)
|
||||
}
|
||||
|
||||
func (suite *TargetObserverSuite) TearDownSuite() {
|
||||
|
|
Loading…
Reference in New Issue