mirror of https://github.com/milvus-io/milvus.git
fix: Fix load slowly (#37454)
When there're a lot of loaded collections, they would occupy the target observer scheduler’s pool. This prevents loading collections from updating the current target in time, slowing down the load process. This PR adds a separate target dispatcher for loading collections. issue: https://github.com/milvus-io/milvus/issues/37166 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/37558/head
parent
ce2fa3de01
commit
ff9bdf7029
|
@ -159,7 +159,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
|
||||||
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
|
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
|
||||||
return partition.PartitionID
|
return partition.PartitionID
|
||||||
})
|
})
|
||||||
allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs)
|
|
||||||
|
|
||||||
channelInfos := make(map[string][]*datapb.VchannelInfo)
|
channelInfos := make(map[string][]*datapb.VchannelInfo)
|
||||||
segments := make(map[int64]*datapb.SegmentInfo, 0)
|
segments := make(map[int64]*datapb.SegmentInfo, 0)
|
||||||
|
@ -194,7 +193,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs))
|
allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs)
|
||||||
|
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
|
||||||
log.Debug("finish to update next targets for collection",
|
log.Debug("finish to update next targets for collection",
|
||||||
zap.Int64("collectionID", collectionID),
|
zap.Int64("collectionID", collectionID),
|
||||||
zap.Int64s("PartitionIDs", partitionIDs),
|
zap.Int64s("PartitionIDs", partitionIDs),
|
||||||
|
|
|
@ -87,8 +87,12 @@ type TargetObserver struct {
|
||||||
mut sync.Mutex // Guard readyNotifiers
|
mut sync.Mutex // Guard readyNotifiers
|
||||||
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers
|
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers
|
||||||
|
|
||||||
dispatcher *taskDispatcher[int64]
|
// loadingDispatcher updates targets for collections that are loading (also collections without a current target).
|
||||||
keylocks *lock.KeyLock[int64]
|
loadingDispatcher *taskDispatcher[int64]
|
||||||
|
// loadedDispatcher updates targets for loaded collections.
|
||||||
|
loadedDispatcher *taskDispatcher[int64]
|
||||||
|
|
||||||
|
keylocks *lock.KeyLock[int64]
|
||||||
|
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
|
@ -114,8 +118,8 @@ func NewTargetObserver(
|
||||||
keylocks: lock.NewKeyLock[int64](),
|
keylocks: lock.NewKeyLock[int64](),
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher := newTaskDispatcher(result.check)
|
result.loadingDispatcher = newTaskDispatcher(result.check)
|
||||||
result.dispatcher = dispatcher
|
result.loadedDispatcher = newTaskDispatcher(result.check)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +128,8 @@ func (ob *TargetObserver) Start() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
ob.cancel = cancel
|
ob.cancel = cancel
|
||||||
|
|
||||||
ob.dispatcher.Start()
|
ob.loadingDispatcher.Start()
|
||||||
|
ob.loadedDispatcher.Start()
|
||||||
|
|
||||||
ob.wg.Add(1)
|
ob.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -144,7 +149,8 @@ func (ob *TargetObserver) Stop() {
|
||||||
}
|
}
|
||||||
ob.wg.Wait()
|
ob.wg.Wait()
|
||||||
|
|
||||||
ob.dispatcher.Stop()
|
ob.loadingDispatcher.Stop()
|
||||||
|
ob.loadedDispatcher.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +173,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
ob.clean()
|
ob.clean()
|
||||||
ob.dispatcher.AddTask(ob.meta.GetAll()...)
|
loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) {
|
||||||
|
if collection.GetStatus() == querypb.LoadStatus_Loaded {
|
||||||
|
return collection.GetCollectionID(), true
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
})
|
||||||
|
ob.loadedDispatcher.AddTask(loaded...)
|
||||||
|
|
||||||
case req := <-ob.updateChan:
|
case req := <-ob.updateChan:
|
||||||
log.Info("manually trigger update target",
|
log.Info("manually trigger update target",
|
||||||
|
@ -217,7 +229,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
|
||||||
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
|
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
|
||||||
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
|
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
|
||||||
if !result {
|
if !result {
|
||||||
ob.dispatcher.AddTask(collectionID)
|
ob.loadingDispatcher.AddTask(collectionID)
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,9 @@ func (suite *TargetObserverSuite) SetupTest() {
|
||||||
suite.collectionID = int64(1000)
|
suite.collectionID = int64(1000)
|
||||||
suite.partitionID = int64(100)
|
suite.partitionID = int64(100)
|
||||||
|
|
||||||
err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
|
testCollection := utils.CreateTestCollection(suite.collectionID, 1)
|
||||||
|
testCollection.Status = querypb.LoadStatus_Loaded
|
||||||
|
err = suite.meta.CollectionManager.PutCollection(testCollection)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
|
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
@ -302,7 +304,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
|
||||||
func (s *TargetObserverCheckSuite) TestCheck() {
|
func (s *TargetObserverCheckSuite) TestCheck() {
|
||||||
r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID)
|
r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID)
|
||||||
s.False(r)
|
s.False(r)
|
||||||
s.True(s.observer.dispatcher.tasks.Contain(s.collectionID))
|
s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID))
|
||||||
|
s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetObserver(t *testing.T) {
|
func TestTargetObserver(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue