mirror of https://github.com/milvus-io/milvus.git
Add grpc connect backoff policy and reduce the dial timeout (#14227)
issue: #13256 Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/13878/head
parent
7623a648ae
commit
741f9b81b1
|
@ -11,14 +11,20 @@ import (
|
|||
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
const (
|
||||
dialTimeout = 5 * time.Second
|
||||
keepAliveTime = 10 * time.Second
|
||||
keepAliveTimeout = 3 * time.Second
|
||||
)
|
||||
|
||||
// GrpcClient abstracts client of grpc
|
||||
type GrpcClient interface {
|
||||
SetRole(string)
|
||||
|
@ -81,7 +87,7 @@ func (c *ClientBase) GetGrpcClient(ctx context.Context) (interface{}, error) {
|
|||
return c.grpcClient, nil
|
||||
}
|
||||
|
||||
err := c.connect(ctx, retry.Attempts(5))
|
||||
err := c.connect(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -106,61 +112,62 @@ func (c *ClientBase) resetConnection(client interface{}) {
|
|||
c.grpcClient = nil
|
||||
}
|
||||
|
||||
func (c *ClientBase) connect(ctx context.Context, retryOptions ...retry.Option) error {
|
||||
var kacp = keepalive.ClientParameters{
|
||||
Time: 60 * time.Second, // send pings every 60 seconds if there is no activity
|
||||
Timeout: 6 * time.Second, // wait 6 second for ping ack before considering the connection dead
|
||||
PermitWithoutStream: true, // send pings even without active streams
|
||||
}
|
||||
|
||||
var err error
|
||||
var addr string
|
||||
connectServiceFunc := func() error {
|
||||
addr, err = c.getAddrFunc()
|
||||
if err != nil {
|
||||
log.Debug(c.GetRole()+" client getAddr failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
opts := trace.GetInterceptorOpts()
|
||||
ctx1, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
conn, err2 := grpc.DialContext(ctx1, addr,
|
||||
grpc.WithKeepaliveParams(kacp),
|
||||
grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize),
|
||||
grpc.MaxCallSendMsgSize(c.ClientMaxSendSize)),
|
||||
grpc.WithUnaryInterceptor(
|
||||
grpcmiddleware.ChainUnaryClient(
|
||||
grpcretry.UnaryClientInterceptor(
|
||||
grpcretry.WithMax(3),
|
||||
grpcretry.WithCodes(codes.Aborted, codes.Unavailable),
|
||||
),
|
||||
grpcopentracing.UnaryClientInterceptor(opts...),
|
||||
)),
|
||||
grpc.WithStreamInterceptor(
|
||||
grpcmiddleware.ChainStreamClient(
|
||||
grpcretry.StreamClientInterceptor(grpcretry.WithMax(3),
|
||||
grpcretry.WithCodes(codes.Aborted, codes.Unavailable),
|
||||
),
|
||||
grpcopentracing.StreamClientInterceptor(opts...),
|
||||
)),
|
||||
)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
if c.conn != nil {
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
err = retry.Do(ctx, connectServiceFunc, retryOptions...)
|
||||
func (c *ClientBase) connect(ctx context.Context) error {
|
||||
addr, err := c.getAddrFunc()
|
||||
if err != nil {
|
||||
log.Debug(c.GetRole()+" client try reconnect failed", zap.Error(err))
|
||||
log.Error("failed to get cclient address", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
opts := trace.GetInterceptorOpts()
|
||||
dialContext, cancel := context.WithTimeout(ctx, dialTimeout)
|
||||
conn, err := grpc.DialContext(
|
||||
dialContext,
|
||||
addr,
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize),
|
||||
grpc.MaxCallSendMsgSize(c.ClientMaxSendSize)),
|
||||
grpc.WithUnaryInterceptor(
|
||||
grpcmiddleware.ChainUnaryClient(
|
||||
grpcretry.UnaryClientInterceptor(
|
||||
grpcretry.WithMax(3),
|
||||
grpcretry.WithCodes(codes.Aborted, codes.Unavailable),
|
||||
),
|
||||
grpcopentracing.UnaryClientInterceptor(opts...),
|
||||
)),
|
||||
grpc.WithStreamInterceptor(
|
||||
grpcmiddleware.ChainStreamClient(
|
||||
grpcretry.StreamClientInterceptor(grpcretry.WithMax(3),
|
||||
grpcretry.WithCodes(codes.Aborted, codes.Unavailable),
|
||||
),
|
||||
grpcopentracing.StreamClientInterceptor(opts...),
|
||||
)),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: keepAliveTime, // send pings every 60 seconds if there is no activity
|
||||
Timeout: keepAliveTimeout, // wait 6 second for ping ack before considering the connection dead
|
||||
PermitWithoutStream: true, // send pings even without active streams
|
||||
}),
|
||||
grpc.WithConnectParams(grpc.ConnectParams{
|
||||
Backoff: backoff.Config{
|
||||
BaseDelay: 100 * time.Millisecond,
|
||||
Multiplier: 1.6,
|
||||
Jitter: 0.2,
|
||||
MaxDelay: 3 * time.Second,
|
||||
},
|
||||
MinConnectTimeout: dialTimeout,
|
||||
}),
|
||||
)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c.conn != nil {
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
|
||||
c.conn = conn
|
||||
c.grpcClient = c.newGrpcClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue