Make qcv2 target&leader observer execute in parallel (#27844)

- Add `taskDispatcher` to submit and run task async safely
- Change `LeaderObeserver` and `TargetObserver` schedule and manual check action to submitting task into dispatcher
- Fix logic problem in collection observer when manual check return false

See also #27494

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27861/head
congqixia 2023-10-24 10:14:11 +08:00 committed by GitHub
parent 351c64b606
commit 93a877f55e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 239 additions and 162 deletions

View File

@ -226,7 +226,15 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
}
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 && ob.targetObserver.Check(ctx, partition.GetCollectionID()) && ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) {
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) {
log.Warn("failed to manual check current target, skip update load status")
return
}
if !ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) {
log.Warn("failed to manual check leader target version ,skip update load status")
return
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage)

View File

@ -41,14 +41,15 @@ const (
// LeaderObserver is to sync the distribution with leader
type LeaderObserver struct {
wg sync.WaitGroup
cancel context.CancelFunc
dist *meta.DistributionManager
meta *meta.Meta
target *meta.TargetManager
broker meta.Broker
cluster session.Cluster
manualCheck chan checkRequest
wg sync.WaitGroup
cancel context.CancelFunc
dist *meta.DistributionManager
meta *meta.Meta
target *meta.TargetManager
broker meta.Broker
cluster session.Cluster
dispatcher *taskDispatcher[int64]
stopOnce sync.Once
}
@ -57,27 +58,12 @@ func (o *LeaderObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
o.cancel = cancel
o.dispatcher.Start()
o.wg.Add(1)
go func() {
defer o.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("stop leader observer")
return
case req := <-o.manualCheck:
log.Info("triggering manual check")
ret := o.observeCollection(ctx, req.CollectionID)
req.Notifier <- ret
log.Info("manual check done", zap.Bool("result", ret))
case <-ticker.C:
o.observe(ctx)
}
}
o.schedule(ctx)
}()
}
@ -87,9 +73,26 @@ func (o *LeaderObserver) Stop() {
o.cancel()
}
o.wg.Wait()
o.dispatcher.Stop()
})
}
func (o *LeaderObserver) schedule(ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("stop leader observer")
return
case <-ticker.C:
o.observe(ctx)
}
}
}
func (o *LeaderObserver) observe(ctx context.Context) {
o.observeSegmentsDist(ctx)
}
@ -105,14 +108,13 @@ func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) {
collectionIDs := o.meta.CollectionManager.GetAll()
for _, cid := range collectionIDs {
if o.readyToObserve(cid) {
o.observeCollection(ctx, cid)
o.dispatcher.AddTask(cid)
}
}
}
func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) bool {
func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) {
replicas := o.meta.ReplicaManager.GetByCollection(collection)
result := true
for _, replica := range replicas {
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
@ -128,29 +130,42 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
success := o.sync(ctx, replica.GetID(), leaderView, actions)
if !success {
result = false
}
o.sync(ctx, replica.GetID(), leaderView, actions)
}
}
}
func (o *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool {
// if not ready to observer, skip add task
if !o.readyToObserve(collectionID) {
return false
}
result := o.checkCollectionLeaderVersionIsCurrent(ctx, collectionID)
if !result {
o.dispatcher.AddTask(collectionID)
}
return result
}
func (ob *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool {
notifier := make(chan bool)
select {
case ob.manualCheck <- checkRequest{CollectionID: collectionID, Notifier: notifier}:
case <-ctx.Done():
return false
}
func (o *LeaderObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool {
replicas := o.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
return false
}
select {
case result := <-notifier:
return result
case <-ctx.Done():
return false
action := o.checkNeedUpdateTargetVersion(ctx, leaderView)
if action != nil {
return false
}
}
}
return true
}
func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
@ -312,12 +327,16 @@ func NewLeaderObserver(
broker meta.Broker,
cluster session.Cluster,
) *LeaderObserver {
return &LeaderObserver{
dist: dist,
meta: meta,
target: targetMgr,
broker: broker,
cluster: cluster,
manualCheck: make(chan checkRequest, 10),
ob := &LeaderObserver{
dist: dist,
meta: meta,
target: targetMgr,
broker: broker,
cluster: cluster,
}
dispatcher := newTaskDispatcher[int64](ob.observeCollection)
ob.dispatcher = dispatcher
return ob
}

View File

@ -591,44 +591,6 @@ func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() {
suite.Len(action.SealedInTarget, 1)
}
func (suite *LeaderObserverTestSuite) TestCheckTargetVersion() {
collectionID := int64(1001)
observer := suite.observer
suite.Run("check_channel_blocked", func() {
oldCh := observer.manualCheck
defer func() {
observer.manualCheck = oldCh
}()
// zero-length channel
observer.manualCheck = make(chan checkRequest)
ctx, cancel := context.WithCancel(context.Background())
// cancel context, make test return fast
cancel()
result := observer.CheckTargetVersion(ctx, collectionID)
suite.False(result)
})
suite.Run("check_return_ctx_timeout", func() {
oldCh := observer.manualCheck
defer func() {
observer.manualCheck = oldCh
}()
// make channel length = 1, task received
observer.manualCheck = make(chan checkRequest, 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
result := observer.CheckTargetVersion(ctx, collectionID)
suite.False(result)
})
}
func TestLeaderObserverSuite(t *testing.T) {
suite.Run(t, new(LeaderObserverTestSuite))
}

View File

@ -51,36 +51,48 @@ type TargetObserver struct {
distMgr *meta.DistributionManager
broker meta.Broker
initChan chan initRequest
manualCheck chan checkRequest
nextTargetLastUpdate map[int64]time.Time
initChan chan initRequest
manualCheck chan checkRequest
// nextTargetLastUpdate map[int64]time.Time
nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time]
updateChan chan targetUpdateRequest
mut sync.Mutex // Guard readyNotifiers
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers
dispatcher *taskDispatcher[int64]
stopOnce sync.Once
}
func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *meta.DistributionManager, broker meta.Broker) *TargetObserver {
return &TargetObserver{
result := &TargetObserver{
meta: meta,
targetMgr: targetMgr,
distMgr: distMgr,
broker: broker,
manualCheck: make(chan checkRequest, 10),
nextTargetLastUpdate: make(map[int64]time.Time),
nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](),
updateChan: make(chan targetUpdateRequest),
readyNotifiers: make(map[int64][]chan struct{}),
initChan: make(chan initRequest),
}
dispatcher := newTaskDispatcher(result.check)
result.dispatcher = dispatcher
return result
}
func (ob *TargetObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.dispatcher.Start()
ob.wg.Add(1)
go ob.schedule(ctx)
go func() {
defer ob.wg.Done()
ob.schedule(ctx)
}()
// after target observer start, update target for all collection
ob.initChan <- initRequest{}
@ -92,11 +104,12 @@ func (ob *TargetObserver) Stop() {
ob.cancel()
}
ob.wg.Wait()
ob.dispatcher.Stop()
})
}
func (ob *TargetObserver) schedule(ctx context.Context) {
defer ob.wg.Done()
log.Info("Start update next target loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.UpdateNextTargetInterval.GetAsDuration(time.Second))
@ -111,16 +124,11 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
for _, collectionID := range ob.meta.GetAll() {
ob.init(collectionID)
}
log.Info("target observer init done")
case <-ticker.C:
ob.clean()
for _, collectionID := range ob.meta.GetAll() {
ob.check(collectionID)
}
case req := <-ob.manualCheck:
ob.check(req.CollectionID)
req.Notifier <- ob.targetMgr.IsCurrentTargetExist(req.CollectionID)
ob.dispatcher.AddTask(ob.meta.GetAll()...)
case req := <-ob.updateChan:
err := ob.updateNextTarget(req.CollectionID)
@ -137,26 +145,17 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
}
}
// Check checks whether the next target is ready,
// and updates the current target if it is,
// returns true if current target is not nil
// Check whether provided collection is has current target.
// If not, submit a async task into dispatcher.
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool {
notifier := make(chan bool)
select {
case ob.manualCheck <- checkRequest{CollectionID: collectionID, Notifier: notifier}:
case <-ctx.Done():
return false
}
select {
case result := <-notifier:
return result
case <-ctx.Done():
return false
result := ob.targetMgr.IsCurrentTargetExist(collectionID)
if !result {
ob.dispatcher.AddTask(collectionID)
}
return result
}
func (ob *TargetObserver) check(collectionID int64) {
func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
if !ob.meta.Exist(collectionID) {
ob.ReleaseCollection(collectionID)
ob.targetMgr.RemoveCollection(collectionID)
@ -215,11 +214,12 @@ func (ob *TargetObserver) ReleaseCollection(collectionID int64) {
func (ob *TargetObserver) clean() {
collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...)
// for collection which has been removed from target, try to clear nextTargetLastUpdate
for collection := range ob.nextTargetLastUpdate {
if !collectionSet.Contain(collection) {
delete(ob.nextTargetLastUpdate, collection)
ob.nextTargetLastUpdate.Range(func(collectionID int64, _ time.Time) bool {
if !collectionSet.Contain(collectionID) {
ob.nextTargetLastUpdate.Remove(collectionID)
}
}
return true
})
ob.mut.Lock()
defer ob.mut.Unlock()
@ -238,7 +238,11 @@ func (ob *TargetObserver) shouldUpdateNextTarget(collectionID int64) bool {
}
func (ob *TargetObserver) isNextTargetExpired(collectionID int64) bool {
return time.Since(ob.nextTargetLastUpdate[collectionID]) > params.Params.QueryCoordCfg.NextTargetSurviveTime.GetAsDuration(time.Second)
lastUpdated, has := ob.nextTargetLastUpdate.Get(collectionID)
if !has {
return true
}
return time.Since(lastUpdated) > params.Params.QueryCoordCfg.NextTargetSurviveTime.GetAsDuration(time.Second)
}
func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
@ -256,7 +260,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
}
func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) {
ob.nextTargetLastUpdate[collectionID] = time.Now()
ob.nextTargetLastUpdate.Insert(collectionID, time.Now())
}
func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {

View File

@ -273,41 +273,10 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
suite.NoError(err)
}
func (suite *TargetObserverCheckSuite) TestCheckCtxDone() {
observer := suite.observer
suite.Run("check_channel_blocked", func() {
oldCh := observer.manualCheck
defer func() {
observer.manualCheck = oldCh
}()
// zero-length channel
observer.manualCheck = make(chan checkRequest)
ctx, cancel := context.WithCancel(context.Background())
// cancel context, make test return fast
cancel()
result := observer.Check(ctx, suite.collectionID)
suite.False(result)
})
suite.Run("check_return_ctx_timeout", func() {
oldCh := observer.manualCheck
defer func() {
observer.manualCheck = oldCh
}()
// make channel length = 1, task received
observer.manualCheck = make(chan checkRequest, 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
result := observer.Check(ctx, suite.collectionID)
suite.False(result)
})
func (s *TargetObserverCheckSuite) TestCheck() {
r := s.observer.Check(context.Background(), s.collectionID)
s.False(r)
s.True(s.observer.dispatcher.tasks.Contain(s.collectionID))
}
func TestTargetObserver(t *testing.T) {

View File

@ -0,0 +1,104 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package observers
import (
"context"
"sync"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// taskDispatcher is the utility to provide task dedup and dispatch feature
type taskDispatcher[K comparable] struct {
tasks *typeutil.ConcurrentSet[K]
pool *conc.Pool[any]
notifyCh chan struct{}
taskRunner task[K]
wg sync.WaitGroup
cancel context.CancelFunc
stopOnce sync.Once
}
type task[K comparable] func(context.Context, K)
func newTaskDispatcher[K comparable](runner task[K]) *taskDispatcher[K] {
return &taskDispatcher[K]{
tasks: typeutil.NewConcurrentSet[K](),
pool: conc.NewPool[any](paramtable.Get().QueryCoordCfg.ObserverTaskParallel.GetAsInt()),
notifyCh: make(chan struct{}, 1),
taskRunner: runner,
}
}
func (d *taskDispatcher[K]) Start() {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.schedule(ctx)
}()
}
func (d *taskDispatcher[K]) Stop() {
d.stopOnce.Do(func() {
if d.cancel != nil {
d.cancel()
}
d.wg.Wait()
})
}
func (d *taskDispatcher[K]) AddTask(keys ...K) {
var added bool
for _, key := range keys {
added = added || d.tasks.Insert(key)
}
if added {
d.notify()
}
}
func (d *taskDispatcher[K]) notify() {
select {
case d.notifyCh <- struct{}{}:
default:
}
}
func (d *taskDispatcher[K]) schedule(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-d.notifyCh:
d.tasks.Range(func(k K) bool {
d.tasks.Insert(k)
d.pool.Submit(func() (any, error) {
d.taskRunner(ctx, k)
d.tasks.Remove(k)
return struct{}{}, nil
})
return true
})
}
}
}

View File

@ -1204,6 +1204,7 @@ type queryCoordConfig struct {
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
CollectionRecoverTimesLimit ParamItem `refreshable:"true"`
ObserverTaskParallel ParamItem `refreshable:"false"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -1515,6 +1516,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: true,
}
p.CollectionRecoverTimesLimit.Init(base.mgr)
p.ObserverTaskParallel = ParamItem{
Key: "queryCoord.observerTaskParallel",
Version: "2.3.2",
DefaultValue: "16",
PanicIfEmpty: true,
Doc: "the parallel observer dispatcher task number",
Export: true,
}
p.ObserverTaskParallel.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////