mirror of https://github.com/milvus-io/milvus.git
parent
3d2283415f
commit
cd29b863d0
|
@ -23,7 +23,6 @@ import (
|
|||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
|
@ -31,6 +30,7 @@ import (
|
|||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
|
@ -108,7 +108,7 @@ type Session struct {
|
|||
registered atomic.Value
|
||||
disconnected atomic.Value
|
||||
retryKeepAlive atomic.Value
|
||||
enableRetryKeepAlive bool
|
||||
enableRetryKeepAlive *atomic.Bool
|
||||
|
||||
isStandby atomic.Value
|
||||
enableActiveStandBy bool
|
||||
|
@ -209,7 +209,7 @@ func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, o
|
|||
sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(),
|
||||
sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(),
|
||||
reuseNodeID: true,
|
||||
enableRetryKeepAlive: true,
|
||||
enableRetryKeepAlive: atomic.NewBool(true),
|
||||
}
|
||||
|
||||
// integration test create cluster with different nodeId in one process
|
||||
|
@ -464,7 +464,7 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
|
|||
case resp, ok := <-ch:
|
||||
if !ok {
|
||||
log.Warn("session keepalive channel closed")
|
||||
if !s.enableRetryKeepAlive {
|
||||
if !s.enableRetryKeepAlive.Load() {
|
||||
s.safeCloseLiveCh()
|
||||
return
|
||||
}
|
||||
|
@ -784,7 +784,7 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
|
|||
return
|
||||
case <-ctx.Done():
|
||||
log.Debug("liveness exits due to context done")
|
||||
s.enableRetryKeepAlive = false
|
||||
s.enableRetryKeepAlive.Store(false)
|
||||
// cancel the etcd keepAlive context
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
|
@ -900,7 +900,7 @@ func (s *Session) updateStandby(b bool) {
|
|||
}
|
||||
|
||||
func (s *Session) SetEnableRetryKeepAlive(enable bool) {
|
||||
s.enableRetryKeepAlive = enable
|
||||
s.enableRetryKeepAlive.Store(enable)
|
||||
}
|
||||
|
||||
func (s *Session) isRetryingKeepAlive() bool {
|
||||
|
|
|
@ -193,10 +193,7 @@ func TestSessionLivenessCheck(t *testing.T) {
|
|||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
|
||||
require.NoError(t, err)
|
||||
s := &Session{
|
||||
etcdCli: etcdCli,
|
||||
metaRoot: metaRoot,
|
||||
}
|
||||
s := NewSession(context.Background(), metaRoot, etcdCli)
|
||||
ctx := context.Background()
|
||||
ch := make(chan bool)
|
||||
s.liveCh = ch
|
||||
|
|
Loading…
Reference in New Issue