Change grpc retry (#5925)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5954/head
godchen 2021-06-22 09:46:06 +08:00 committed by GitHub
parent cdaadca40c
commit 8643127e99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 161 additions and 56 deletions

View File

@ -19,6 +19,8 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -76,9 +78,16 @@ func (c *Client) connect() error {
log.Debug("DataNode connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -101,15 +110,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -16,6 +16,8 @@ import (
"fmt"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -100,9 +102,16 @@ func (c *Client) connect() error {
log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -124,15 +133,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -18,6 +18,8 @@ import (
"google.golang.org/grpc"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
@ -105,9 +107,16 @@ func (c *Client) connect() error {
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -124,20 +133,25 @@ func (c *Client) connect() error {
c.grpcClient = indexpb.NewIndexServiceClient(c.conn)
return nil
}
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller()
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -16,6 +16,8 @@ import (
"fmt"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -71,9 +73,16 @@ func (c *Client) connect() error {
log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -96,15 +105,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -15,6 +15,8 @@ import (
"context"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -65,9 +67,16 @@ func (c *Client) connect() error {
log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -90,15 +99,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -18,6 +18,8 @@ import (
"google.golang.org/grpc"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -71,9 +73,16 @@ func (c *Client) connect() error {
log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -96,15 +105,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -16,6 +16,8 @@ import (
"fmt"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -103,9 +105,16 @@ func (c *Client) connect() error {
log.Debug("QueryServiceClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
if err != nil {
return err
}
@ -127,15 +136,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}

View File

@ -17,6 +17,8 @@ import (
"fmt"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -122,9 +124,16 @@ func (c *GrpcClient) connect() error {
go func() {
conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(opts...)),
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
grpc_opentracing.UnaryClientInterceptor(opts...),
)),
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(opts...)))
grpc_middleware.ChainStreamClient(
grpc_retry.StreamClientInterceptor(),
grpc_opentracing.StreamClientInterceptor(opts...),
)),
)
ch <- struct{}{}
}()
select {
@ -174,15 +183,19 @@ func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, er
if err == nil {
return ret, nil
}
for i := 0; i < c.recallTry; i++ {
for i := 0; i < c.reconnTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
}
break
}
}
if err != nil {
return nil, err
}
ret, err = caller()
if err == nil {
return ret, nil
}
return ret, err
}