mirror of https://github.com/milvus-io/milvus.git
Add timeout for keepalive in session (#25077)
Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/25139/head
parent
2d029b7fd3
commit
b752a29995
|
@ -86,6 +86,7 @@ type Session struct {
|
|||
// When outside context done, Session cancels its goroutines first, then uses
|
||||
// keepAliveCancel to cancel the etcd KeepAlive
|
||||
keepAliveCancel context.CancelFunc
|
||||
keepAliveCtx context.Context
|
||||
|
||||
ServerID int64 `json:"ServerID,omitempty"`
|
||||
ServerName string `json:"ServerName,omitempty"`
|
||||
|
@ -429,6 +430,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
|
|||
log.Debug("put session key into etcd", zap.String("key", completeKey), zap.String("value", string(sessionJSON)))
|
||||
|
||||
keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background())
|
||||
s.keepAliveCtx = keepAliveCtx
|
||||
s.keepAliveCancel = keepAliveCancel
|
||||
ch, err = s.etcdCli.KeepAlive(keepAliveCtx, resp.ID)
|
||||
if err != nil {
|
||||
|
@ -469,15 +471,33 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
|
|||
log.Info("start try to KeepAliveOnce", zap.String("serverName", s.ServerName))
|
||||
s.retryKeepAlive.Store(true)
|
||||
// have to KeepAliveOnce before KeepAlive because KeepAlive won't throw error even when lease OT
|
||||
keepAliveOnceResp, err := s.etcdCli.KeepAliveOnce(s.ctx, *s.leaseID)
|
||||
var keepAliveOnceResp *clientv3.LeaseKeepAliveResponse
|
||||
s.keepAliveCtx.Done()
|
||||
s.keepAliveCtx = context.Background()
|
||||
err := retry.Do(s.keepAliveCtx, func() error {
|
||||
ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*5)
|
||||
defer cancel()
|
||||
resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.leaseID)
|
||||
keepAliveOnceResp = resp
|
||||
return err
|
||||
}, retry.Attempts(3))
|
||||
|
||||
if err != nil {
|
||||
// error="etcdserver: requested lease not found"
|
||||
log.Warn("fail to keepAliveOnce", zap.Error(err))
|
||||
log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Error(err))
|
||||
s.safeCloseLiveCh()
|
||||
return
|
||||
}
|
||||
log.Info("succeed to KeepAliveOnce", zap.Any("resp", keepAliveOnceResp))
|
||||
chNew, err := s.etcdCli.KeepAlive(s.ctx, *s.leaseID)
|
||||
log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Any("resp", keepAliveOnceResp))
|
||||
|
||||
var chNew <-chan *clientv3.LeaseKeepAliveResponse
|
||||
err = retry.Do(s.keepAliveCtx, func() error {
|
||||
ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*5)
|
||||
defer cancel()
|
||||
ch, err := s.etcdCli.KeepAlive(ctx, *s.leaseID)
|
||||
chNew = ch
|
||||
return err
|
||||
}, retry.Attempts(3))
|
||||
|
||||
if err != nil {
|
||||
log.Warn("fail to retry keepAlive", zap.Error(err))
|
||||
s.safeCloseLiveCh()
|
||||
|
|
Loading…
Reference in New Issue