mirror of https://github.com/milvus-io/milvus.git
				
				
				
			fix: Make sure querycoord observers started once (#35811)
Related to #35809 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/35815/head
							parent
							
								
									70bc0e4538
								
							
						
					
					
						commit
						09ef3f1b4f
					
				| 
						 | 
				
			
			@ -56,7 +56,8 @@ type CollectionObserver struct {
 | 
			
		|||
 | 
			
		||||
	proxyManager proxyutil.ProxyClientManagerInterface
 | 
			
		||||
 | 
			
		||||
	stopOnce sync.Once
 | 
			
		||||
	startOnce sync.Once
 | 
			
		||||
	stopOnce  sync.Once
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type LoadTask struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -94,27 +95,29 @@ func NewCollectionObserver(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (ob *CollectionObserver) Start() {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	ob.cancel = cancel
 | 
			
		||||
	ob.startOnce.Do(func() {
 | 
			
		||||
		ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
		ob.cancel = cancel
 | 
			
		||||
 | 
			
		||||
	observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond)
 | 
			
		||||
	ob.wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer ob.wg.Done()
 | 
			
		||||
		observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond)
 | 
			
		||||
		ob.wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer ob.wg.Done()
 | 
			
		||||
 | 
			
		||||
		ticker := time.NewTicker(observePeriod)
 | 
			
		||||
		defer ticker.Stop()
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-ctx.Done():
 | 
			
		||||
				log.Info("CollectionObserver stopped")
 | 
			
		||||
				return
 | 
			
		||||
			ticker := time.NewTicker(observePeriod)
 | 
			
		||||
			defer ticker.Stop()
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case <-ctx.Done():
 | 
			
		||||
					log.Info("CollectionObserver stopped")
 | 
			
		||||
					return
 | 
			
		||||
 | 
			
		||||
			case <-ticker.C:
 | 
			
		||||
				ob.Observe(ctx)
 | 
			
		||||
				case <-ticker.C:
 | 
			
		||||
					ob.Observe(ctx)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
		}()
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ob *CollectionObserver) Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,6 +36,7 @@ type CollectionShardLeaderCache = map[string]*querypb.ShardLeadersList
 | 
			
		|||
type LeaderCacheObserver struct {
 | 
			
		||||
	wg           sync.WaitGroup
 | 
			
		||||
	proxyManager proxyutil.ProxyClientManagerInterface
 | 
			
		||||
	startOnce    sync.Once
 | 
			
		||||
	stopOnce     sync.Once
 | 
			
		||||
	closeCh      chan struct{}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -44,8 +45,10 @@ type LeaderCacheObserver struct {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (o *LeaderCacheObserver) Start(ctx context.Context) {
 | 
			
		||||
	o.wg.Add(1)
 | 
			
		||||
	go o.schedule(ctx)
 | 
			
		||||
	o.startOnce.Do(func() {
 | 
			
		||||
		o.wg.Add(1)
 | 
			
		||||
		go o.schedule(ctx)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *LeaderCacheObserver) Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,7 +37,8 @@ type ReplicaObserver struct {
 | 
			
		|||
	meta    *meta.Meta
 | 
			
		||||
	distMgr *meta.DistributionManager
 | 
			
		||||
 | 
			
		||||
	stopOnce sync.Once
 | 
			
		||||
	startOnce sync.Once
 | 
			
		||||
	stopOnce  sync.Once
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *ReplicaObserver {
 | 
			
		||||
| 
						 | 
				
			
			@ -48,11 +49,13 @@ func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *Rep
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (ob *ReplicaObserver) Start() {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	ob.cancel = cancel
 | 
			
		||||
	ob.startOnce.Do(func() {
 | 
			
		||||
		ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
		ob.cancel = cancel
 | 
			
		||||
 | 
			
		||||
	ob.wg.Add(1)
 | 
			
		||||
	go ob.schedule(ctx)
 | 
			
		||||
		ob.wg.Add(1)
 | 
			
		||||
		go ob.schedule(ctx)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ob *ReplicaObserver) Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,7 +36,8 @@ type ResourceObserver struct {
 | 
			
		|||
	wg     sync.WaitGroup
 | 
			
		||||
	meta   *meta.Meta
 | 
			
		||||
 | 
			
		||||
	stopOnce sync.Once
 | 
			
		||||
	startOnce sync.Once
 | 
			
		||||
	stopOnce  sync.Once
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewResourceObserver(meta *meta.Meta) *ResourceObserver {
 | 
			
		||||
| 
						 | 
				
			
			@ -46,11 +47,13 @@ func NewResourceObserver(meta *meta.Meta) *ResourceObserver {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (ob *ResourceObserver) Start() {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	ob.cancel = cancel
 | 
			
		||||
	ob.startOnce.Do(func() {
 | 
			
		||||
		ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
		ob.cancel = cancel
 | 
			
		||||
 | 
			
		||||
	ob.wg.Add(1)
 | 
			
		||||
	go ob.schedule(ctx)
 | 
			
		||||
		ob.wg.Add(1)
 | 
			
		||||
		go ob.schedule(ctx)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ob *ResourceObserver) Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -71,7 +71,8 @@ type TargetObserver struct {
 | 
			
		|||
	dispatcher *taskDispatcher[int64]
 | 
			
		||||
	keylocks   *lock.KeyLock[int64]
 | 
			
		||||
 | 
			
		||||
	stopOnce sync.Once
 | 
			
		||||
	startOnce sync.Once
 | 
			
		||||
	stopOnce  sync.Once
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewTargetObserver(
 | 
			
		||||
| 
						 | 
				
			
			@ -101,19 +102,21 @@ func NewTargetObserver(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (ob *TargetObserver) Start() {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	ob.cancel = cancel
 | 
			
		||||
	ob.startOnce.Do(func() {
 | 
			
		||||
		ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
		ob.cancel = cancel
 | 
			
		||||
 | 
			
		||||
	ob.dispatcher.Start()
 | 
			
		||||
		ob.dispatcher.Start()
 | 
			
		||||
 | 
			
		||||
	ob.wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer ob.wg.Done()
 | 
			
		||||
		ob.schedule(ctx)
 | 
			
		||||
	}()
 | 
			
		||||
		ob.wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer ob.wg.Done()
 | 
			
		||||
			ob.schedule(ctx)
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
	// after target observer start, update target for all collection
 | 
			
		||||
	ob.initChan <- initRequest{}
 | 
			
		||||
		// after target observer start, update target for all collection
 | 
			
		||||
		ob.initChan <- initRequest{}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ob *TargetObserver) Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1084,8 +1084,6 @@ func (suite *ServiceSuite) TestReleasePartition() {
 | 
			
		|||
func (suite *ServiceSuite) TestRefreshCollection() {
 | 
			
		||||
	server := suite.server
 | 
			
		||||
 | 
			
		||||
	server.collectionObserver.Start()
 | 
			
		||||
 | 
			
		||||
	// Test refresh all collections.
 | 
			
		||||
	for _, collection := range suite.collections {
 | 
			
		||||
		err := server.refreshCollection(collection)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue