mirror of https://github.com/milvus-io/milvus.git
Load should base on the accurate loaded number (#21133)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com>pull/21163/head
parent
2823d23db0
commit
124ff014da
|
@ -35,11 +35,13 @@ import (
|
|||
type CollectionObserver struct {
|
||||
stopCh chan struct{}
|
||||
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
broker meta.Broker
|
||||
handoffOb *HandoffObserver
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
broker meta.Broker
|
||||
handoffOb *HandoffObserver
|
||||
collectionLoadedCount map[int64]int
|
||||
partitionLoadedCount map[int64]int
|
||||
|
||||
refreshed map[int64]time.Time
|
||||
|
||||
|
@ -54,12 +56,14 @@ func NewCollectionObserver(
|
|||
handoffObserver *HandoffObserver,
|
||||
) *CollectionObserver {
|
||||
return &CollectionObserver{
|
||||
stopCh: make(chan struct{}),
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
broker: broker,
|
||||
handoffOb: handoffObserver,
|
||||
stopCh: make(chan struct{}),
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
broker: broker,
|
||||
handoffOb: handoffObserver,
|
||||
collectionLoadedCount: make(map[int64]int),
|
||||
partitionLoadedCount: make(map[int64]int),
|
||||
|
||||
refreshed: make(map[int64]time.Time),
|
||||
}
|
||||
|
@ -266,11 +270,13 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
|
|||
|
||||
updated := collection.Clone()
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
|
||||
if updated.LoadPercentage <= collection.LoadPercentage {
|
||||
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] {
|
||||
return
|
||||
}
|
||||
|
||||
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
|
||||
if loadedCount >= len(segmentTargets)+len(channelTargets) {
|
||||
delete(ob.collectionLoadedCount, collection.GetCollectionID())
|
||||
updated.Status = querypb.LoadStatus_Loaded
|
||||
ob.meta.CollectionManager.UpdateCollection(updated)
|
||||
ob.handoffOb.StartHandoff(updated.GetCollectionID())
|
||||
|
@ -329,11 +335,13 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
|
|||
|
||||
updated := partition.Clone()
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
|
||||
if updated.LoadPercentage <= partition.LoadPercentage {
|
||||
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] {
|
||||
return
|
||||
}
|
||||
|
||||
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||
if loadedCount >= len(segmentTargets)+len(channelTargets) {
|
||||
delete(ob.partitionLoadedCount, partition.GetPartitionID())
|
||||
updated.Status = querypb.LoadStatus_Loaded
|
||||
ob.meta.CollectionManager.UpdatePartition(updated)
|
||||
ob.handoffOb.StartHandoff(updated.GetCollectionID())
|
||||
|
|
|
@ -67,10 +67,11 @@ type CollectionObserverSuite struct {
|
|||
func (suite *CollectionObserverSuite) SetupSuite() {
|
||||
Params.Init()
|
||||
|
||||
suite.collections = []int64{100, 101}
|
||||
suite.collections = []int64{100, 101, 102}
|
||||
suite.partitions = map[int64][]int64{
|
||||
100: {10},
|
||||
101: {11, 12},
|
||||
102: {13},
|
||||
}
|
||||
suite.channels = map[int64][]*meta.DmChannel{
|
||||
100: {
|
||||
|
@ -93,6 +94,12 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||
ChannelName: "101-dmc1",
|
||||
}),
|
||||
},
|
||||
102: {
|
||||
meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||
CollectionID: 102,
|
||||
ChannelName: "102-dmc0",
|
||||
}),
|
||||
},
|
||||
}
|
||||
suite.segments = map[int64][]*datapb.SegmentInfo{
|
||||
100: {
|
||||
|
@ -123,18 +130,22 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||
InsertChannel: "101-dmc1",
|
||||
},
|
||||
},
|
||||
102: genSegmentsInfo(999, 5, 102, 13, "102-dmc0"),
|
||||
}
|
||||
suite.loadTypes = map[int64]querypb.LoadType{
|
||||
100: querypb.LoadType_LoadCollection,
|
||||
101: querypb.LoadType_LoadPartition,
|
||||
102: querypb.LoadType_LoadCollection,
|
||||
}
|
||||
suite.replicaNumber = map[int64]int32{
|
||||
100: 1,
|
||||
101: 1,
|
||||
102: 1,
|
||||
}
|
||||
suite.loadPercentage = map[int64]int32{
|
||||
100: 0,
|
||||
101: 50,
|
||||
102: 0,
|
||||
}
|
||||
suite.nodes = []int64{1, 2, 3}
|
||||
}
|
||||
|
@ -224,6 +235,8 @@ func (suite *CollectionObserverSuite) TestObservePartitionsTimeout() {
|
|||
const (
|
||||
timeout = 2 * time.Second
|
||||
)
|
||||
// time before load
|
||||
time := suite.meta.GetCollection(suite.collections[2]).UpdatedAt
|
||||
// Not timeout
|
||||
Params.QueryCoordCfg.LoadTimeoutSeconds = timeout
|
||||
suite.ob.Start(context.Background())
|
||||
|
@ -242,6 +255,17 @@ func (suite *CollectionObserverSuite) TestObservePartitionsTimeout() {
|
|||
Channel: "100-dmc1",
|
||||
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
|
||||
})
|
||||
suite.dist.LeaderViewManager.Update(3, &meta.LeaderView{
|
||||
ID: 3,
|
||||
CollectionID: 102,
|
||||
Channel: "102-dmc0",
|
||||
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 5, Version: 0}},
|
||||
})
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
return suite.isCollectionLoadedContinue(suite.collections[2], time)
|
||||
}, timeout*2, timeout/10)
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
return suite.isCollectionLoaded(suite.collections[0]) &&
|
||||
suite.isCollectionTimeout(suite.collections[1])
|
||||
|
@ -258,6 +282,10 @@ func (suite *CollectionObserverSuite) TestObserveCollectionRefresh() {
|
|||
for _, partition := range suite.partitions[100] {
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(100), partition).Return(nil, nil, nil)
|
||||
}
|
||||
for _, partition := range suite.partitions[102] {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(102)).Return(suite.partitions[102], nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(102), partition).Return(nil, nil, nil)
|
||||
}
|
||||
suite.ob.Start(context.Background())
|
||||
|
||||
// Collection 100 refreshed,
|
||||
|
@ -286,6 +314,10 @@ func (suite *CollectionObserverSuite) TestObservePartitionsRefresh() {
|
|||
for _, partition := range suite.partitions[101] {
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(101), partition).Return(nil, nil, nil)
|
||||
}
|
||||
for _, partition := range suite.partitions[102] {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, int64(102)).Return(suite.partitions[102], nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, int64(102), partition).Return(nil, nil, nil)
|
||||
}
|
||||
suite.ob.Start(context.Background())
|
||||
|
||||
// Collection 100 loaded,
|
||||
|
@ -333,6 +365,11 @@ func (suite *CollectionObserverSuite) isCollectionTimeout(collection int64) bool
|
|||
len(segments) > 0)
|
||||
}
|
||||
|
||||
func (suite *CollectionObserverSuite) isCollectionLoadedContinue(collection int64, beforeTime time.Time) bool {
|
||||
return suite.meta.GetCollection(collection).UpdatedAt.After(beforeTime)
|
||||
|
||||
}
|
||||
|
||||
func (suite *CollectionObserverSuite) loadAll() {
|
||||
for _, collection := range suite.collections {
|
||||
suite.load(collection)
|
||||
|
@ -381,3 +418,17 @@ func (suite *CollectionObserverSuite) load(collection int64) {
|
|||
func TestCollectionObserver(t *testing.T) {
|
||||
suite.Run(t, new(CollectionObserverSuite))
|
||||
}
|
||||
|
||||
func genSegmentsInfo(count int, start int, collID int64, partitionID int64, insertChannel string) []*datapb.SegmentInfo {
|
||||
ret := make([]*datapb.SegmentInfo, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
segment := &datapb.SegmentInfo{
|
||||
ID: int64(start + i),
|
||||
CollectionID: collID,
|
||||
PartitionID: partitionID,
|
||||
InsertChannel: insertChannel,
|
||||
}
|
||||
ret = append(ret, segment)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue