From f0d0651989f02ea647a57c39392dfc62098575f6 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 13 Sep 2023 15:01:18 +0800 Subject: [PATCH] Do not reset connection immediately if grpc code is `Canceled` or `DeadlineExceeded` (#27014) We found lots of connection reset & canceled due to recent retry change Current implementation resets connection no matter what the error code is To sync behavior to previous retry, skip reset connection only if cancel error happens too much. Also adds a config item for minResetInterval for grpc reset connection Signed-off-by: Congqi Xia --- .../distributed/datanode/client/client.go | 3 +- .../distributed/indexnode/client/client.go | 3 +- internal/distributed/proxy/client/client.go | 3 +- .../distributed/querynode/client/client.go | 3 +- internal/util/grpcclient/client.go | 144 +++++++++++++----- internal/util/grpcclient/client_test.go | 20 ++- internal/util/sessionutil/session_util.go | 5 +- pkg/util/paramtable/grpc_param.go | 69 ++++++++- pkg/util/paramtable/grpc_param_test.go | 18 +++ 9 files changed, 223 insertions(+), 45 deletions(-) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index bbaaa16a86..7c571827da 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -50,7 +50,8 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) addr: addr, grpcClient: grpcclient.NewClientBase[datapb.DataNodeClient](config, "milvus.proto.data.DataNode"), } - client.grpcClient.SetRole(typeutil.DataNodeRole) + // node shall specify node id + client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 999f6dddc9..41f7aaba13 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -51,7 +51,8 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool) addr: addr, grpcClient: grpcclient.NewClientBase[indexpb.IndexNodeClient](config, "milvus.proto.index.IndexNode"), } - client.grpcClient.SetRole(typeutil.IndexNodeRole) + // node shall specify node id + client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.IndexNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 18f3249994..c33e123d25 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -50,7 +50,8 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) addr: addr, grpcClient: grpcclient.NewClientBase[proxypb.ProxyClient](config, "milvus.proto.proxy.Proxy"), } - client.grpcClient.SetRole(typeutil.ProxyRole) + // node shall specify node id + client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.ProxyRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index eaf3cc602c..e8853db7d8 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -50,7 +50,8 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) addr: addr, grpcClient: grpcclient.NewClientBase[querypb.QueryNodeClient](config, "milvus.proto.query.QueryNode"), } - client.grpcClient.SetRole(typeutil.QueryNodeRole) + // node shall specify node id + client.grpcClient.SetRole(fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)) client.grpcClient.SetGetAddrFunc(client.getAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 7c95b0c668..838e8c73dd 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -43,6 +43,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" @@ -91,24 +92,38 @@ type ClientBase[T interface { MaxAttempts int InitialBackoff float64 MaxBackoff float64 - NodeID atomic.Int64 - sess *sessionutil.Session + // resetInterval is the minimal duration to reset connection + minResetInterval time.Duration + lastReset atomic.Time + // sessionCheckInterval is the minmal duration to check session, preventing too much etcd pulll + minSessionCheckInterval time.Duration + lastSessionCheck atomic.Time + + // counter for canceled or deadline exceeded + ctxCounter atomic.Int32 + maxCancelError int32 + + NodeID atomic.Int64 + sess *sessionutil.Session } func NewClientBase[T interface { GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) }](config *paramtable.GrpcClientConfig, serviceName string) *ClientBase[T] { return &ClientBase[T]{ - ClientMaxRecvSize: config.ClientMaxRecvSize.GetAsInt(), - ClientMaxSendSize: config.ClientMaxSendSize.GetAsInt(), - DialTimeout: config.DialTimeout.GetAsDuration(time.Millisecond), - KeepAliveTime: config.KeepAliveTime.GetAsDuration(time.Millisecond), - KeepAliveTimeout: config.KeepAliveTimeout.GetAsDuration(time.Millisecond), - RetryServiceNameConfig: serviceName, - MaxAttempts: config.MaxAttempts.GetAsInt(), - InitialBackoff: config.InitialBackoff.GetAsFloat(), - MaxBackoff: config.MaxBackoff.GetAsFloat(), - CompressionEnabled: config.CompressionEnabled.GetAsBool(), + ClientMaxRecvSize: config.ClientMaxRecvSize.GetAsInt(), + ClientMaxSendSize: config.ClientMaxSendSize.GetAsInt(), + DialTimeout: config.DialTimeout.GetAsDuration(time.Millisecond), + KeepAliveTime: config.KeepAliveTime.GetAsDuration(time.Millisecond), + KeepAliveTimeout: config.KeepAliveTimeout.GetAsDuration(time.Millisecond), + RetryServiceNameConfig: serviceName, + MaxAttempts: config.MaxAttempts.GetAsInt(), + InitialBackoff: config.InitialBackoff.GetAsFloat(), + MaxBackoff: config.MaxBackoff.GetAsFloat(), + CompressionEnabled: config.CompressionEnabled.GetAsBool(), + minResetInterval: config.MinResetInterval.GetAsDuration(time.Millisecond), + minSessionCheckInterval: config.MinSessionCheckInterval.GetAsDuration(time.Millisecond), + maxCancelError: config.MaxCancelError.GetAsInt32(), } } @@ -167,8 +182,14 @@ func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (T, error) { } func (c *ClientBase[T]) resetConnection(client T) { + if time.Since(c.lastReset.Load()) < c.minResetInterval { + return + } c.grpcClientMtx.Lock() defer c.grpcClientMtx.Unlock() + if time.Since(c.lastReset.Load()) < c.minResetInterval { + return + } if generic.IsZero(c.grpcClient) { return } @@ -181,6 +202,7 @@ func (c *ClientBase[T]) resetConnection(client T) { c.conn = nil c.addr.Store("") c.grpcClient = generic.Zero[T]() + c.lastReset.Store(time.Now()) } func (c *ClientBase[T]) connect(ctx context.Context) error { @@ -291,10 +313,72 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { c.conn = conn c.addr.Store(addr) + c.ctxCounter.Store(0) c.grpcClient = c.newGrpcClient(c.conn) return nil } +func (c *ClientBase[T]) verifySession(ctx context.Context) error { + if funcutil.CheckCtxValid(ctx) { + return nil + } + log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole())) + if time.Since(c.lastSessionCheck.Load()) < c.minSessionCheckInterval { + log.Debug("skip session check, verify too frequent") + return nil + } + c.lastSessionCheck.Store(time.Now()) + if c.sess != nil { + sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) + if getSessionErr != nil { + // Only log but not handle this error as it is an auxiliary logic + log.Warn("fail to get session", zap.Error(getSessionErr)) + } + if coordSess, exist := sessions[c.GetRole()]; exist { + if c.GetNodeID() != coordSess.ServerID { + log.Warn("server id mismatch, may connected to a old server, start to reset connection", + zap.Int64("client_node", c.GetNodeID()), + zap.Int64("current_node", coordSess.ServerID)) + return merr.WrapErrNodeNotMatch(c.GetNodeID(), coordSess.ServerID) + } + } else { + return merr.WrapErrNodeNotFound(c.GetNodeID(), "session not found", c.GetRole()) + } + } + return nil +} + +func (c *ClientBase[T]) recordCtxError() (needReset bool) { + val := c.ctxCounter.Add(1) + if val > c.maxCancelError { + c.ctxCounter.Store(0) + return true + } + return false +} + +func (c *ClientBase[T]) checkErr(ctx context.Context, err error) (needRetry, needReset bool) { + log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole())) + switch { + case funcutil.IsGrpcErr(err): + // grpc err + log.Warn("call received grpc error", zap.Error(err)) + if funcutil.IsGrpcErr(err, codes.Canceled, codes.DeadlineExceeded) { + // canceled or deadline exceeded + return c.recordCtxError(), false + } + return true, true + case IsServerIDMismatchErr(err): + fallthrough + case IsCrossClusterRoutingErr(err): + return true, true + default: + log.Warn("fail to grpc call because of unknown error", zap.Error(err)) + // Unknown err + return false, false + } +} + func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, error)) (any, error) { log := log.Ctx(ctx).With(zap.String("client_role", c.GetRole())) var ( @@ -328,33 +412,25 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er } ret, callErr = caller(client) if callErr != nil { - if funcutil.IsGrpcErr(callErr) || - IsCrossClusterRoutingErr(callErr) || IsServerIDMismatchErr(callErr) { + needRetry, needReset := c.checkErr(ctx, callErr) + if !needRetry { + // stop retry + callErr = retry.Unrecoverable(callErr) + } + if needReset { log.Warn("start to reset connection because of specific reasons", zap.Error(callErr)) resetClientFunc() - return callErr - } - if !funcutil.CheckCtxValid(ctx) { - if c.sess != nil { - sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) - if getSessionErr != nil { - // Only log but not handle this error as it is an auxiliary logic - log.Warn("fail to get session", zap.Error(getSessionErr)) - } - if coordSess, exist := sessions[c.GetRole()]; exist { - if c.GetNodeID() != coordSess.ServerID { - log.Warn("server id mismatch, may connected to a old server, start to reset connection", - zap.Int64("client_node", c.GetNodeID()), zap.Int64("current_node", coordSess.ServerID)) - resetClientFunc() - return callErr - } - } + } else { + err := c.verifySession(ctx) + if err != nil { + log.Warn("failed to verify session, reset connection", zap.Error(err)) + resetClientFunc() } } - log.Warn("fail to grpc call because of unknown error", zap.Error(callErr)) - // not rpc error, it will stop to retry - return retry.Unrecoverable(callErr) + return callErr } + // reset counter + c.ctxCounter.Store(0) var status *commonpb.Status switch res := ret.(type) { diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 280d38f116..6214c658fb 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -169,11 +169,11 @@ func testCall(t *testing.T, compressed bool) { base.grpcClientMtx.RUnlock() }) - t.Run("Call returns grpc error", func(t *testing.T) { + t.Run("Call returns Unavailable grpc error", func(t *testing.T) { initClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - errGrpc := status.Error(codes.Unknown, "mocked") + errGrpc := status.Error(codes.Unavailable, "mocked") _, err := base.Call(ctx, func(client *mockClient) (any, error) { return nil, errGrpc }) @@ -184,7 +184,23 @@ func testCall(t *testing.T, compressed bool) { // client shall not be reset assert.Nil(t, base.grpcClient) base.grpcClientMtx.RUnlock() + }) + t.Run("Call returns canceled grpc error", func(t *testing.T) { + initClient() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errGrpc := status.Error(codes.Canceled, "mocked") + _, err := base.Call(ctx, func(client *mockClient) (any, error) { + return nil, errGrpc + }) + + assert.Error(t, err) + assert.True(t, errors.Is(err, errGrpc)) + base.grpcClientMtx.RLock() + // client shall not be reset + assert.NotNil(t, base.grpcClient) + base.grpcClientMtx.RUnlock() }) base.grpcClientMtx.Lock() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 30d1ac1a48..5665f43e83 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -560,9 +560,10 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) return nil, 0, err } _, mapKey := path.Split(string(kv.Key)) - log.Debug("SessionUtil GetSessions ", zap.Any("prefix", prefix), + log.Debug("SessionUtil GetSessions", + zap.String("prefix", prefix), zap.String("key", mapKey), - zap.Any("address", session.Address)) + zap.String("address", session.Address)) res[mapKey] = session } return res, resp.Header.Revision, nil diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index 2371d57feb..e47a67f5e0 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -192,9 +192,12 @@ type GrpcClientConfig struct { KeepAliveTime ParamItem `refreshable:"false"` KeepAliveTimeout ParamItem `refreshable:"false"` - MaxAttempts ParamItem `refreshable:"false"` - InitialBackoff ParamItem `refreshable:"false"` - MaxBackoff ParamItem `refreshable:"false"` + MaxAttempts ParamItem `refreshable:"false"` + InitialBackoff ParamItem `refreshable:"false"` + MaxBackoff ParamItem `refreshable:"false"` + MinResetInterval ParamItem `refreshable:"false"` + MaxCancelError ParamItem `refreshable:"false"` + MinSessionCheckInterval ParamItem `refreshable:"false"` } func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { @@ -390,4 +393,64 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { Export: true, } p.CompressionEnabled.Init(base.mgr) + + p.MinResetInterval = ParamItem{ + Key: "grpc.client.minResetInterval", + DefaultValue: "1000", + Formatter: func(v string) string { + if v == "" { + return "1000" + } + _, err := strconv.Atoi(v) + if err != nil { + log.Warn("Failed to parse grpc.client.minResetInterval, set to default", + zap.String("role", p.Domain), zap.String("grpc.client.minResetInterval", v), + zap.Error(err)) + return "1000" + } + return v + }, + Export: true, + } + p.MinResetInterval.Init(base.mgr) + + p.MinSessionCheckInterval = ParamItem{ + Key: "grpc.client.minSessionCheckInterval", + DefaultValue: "200", + Formatter: func(v string) string { + if v == "" { + return "200" + } + _, err := strconv.Atoi(v) + if err != nil { + log.Warn("Failed to parse grpc.client.minSessionCheckInterval, set to default", + zap.String("role", p.Domain), zap.String("grpc.client.minSessionCheckInterval", v), + zap.Error(err)) + return "200" + } + return v + }, + Export: true, + } + p.MinSessionCheckInterval.Init(base.mgr) + + p.MaxCancelError = ParamItem{ + Key: "grpc.client.maxCancelError", + DefaultValue: "32", + Formatter: func(v string) string { + if v == "" { + return "32" + } + _, err := strconv.Atoi(v) + if err != nil { + log.Warn("Failed to parse grpc.client.maxCancelError, set to default", + zap.String("role", p.Domain), zap.String("grpc.client.maxCancelError", v), + zap.Error(err)) + return "32" + } + return v + }, + Export: true, + } + p.MaxCancelError.Init(base.mgr) } diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index 9baeef30bf..e141025669 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -143,6 +143,24 @@ func TestGrpcClientParams(t *testing.T) { base.Save("grpc.client.CompressionEnabled", "true") assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), true) + assert.Equal(t, clientConfig.MinResetInterval.GetValue(), "1000") + base.Save("grpc.client.minResetInterval", "abc") + assert.Equal(t, clientConfig.MinResetInterval.GetValue(), "1000") + base.Save("grpc.client.minResetInterval", "5000") + assert.Equal(t, clientConfig.MinResetInterval.GetValue(), "5000") + + assert.Equal(t, clientConfig.MinSessionCheckInterval.GetValue(), "200") + base.Save("grpc.client.minSessionCheckInterval", "abc") + assert.Equal(t, clientConfig.MinSessionCheckInterval.GetValue(), "200") + base.Save("grpc.client.minSessionCheckInterval", "500") + assert.Equal(t, clientConfig.MinSessionCheckInterval.GetValue(), "500") + + assert.Equal(t, clientConfig.MaxCancelError.GetValue(), "32") + base.Save("grpc.client.maxCancelError", "abc") + assert.Equal(t, clientConfig.MaxCancelError.GetValue(), "32") + base.Save("grpc.client.maxCancelError", "64") + assert.Equal(t, clientConfig.MaxCancelError.GetValue(), "64") + base.Save("common.security.tlsMode", "1") base.Save("tls.serverPemPath", "/pem") base.Save("tls.serverKeyPath", "/key")