From b752a299953bf7cddd0e4e894d22a0757b573868 Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 26 Jun 2023 12:30:44 +0800 Subject: [PATCH] Add timeout for keepalive in session (#25077) Signed-off-by: wayblink --- internal/util/sessionutil/session_util.go | 30 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 57e89b38a8..a55ecc9923 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -86,6 +86,7 @@ type Session struct { // When outside context done, Session cancels its goroutines first, then uses // keepAliveCancel to cancel the etcd KeepAlive keepAliveCancel context.CancelFunc + keepAliveCtx context.Context ServerID int64 `json:"ServerID,omitempty"` ServerName string `json:"ServerName,omitempty"` @@ -429,6 +430,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er log.Debug("put session key into etcd", zap.String("key", completeKey), zap.String("value", string(sessionJSON))) keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background()) + s.keepAliveCtx = keepAliveCtx s.keepAliveCancel = keepAliveCancel ch, err = s.etcdCli.KeepAlive(keepAliveCtx, resp.ID) if err != nil { @@ -469,15 +471,33 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes log.Info("start try to KeepAliveOnce", zap.String("serverName", s.ServerName)) s.retryKeepAlive.Store(true) // have to KeepAliveOnce before KeepAlive because KeepAlive won't throw error even when lease OT - keepAliveOnceResp, err := s.etcdCli.KeepAliveOnce(s.ctx, *s.leaseID) + var keepAliveOnceResp *clientv3.LeaseKeepAliveResponse + s.keepAliveCtx.Done() + s.keepAliveCtx = context.Background() + err := retry.Do(s.keepAliveCtx, func() error { + ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*5) + defer cancel() + resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.leaseID) + keepAliveOnceResp = resp + return err + }, retry.Attempts(3)) + if err != nil { - // error="etcdserver: requested lease not found" - log.Warn("fail to keepAliveOnce", zap.Error(err)) + log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Error(err)) s.safeCloseLiveCh() return } - log.Info("succeed to KeepAliveOnce", zap.Any("resp", keepAliveOnceResp)) - chNew, err := s.etcdCli.KeepAlive(s.ctx, *s.leaseID) + log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Any("resp", keepAliveOnceResp)) + + var chNew <-chan *clientv3.LeaseKeepAliveResponse + err = retry.Do(s.keepAliveCtx, func() error { + ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*5) + defer cancel() + ch, err := s.etcdCli.KeepAlive(ctx, *s.leaseID) + chNew = ch + return err + }, retry.Attempts(3)) + if err != nil { log.Warn("fail to retry keepAlive", zap.Error(err)) s.safeCloseLiveCh()