fix component not exit when liveness check failed (#27236)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/27329/head
wei liu 2023-09-22 19:13:25 +08:00 committed by GitHub
parent 4b12cb8847
commit 9433a24f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 23 deletions

View File

@ -372,10 +372,10 @@ func (s *Session) initWatchSessionCh(ctx context.Context) error {
err = retry.Do(ctx, func() error {
getResp, err = s.etcdCli.Get(ctx, s.getSessionKey())
log.Warn("fail to get the session key from the etcd", zap.Error(err))
return err
}, retry.Attempts(uint(s.sessionRetryTimes)))
if err != nil {
log.Warn("fail to get the session key from the etcd", zap.Error(err))
return err
}
s.watchSessionKeyCh = s.etcdCli.Watch(ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
@ -785,18 +785,31 @@ func (w *sessionWatcher) handleWatchErr(err error) error {
// LivenessCheck performs liveness check with provided context and channel
// ctx controls the liveness check loop
// ch is the liveness signal channel, ch is closed only when the session is expired
// callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context
// callback must be called before liveness check exit, to close the session's owner component
func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
err := s.initWatchSessionCh(ctx)
if err != nil {
log.Error("failed to get session for liveness check", zap.Error(err))
s.cancelKeepAlive()
if callback != nil {
go callback()
}
return
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
if callback != nil {
// before exit liveness check, callback to exit the session owner
defer func() {
if ctx.Err() == nil {
go callback()
}
}()
}
defer s.SetDisconnected(true)
for {
defer s.SetDisconnected(true)
select {
case _, ok := <-s.liveCh:
// ok, still alive
@ -805,9 +818,6 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
}
// not ok, connection lost
log.Warn("connection lost detected, shuting down")
if callback != nil {
go callback()
}
return
case <-ctx.Done():
log.Warn("liveness exits due to context done")

View File

@ -24,6 +24,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.uber.org/atomic"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -194,39 +195,55 @@ func TestSessionLivenessCheck(t *testing.T) {
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
require.NoError(t, err)
s := NewSession(context.Background(), metaRoot, etcdCli)
ctx := context.Background()
s.Register()
ch := make(chan struct{})
s.liveCh = ch
signal := make(chan struct{}, 1)
flag := false
s.LivenessCheck(ctx, func() {
flag = true
flag := atomic.NewBool(false)
s.LivenessCheck(context.Background(), func() {
flag.Store(true)
signal <- struct{}{}
})
assert.False(t, flag.Load())
assert.False(t, flag)
// test liveCh receive event, liveness won't exit, callback won't trigger
ch <- struct{}{}
assert.False(t, flag.Load())
assert.False(t, flag)
// test close liveCh, liveness exit, callback should trigger
close(ch)
<-signal
assert.True(t, flag)
assert.True(t, flag.Load())
ctx, cancel := context.WithCancel(ctx)
cancel()
ch = make(chan struct{})
s.liveCh = ch
flag = false
// test context done, liveness exit, callback shouldn't trigger
metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
s1 := NewSession(context.Background(), metaRoot, etcdCli)
s1.Register()
ctx, cancel := context.WithCancel(context.Background())
flag.Store(false)
s.LivenessCheck(ctx, func() {
flag = true
s1.LivenessCheck(ctx, func() {
flag.Store(true)
signal <- struct{}{}
})
cancel()
assert.False(t, flag.Load())
assert.False(t, flag)
// test context done, liveness start failed, callback should trigger
metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
s2 := NewSession(context.Background(), metaRoot, etcdCli)
s2.Register()
ctx, cancel = context.WithCancel(context.Background())
signal = make(chan struct{}, 1)
flag.Store(false)
cancel()
s2.LivenessCheck(ctx, func() {
flag.Store(true)
signal <- struct{}{}
})
<-signal
assert.True(t, flag.Load())
}
func TestWatcherHandleWatchResp(t *testing.T) {