mirror of https://github.com/milvus-io/milvus.git
enhance: [10kcp] Accelerate observe collection (#38058)
issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38062/head
parent
312475d1f1
commit
635d161109
|
@ -565,13 +565,13 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int32) (int32, error) {
|
||||
func (m *CollectionManager) UpdatePartitionLoadPercent(partitionID int64, loadPercent int32) error {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
oldPartition, ok := m.partitions[partitionID]
|
||||
if !ok {
|
||||
return 0, merr.WrapErrPartitionNotFound(partitionID)
|
||||
return merr.WrapErrPartitionNotFound(partitionID)
|
||||
}
|
||||
|
||||
// update partition load percentage
|
||||
|
@ -579,7 +579,7 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
|
|||
newPartition.LoadPercentage = loadPercent
|
||||
savePartition := false
|
||||
if loadPercent == 100 {
|
||||
savePartition = true
|
||||
savePartition = newPartition.Status != querypb.LoadStatus_Loaded || newPartition.RecoverTimes != 0
|
||||
newPartition.Status = querypb.LoadStatus_Loaded
|
||||
// if partition becomes loaded, clear it's recoverTimes in load info
|
||||
newPartition.RecoverTimes = 0
|
||||
|
@ -587,22 +587,24 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
|
|||
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds()))
|
||||
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Partition %d loaded", partitionID)))
|
||||
}
|
||||
err := m.putPartition([]*Partition{newPartition}, savePartition)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return m.putPartition([]*Partition{newPartition}, savePartition)
|
||||
}
|
||||
|
||||
func (m *CollectionManager) UpdateCollectionLoadPercent(collectionID int64) (int32, error) {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
// update collection load percentage
|
||||
oldCollection, ok := m.collections[newPartition.CollectionID]
|
||||
oldCollection, ok := m.collections[collectionID]
|
||||
if !ok {
|
||||
return 0, merr.WrapErrCollectionNotFound(newPartition.CollectionID)
|
||||
return 0, merr.WrapErrCollectionNotFound(collectionID)
|
||||
}
|
||||
collectionPercent := m.calculateLoadPercentage(oldCollection.CollectionID)
|
||||
newCollection := oldCollection.Clone()
|
||||
newCollection.LoadPercentage = collectionPercent
|
||||
saveCollection := false
|
||||
if collectionPercent == 100 {
|
||||
saveCollection = true
|
||||
saveCollection = newCollection.Status != querypb.LoadStatus_Loaded || newCollection.RecoverTimes != 0
|
||||
if newCollection.LoadSpan != nil {
|
||||
newCollection.LoadSpan.End()
|
||||
newCollection.LoadSpan = nil
|
||||
|
|
|
@ -372,8 +372,11 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
|
|||
// update load percent, then recover for second time
|
||||
for _, collectionID := range suite.collections {
|
||||
for _, partitionID := range suite.partitions[collectionID] {
|
||||
mgr.UpdateLoadPercent(partitionID, 10)
|
||||
err = mgr.UpdatePartitionLoadPercent(partitionID, 10)
|
||||
suite.NoError(err)
|
||||
}
|
||||
_, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.clearMemory()
|
||||
err = mgr.Recover(suite.broker)
|
||||
|
@ -432,27 +435,32 @@ func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() {
|
|||
})
|
||||
}
|
||||
// test update partition load percentage
|
||||
mgr.UpdateLoadPercent(1, 30)
|
||||
err := mgr.UpdatePartitionLoadPercent(1, 30)
|
||||
suite.NoError(err)
|
||||
partition := mgr.GetPartition(1)
|
||||
suite.Equal(int32(30), partition.LoadPercentage)
|
||||
suite.Equal(int32(30), mgr.GetPartitionLoadPercentage(partition.PartitionID))
|
||||
suite.Equal(querypb.LoadStatus_Loading, partition.Status)
|
||||
collection := mgr.GetCollection(1)
|
||||
suite.Equal(int32(15), collection.LoadPercentage)
|
||||
suite.Equal(int32(0), collection.LoadPercentage)
|
||||
suite.Equal(querypb.LoadStatus_Loading, collection.Status)
|
||||
// test update partition load percentage to 100
|
||||
mgr.UpdateLoadPercent(1, 100)
|
||||
err = mgr.UpdatePartitionLoadPercent(1, 100)
|
||||
suite.NoError(err)
|
||||
partition = mgr.GetPartition(1)
|
||||
suite.Equal(int32(100), partition.LoadPercentage)
|
||||
suite.Equal(querypb.LoadStatus_Loaded, partition.Status)
|
||||
collection = mgr.GetCollection(1)
|
||||
suite.Equal(int32(50), collection.LoadPercentage)
|
||||
suite.Equal(int32(0), collection.LoadPercentage)
|
||||
suite.Equal(querypb.LoadStatus_Loading, collection.Status)
|
||||
// test update collection load percentage
|
||||
mgr.UpdateLoadPercent(2, 100)
|
||||
err = mgr.UpdatePartitionLoadPercent(2, 100)
|
||||
suite.NoError(err)
|
||||
partition = mgr.GetPartition(1)
|
||||
suite.Equal(int32(100), partition.LoadPercentage)
|
||||
suite.Equal(querypb.LoadStatus_Loaded, partition.Status)
|
||||
_, err = mgr.UpdateCollectionLoadPercent(1)
|
||||
suite.NoError(err)
|
||||
collection = mgr.GetCollection(1)
|
||||
suite.Equal(int32(100), collection.LoadPercentage)
|
||||
suite.Equal(querypb.LoadStatus_Loaded, collection.Status)
|
||||
|
|
|
@ -261,19 +261,31 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
|
|||
}
|
||||
|
||||
loaded := true
|
||||
hasUpdate := false
|
||||
|
||||
channelTargetNum, subChannelCount := ob.observeChannelStatus(task.CollectionID)
|
||||
|
||||
for _, partition := range partitions {
|
||||
if partition.LoadPercentage == 100 {
|
||||
continue
|
||||
}
|
||||
if ob.readyToObserve(partition.CollectionID) {
|
||||
replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID())
|
||||
ob.observePartitionLoadStatus(ctx, partition, replicaNum)
|
||||
has := ob.observePartitionLoadStatus(ctx, partition, replicaNum, channelTargetNum, subChannelCount)
|
||||
if has {
|
||||
hasUpdate = true
|
||||
}
|
||||
}
|
||||
partition = ob.meta.GetPartition(partition.PartitionID)
|
||||
if partition != nil && partition.LoadPercentage != 100 {
|
||||
loaded = false
|
||||
}
|
||||
}
|
||||
|
||||
if hasUpdate {
|
||||
ob.observeCollectionLoadStatus(ctx, task.CollectionID)
|
||||
}
|
||||
|
||||
// all partition loaded, finish task
|
||||
if len(partitions) > 0 && loaded {
|
||||
log.Info("Load task finish",
|
||||
|
@ -293,37 +305,48 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32) {
|
||||
func (ob *CollectionObserver) observeChannelStatus(collectionID int64) (int, int) {
|
||||
channelTargets := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
|
||||
|
||||
channelTargetNum := len(channelTargets)
|
||||
if channelTargetNum == 0 {
|
||||
log.Info("channels in target is empty, waiting for new target content")
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
subChannelCount := 0
|
||||
for _, channel := range channelTargets {
|
||||
views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
|
||||
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID })
|
||||
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes)
|
||||
subChannelCount += len(group)
|
||||
}
|
||||
return channelTargetNum, subChannelCount
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
|
||||
log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
|
||||
zap.Int64("collectionID", partition.GetCollectionID()),
|
||||
zap.Int64("partitionID", partition.GetPartitionID()),
|
||||
)
|
||||
|
||||
segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)
|
||||
channelTargets := ob.targetMgr.GetDmChannelsByCollection(partition.GetCollectionID(), meta.NextTarget)
|
||||
|
||||
targetNum := len(segmentTargets) + len(channelTargets)
|
||||
targetNum := len(segmentTargets) + channelTargetNum
|
||||
if targetNum == 0 {
|
||||
log.Info("segments and channels in target are both empty, waiting for new target content")
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
log.RatedInfo(10, "partition targets",
|
||||
zap.Int("segmentTargetNum", len(segmentTargets)),
|
||||
zap.Int("channelTargetNum", len(channelTargets)),
|
||||
zap.Int("channelTargetNum", channelTargetNum),
|
||||
zap.Int("totalTargetNum", targetNum),
|
||||
zap.Int32("replicaNum", replicaNum),
|
||||
)
|
||||
loadedCount := 0
|
||||
loadedCount := subChannelCount
|
||||
loadPercentage := int32(0)
|
||||
|
||||
for _, channel := range channelTargets {
|
||||
views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
|
||||
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID })
|
||||
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, partition.GetCollectionID(), nodes)
|
||||
loadedCount += len(group)
|
||||
}
|
||||
subChannelCount := loadedCount
|
||||
for _, segment := range segmentTargets {
|
||||
views := ob.dist.LeaderViewManager.GetByFilter(
|
||||
meta.WithChannelName2LeaderView(segment.GetInsertChannel()),
|
||||
|
@ -341,29 +364,42 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
|
|||
|
||||
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 {
|
||||
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||
if loadPercentage == 100 {
|
||||
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
|
||||
log.Warn("failed to manual check current target, skip update load status")
|
||||
return
|
||||
return false
|
||||
}
|
||||
delete(ob.partitionLoadedCount, partition.GetPartitionID())
|
||||
}
|
||||
collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage)
|
||||
err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(partition.PartitionID, loadPercentage)
|
||||
if err != nil {
|
||||
log.Warn("failed to update load percentage")
|
||||
log.Warn("failed to update partition load percentage")
|
||||
}
|
||||
log.Info("load status updated",
|
||||
log.Info("partition load status updated",
|
||||
zap.Int32("partitionLoadPercentage", loadPercentage),
|
||||
)
|
||||
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
|
||||
return true
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
|
||||
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
|
||||
|
||||
collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(collectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to update collection load percentage")
|
||||
}
|
||||
log.Info("collection load status updated",
|
||||
zap.Int32("collectionLoadPercentage", collectionPercentage),
|
||||
)
|
||||
if collectionPercentage == 100 {
|
||||
ob.invalidateCache(ctx, partition.GetCollectionID())
|
||||
ob.invalidateCache(ctx, collectionID)
|
||||
}
|
||||
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", partition.CollectionID, loadPercentage)))
|
||||
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", collectionID, collectionPercentage)))
|
||||
}
|
||||
|
||||
func (ob *CollectionObserver) invalidateCache(ctx context.Context, collectionID int64) {
|
||||
|
|
Loading…
Reference in New Issue