Change default grpc retry config (#6233)

* Change default grpc retry config

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change timeout

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* add retry code

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* change time

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/6252/head
godchen 2021-07-01 15:24:17 +08:00 committed by GitHub
parent 6019c193fc
commit 2a07124885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 82 additions and 24 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -93,12 +94,19 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc/codes"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -67,19 +68,25 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
connectGrpcFunc := func() error {
opts := trace.GetInterceptorOpts()
log.Debug("DataNode connect ", zap.String("address", c.addr))
ctx, cancel := context.WithTimeout(c.ctx, time.Millisecond*200)
defer cancel()
conn, err := grpc.DialContext(ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(200*time.Millisecond),
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDisableRetry(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
//grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(3), grpc_retry.WithPerRetryTimeout(time.Millisecond*50)),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*5),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*5),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -95,12 +95,12 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(3), grpc_retry.WithPerRetryTimeout(time.Second*3)),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3), grpc_retry.WithPerRetryTimeout(time.Second*3)),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -67,12 +68,20 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type Client struct {
@ -66,12 +67,20 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -94,12 +95,20 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*10),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*10),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
type Client struct {
@ -67,12 +68,19 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*10),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*10),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// GrpcClient grpc client
@ -102,12 +103,19 @@ func (c *GrpcClient) connect(retryOptions ...retry.Option) error {
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Second*3),
grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)

View File

@ -179,7 +179,7 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
},
CollectionID: collectionID,
}
showPartitionResponse, err := lct.rootCoord.ShowPartitions(lct.ctx, showPartitionRequest)
showPartitionResponse, err := lct.rootCoord.ShowPartitions(ctx, showPartitionRequest)
if err != nil {
status.Reason = err.Error()
lct.result = status
@ -675,8 +675,8 @@ func (rpt *ReleasePartitionTask) Execute(ctx context.Context) error {
req.NodeID = nodeID
releasePartitionTask := &ReleasePartitionTask{
BaseTask: BaseTask{
ctx: rpt.ctx,
Condition: NewTaskCondition(rpt.ctx),
ctx: ctx,
Condition: NewTaskCondition(ctx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
@ -1188,7 +1188,7 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
PartitionID: partitionID,
}
recoveryInfo, err := lbt.dataCoord.GetRecoveryInfo(lbt.ctx, getRecoveryInfo)
recoveryInfo, err := lbt.dataCoord.GetRecoveryInfo(ctx, getRecoveryInfo)
if err != nil {
status.Reason = err.Error()
lbt.result = status