mirror of https://github.com/milvus-io/milvus.git
Fix load timeout after next target updates (#21759)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/21772/head
parent
e64f55cd27
commit
b91bb5a729
|
@ -304,13 +304,6 @@ func (m *CollectionManager) PutCollection(collection *Collection) error {
|
|||
return m.putCollection(collection, true)
|
||||
}
|
||||
|
||||
func (m *CollectionManager) PutCollectionWithoutSave(collection *Collection) {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
m.putCollection(collection, false)
|
||||
}
|
||||
|
||||
func (m *CollectionManager) UpdateCollection(collection *Collection) error {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -26,12 +28,14 @@ import (
|
|||
type CollectionTarget struct {
|
||||
segments map[int64]*datapb.SegmentInfo
|
||||
dmChannels map[string]*DmChannel
|
||||
createTime time.Time
|
||||
}
|
||||
|
||||
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget {
|
||||
return &CollectionTarget{
|
||||
segments: segments,
|
||||
dmChannels: dmChannels,
|
||||
createTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,6 +55,10 @@ func (p *CollectionTarget) GetAllDmChannelNames() []string {
|
|||
return lo.Keys(p.dmChannels)
|
||||
}
|
||||
|
||||
func (p *CollectionTarget) GetCreateTime() time.Time {
|
||||
return p.createTime
|
||||
}
|
||||
|
||||
func (p *CollectionTarget) IsEmpty() bool {
|
||||
return len(p.dmChannels)+len(p.segments) == 0
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -402,6 +403,17 @@ func (mgr *TargetManager) GetHistoricalSegment(collectionID int64, id int64, sco
|
|||
return collectionTarget.GetAllSegments()[id]
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) GetNextTargetCreateTime(collectionID int64) time.Time {
|
||||
mgr.rwMutex.RLock()
|
||||
defer mgr.rwMutex.RUnlock()
|
||||
targetMap := mgr.getTarget(NextTarget)
|
||||
collectionTarget := targetMap.getCollectionTarget(collectionID)
|
||||
if collectionTarget == nil {
|
||||
return time.Time{}
|
||||
}
|
||||
return collectionTarget.GetCreateTime()
|
||||
}
|
||||
|
||||
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
|
||||
newChannels := mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)
|
||||
|
||||
|
|
|
@ -34,12 +34,13 @@ import (
|
|||
type CollectionObserver struct {
|
||||
stopCh chan struct{}
|
||||
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetObserver *TargetObserver
|
||||
collectionLoadedCount map[int64]int
|
||||
partitionLoadedCount map[int64]int
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetObserver *TargetObserver
|
||||
collectionLoadedCount map[int64]int
|
||||
partitionLoadedCount map[int64]int
|
||||
collectionNextTargetTime map[int64]time.Time
|
||||
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
@ -51,13 +52,14 @@ func NewCollectionObserver(
|
|||
targetObserver *TargetObserver,
|
||||
) *CollectionObserver {
|
||||
return &CollectionObserver{
|
||||
stopCh: make(chan struct{}),
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
targetObserver: targetObserver,
|
||||
collectionLoadedCount: make(map[int64]int),
|
||||
partitionLoadedCount: make(map[int64]int),
|
||||
stopCh: make(chan struct{}),
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
targetObserver: targetObserver,
|
||||
collectionLoadedCount: make(map[int64]int),
|
||||
partitionLoadedCount: make(map[int64]int),
|
||||
collectionNextTargetTime: make(map[int64]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,9 +202,16 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
|
|||
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(collection.GetReplicaNumber())))
|
||||
}
|
||||
|
||||
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && updated.LoadPercentage != 100 {
|
||||
targetTime := ob.targetMgr.GetNextTargetCreateTime(collection.CollectionID)
|
||||
lastTime, ok := ob.collectionNextTargetTime[collection.CollectionID]
|
||||
|
||||
if ok && targetTime.Equal(lastTime) &&
|
||||
loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] &&
|
||||
updated.LoadPercentage != 100 {
|
||||
return
|
||||
}
|
||||
|
||||
ob.collectionNextTargetTime[collection.GetCollectionID()] = targetTime
|
||||
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
|
||||
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
|
||||
delete(ob.collectionLoadedCount, collection.GetCollectionID())
|
||||
|
@ -260,12 +269,17 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
|
|||
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
|
||||
}
|
||||
updated.LoadPercentage = int32(loadedCount * 100 / (targetNum * int(partition.GetReplicaNumber())))
|
||||
|
||||
}
|
||||
|
||||
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && updated.LoadPercentage != 100 {
|
||||
targetTime := ob.targetMgr.GetNextTargetCreateTime(partition.GetCollectionID())
|
||||
lastTime, ok := ob.collectionNextTargetTime[partition.GetCollectionID()]
|
||||
|
||||
if ok && targetTime.Equal(lastTime) &&
|
||||
loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] &&
|
||||
updated.LoadPercentage != 100 {
|
||||
return
|
||||
}
|
||||
ob.collectionNextTargetTime[partition.GetCollectionID()] = targetTime
|
||||
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
|
||||
delete(ob.partitionLoadedCount, partition.GetPartitionID())
|
||||
|
|
Loading…
Reference in New Issue