mirror of https://github.com/milvus-io/milvus.git
Fix load progress bug (#21069)
issue: #21023 Signed-off-by: sunby <bingyi.sun@zilliz.com> Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/21082/head
parent
18cad3a1fb
commit
27219ce9c0
|
@ -196,14 +196,14 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
|
|||
)
|
||||
}
|
||||
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / targetNum * int(collection.GetReplicaNumber()))
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(collection.GetReplicaNumber())))
|
||||
}
|
||||
|
||||
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && updated.LoadPercentage != 100 {
|
||||
return
|
||||
}
|
||||
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
|
||||
if loadedCount >= targetNum {
|
||||
if updated.LoadPercentage == 100 {
|
||||
ob.targetMgr.UpdateCollectionCurrentTarget(updated.CollectionID)
|
||||
updated.Status = querypb.LoadStatus_Loaded
|
||||
ob.meta.CollectionManager.UpdateCollection(updated)
|
||||
|
@ -258,7 +258,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
|
|||
zap.Int("subChannelCount", subChannelCount),
|
||||
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
|
||||
}
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / targetNum * int(partition.GetReplicaNumber()))
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(partition.GetReplicaNumber())))
|
||||
|
||||
}
|
||||
|
||||
|
@ -266,7 +266,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
|
|||
return
|
||||
}
|
||||
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||
if loadedCount >= targetNum {
|
||||
if updated.LoadPercentage == 100 {
|
||||
ob.targetMgr.UpdateCollectionCurrentTarget(partition.GetCollectionID(), partition.GetPartitionID())
|
||||
updated.Status = querypb.LoadStatus_Loaded
|
||||
ob.meta.CollectionManager.PutPartition(updated)
|
||||
|
|
|
@ -40,14 +40,13 @@ type CollectionObserverSuite struct {
|
|||
suite.Suite
|
||||
|
||||
// Data
|
||||
collections []int64
|
||||
partitions map[int64][]int64 // CollectionID -> PartitionIDs
|
||||
channels map[int64][]*meta.DmChannel
|
||||
segments map[int64][]*datapb.SegmentInfo // CollectionID -> []datapb.SegmentInfo
|
||||
loadTypes map[int64]querypb.LoadType
|
||||
replicaNumber map[int64]int32
|
||||
loadPercentage map[int64]int32
|
||||
nodes []int64
|
||||
collections []int64
|
||||
partitions map[int64][]int64 // CollectionID -> PartitionIDs
|
||||
channels map[int64][]*meta.DmChannel
|
||||
segments map[int64][]*datapb.SegmentInfo // CollectionID -> []datapb.SegmentInfo
|
||||
loadTypes map[int64]querypb.LoadType
|
||||
replicaNumber map[int64]int32
|
||||
nodes []int64
|
||||
|
||||
// Mocks
|
||||
idAllocator func() (int64, error)
|
||||
|
@ -68,11 +67,12 @@ type CollectionObserverSuite struct {
|
|||
func (suite *CollectionObserverSuite) SetupSuite() {
|
||||
Params.Init()
|
||||
|
||||
suite.collections = []int64{100, 101, 102}
|
||||
suite.collections = []int64{100, 101, 102, 103}
|
||||
suite.partitions = map[int64][]int64{
|
||||
100: {10},
|
||||
101: {11, 12},
|
||||
102: {13},
|
||||
103: {14},
|
||||
}
|
||||
suite.channels = map[int64][]*meta.DmChannel{
|
||||
100: {
|
||||
|
@ -101,6 +101,12 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||
ChannelName: "102-dmc0",
|
||||
}),
|
||||
},
|
||||
103: {
|
||||
meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||
CollectionID: 103,
|
||||
ChannelName: "103-dmc0",
|
||||
}),
|
||||
},
|
||||
}
|
||||
suite.segments = map[int64][]*datapb.SegmentInfo{
|
||||
100: {
|
||||
|
@ -132,21 +138,19 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||
},
|
||||
},
|
||||
102: genSegmentsInfo(999, 5, 102, 13, "102-dmc0"),
|
||||
103: genSegmentsInfo(10, 2000, 103, 14, "103-dmc0"),
|
||||
}
|
||||
suite.loadTypes = map[int64]querypb.LoadType{
|
||||
100: querypb.LoadType_LoadCollection,
|
||||
101: querypb.LoadType_LoadPartition,
|
||||
102: querypb.LoadType_LoadCollection,
|
||||
103: querypb.LoadType_LoadCollection,
|
||||
}
|
||||
suite.replicaNumber = map[int64]int32{
|
||||
100: 1,
|
||||
101: 1,
|
||||
102: 1,
|
||||
}
|
||||
suite.loadPercentage = map[int64]int32{
|
||||
100: 0,
|
||||
101: 50,
|
||||
102: 0,
|
||||
103: 2,
|
||||
}
|
||||
suite.nodes = []int64{1, 2, 3}
|
||||
}
|
||||
|
@ -231,6 +235,22 @@ func (suite *CollectionObserverSuite) TestObserve() {
|
|||
Channel: "102-dmc0",
|
||||
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 5, Version: 0}},
|
||||
})
|
||||
|
||||
segmentsInfo, ok := suite.segments[103]
|
||||
suite.True(ok)
|
||||
lview := &meta.LeaderView{
|
||||
ID: 3,
|
||||
CollectionID: 13,
|
||||
Channel: "103-dmc0",
|
||||
Segments: make(map[int64]*querypb.SegmentDist),
|
||||
}
|
||||
for _, segment := range segmentsInfo {
|
||||
lview.Segments[segment.GetID()] = &querypb.SegmentDist{
|
||||
NodeID: 3, Version: 0,
|
||||
}
|
||||
}
|
||||
suite.dist.LeaderViewManager.Update(3, lview)
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
return suite.isCollectionLoaded(suite.collections[0])
|
||||
}, timeout*2, timeout/10)
|
||||
|
@ -242,6 +262,10 @@ func (suite *CollectionObserverSuite) TestObserve() {
|
|||
suite.Eventually(func() bool {
|
||||
return suite.isCollectionTimeout(suite.collections[1])
|
||||
}, timeout*2, timeout/10)
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
return suite.isCollectionLoaded(suite.collections[3])
|
||||
}, timeout*2, timeout/10)
|
||||
}
|
||||
|
||||
func (suite *CollectionObserverSuite) isCollectionLoaded(collection int64) bool {
|
||||
|
|
Loading…
Reference in New Issue