diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 766de58681..4492f3c207 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -19,6 +19,8 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -76,9 +78,16 @@ func (c *Client) connect() error { log.Debug("DataNode connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -101,15 +110,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 91410ee7cf..fb37c5aaca 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -16,6 +16,8 @@ import ( "fmt" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" @@ -100,9 +102,16 @@ func (c *Client) connect() error { log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -124,15 +133,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 3b24c0fb89..d5b8e1e605 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -18,6 +18,8 @@ import ( "google.golang.org/grpc" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" @@ -105,9 +107,16 @@ func (c *Client) connect() error { log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -124,20 +133,25 @@ func (c *Client) connect() error { c.grpcClient = indexpb.NewIndexServiceClient(c.conn) return nil } + func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) { ret, err := caller() if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 7610907e5e..4e6edb76b2 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -16,6 +16,8 @@ import ( "fmt" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/milvuspb" @@ -71,9 +73,16 @@ func (c *Client) connect() error { log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -96,15 +105,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/proxynode/client/client.go b/internal/distributed/proxynode/client/client.go index 4810b14118..cd6806e5f5 100644 --- a/internal/distributed/proxynode/client/client.go +++ b/internal/distributed/proxynode/client/client.go @@ -15,6 +15,8 @@ import ( "context" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -65,9 +67,16 @@ func (c *Client) connect() error { log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -90,15 +99,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index d1ac09c20e..7c338766eb 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -18,6 +18,8 @@ import ( "google.golang.org/grpc" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -71,9 +73,16 @@ func (c *Client) connect() error { log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -96,15 +105,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index 3826a97da4..8ca141d461 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -16,6 +16,8 @@ import ( "fmt" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -103,9 +105,16 @@ func (c *Client) connect() error { log.Debug("QueryServiceClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) if err != nil { return err } @@ -127,15 +136,19 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err } diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index e610b299e1..3a37335e48 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -17,6 +17,8 @@ import ( "fmt" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -122,9 +124,16 @@ func (c *GrpcClient) connect() error { go func() { conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( - grpc_opentracing.UnaryClientInterceptor(opts...)), + grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(), + grpc_opentracing.UnaryClientInterceptor(opts...), + )), grpc.WithStreamInterceptor( - grpc_opentracing.StreamClientInterceptor(opts...))) + grpc_middleware.ChainStreamClient( + grpc_retry.StreamClientInterceptor(), + grpc_opentracing.StreamClientInterceptor(opts...), + )), + ) ch <- struct{}{} }() select { @@ -174,15 +183,19 @@ func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, er if err == nil { return ret, nil } - for i := 0; i < c.recallTry; i++ { + for i := 0; i < c.reconnTry; i++ { err = c.connect() if err == nil { - ret, err = caller() - if err == nil { - return ret, nil - } + break } } + if err != nil { + return nil, err + } + ret, err = caller() + if err == nil { + return ret, nil + } return ret, err }