diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 2bc9528020..b7db42a756 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1409,11 +1409,16 @@ func (suite *ServiceSuite) TestGetReplicasFailed() { ctx := context.Background() server := suite.server - suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100001, 100000, []int64{})) - suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100002, 100000, []int64{1})) + replicas := suite.meta.ReplicaManager.GetByCollection(suite.collections[0]) + for _, replica := range replicas { + suite.updateSegmentDist(suite.collections[0], replica.GetNodes()[0]) + } + suite.updateChannelDist(suite.collections[0]) + + suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100001, suite.collections[0], []int64{})) req := &milvuspb.GetReplicasRequest{ - CollectionID: 100000, + CollectionID: suite.collections[0], WithShardNodes: true, } resp, err := server.GetReplicas(ctx, req) diff --git a/pkg/config/refresher.go b/pkg/config/refresher.go index 6b4d237e9e..4b49796fd3 100644 --- a/pkg/config/refresher.go +++ b/pkg/config/refresher.go @@ -26,17 +26,19 @@ import ( type refresher struct { refreshInterval time.Duration - intervalDone chan bool + intervalDone chan struct{} intervalInitOnce sync.Once eh EventHandler fetchFunc func() error + stopOnce sync.Once + wg sync.WaitGroup } func newRefresher(interval time.Duration, fetchFunc func() error) *refresher { return &refresher{ refreshInterval: interval, - intervalDone: make(chan bool, 1), + intervalDone: make(chan struct{}), fetchFunc: fetchFunc, } } @@ -44,16 +46,21 @@ func newRefresher(interval time.Duration, fetchFunc func() error) *refresher { func (r *refresher) start(name string) { if r.refreshInterval > 0 { r.intervalInitOnce.Do(func() { + r.wg.Add(1) go r.refreshPeriodically(name) }) } } func (r *refresher) stop() { - r.intervalDone <- true + r.stopOnce.Do(func() { + close(r.intervalDone) + r.wg.Wait() + }) } func (r *refresher) refreshPeriodically(name string) { + defer r.wg.Done() ticker := time.NewTicker(r.refreshInterval) defer ticker.Stop() log.Info("start refreshing configurations", zap.String("source", name)) @@ -63,7 +70,7 @@ func (r *refresher) refreshPeriodically(name string) { err := r.fetchFunc() if err != nil { log.Error("can not pull configs", zap.Error(err)) - r.intervalDone <- true + r.stop() } case <-r.intervalDone: log.Info("stop refreshing configurations")