Fix load collection blocks when memory is limited. (#21990) (#22112)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/22333/head
Bingyi Sun 2023-02-10 14:14:32 +08:00 committed by GitHub
parent 24a4d231eb
commit f7ff6ab5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 47 deletions

View File

@ -17,8 +17,6 @@
package meta
import (
"time"
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -28,14 +26,12 @@ import (
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
createTime time.Time
}
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget {
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
createTime: time.Now(),
}
}
@ -55,10 +51,6 @@ func (p *CollectionTarget) GetAllDmChannelNames() []string {
return lo.Keys(p.dmChannels)
}
func (p *CollectionTarget) GetCreateTime() time.Time {
return p.createTime
}
func (p *CollectionTarget) IsEmpty() bool {
return len(p.dmChannels)+len(p.segments) == 0
}

View File

@ -20,7 +20,6 @@ import (
"context"
"errors"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -396,17 +395,6 @@ func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, sco
return collectionTarget.GetAllSegments()[id]
}
func (mgr *TargetManager) GetNextTargetCreateTime(collectionID int64) time.Time {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targetMap := mgr.getTarget(NextTarget)
collectionTarget := targetMap.getCollectionTarget(collectionID)
if collectionTarget == nil {
return time.Time{}
}
return collectionTarget.GetCreateTime()
}
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)

View File

@ -33,13 +33,12 @@ import (
type CollectionObserver struct {
stopCh chan struct{}
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
targetObserver *TargetObserver
collectionLoadedCount map[int64]int
partitionLoadedCount map[int64]int
collectionNextTargetTime map[int64]time.Time
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
targetObserver *TargetObserver
collectionLoadedCount map[int64]int
partitionLoadedCount map[int64]int
stopOnce sync.Once
}
@ -51,14 +50,13 @@ func NewCollectionObserver(
targetObserver *TargetObserver,
) *CollectionObserver {
return &CollectionObserver{
stopCh: make(chan struct{}),
dist: dist,
meta: meta,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionLoadedCount: make(map[int64]int),
partitionLoadedCount: make(map[int64]int),
collectionNextTargetTime: make(map[int64]time.Time),
stopCh: make(chan struct{}),
dist: dist,
meta: meta,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionLoadedCount: make(map[int64]int),
partitionLoadedCount: make(map[int64]int),
}
}
@ -201,16 +199,12 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(collection.GetReplicaNumber())))
}
targetTime := ob.targetMgr.GetNextTargetCreateTime(collection.CollectionID)
lastTime, ok := ob.collectionNextTargetTime[collection.CollectionID]
if ok && targetTime.Equal(lastTime) &&
loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] &&
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] &&
updated.LoadPercentage != 100 {
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
return
}
ob.collectionNextTargetTime[collection.GetCollectionID()] = targetTime
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
delete(ob.collectionLoadedCount, collection.GetCollectionID())
@ -270,15 +264,11 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(partition.GetReplicaNumber())))
}
targetTime := ob.targetMgr.GetNextTargetCreateTime(partition.GetCollectionID())
lastTime, ok := ob.collectionNextTargetTime[partition.GetCollectionID()]
if ok && targetTime.Equal(lastTime) &&
loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] &&
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] &&
updated.LoadPercentage != 100 {
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
return
}
ob.collectionNextTargetTime[partition.GetCollectionID()] = targetTime
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
delete(ob.partitionLoadedCount, partition.GetPartitionID())