Add background health check if ctx err returned (#22439)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/22479/head
congqixia 2023-02-28 14:11:47 +08:00 committed by GitHub
parent 256aaa1944
commit 608615e5bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 46 deletions

View File

@ -25,21 +25,26 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/tracer"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/crypto"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/generic"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// GrpcClient abstracts client of grpc
type GrpcClient[T any] interface {
type GrpcClient[T interface {
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
}] interface {
SetRole(string)
GetRole() string
SetGetAddrFunc(func() (string, error))
@ -54,7 +59,9 @@ type GrpcClient[T any] interface {
}
// ClientBase is a base of grpc client
type ClientBase[T any] struct {
type ClientBase[T interface {
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
}] struct {
getAddrFunc func() (string, error)
newGrpcClient func(cc *grpc.ClientConn) T
@ -77,6 +84,8 @@ type ClientBase[T any] struct {
MaxBackoff float32
BackoffMultiplier float32
NodeID int64
sf singleflight.Group
}
// SetRole sets role of client
@ -255,21 +264,26 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any
return generic.Zero[T](), err
}
ret, err2 := caller(client)
if err2 == nil {
ret, err := caller(client)
if err == nil {
return ret, nil
}
if !funcutil.CheckCtxValid(ctx) {
return generic.Zero[T](), err2
// start bg check in case of https://github.com/milvus-io/milvus/issues/22435
go c.bgHealthCheck(client)
return generic.Zero[T](), err
}
if !funcutil.IsGrpcErr(err2) {
log.Debug("ClientBase:isNotGrpcErr", zap.Error(err2))
return generic.Zero[T](), err2
if !funcutil.IsGrpcErr(err) {
log.Warn("ClientBase:isNotGrpcErr", zap.Error(err))
return generic.Zero[T](), err
}
log.Debug(c.GetRole()+" ClientBase grpc error, start to reset connection", zap.Error(err2))
log.Info("ClientBase grpc error, start to reset connection",
zap.String("role", c.GetRole()),
zap.Error(err),
)
c.resetConnection(client)
return ret, err2
return ret, err
}
// Call does a grpc call
@ -281,7 +295,10 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er
ret, err := c.callOnce(ctx, caller)
if err != nil {
traceErr := fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace())
log.Warn("ClientBase Call grpc first call get error", zap.String("role", c.GetRole()), zap.Error(traceErr))
log.Warn("ClientBase Call grpc first call get error",
zap.String("role", c.GetRole()),
zap.Error(traceErr),
)
return generic.Zero[T](), traceErr
}
return ret, err
@ -314,6 +331,20 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any,
return ret, err
}
func (c *ClientBase[T]) bgHealthCheck(client T) {
c.sf.Do("healthcheck", func() (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), paramtable.Get().CommonCfg.SessionTTL.GetAsDuration(time.Second))
defer cancel()
_, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
c.resetConnection(c.grpcClient)
}
return struct{}{}, nil
})
}
// Close close the client connection
func (c *ClientBase[T]) Close() error {
c.grpcClientMtx.Lock()

View File

@ -20,7 +20,9 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net"
"os"
"strings"
"sync"
"testing"
@ -31,6 +33,9 @@ import (
"google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
@ -40,21 +45,32 @@ import (
"google.golang.org/grpc/status"
)
func TestMain(m *testing.M) {
paramtable.Init()
os.Exit(m.Run())
}
type mockClient struct{}
func (c *mockClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) {
return &milvuspb.ComponentStates{}, nil
}
func TestClientBase_SetRole(t *testing.T) {
base := ClientBase[any]{}
base := ClientBase[*mockClient]{}
expect := "abc"
base.SetRole("abc")
assert.Equal(t, expect, base.GetRole())
}
func TestClientBase_GetRole(t *testing.T) {
base := ClientBase[any]{}
base := ClientBase[*mockClient]{}
assert.Equal(t, "", base.GetRole())
}
func TestClientBase_connect(t *testing.T) {
t.Run("failed to connect", func(t *testing.T) {
base := ClientBase[any]{
base := ClientBase[*mockClient]{
getAddrFunc: func() (string, error) {
return "", nil
},
@ -67,7 +83,7 @@ func TestClientBase_connect(t *testing.T) {
t.Run("failed to get addr", func(t *testing.T) {
errMock := errors.New("mocked")
base := ClientBase[any]{
base := ClientBase[*mockClient]{
getAddrFunc: func() (string, error) {
return "", errMock
},
@ -89,14 +105,14 @@ func TestClientBase_CompressCall(t *testing.T) {
func testCall(t *testing.T, compressed bool) {
// mock client with nothing
base := ClientBase[any]{}
base := ClientBase[*mockClient]{}
base.CompressionEnabled = compressed
base.grpcClientMtx.Lock()
base.grpcClient = struct{}{}
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
t.Run("Call normal return", func(t *testing.T) {
_, err := base.Call(context.Background(), func(client any) (any, error) {
_, err := base.Call(context.Background(), func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.NoError(t, err)
@ -105,7 +121,7 @@ func testCall(t *testing.T, compressed bool) {
t.Run("Call with canceled context", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.Error(t, err)
@ -115,7 +131,7 @@ func testCall(t *testing.T, compressed bool) {
t.Run("Call canceled in caller func", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
cancel()
return nil, errMock
})
@ -131,7 +147,7 @@ func testCall(t *testing.T, compressed bool) {
t.Run("Call canceled in caller func", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
cancel()
return nil, errMock
})
@ -148,7 +164,7 @@ func testCall(t *testing.T, compressed bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errMock := errors.New("mocked")
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
return nil, errMock
})
@ -164,7 +180,7 @@ func testCall(t *testing.T, compressed bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errGrpc := status.Error(codes.Unknown, "mocked")
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
return nil, errGrpc
})
@ -185,7 +201,7 @@ func testCall(t *testing.T, compressed bool) {
t.Run("Call with connect failure", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := base.Call(ctx, func(client any) (any, error) {
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.Error(t, err)
@ -195,13 +211,13 @@ func testCall(t *testing.T, compressed bool) {
func TestClientBase_Recall(t *testing.T) {
// mock client with nothing
base := ClientBase[any]{}
base := ClientBase[*mockClient]{}
base.grpcClientMtx.Lock()
base.grpcClient = struct{}{}
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
t.Run("Recall normal return", func(t *testing.T) {
_, err := base.ReCall(context.Background(), func(client any) (any, error) {
_, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.NoError(t, err)
@ -210,7 +226,7 @@ func TestClientBase_Recall(t *testing.T) {
t.Run("ReCall with canceled context", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := base.ReCall(ctx, func(client any) (any, error) {
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.Error(t, err)
@ -222,7 +238,7 @@ func TestClientBase_Recall(t *testing.T) {
defer cancel()
flag := false
var mut sync.Mutex
_, err := base.ReCall(ctx, func(client any) (any, error) {
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
mut.Lock()
defer mut.Unlock()
if flag {
@ -237,7 +253,7 @@ func TestClientBase_Recall(t *testing.T) {
t.Run("ReCall canceled in caller func", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.ReCall(ctx, func(client any) (any, error) {
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
cancel()
return nil, errMock
})
@ -258,7 +274,7 @@ func TestClientBase_Recall(t *testing.T) {
t.Run("ReCall with connect failure", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := base.ReCall(ctx, func(client any) (any, error) {
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.Error(t, err)
@ -313,13 +329,13 @@ func TestClientBase_RetryPolicy(t *testing.T) {
}()
defer s.Stop()
clientBase := ClientBase[helloworld.GreeterClient]{
clientBase := ClientBase[rootcoordpb.RootCoordClient]{
ClientMaxRecvSize: 1 * 1024 * 1024,
ClientMaxSendSize: 1 * 1024 * 1024,
DialTimeout: 60 * time.Second,
KeepAliveTime: 60 * time.Second,
KeepAliveTimeout: 60 * time.Second,
RetryServiceNameConfig: "helloworld.Greeter",
RetryServiceNameConfig: "rootcoordpb.GetComponentStates",
MaxAttempts: maxAttempts,
InitialBackoff: 10.0,
MaxBackoff: 60.0,
@ -329,19 +345,21 @@ func TestClientBase_RetryPolicy(t *testing.T) {
clientBase.SetGetAddrFunc(func() (string, error) {
return address.String(), nil
})
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient {
return helloworld.NewGreeterClient(cc)
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) rootcoordpb.RootCoordClient {
return rootcoordpb.NewRootCoordClient(cc)
})
defer clientBase.Close()
ctx := context.Background()
name := fmt.Sprintf("hello world %d", time.Now().Second())
res, err := clientBase.Call(ctx, func(client helloworld.GreeterClient) (any, error) {
randID := rand.Int63()
res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
fmt.Println("client base...")
return client.SayHello(ctx, &helloworld.HelloRequest{Name: name})
return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{
NodeID: randID,
}}, nil
})
assert.Nil(t, err)
assert.Equal(t, res.(*helloworld.HelloReply).Message, strings.ToUpper(name))
assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID)
}
func TestClientBase_Compression(t *testing.T) {
@ -373,7 +391,7 @@ func TestClientBase_Compression(t *testing.T) {
}()
defer s.Stop()
clientBase := ClientBase[helloworld.GreeterClient]{
clientBase := ClientBase[rootcoordpb.RootCoordClient]{
ClientMaxRecvSize: 1 * 1024 * 1024,
ClientMaxSendSize: 1 * 1024 * 1024,
DialTimeout: 60 * time.Second,
@ -390,17 +408,19 @@ func TestClientBase_Compression(t *testing.T) {
clientBase.SetGetAddrFunc(func() (string, error) {
return address.String(), nil
})
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient {
return helloworld.NewGreeterClient(cc)
clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) rootcoordpb.RootCoordClient {
return rootcoordpb.NewRootCoordClient(cc)
})
defer clientBase.Close()
ctx := context.Background()
name := fmt.Sprintf("hello world %d", time.Now().Second())
res, err := clientBase.Call(ctx, func(client helloworld.GreeterClient) (any, error) {
randID := rand.Int63()
res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
fmt.Println("client base...")
return client.SayHello(ctx, &helloworld.HelloRequest{Name: name})
return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{
NodeID: randID,
}}, nil
})
assert.Nil(t, err)
assert.Equal(t, res.(*helloworld.HelloReply).Message, strings.ToUpper(name))
assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID)
}