mirror of https://github.com/milvus-io/milvus.git
[Cherry-Pick] Add background health check if ctx err returned (#22470)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22514/head
parent
21cd958383
commit
e27907cd29
|
@ -24,6 +24,14 @@ import (
|
|||
"time"
|
||||
|
||||
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/milvus-io/milvus/internal/util/crypto"
|
||||
|
@ -31,15 +39,12 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/generic"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
// GrpcClient abstracts client of grpc
|
||||
type GrpcClient[T any] interface {
|
||||
type GrpcClient[T interface {
|
||||
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
|
||||
}] interface {
|
||||
SetRole(string)
|
||||
GetRole() string
|
||||
SetGetAddrFunc(func() (string, error))
|
||||
|
@ -54,7 +59,9 @@ type GrpcClient[T any] interface {
|
|||
}
|
||||
|
||||
// ClientBase is a base of grpc client
|
||||
type ClientBase[T any] struct {
|
||||
type ClientBase[T interface {
|
||||
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
|
||||
}] struct {
|
||||
getAddrFunc func() (string, error)
|
||||
newGrpcClient func(cc *grpc.ClientConn) T
|
||||
|
||||
|
@ -76,6 +83,8 @@ type ClientBase[T any] struct {
|
|||
MaxBackoff float32
|
||||
BackoffMultiplier float32
|
||||
NodeID int64
|
||||
|
||||
sf singleflight.Group
|
||||
}
|
||||
|
||||
// SetRole sets role of client
|
||||
|
@ -249,29 +258,31 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any
|
|||
return generic.Zero[T](), err
|
||||
}
|
||||
|
||||
var (
|
||||
ret any
|
||||
err2 error
|
||||
)
|
||||
var ret any
|
||||
|
||||
_, _ = retry.DoGrpc(ctx, uint(c.MaxAttempts*2), func() (any, error) {
|
||||
ret, err2 = caller(client)
|
||||
return ret, err2
|
||||
ret, err = caller(client)
|
||||
return ret, err
|
||||
})
|
||||
if err2 == nil {
|
||||
if err == nil {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return generic.Zero[T](), err2
|
||||
// start bg check in case of https://github.com/milvus-io/milvus/issues/22435
|
||||
go c.bgHealthCheck(client)
|
||||
return generic.Zero[T](), err
|
||||
}
|
||||
if !funcutil.IsGrpcErr(err2) {
|
||||
log.Debug("ClientBase:isNotGrpcErr", zap.Error(err2))
|
||||
return generic.Zero[T](), err2
|
||||
if !funcutil.IsGrpcErr(err) {
|
||||
log.Warn("ClientBase:isNotGrpcErr", zap.Error(err))
|
||||
return generic.Zero[T](), err
|
||||
}
|
||||
log.Debug(c.GetRole()+" ClientBase grpc error, start to reset connection", zap.Error(err2))
|
||||
log.Info("ClientBase grpc error, start to reset connection",
|
||||
zap.String("role", c.GetRole()),
|
||||
zap.Error(err),
|
||||
)
|
||||
c.resetConnection(client)
|
||||
return ret, err2
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// Call does a grpc call
|
||||
|
@ -283,7 +294,10 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er
|
|||
ret, err := c.callOnce(ctx, caller)
|
||||
if err != nil {
|
||||
traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace())
|
||||
log.Error("ClientBase Call grpc first call get error", zap.String("role", c.GetRole()), zap.Error(traceErr))
|
||||
log.Warn("ClientBase Call grpc first call get error",
|
||||
zap.String("role", c.GetRole()),
|
||||
zap.Error(traceErr),
|
||||
)
|
||||
return generic.Zero[T](), traceErr
|
||||
}
|
||||
return ret, err
|
||||
|
@ -301,7 +315,10 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any,
|
|||
}
|
||||
|
||||
traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace())
|
||||
log.Warn(c.GetRole()+" ClientBase ReCall grpc first call get error ", zap.Error(traceErr))
|
||||
log.Warn("ClientBase ReCall grpc first call get error",
|
||||
zap.String("role", c.GetRole()),
|
||||
zap.Error(traceErr),
|
||||
)
|
||||
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return generic.Zero[T](), ctx.Err()
|
||||
|
@ -316,6 +333,21 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any,
|
|||
return ret, err
|
||||
}
|
||||
|
||||
func (c *ClientBase[T]) bgHealthCheck(client T) {
|
||||
c.sf.Do("healthcheck", func() (any, error) {
|
||||
// v2.2.0 does not has paramtable, use magic nubmer here
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
if err != nil {
|
||||
c.resetConnection(client)
|
||||
}
|
||||
|
||||
return struct{}{}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// Close close the client connection
|
||||
func (c *ClientBase[T]) Close() error {
|
||||
c.grpcClientMtx.Lock()
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -30,6 +31,8 @@ import (
|
|||
"google.golang.org/grpc/examples/helloworld/helloworld"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
@ -39,21 +42,27 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type mockClient struct{}
|
||||
|
||||
func (c *mockClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) {
|
||||
return &milvuspb.ComponentStates{}, nil
|
||||
}
|
||||
|
||||
func TestClientBase_SetRole(t *testing.T) {
|
||||
base := ClientBase[any]{}
|
||||
base := ClientBase[*mockClient]{}
|
||||
expect := "abc"
|
||||
base.SetRole("abc")
|
||||
assert.Equal(t, expect, base.GetRole())
|
||||
}
|
||||
|
||||
func TestClientBase_GetRole(t *testing.T) {
|
||||
base := ClientBase[any]{}
|
||||
base := ClientBase[*mockClient]{}
|
||||
assert.Equal(t, "", base.GetRole())
|
||||
}
|
||||
|
||||
func TestClientBase_connect(t *testing.T) {
|
||||
t.Run("failed to connect", func(t *testing.T) {
|
||||
base := ClientBase[any]{
|
||||
base := ClientBase[*mockClient]{
|
||||
getAddrFunc: func() (string, error) {
|
||||
return "", nil
|
||||
},
|
||||
|
@ -66,7 +75,7 @@ func TestClientBase_connect(t *testing.T) {
|
|||
|
||||
t.Run("failed to get addr", func(t *testing.T) {
|
||||
errMock := errors.New("mocked")
|
||||
base := ClientBase[any]{
|
||||
base := ClientBase[*mockClient]{
|
||||
getAddrFunc: func() (string, error) {
|
||||
return "", errMock
|
||||
},
|
||||
|
@ -80,13 +89,13 @@ func TestClientBase_connect(t *testing.T) {
|
|||
|
||||
func TestClientBase_Call(t *testing.T) {
|
||||
// mock client with nothing
|
||||
base := ClientBase[any]{}
|
||||
base := ClientBase[*mockClient]{}
|
||||
base.grpcClientMtx.Lock()
|
||||
base.grpcClient = struct{}{}
|
||||
base.grpcClient = &mockClient{}
|
||||
base.grpcClientMtx.Unlock()
|
||||
|
||||
t.Run("Call normal return", func(t *testing.T) {
|
||||
_, err := base.Call(context.Background(), func(client any) (any, error) {
|
||||
_, err := base.Call(context.Background(), func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
@ -95,7 +104,7 @@ func TestClientBase_Call(t *testing.T) {
|
|||
t.Run("Call with canceled context", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.Error(t, err)
|
||||
|
@ -105,23 +114,7 @@ func TestClientBase_Call(t *testing.T) {
|
|||
t.Run("Call canceled in caller func", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errMock := errors.New("mocked")
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
cancel()
|
||||
return nil, errMock
|
||||
})
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, errMock))
|
||||
base.grpcClientMtx.RLock()
|
||||
// client shall not be reset
|
||||
assert.NotNil(t, base.grpcClient)
|
||||
base.grpcClientMtx.RUnlock()
|
||||
})
|
||||
|
||||
t.Run("Call canceled in caller func", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errMock := errors.New("mocked")
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
|
||||
cancel()
|
||||
return nil, errMock
|
||||
})
|
||||
|
@ -138,7 +131,7 @@ func TestClientBase_Call(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
errMock := errors.New("mocked")
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
|
||||
return nil, errMock
|
||||
})
|
||||
|
||||
|
@ -154,7 +147,7 @@ func TestClientBase_Call(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
errGrpc := status.Error(codes.Unknown, "mocked")
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
|
||||
return nil, errGrpc
|
||||
})
|
||||
|
||||
|
@ -175,7 +168,7 @@ func TestClientBase_Call(t *testing.T) {
|
|||
t.Run("Call with connect failure", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
_, err := base.Call(ctx, func(client any) (any, error) {
|
||||
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.Error(t, err)
|
||||
|
@ -185,13 +178,13 @@ func TestClientBase_Call(t *testing.T) {
|
|||
|
||||
func TestClientBase_Recall(t *testing.T) {
|
||||
// mock client with nothing
|
||||
base := ClientBase[any]{}
|
||||
base := ClientBase[*mockClient]{}
|
||||
base.grpcClientMtx.Lock()
|
||||
base.grpcClient = struct{}{}
|
||||
base.grpcClient = &mockClient{}
|
||||
base.grpcClientMtx.Unlock()
|
||||
|
||||
t.Run("Recall normal return", func(t *testing.T) {
|
||||
_, err := base.ReCall(context.Background(), func(client any) (any, error) {
|
||||
_, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
@ -200,7 +193,7 @@ func TestClientBase_Recall(t *testing.T) {
|
|||
t.Run("ReCall with canceled context", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
_, err := base.ReCall(ctx, func(client any) (any, error) {
|
||||
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.Error(t, err)
|
||||
|
@ -212,7 +205,7 @@ func TestClientBase_Recall(t *testing.T) {
|
|||
defer cancel()
|
||||
flag := false
|
||||
var mut sync.Mutex
|
||||
_, err := base.ReCall(ctx, func(client any) (any, error) {
|
||||
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
|
||||
mut.Lock()
|
||||
defer mut.Unlock()
|
||||
if flag {
|
||||
|
@ -227,7 +220,7 @@ func TestClientBase_Recall(t *testing.T) {
|
|||
t.Run("ReCall canceled in caller func", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errMock := errors.New("mocked")
|
||||
_, err := base.ReCall(ctx, func(client any) (any, error) {
|
||||
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
|
||||
cancel()
|
||||
return nil, errMock
|
||||
})
|
||||
|
@ -248,7 +241,7 @@ func TestClientBase_Recall(t *testing.T) {
|
|||
t.Run("ReCall with connect failure", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
_, err := base.ReCall(ctx, func(client any) (any, error) {
|
||||
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
|
||||
return struct{}{}, nil
|
||||
})
|
||||
assert.Error(t, err)
|
||||
|
@ -304,13 +297,13 @@ func TestClientBase_RetryPolicy(t *testing.T) {
|
|||
}()
|
||||
defer s.Stop()
|
||||
|
||||
clientBase := ClientBase[helloworld.GreeterClient]{
|
||||
clientBase := ClientBase[rootcoordpb.RootCoordClient]{
|
||||
ClientMaxRecvSize: 1 * 1024 * 1024,
|
||||
ClientMaxSendSize: 1 * 1024 * 1024,
|
||||
DialTimeout: 60 * time.Second,
|
||||
KeepAliveTime: 60 * time.Second,
|
||||
KeepAliveTimeout: 60 * time.Second,
|
||||
RetryServiceNameConfig: "helloworld.Greeter",
|
||||
RetryServiceNameConfig: "rootcoordpb.GetComponentStates",
|
||||
MaxAttempts: maxAttempts,
|
||||
InitialBackoff: 10.0,
|
||||
MaxBackoff: 60.0,
|
||||
|
@ -320,17 +313,19 @@ func TestClientBase_RetryPolicy(t *testing.T) {
|
|||
clientBase.SetGetAddrFunc(func() (string, error) {
|
||||
return address, nil
|
||||
})
|
||||
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient {
|
||||
return helloworld.NewGreeterClient(cc)
|
||||
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) rootcoordpb.RootCoordClient {
|
||||
return rootcoordpb.NewRootCoordClient(cc)
|
||||
})
|
||||
defer clientBase.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
name := fmt.Sprintf("hello world %d", time.Now().Second())
|
||||
res, err := clientBase.Call(ctx, func(client helloworld.GreeterClient) (any, error) {
|
||||
randID := rand.Int63()
|
||||
res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
fmt.Println("client base...")
|
||||
return client.SayHello(ctx, &helloworld.HelloRequest{Name: name})
|
||||
return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{
|
||||
NodeID: randID,
|
||||
}}, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, res.(*helloworld.HelloReply).Message, strings.ToUpper(name))
|
||||
assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue