fix: Fix slow dist handle and slow observe (#38566)

1. Provide partition&channel level indexing in the collection target.
2. Make `SegmentAction` not wait for distribution.
3. Remove scheduler and target manager mutex.
4. Optimize logging to reduce CPU overhead.

issue: https://github.com/milvus-io/milvus/issues/37630

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/39307/head
yihao.dai 2025-01-15 20:17:00 +08:00 committed by GitHub
parent 38881bf591
commit 657550cf06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 324 additions and 394 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) {
}
func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
tr := timerecord.NewTimeRecorder("")
resp, err := dh.getDistribution(ctx)
d1 := tr.RecordSpan()
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
*failures = *failures + 1
@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60).
RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
*failures = 0
dh.handleDistResp(ctx, resp, dispatchTask)
}
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120).
RatedInfo(120.0, "pull and handle distribution done",
zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan()))
}
func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) {

View File

@ -26,43 +26,70 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
segments map[int64]*datapb.SegmentInfo
channel2Segments map[string][]*datapb.SegmentInfo
partition2Segments map[int64][]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}
func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels))
partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
for _, segment := range segments {
channel := segment.GetInsertChannel()
if _, ok := channel2Segments[channel]; !ok {
channel2Segments[channel] = make([]*datapb.SegmentInfo, 0)
}
channel2Segments[channel] = append(channel2Segments[channel], segment)
partitionID := segment.GetPartitionID()
if _, ok := partition2Segments[partitionID]; !ok {
partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
}
partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}
func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
channel2Segments := make(map[string][]*datapb.SegmentInfo)
partition2Segments := make(map[int64][]*datapb.SegmentInfo)
var partitions []int64
lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
if _, ok := channel2Segments[t.GetChannelName()]; !ok {
channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0)
}
for _, partition := range t.GetPartitionTargets() {
if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
}
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
info := &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
@ -70,6 +97,9 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
segments[segment.GetID()] = info
channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info)
partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
}
partitions = append(partitions, partition.GetPartitionID())
}
@ -90,11 +120,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}
@ -155,6 +187,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
return p.segments
}
func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo {
return p.channel2Segments[channel]
}
func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
return p.partition2Segments[partitionID]
}
func (p *CollectionTarget) GetTargetVersion() int64 {
return p.version
}
@ -181,39 +221,43 @@ func (p *CollectionTarget) Ready() bool {
}
type target struct {
keyLock *lock.KeyLock[int64] // guards updateCollectionTarget
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget]
}
func newTarget() *target {
return &target{
collectionTargetMap: make(map[int64]*CollectionTarget),
keyLock: lock.NewKeyLock[int64](),
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
}
}
func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) {
if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() {
t.keyLock.Lock(collectionID)
defer t.keyLock.Unlock(collectionID)
if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() {
return
}
t.collectionTargetMap[collectionID] = target
t.collectionTargetMap.Insert(collectionID, target)
}
func (t *target) removeCollectionTarget(collectionID int64) {
delete(t.collectionTargetMap, collectionID)
t.collectionTargetMap.Remove(collectionID)
}
func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
ret, _ := t.collectionTargetMap.Get(collectionID)
return ret
}
func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinfo.QueryCoordTarget {
var ret []*metricsinfo.QueryCoordTarget
for k, v := range t.collectionTargetMap {
targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len())
t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool {
if collectionID > 0 && collectionID != k {
continue
return true
}
segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
return metrics.NewSegmentFrom(s)
})
@ -222,12 +266,13 @@ func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinf
return metrics.NewDMChannelFrom(ch.VchannelInfo)
})
ret = append(ret, &metricsinfo.QueryCoordTarget{
qct := &metricsinfo.QueryCoordTarget{
CollectionID: k,
Segments: segments,
DMChannels: dmChannels,
})
}
return ret
}
targets = append(targets, qct)
return true
})
return targets
}

View File

@ -76,9 +76,8 @@ type TargetManagerInterface interface {
}
type TargetManager struct {
rwMutex sync.RWMutex
broker Broker
meta *Meta
broker Broker
meta *Meta
// all read segment/channel operation happens on current -> only current target are visible to outer
// all add segment/channel operation happens on next -> changes can only happen on next target
@ -100,8 +99,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
// WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
// which may make the current target not available
func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, collectionID int64) bool {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID))
log.Debug("start to update current target for collection")
@ -157,8 +154,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
return err
}
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
partitions := mgr.meta.GetPartitionsByCollection(ctx, collectionID)
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
return partition.PartitionID
@ -188,7 +183,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
}
for _, infos := range channelInfos {
merged := mgr.mergeDmChannelInfo(infos)
merged := mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
}
@ -198,7 +193,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
}
allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs)
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs))
@ -206,7 +203,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
return nil
}
func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {
func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {
var dmChannel *DmChannel
for _, info := range infos {
@ -228,8 +225,6 @@ func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmCh
// RemoveCollection removes all channels and segments in the given collection
func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int64) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log.Info("remove collection from targets",
zap.Int64("collectionID", collectionID))
@ -250,9 +245,6 @@ func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int
// RemovePartition removes all segment in the given partition,
// NOTE: this doesn't remove any channel even the given one is the only partition
func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs))
@ -359,9 +351,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in
func (mgr *TargetManager) GetGrowingSegmentsByCollection(ctx context.Context, collectionID int64,
scope TargetScope,
) typeutil.UniqueSet {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -382,9 +371,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle
channelName string,
scope TargetScope,
) typeutil.UniqueSet {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := typeutil.NewUniqueSet()
@ -405,9 +391,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle
func (mgr *TargetManager) GetSealedSegmentsByCollection(ctx context.Context, collectionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -421,17 +404,11 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec
channelName string,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
ret := make(map[int64]*datapb.SegmentInfo)
for k, v := range t.GetAllSegments() {
if v.GetInsertChannel() == channelName {
ret[k] = v
}
}
ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 {
return s.GetID()
})
if len(ret) > 0 {
return ret
@ -445,9 +422,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(ctx context.Context, colle
channelName string,
scope TargetScope,
) []int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if channel, ok := t.dmChannels[channelName]; ok {
@ -462,16 +436,11 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll
partitionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := make(map[int64]*datapb.SegmentInfo)
for _, s := range t.GetAllSegments() {
if s.GetPartitionID() == partitionID {
segments[s.GetID()] = s
}
for _, s := range t.GetPartitionSegments(partitionID) {
segments[s.GetID()] = s
}
if len(segments) > 0 {
@ -483,9 +452,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll
}
func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collectionID int64, scope TargetScope) map[string]*DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -496,9 +462,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collect
}
func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, channel string, scope TargetScope) *DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if ch, ok := t.GetAllDmChannels()[channel]; ok {
@ -509,9 +472,6 @@ func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64,
}
func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if s, ok := t.GetAllSegments()[id]; ok {
@ -523,9 +483,6 @@ func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int
}
func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collectionID int64, scope TargetScope) int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if t.GetTargetVersion() > 0 {
@ -537,9 +494,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collec
}
func (mgr *TargetManager) IsCurrentTargetExist(ctx context.Context, collectionID int64, partitionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(CurrentTarget, collectionID)
return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0
@ -552,8 +506,6 @@ func (mgr *TargetManager) IsNextTargetExist(ctx context.Context, collectionID in
}
func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
if mgr.current != nil {
// use pool here to control maximal writer used by save target
pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
@ -577,13 +529,14 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto
})
}
tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
for id, target := range mgr.current.collectionTargetMap {
mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool {
tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
if len(tasks) >= batchSize {
submit(tasks)
tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
}
}
return true
})
if len(tasks) > 0 {
submit(tasks)
}
@ -592,9 +545,6 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto
}
func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
targets, err := catalog.GetCollectionTargets(ctx)
if err != nil {
log.Warn("failed to recover collection target from etcd", zap.Error(err))
@ -623,8 +573,6 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo
// if segment isn't l0 segment, and exist in current/next target, then it can be moved
func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
current := mgr.current.getCollectionTarget(collectionID)
if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
return true
@ -639,9 +587,6 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s
}
func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
ret := mgr.getTarget(scope)
if ret == nil {
return ""
@ -656,9 +601,6 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope,
}
func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
ret := mgr.getCollectionTarget(scope, collectionID)
if len(ret) == 0 {
return nil, merr.WrapErrCollectionNotLoaded(collectionID)
@ -676,9 +618,7 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target {
}
func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
target, ok := mgr.current.collectionTargetMap.Get(collectionID)
if !ok {
return false
}

View File

@ -425,33 +425,38 @@ func (suite *TargetManagerSuite) TestGetTarget() {
current := &CollectionTarget{}
next := &CollectionTarget{}
t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t1.Insert(1000, current)
t2.Insert(1000, next)
t3.Insert(1000, current)
t4.Insert(1000, current)
bothMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t1,
},
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: next,
},
collectionTargetMap: t2,
},
}
currentMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t3,
},
next: &target{
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
},
next: &target{},
}
nextMgr := &TargetManager{
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t4,
},
current: &target{
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
},
current: &target{},
}
cases := []testCase{
@ -728,7 +733,7 @@ func BenchmarkTargetManager(b *testing.B) {
collectionNum := 10000
for i := 0; i < collectionNum; i++ {
mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil)
mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {

View File

@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(ctx context.Context, collectionID i
func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
loading := false
observeTaskNum := 0
observeStart := time.Now()
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
loading = true
observeTaskNum++
start := time.Now()
collection := ob.meta.CollectionManager.GetCollection(ctx, task.CollectionID)
if collection == nil {
return true
@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
ob.loadTasks.Remove(traceID)
}
log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start)))
return true
})
log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart)))
// trigger check logic when loading collections/partitions
if loading {
ob.checkerController.Check()
@ -325,11 +332,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti
}
func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
)
segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)
targetNum := len(segmentTargets) + channelTargetNum
@ -338,7 +340,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
return false
}
log.RatedInfo(10, "partition targets",
log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", channelTargetNum),
zap.Int("totalTargetNum", targetNum),
@ -355,11 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes)
loadedCount += len(group)
}
if loadedCount > 0 {
log.Info("partition load progress",
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
}
loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum)))
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 {
@ -370,30 +369,37 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
log.Ctx(ctx).Warn("failed to manual check current target, skip update load status",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
return false
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage)
if err != nil {
log.Warn("failed to update partition load percentage")
log.Ctx(ctx).Warn("failed to update partition load percentage",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
}
log.Info("partition load status updated",
log.Ctx(ctx).Info("partition load status updated",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int32("partitionLoadPercentage", loadPercentage),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount),
)
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
return true
}
func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID)
if err != nil {
log.Warn("failed to update collection load percentage")
log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID))
}
log.Info("collection load status updated",
log.Ctx(ctx).Info("collection load status updated",
zap.Int64("collectionID", collectionID),
zap.Int32("collectionLoadPercentage", collectionPercentage),
)
if collectionPercentage == 100 {

View File

@ -24,7 +24,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -116,49 +115,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope {
}
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
// rpc finished
if !action.rpcReturned.Load() {
return false
}
// segment found in leader view
views := distMgr.LeaderViewManager.GetByFilter(
meta.WithChannelName2LeaderView(action.Shard),
meta.WithSegment2LeaderView(action.SegmentID, false))
if len(views) == 0 {
return false
}
// segment found in dist
segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID))
return len(segmentInTargetNode) > 0
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,
// which confuses the condition of finishing,
// the leader should return a map of segment ID to list of nodes,
// now, we just always commit the release task to executor once.
// NOTE: DO NOT create a task containing release action and the action is not the last action
sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()))
views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node()))
growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 {
return lo.Keys(view.GrowingSegments)
})
segments := make([]int64, 0, len(sealed)+len(growing))
for _, segment := range sealed {
segments = append(segments, segment.GetID())
}
segments = append(segments, growing...)
if !funcutil.SliceContain(segments, action.GetSegmentID()) {
return true
}
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeUpdate {
return action.rpcReturned.Load()
}
return true
return action.rpcReturned.Load()
}
func (action *SegmentAction) Desc() string {

View File

@ -38,7 +38,9 @@ import (
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/timerecord"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -91,6 +93,7 @@ type replicaChannelIndex struct {
}
type taskQueue struct {
mu sync.RWMutex
// TaskPriority -> TaskID -> Task
buckets []map[int64]Task
}
@ -106,6 +109,8 @@ func newTaskQueue() *taskQueue {
}
func (queue *taskQueue) Len() int {
queue.mu.RLock()
defer queue.mu.RUnlock()
taskNum := 0
for _, tasks := range queue.buckets {
taskNum += len(tasks)
@ -115,17 +120,23 @@ func (queue *taskQueue) Len() int {
}
func (queue *taskQueue) Add(task Task) {
queue.mu.Lock()
defer queue.mu.Unlock()
bucket := queue.buckets[task.Priority()]
bucket[task.ID()] = task
}
func (queue *taskQueue) Remove(task Task) {
queue.mu.Lock()
defer queue.mu.Unlock()
bucket := queue.buckets[task.Priority()]
delete(bucket, task.ID())
}
// Range iterates all tasks in the queue ordered by priority from high to low
func (queue *taskQueue) Range(fn func(task Task) bool) {
queue.mu.RLock()
defer queue.mu.RUnlock()
for priority := len(queue.buckets) - 1; priority >= 0; priority-- {
for _, task := range queue.buckets[priority] {
if !fn(task) {
@ -153,9 +164,8 @@ type Scheduler interface {
}
type taskScheduler struct {
rwmutex sync.RWMutex
ctx context.Context
executors map[int64]*Executor // NodeID -> Executor
executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor
idAllocator func() UniqueID
distMgr *meta.DistributionManager
@ -165,9 +175,11 @@ type taskScheduler struct {
cluster session.Cluster
nodeMgr *session.NodeManager
tasks UniqueSet
segmentTasks map[replicaSegmentIndex]Task
channelTasks map[replicaChannelIndex]Task
scheduleMu sync.Mutex // guards schedule()
collKeyLock *lock.KeyLock[int64] // guards Add()
tasks *ConcurrentMap[UniqueID, struct{}]
segmentTasks *ConcurrentMap[replicaSegmentIndex, Task]
channelTasks *ConcurrentMap[replicaChannelIndex, Task]
processQueue *taskQueue
waitQueue *taskQueue
taskStats *expirable.LRU[UniqueID, Task]
@ -184,7 +196,7 @@ func NewScheduler(ctx context.Context,
id := time.Now().UnixMilli()
return &taskScheduler{
ctx: ctx,
executors: make(map[int64]*Executor),
executors: NewConcurrentMap[int64, *Executor](),
idAllocator: func() UniqueID {
id++
return id
@ -197,9 +209,10 @@ func NewScheduler(ctx context.Context,
cluster: cluster,
nodeMgr: nodeMgr,
tasks: make(UniqueSet),
segmentTasks: make(map[replicaSegmentIndex]Task),
channelTasks: make(map[replicaChannelIndex]Task),
collKeyLock: lock.NewKeyLock[int64](),
tasks: NewConcurrentMap[UniqueID, struct{}](),
segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](),
channelTasks: NewConcurrentMap[replicaChannelIndex, Task](),
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15),
@ -209,30 +222,22 @@ func NewScheduler(ctx context.Context,
func (scheduler *taskScheduler) Start() {}
func (scheduler *taskScheduler) Stop() {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
for nodeID, executor := range scheduler.executors {
scheduler.executors.Range(func(nodeID int64, executor *Executor) bool {
executor.Stop()
delete(scheduler.executors, nodeID)
}
return true
})
for _, task := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
scheduler.remove(task)
}
for _, task := range scheduler.channelTasks {
return true
})
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
scheduler.remove(task)
}
return true
})
}
func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
if _, exist := scheduler.executors[nodeID]; exist {
return
}
executor := NewExecutor(scheduler.meta,
scheduler.distMgr,
scheduler.broker,
@ -240,27 +245,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.cluster,
scheduler.nodeMgr)
scheduler.executors[nodeID] = executor
if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist {
return
}
executor.Start(scheduler.ctx)
log.Ctx(scheduler.ctx).Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
}
func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
executor, ok := scheduler.executors[nodeID]
executor, ok := scheduler.executors.GetAndRemove(nodeID)
if ok {
executor.Stop()
delete(scheduler.executors, nodeID)
log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
}
}
func (scheduler *taskScheduler) Add(task Task) error {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
scheduler.collKeyLock.Lock(task.CollectionID())
defer scheduler.collKeyLock.Unlock(task.CollectionID())
err := scheduler.preAdd(task)
if err != nil {
task.Cancel(err)
@ -269,19 +271,19 @@ func (scheduler *taskScheduler) Add(task Task) error {
task.SetID(scheduler.idAllocator())
scheduler.waitQueue.Add(task)
scheduler.tasks.Insert(task.ID())
scheduler.tasks.Insert(task.ID(), struct{}{})
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
scheduler.segmentTasks[index] = task
scheduler.segmentTasks.Insert(index, task)
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
scheduler.channelTasks[index] = task
scheduler.channelTasks.Insert(index, task)
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
scheduler.segmentTasks[index] = task
scheduler.segmentTasks.Insert(index, task)
}
scheduler.taskStats.Add(task.ID(), task)
@ -292,21 +294,39 @@ func (scheduler *taskScheduler) Add(task Task) error {
}
func (scheduler *taskScheduler) updateTaskMetrics() {
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0
leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
for _, task := range scheduler.segmentTasks {
taskType := GetTaskType(task)
switch taskType {
case TaskTypeGrow:
segmentGrowNum++
case TaskTypeReduce:
segmentReduceNum++
case TaskTypeMove:
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
switch {
case len(task.Actions()) > 1:
segmentMoveNum++
case task.Actions()[0].Type() == ActionTypeGrow:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentGrowNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderGrowNum++
}
case task.Actions()[0].Type() == ActionTypeReduce:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentReduceNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderReduceNum++
}
case task.Actions()[0].Type() == ActionTypeUpdate:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentUpdateNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderUpdateNum++
}
}
}
return true
})
for _, task := range scheduler.channelTasks {
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
taskType := GetTaskType(task)
switch taskType {
case TaskTypeGrow:
@ -316,11 +336,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() {
case TaskTypeMove:
channelMoveNum++
}
}
return true
})
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum))
@ -332,7 +359,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
if old, ok := scheduler.segmentTasks[index]; ok {
if old, ok := scheduler.segmentTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -365,7 +392,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
if old, ok := scheduler.channelTasks[index]; ok {
if old, ok := scheduler.channelTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -398,7 +425,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
}
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
if old, ok := scheduler.segmentTasks[index]; ok {
if old, ok := scheduler.segmentTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -477,46 +504,42 @@ func (scheduler *taskScheduler) Dispatch(node int64) {
log.Ctx(scheduler.ctx).Info("scheduler stopped")
default:
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
scheduler.scheduleMu.Lock()
defer scheduler.scheduleMu.Unlock()
scheduler.schedule(node)
}
}
func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
targetActions := make(map[int64][]Action)
for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
taskCollID := task.CollectionID()
if collectionID != -1 && collectionID != taskCollID {
continue
return true
}
actions := filterActions(task.Actions(), nodeID)
if len(actions) > 0 {
targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
}
}
return true
})
return scheduler.calculateTaskDelta(targetActions)
}
func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
targetActions := make(map[int64][]Action)
for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
taskCollID := task.CollectionID()
if collectionID != -1 && collectionID != taskCollID {
continue
return true
}
actions := filterActions(task.Actions(), nodeID)
if len(actions) > 0 {
targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
}
}
return true
})
return scheduler.calculateTaskDelta(targetActions)
}
@ -561,10 +584,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Act
}
func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
executor, ok := scheduler.executors[nodeID]
executor, ok := scheduler.executors.Get(nodeID)
if !ok {
return nil
}
@ -587,16 +607,13 @@ func WithTaskTypeFilter(taskType Type) TaskFilter {
}
func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
if len(filters) == 0 {
return len(scheduler.channelTasks)
return scheduler.channelTasks.Len()
}
// rewrite this with for loop
counter := 0
for _, task := range scheduler.channelTasks {
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
allMatch := true
for _, filter := range filters {
if !filter(task) {
@ -607,21 +624,19 @@ func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int {
if allMatch {
counter++
}
}
return true
})
return counter
}
func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
if len(filters) == 0 {
return len(scheduler.segmentTasks)
scheduler.segmentTasks.Len()
}
// rewrite this with for loop
counter := 0
for _, task := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
allMatch := true
for _, filter := range filters {
if !filter(task) {
@ -632,7 +647,8 @@ func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int {
if allMatch {
counter++
}
}
return true
})
return counter
}
@ -657,17 +673,19 @@ func (scheduler *taskScheduler) schedule(node int64) {
return
}
tr := timerecord.NewTimeRecorder("")
log := log.Ctx(scheduler.ctx).With(
zap.Int64("nodeID", node),
)
scheduler.tryPromoteAll()
promoteDur := tr.RecordSpan()
log.Debug("process tasks related to node",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
)
// Process tasks
@ -683,6 +701,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
return true
})
preprocessDur := tr.RecordSpan()
// The scheduler doesn't limit the number of tasks,
// to commit tasks to executors as soon as possible, to reach higher merge possibility
@ -693,22 +712,29 @@ func (scheduler *taskScheduler) schedule(node int64) {
}
return nil
}, "process")
processDur := tr.RecordSpan()
for _, task := range toRemove {
scheduler.remove(task)
}
scheduler.updateTaskMetrics()
log.Info("processed tasks",
zap.Int("toProcessNum", len(toProcess)),
zap.Int32("committedNum", commmittedNum.Load()),
zap.Int("toRemoveNum", len(toRemove)),
zap.Duration("promoteDur", promoteDur),
zap.Duration("preprocessDUr", preprocessDur),
zap.Duration("processDUr", processDur),
zap.Duration("totalDur", tr.ElapseSpan()),
)
log.Info("process tasks related to node done",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
)
}
@ -749,10 +775,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
// return true if the task should be executed,
// false otherwise
func (scheduler *taskScheduler) preProcess(task Task) bool {
log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With(
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()),
)
if task.Status() != TaskStatusStarted {
return false
}
@ -775,7 +797,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
}
if !ready {
log.RatedInfo(30, "Blocking reduce action in balance channel task")
log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task",
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()))
break
}
}
@ -806,7 +830,7 @@ func (scheduler *taskScheduler) process(task Task) bool {
)
actions, step := task.Actions(), task.Step()
executor, ok := scheduler.executors[actions[step].Node()]
executor, ok := scheduler.executors.Get(actions[step].Node())
if !ok {
log.Warn("no executor for QueryNode",
zap.Int("step", step),
@ -827,19 +851,18 @@ func (scheduler *taskScheduler) check(task Task) error {
}
func (scheduler *taskScheduler) RemoveByNode(node int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
for _, task := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
if scheduler.isRelated(task, node) {
scheduler.remove(task)
}
}
for _, task := range scheduler.channelTasks {
return true
})
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
if scheduler.isRelated(task, node) {
scheduler.remove(task)
}
}
return true
})
}
func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
@ -875,7 +898,7 @@ func (scheduler *taskScheduler) remove(task Task) {
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
delete(scheduler.segmentTasks, index)
scheduler.segmentTasks.Remove(index)
log = log.With(zap.Int64("segmentID", task.SegmentID()))
if task.Status() == TaskStatusFailed &&
task.Err() != nil &&
@ -885,16 +908,15 @@ func (scheduler *taskScheduler) remove(task Task) {
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
delete(scheduler.channelTasks, index)
scheduler.channelTasks.Remove(index)
log = log.With(zap.String("channel", task.Channel()))
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
delete(scheduler.segmentTasks, index)
scheduler.segmentTasks.Remove(index)
log = log.With(zap.Int64("segmentID", task.SegmentID()))
}
scheduler.updateTaskMetrics()
log.Info("task removed")
if scheduler.meta.Exist(task.Context(), task.CollectionID()) {
@ -940,14 +962,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string {
return metrics.UnknownTaskLabel
}
func (scheduler *taskScheduler) checkStale(task Task) error {
log := log.Ctx(task.Context()).With(
func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field {
res := []zap.Field{
zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("source", task.Source().String()),
)
}
res = append(res, fields...)
return res
}
func (scheduler *taskScheduler) checkStale(task Task) error {
switch task := task.(type) {
case *SegmentTask:
if err := scheduler.checkSegmentTaskStale(task); err != nil {
@ -974,7 +1000,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error {
zap.Int("step", step))
if scheduler.nodeMgr.Get(action.Node()) == nil {
log.Warn("the task is stale, the target node is offline")
log.Warn("the task is stale, the target node is offline", WrapTaskLog(task,
zap.Int64("nodeID", action.Node()),
zap.Int("step", step))...)
return merr.WrapErrNodeNotFound(action.Node())
}
}
@ -983,38 +1011,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error {
}
func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
log := log.Ctx(task.Context()).With(
zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("source", task.Source().String()),
)
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...)
return merr.WrapErrNodeOffline(action.Node())
}
taskType := GetTaskType(task)
segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
if segment == nil {
log.Warn("task stale due to the segment to load not exists in targets",
zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()),
)
log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
WrapTaskLog(task, zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()))...)
return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
}
replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node())
if replica == nil {
log.Warn("task stale due to replica not found")
log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...)
return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID")
}
_, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel())
if !ok {
log.Warn("task stale due to leader not found")
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...)
return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator")
}
@ -1026,23 +1046,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
}
func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
log := log.Ctx(task.Context()).With(
zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("source", task.Source().String()),
)
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
log.Warn("task stale due to node offline", zap.String("channel", task.Channel()))
log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...)
return merr.WrapErrNodeOffline(action.Node())
}
if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil {
log.Warn("the task is stale, the channel to subscribe not exists in targets",
zap.String("channel", task.Channel()))
log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets",
WrapTaskLog(task, zap.String("channel", task.Channel()))...)
return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel")
}
@ -1054,48 +1067,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
}
func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error {
log := log.Ctx(task.Context()).With(
zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("source", task.Source().String()),
zap.Int64("leaderID", task.leaderID),
)
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok {
log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
log.Ctx(task.Context()).Warn("task stale due to node offline",
WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...)
return merr.WrapErrNodeOffline(action.Node())
}
taskType := GetTaskType(task)
segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
if segment == nil {
log.Warn("task stale due to the segment to load not exists in targets",
zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()),
)
log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
WrapTaskLog(task, zap.Int64("leaderID", task.leaderID),
zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()))...)
return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
}
replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node())
if replica == nil {
log.Warn("task stale due to replica not found")
log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID")
}
view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard())
if view == nil {
log.Warn("task stale due to leader not found")
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
}
case ActionTypeReduce:
view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard())
if view == nil {
log.Warn("task stale due to leader not found")
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
}
}

