Make progress if any channel/segment was loaded on node (#20775) (#21163)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/21174/head
yah01 2022-12-12 19:33:22 +08:00 committed by GitHub
parent 124ff014da
commit f868144c46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 21 deletions

View File

@ -159,7 +159,7 @@ func (mgr *LeaderViewManager) GetLeadersByGrowingSegment(segmentID int64) *Leade
return nil
}
// GetSegmentDist returns the list of nodes the given segment on
// GetSegmentDist returns the list of nodes the given channel on
func (mgr *LeaderViewManager) GetChannelDist(channel string) []int64 {
mgr.rwmutex.RLock()
defer mgr.rwmutex.RUnlock()

View File

@ -237,7 +237,9 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
log.Info("collection targets",
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", len(channelTargets)),
zap.Int("totalTargetNum", targetNum))
zap.Int("totalTargetNum", targetNum),
zap.Int32("replicaNum", collection.GetReplicaNumber()),
)
if targetNum == 0 {
log.Info("collection released, skip it")
return
@ -248,34 +250,31 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
collection.GetCollectionID(),
ob.dist.LeaderViewManager.GetChannelDist(channel.GetChannelName()))
if len(group) >= int(collection.GetReplicaNumber()) {
loadedCount++
}
loadedCount += len(group)
}
subChannelCount := loadedCount
for _, segment := range segmentTargets {
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
collection.GetCollectionID(),
ob.dist.LeaderViewManager.GetSealedSegmentDist(segment.GetID()))
if len(group) >= int(collection.GetReplicaNumber()) {
loadedCount++
}
loadedCount += len(group)
}
if loadedCount > 0 {
log.Info("collection load progress",
zap.Int("sub-channel-count", subChannelCount),
zap.Int("load-segment-count", loadedCount-subChannelCount),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount),
)
}
updated := collection.Clone()
targetNum *= int(collection.GetReplicaNumber())
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] {
return
}
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
if loadedCount >= len(segmentTargets)+len(channelTargets) {
if loadedCount >= targetNum {
delete(ob.collectionLoadedCount, collection.GetCollectionID())
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.UpdateCollection(updated)
@ -303,7 +302,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
log.Info("partition targets",
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", len(channelTargets)),
zap.Int("totalTargetNum", targetNum))
zap.Int("totalTargetNum", targetNum),
zap.Int32("replicaNum", partition.GetReplicaNumber()),
)
if targetNum == 0 {
log.Info("partition released, skip it")
return
@ -314,33 +315,30 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
partition.GetCollectionID(),
ob.dist.LeaderViewManager.GetChannelDist(channel.GetChannelName()))
if len(group) >= int(partition.GetReplicaNumber()) {
loadedCount++
}
loadedCount += len(group)
}
subChannelCount := loadedCount
for _, segment := range segmentTargets {
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager,
partition.GetCollectionID(),
ob.dist.LeaderViewManager.GetSealedSegmentDist(segment.GetID()))
if len(group) >= int(partition.GetReplicaNumber()) {
loadedCount++
}
loadedCount += len(group)
}
if loadedCount > 0 {
log.Info("partition load progress",
zap.Int("sub-channel-count", subChannelCount),
zap.Int("load-segment-count", loadedCount-subChannelCount))
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
}
updated := partition.Clone()
targetNum *= int(partition.GetReplicaNumber())
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] {
return
}
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadedCount >= len(segmentTargets)+len(channelTargets) {
if loadedCount >= targetNum {
delete(ob.partitionLoadedCount, partition.GetPartitionID())
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.UpdatePartition(updated)