fix qc service unstable ut (#24340)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/24334/head
wei liu 2023-05-24 18:49:25 +08:00 committed by GitHub
parent b06e88153c
commit 8e3ba74648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 7 deletions

View File

@ -1409,11 +1409,16 @@ func (suite *ServiceSuite) TestGetReplicasFailed() {
ctx := context.Background()
server := suite.server
suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100001, 100000, []int64{}))
suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100002, 100000, []int64{1}))
replicas := suite.meta.ReplicaManager.GetByCollection(suite.collections[0])
for _, replica := range replicas {
suite.updateSegmentDist(suite.collections[0], replica.GetNodes()[0])
}
suite.updateChannelDist(suite.collections[0])
suite.meta.ReplicaManager.Put(utils.CreateTestReplica(100001, suite.collections[0], []int64{}))
req := &milvuspb.GetReplicasRequest{
CollectionID: 100000,
CollectionID: suite.collections[0],
WithShardNodes: true,
}
resp, err := server.GetReplicas(ctx, req)

View File

@ -26,17 +26,19 @@ import (
type refresher struct {
refreshInterval time.Duration
intervalDone chan bool
intervalDone chan struct{}
intervalInitOnce sync.Once
eh EventHandler
fetchFunc func() error
stopOnce sync.Once
wg sync.WaitGroup
}
func newRefresher(interval time.Duration, fetchFunc func() error) *refresher {
return &refresher{
refreshInterval: interval,
intervalDone: make(chan bool, 1),
intervalDone: make(chan struct{}),
fetchFunc: fetchFunc,
}
}
@ -44,16 +46,21 @@ func newRefresher(interval time.Duration, fetchFunc func() error) *refresher {
func (r *refresher) start(name string) {
if r.refreshInterval > 0 {
r.intervalInitOnce.Do(func() {
r.wg.Add(1)
go r.refreshPeriodically(name)
})
}
}
func (r *refresher) stop() {
r.intervalDone <- true
r.stopOnce.Do(func() {
close(r.intervalDone)
r.wg.Wait()
})
}
func (r *refresher) refreshPeriodically(name string) {
defer r.wg.Done()
ticker := time.NewTicker(r.refreshInterval)
defer ticker.Stop()
log.Info("start refreshing configurations", zap.String("source", name))
@ -63,7 +70,7 @@ func (r *refresher) refreshPeriodically(name string) {
err := r.fetchFunc()
if err != nil {
log.Error("can not pull configs", zap.Error(err))
r.intervalDone <- true
r.stop()
}
case <-r.intervalDone:
log.Info("stop refreshing configurations")