View File

@ -134,12 +134,14 @@ type baseTask struct {
name string
// startTs
startTs time.Time
startTs atomic.Time
}
func newBaseTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, shard string, taskTag string) *baseTask {
ctx, cancel := context.WithCancel(ctx)
ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag)
startTs := atomic.Time{}
startTs.Store(time.Now())
return &baseTask{
source: source,
@ -154,7 +156,7 @@ func newBaseTask(ctx context.Context, source Source, collectionID typeutil.Uniqu
doneCh: make(chan struct{}),
canceled: atomic.NewBool(false),
span: span,
startTs: time.Now(),
startTs: startTs,
}
}
@ -217,11 +219,11 @@ func (task *baseTask) Index() string {
}
func (task *baseTask) RecordStartTs() {
task.startTs = time.Now()
task.startTs.Store(time.Now())
}
func (task *baseTask) GetTaskLatency() int64 {
return time.Since(task.startTs).Milliseconds()
return time.Since(task.startTs.Load()).Milliseconds()
}
func (task *baseTask) Err() error {

View File

@ -771,26 +771,14 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
suite.NoError(err)
}
growings := map[int64]*meta.Segment{}
for _, segment := range suite.releaseSegments[1:] {
growings[segment] = utils.CreateTestSegment(suite.collection, 1, segment, targetNode, 1, "")
}
suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{
ID: targetNode,
GrowingSegments: growings,
})
segmentsNum := len(suite.releaseSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
// Process tasks
// Process tasks and Release done
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Release done
suite.dist.LeaderViewManager.Update(targetNode)
// Process tasks done
// Tasks removed
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
@ -1090,7 +1078,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
CollectionID: suite.collection,
},
}, nil)
for _, segment := range suite.loadSegments {
for _, segment := range suite.loadSegments[1:] {
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{
{
ID: segment,
@ -1111,13 +1099,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
PartitionID: 1,
InsertChannel: channel.GetChannelName(),
})
task, err := NewSegmentTask(
ctx,
timeout,
@ -1131,33 +1113,8 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, partition))
suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
// Process tasks
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Process tasks done
// Dist contains channels, first task stale
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
Channel: channel.ChannelName,
}
for _, segment := range suite.loadSegments[1:] {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
segments = make([]*datapb.SegmentInfo, 0)
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments[1:] {
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
@ -1165,13 +1122,16 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
InsertChannel: channel.GetChannelName(),
})
}
bakExpectations := suite.broker.ExpectedCalls
suite.broker.AssertExpectations(suite.T())
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2))
suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
// process done
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(1, 0, 0, 1)
// task removed
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
@ -1184,7 +1144,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
suite.NoError(task.Err())
}
}
suite.broker.ExpectedCalls = bakExpectations
}
func (suite *TaskSuite) TestChannelTaskReplace() {
@ -1497,10 +1456,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) {
suite.Equal(process, scheduler.processQueue.Len())
suite.Equal(wait, scheduler.waitQueue.Len())
suite.Len(scheduler.segmentTasks, segment)
suite.Len(scheduler.channelTasks, channel)
suite.Equal(len(scheduler.tasks), process+wait)
suite.Equal(len(scheduler.tasks), segment+channel)
suite.Equal(scheduler.segmentTasks.Len(), segment)
suite.Equal(scheduler.channelTasks.Len(), channel)
suite.Equal(scheduler.tasks.Len(), process+wait)
suite.Equal(scheduler.tasks.Len(), segment+channel)
}
func (suite *TaskSuite) dispatchAndWait(node int64) {
@ -1512,13 +1471,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
count = 0
keys = make([]any, 0)
for _, executor := range suite.scheduler.executors {
suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool {
executor.executingTasks.Range(func(taskIndex string) bool {
keys = append(keys, taskIndex)
count++
return true
})
}
return true
})
if count == 0 {
return

View File

@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
log := log.Ctx(context.TODO()).
WithRateGroup(fmt.Sprintf("util.CheckDelegatorDataReady-%d", leader.CollectionID), 1, 60).
With(zap.Int64("leaderID", leader.ID))
With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID))
info := nodeMgr.Get(leader.ID)
// Check whether leader is online

View File

@ -36,6 +36,7 @@ const (
LeaderGrowTaskLabel = "leader_grow"
LeaderReduceTaskLabel = "leader_reduce"
LeaderUpdateTaskLabel = "leader_update"
UnknownTaskLabel = "unknown"