fix: use SafeChan preventing close channel multiple times (#30022)

See also #29935

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30010/head^2
congqixia 2024-01-16 17:34:54 +08:00 committed by GitHub
parent 7cb6bebd96
commit c0f0548702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 5 additions and 4 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -53,7 +54,7 @@ type ProxyWatcher struct {
addSessionsFunc []func(*sessionutil.Session)
delSessionsFunc []func(*sessionutil.Session)
closeCh chan struct{}
closeCh lifetime.SafeChan
}
// NewProxyWatcher helper function to create a proxyWatcher
@ -62,7 +63,7 @@ func NewProxyWatcher(client *clientv3.Client, fns ...func([]*sessionutil.Session
p := &ProxyWatcher{
lock: sync.Mutex{},
etcdCli: client,
closeCh: make(chan struct{}),
closeCh: lifetime.NewSafeChan(),
}
p.initSessionsFunc = append(p.initSessionsFunc, fns...)
return p
@ -121,7 +122,7 @@ func (p *ProxyWatcher) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc
log.Warn("stop watching etcd loop")
return
case <-p.closeCh:
case <-p.closeCh.CloseCh():
log.Warn("stop watching etcd loop")
return
@ -218,6 +219,6 @@ func (p *ProxyWatcher) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se
// Stop stops the ProxyManager
func (p *ProxyWatcher) Stop() {
close(p.closeCh)
p.closeCh.Close()
p.wg.Wait()
}