fix: [2.4] Make sure querycoord observers started once (#35811) (#35817)

Cherry-pick from master
pr: #35811
Related to #35809

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35837/head
congqixia 2024-08-29 19:15:01 +08:00 committed by GitHub
parent 8d3685fadf
commit cfc99e63b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 56 additions and 43 deletions

View File

@ -56,7 +56,8 @@ type CollectionObserver struct {
proxyManager proxyutil.ProxyClientManagerInterface
stopOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
}
type LoadTask struct {
@ -94,27 +95,29 @@ func NewCollectionObserver(
}
func (ob *CollectionObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond)
ob.wg.Add(1)
go func() {
defer ob.wg.Done()
observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond)
ob.wg.Add(1)
go func() {
defer ob.wg.Done()
ticker := time.NewTicker(observePeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("CollectionObserver stopped")
return
ticker := time.NewTicker(observePeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("CollectionObserver stopped")
return
case <-ticker.C:
ob.Observe(ctx)
case <-ticker.C:
ob.Observe(ctx)
}
}
}
}()
}()
})
}
func (ob *CollectionObserver) Stop() {

View File

@ -36,6 +36,7 @@ type CollectionShardLeaderCache = map[string]*querypb.ShardLeadersList
type LeaderCacheObserver struct {
wg sync.WaitGroup
proxyManager proxyutil.ProxyClientManagerInterface
startOnce sync.Once
stopOnce sync.Once
closeCh chan struct{}
@ -44,8 +45,10 @@ type LeaderCacheObserver struct {
}
func (o *LeaderCacheObserver) Start(ctx context.Context) {
o.wg.Add(1)
go o.schedule(ctx)
o.startOnce.Do(func() {
o.wg.Add(1)
go o.schedule(ctx)
})
}
func (o *LeaderCacheObserver) Stop() {

View File

@ -37,7 +37,8 @@ type ReplicaObserver struct {
meta *meta.Meta
distMgr *meta.DistributionManager
stopOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
}
func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *ReplicaObserver {
@ -48,11 +49,13 @@ func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *Rep
}
func (ob *ReplicaObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.wg.Add(1)
go ob.schedule(ctx)
ob.wg.Add(1)
go ob.schedule(ctx)
})
}
func (ob *ReplicaObserver) Stop() {

View File

@ -36,7 +36,8 @@ type ResourceObserver struct {
wg sync.WaitGroup
meta *meta.Meta
stopOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
}
func NewResourceObserver(meta *meta.Meta) *ResourceObserver {
@ -46,11 +47,13 @@ func NewResourceObserver(meta *meta.Meta) *ResourceObserver {
}
func (ob *ResourceObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.wg.Add(1)
go ob.schedule(ctx)
ob.wg.Add(1)
go ob.schedule(ctx)
})
}
func (ob *ResourceObserver) Stop() {

View File

@ -71,7 +71,8 @@ type TargetObserver struct {
dispatcher *taskDispatcher[int64]
keylocks *lock.KeyLock[int64]
stopOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
}
func NewTargetObserver(
@ -101,19 +102,21 @@ func NewTargetObserver(
}
func (ob *TargetObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel
ob.dispatcher.Start()
ob.dispatcher.Start()
ob.wg.Add(1)
go func() {
defer ob.wg.Done()
ob.schedule(ctx)
}()
ob.wg.Add(1)
go func() {
defer ob.wg.Done()
ob.schedule(ctx)
}()
// after target observer start, update target for all collection
ob.initChan <- initRequest{}
// after target observer start, update target for all collection
ob.initChan <- initRequest{}
})
}
func (ob *TargetObserver) Stop() {

View File

@ -1084,8 +1084,6 @@ func (suite *ServiceSuite) TestReleasePartition() {
func (suite *ServiceSuite) TestRefreshCollection() {
server := suite.server
server.collectionObserver.Start()
// Test refresh all collections.
for _, collection := range suite.collections {
err := server.refreshCollection(collection)