Improve the retry of the rpc client (#26795)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/26887/head
SimFG 2023-09-06 17:43:14 +08:00 committed by GitHub
parent 61c7b0990d
commit 28681276e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 212 additions and 242 deletions

View File

@ -406,10 +406,10 @@ grpc:
dialTimeout: 200
keepAliveTime: 10000
keepAliveTimeout: 20000
maxMaxAttempts: 5
initialBackoff: 1
maxBackoff: 10
backoffMultiplier: 2
maxMaxAttempts: 10
initialBackOff: 0.2 # seconds
maxBackoff: 10 # seconds
backoffMultiplier: 2.0 # deprecated
clientMaxSendSize: 268435456
clientMaxRecvSize: 268435456

View File

@ -19,6 +19,7 @@ package meta
import (
"context"
"fmt"
"strconv"
"sync"
"time"
@ -123,20 +124,25 @@ func (m *CollectionManager) Recover(broker Broker) error {
return err
}
ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10))
ctxLog := log.Ctx(ctx)
ctxLog.Info("recover collections and partitions from kv store")
for _, collection := range collections {
// Dropped collection should be deprecated
_, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID())
_, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID())
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
m.catalog.ReleaseCollection(collection.GetCollectionID())
continue
}
if err != nil {
ctxLog.Warn("failed to get collection schema", zap.Error(err))
return err
}
// Collections not loaded done should be deprecated
if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 {
log.Info("skip recovery and release collection",
ctxLog.Info("skip recovery and release collection",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.String("status", collection.GetStatus().String()),
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
@ -150,13 +156,14 @@ func (m *CollectionManager) Recover(broker Broker) error {
}
for collection, partitions := range partitions {
existPartitions, err := broker.GetPartitions(context.Background(), collection)
existPartitions, err := broker.GetPartitions(ctx, collection)
if errors.Is(err, merr.ErrCollectionNotFound) {
log.Info("skip dropped collection during recovery", zap.Int64("collection", collection))
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection))
m.catalog.ReleaseCollection(collection)
continue
}
if err != nil {
ctxLog.Warn("failed to get partitions", zap.Error(err))
return err
}
omitPartitions := make([]int64, 0)
@ -168,7 +175,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
return true
})
if len(omitPartitions) > 0 {
log.Info("skip dropped partitions during recovery",
ctxLog.Info("skip dropped partitions during recovery",
zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions))
m.catalog.ReleasePartition(collection, omitPartitions...)
}

View File

@ -80,7 +80,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
err = merr.Error(resp.GetStatus())
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
log.Ctx(ctx).Warn("failed to get collection schema", zap.Error(err))
return nil, err
}
return resp.GetSchema(), nil

View File

@ -117,7 +117,9 @@ func (node *MockQueryNode) Start() error {
case <-node.ctx.Done():
return nil
default:
return &milvuspb.ComponentStates{}
return &milvuspb.ComponentStates{
Status: successStatus,
}
}
}, func(context.Context, *milvuspb.GetComponentStatesRequest) error {
select {

View File

@ -310,7 +310,7 @@ func (s *Server) initMeta() error {
log.Info("recover meta...")
err := s.meta.CollectionManager.Recover(s.broker)
if err != nil {
log.Warn("failed to recover collections")
log.Warn("failed to recover collections", zap.Error(err))
return err
}
collections := s.meta.GetAll()
@ -323,13 +323,13 @@ func (s *Server) initMeta() error {
err = s.meta.ReplicaManager.Recover(collections)
if err != nil {
log.Warn("failed to recover replicas")
log.Warn("failed to recover replicas", zap.Error(err))
return err
}
err = s.meta.ResourceManager.Recover()
if err != nil {
log.Warn("failed to recover resource groups")
log.Warn("failed to recover resource groups", zap.Error(err))
return err
}

View File

@ -19,6 +19,7 @@ package session
import (
"context"
"net"
"strconv"
"testing"
"time"
@ -45,10 +46,12 @@ type ClusterTestSuite struct {
func (suite *ClusterTestSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save("grpc.client.maxMaxAttempts", "1")
suite.setupServers()
}
func (suite *ClusterTestSuite) TearDownSuite() {
paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10))
for _, svr := range suite.svrs {
svr.GracefulStop()
}

View File

@ -219,7 +219,7 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
))
//
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
ret.Status = merr.Status(nil)
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc()

View File

@ -245,7 +245,8 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
)
var (
ret = &internalpb.RetrieveResults{
Ids: &schemapb.IDs{},
Status: merr.Status(nil),
Ids: &schemapb.IDs{},
}
skipDupCnt int64
loopEnd int

View File

@ -491,7 +491,7 @@ func (c *Core) Init() error {
log.Error("RootCoord start failed", zap.Error(err))
}
})
log.Info("RootCoord startup success")
log.Info("RootCoord startup success", zap.String("address", c.session.Address))
return err
}
c.UpdateStateCode(commonpb.StateCode_StandBy)

View File

@ -19,22 +19,13 @@ package grpcclient
import (
"context"
"crypto/tls"
"fmt"
"strings"
"sync"
"time"
"github.com/cockroachdb/errors"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
@ -46,6 +37,15 @@ import (
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
// GrpcClient abstracts client of grpc
@ -88,14 +88,11 @@ type ClientBase[T interface {
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
MaxAttempts int
InitialBackoff float32
MaxBackoff float32
BackoffMultiplier float32
NodeID atomic.Int64
sess *sessionutil.Session
sf singleflight.Group
MaxAttempts int
InitialBackoff float64
MaxBackoff float64
NodeID atomic.Int64
sess *sessionutil.Session
}
func NewClientBase[T interface {
@ -109,9 +106,8 @@ func NewClientBase[T interface {
KeepAliveTimeout: config.KeepAliveTimeout.GetAsDuration(time.Millisecond),
RetryServiceNameConfig: serviceName,
MaxAttempts: config.MaxAttempts.GetAsInt(),
InitialBackoff: float32(config.InitialBackoff.GetAsFloat()),
MaxBackoff: float32(config.MaxBackoff.GetAsFloat()),
BackoffMultiplier: float32(config.BackoffMultiplier.GetAsFloat()),
InitialBackoff: config.InitialBackoff.GetAsFloat(),
MaxBackoff: config.MaxBackoff.GetAsFloat(),
CompressionEnabled: config.CompressionEnabled.GetAsBool(),
}
}
@ -196,18 +192,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
opts := tracer.GetInterceptorOpts()
dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout)
// refer to https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
retryPolicy := fmt.Sprintf(`{
"methodConfig": [{
"name": [{"service": "%s"}],
"retryPolicy": {
"MaxAttempts": %d,
"InitialBackoff": "%fs",
"MaxBackoff": "%fs",
"BackoffMultiplier": %f,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier)
var conn *grpc.ClientConn
compress := None
@ -236,7 +220,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
interceptor.ClusterInjectionStreamClientInterceptor(),
interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()),
)),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.KeepAliveTime,
Timeout: c.KeepAliveTimeout,
@ -254,6 +237,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}),
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithDisableRetry(),
)
} else {
conn, err = grpc.DialContext(
@ -276,7 +260,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
interceptor.ClusterInjectionStreamClientInterceptor(),
interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()),
)),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.KeepAliveTime,
Timeout: c.KeepAliveTimeout,
@ -294,6 +277,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}),
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithDisableRetry(),
)
}
@ -311,55 +295,101 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
return nil
}
func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any, error)) (any, error) {
log := log.Ctx(ctx).With(zap.String("role", c.GetRole()))
client, err := c.GetGrpcClient(ctx)
if err != nil {
return generic.Zero[T](), err
func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, error)) (any, error) {
log := log.Ctx(ctx).With(zap.String("client_role", c.GetRole()))
var (
ret any
clientErr error
callErr error
client T
)
client, clientErr = c.GetGrpcClient(ctx)
if clientErr != nil {
log.Warn("fail to get grpc client", zap.Error(clientErr))
}
ret, err := caller(client)
if err == nil {
return ret, nil
resetClientFunc := func() {
c.resetConnection(client)
client, clientErr = c.GetGrpcClient(ctx)
if clientErr != nil {
log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr))
}
}
if IsCrossClusterRoutingErr(err) {
log.Warn("CrossClusterRoutingErr, start to reset connection", zap.Error(err))
c.resetConnection(client)
return ret, merr.ErrServiceUnavailable // For concealing ErrCrossClusterRouting from the client
}
if IsServerIDMismatchErr(err) {
log.Warn("Server ID mismatch, start to reset connection", zap.Error(err))
c.resetConnection(client)
return ret, err
}
if !funcutil.CheckCtxValid(ctx) {
// check if server ID matches coord session, if not, reset connection
if c.sess != nil {
sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole())
if getSessionErr != nil {
// Only log but not handle this error as it is an auxiliary logic
log.Warn("Fail to GetSessions", zap.Error(getSessionErr))
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
_ = retry.Do(innerCtx, func() error {
if generic.IsZero(client) {
callErr = errors.Wrap(clientErr, "empty grpc client")
log.Warn("grpc client is nil, maybe fail to get client in the retry state")
resetClientFunc()
return callErr
}
ret, callErr = caller(client)
if callErr != nil {
if funcutil.IsGrpcErr(callErr) ||
IsCrossClusterRoutingErr(callErr) || IsServerIDMismatchErr(callErr) {
log.Warn("start to reset connection because of specific reasons", zap.Error(callErr))
resetClientFunc()
return callErr
}
if coordSess, exist := sessions[c.GetRole()]; exist {
if c.GetNodeID() != coordSess.ServerID {
log.Warn("Server ID mismatch, may connected to a old server, start to reset connection", zap.Error(err))
c.resetConnection(client)
return ret, err
if !funcutil.CheckCtxValid(ctx) {
if c.sess != nil {
sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole())
if getSessionErr != nil {
// Only log but not handle this error as it is an auxiliary logic
log.Warn("fail to get session", zap.Error(getSessionErr))
}
if coordSess, exist := sessions[c.GetRole()]; exist {
if c.GetNodeID() != coordSess.ServerID {
log.Warn("server id mismatch, may connected to a old server, start to reset connection",
zap.Int64("client_node", c.GetNodeID()), zap.Int64("current_node", coordSess.ServerID))
resetClientFunc()
return callErr
}
}
}
}
log.Warn("fail to grpc call because of unknown error", zap.Error(callErr))
// not rpc error, it will stop to retry
return retry.Unrecoverable(callErr)
}
// start bg check in case of https://github.com/milvus-io/milvus/issues/22435
go c.bgHealthCheck(client)
return generic.Zero[T](), err
var status *commonpb.Status
switch res := ret.(type) {
case *commonpb.Status:
status = res
case interface{ GetStatus() *commonpb.Status }:
status = res.GetStatus()
default:
// it will directly return the result
log.Warn("unknown return type", zap.Any("return", ret))
return nil
}
if merr.Ok(status) || !merr.IsRetryableCode(status.GetCode()) {
return nil
}
return errors.Newf("error code: %d, reason: %s", status.GetCode(), status.GetReason())
}, retry.Attempts(uint(c.MaxAttempts)),
// Because the previous InitialBackoff and MaxBackoff were float, and the unit was s.
// For compatibility, this is multiplied by 1000.
retry.Sleep(time.Duration(c.InitialBackoff*1000)*time.Millisecond),
retry.MaxSleepTime(time.Duration(c.MaxBackoff*1000)*time.Millisecond))
// default value list: MaxAttempts 10, InitialBackoff 0.2s, MaxBackoff 10s
// and consume 52.8s if all retry failed
if callErr != nil {
// make the error more friendly to user
if IsCrossClusterRoutingErr(callErr) {
callErr = merr.ErrServiceUnavailable
}
return generic.Zero[T](), callErr
}
if !funcutil.IsGrpcErr(err) {
log.Warn("ClientBase:isNotGrpcErr", zap.Error(err))
return generic.Zero[T](), err
}
log.Info("ClientBase grpc error, start to reset connection", zap.Error(err))
c.resetConnection(client)
return ret, err
return ret, nil
}
// Call does a grpc call
@ -368,10 +398,10 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er
return generic.Zero[T](), ctx.Err()
}
ret, err := c.callOnce(ctx, caller)
ret, err := c.call(ctx, caller)
if err != nil {
traceErr := fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace())
log.Ctx(ctx).Warn("ClientBase Call grpc first call get error",
traceErr := errors.Wrapf(err, "stack trace: %s", tracer.StackTrace())
log.Ctx(ctx).Warn("ClientBase Call grpc call get error",
zap.String("role", c.GetRole()),
zap.String("address", c.GetAddr()),
zap.Error(traceErr),
@ -383,44 +413,8 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er
// ReCall does the grpc call twice
func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, error)) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return generic.Zero[T](), ctx.Err()
}
ret, err := c.callOnce(ctx, caller)
if err == nil {
return ret, nil
}
log := log.Ctx(ctx).With(zap.String("role", c.GetRole()), zap.String("address", c.GetAddr()))
traceErr := fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace())
log.Warn("ClientBase ReCall grpc first call get error ", zap.Error(traceErr))
if !funcutil.CheckCtxValid(ctx) {
return generic.Zero[T](), ctx.Err()
}
ret, err = c.callOnce(ctx, caller)
if err != nil {
traceErr = fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace())
log.Warn("ClientBase ReCall grpc second call get error", zap.Error(traceErr))
return generic.Zero[T](), traceErr
}
return ret, err
}
func (c *ClientBase[T]) bgHealthCheck(client T) {
c.sf.Do("healthcheck", func() (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), paramtable.Get().CommonCfg.SessionTTL.GetAsDuration(time.Second))
defer cancel()
_, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
c.resetConnection(client)
}
return struct{}{}, nil
})
// All retry operations are done in `call` function.
return c.Call(ctx, caller)
}
// Close close the client connection

View File

@ -23,11 +23,11 @@ import (
"net"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/merr"
"google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/keepalive"
@ -106,11 +106,18 @@ func testCall(t *testing.T, compressed bool) {
// mock client with nothing
base := ClientBase[*mockClient]{}
base.CompressionEnabled = compressed
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
initClient := func() {
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
}
base.MaxAttempts = 1
base.SetGetAddrFunc(func() (string, error) {
return "", errors.New("mocked address error")
})
t.Run("Call normal return", func(t *testing.T) {
initClient()
_, err := base.Call(context.Background(), func(client *mockClient) (any, error) {
return struct{}{}, nil
})
@ -118,6 +125,7 @@ func testCall(t *testing.T, compressed bool) {
})
t.Run("Call with canceled context", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
@ -128,22 +136,7 @@ func testCall(t *testing.T, compressed bool) {
})
t.Run("Call canceled in caller func", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
cancel()
return nil, errMock
})
assert.Error(t, err)
assert.True(t, errors.Is(err, errMock))
base.grpcClientMtx.RLock()
// client shall not be reset
assert.NotNil(t, base.grpcClient)
base.grpcClientMtx.RUnlock()
})
t.Run("Call canceled in caller func", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
@ -160,6 +153,7 @@ func testCall(t *testing.T, compressed bool) {
})
t.Run("Call returns non-grpc error", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errMock := errors.New("mocked")
@ -176,6 +170,7 @@ func testCall(t *testing.T, compressed bool) {
})
t.Run("Call returns grpc error", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errGrpc := status.Error(codes.Unknown, "mocked")
@ -211,11 +206,18 @@ func testCall(t *testing.T, compressed bool) {
func TestClientBase_Recall(t *testing.T) {
// mock client with nothing
base := ClientBase[*mockClient]{}
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
initClient := func() {
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
}
base.MaxAttempts = 1
base.SetGetAddrFunc(func() (string, error) {
return "", errors.New("mocked address error")
})
t.Run("Recall normal return", func(t *testing.T) {
initClient()
_, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) {
return struct{}{}, nil
})
@ -223,6 +225,7 @@ func TestClientBase_Recall(t *testing.T) {
})
t.Run("ReCall with canceled context", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
@ -232,24 +235,8 @@ func TestClientBase_Recall(t *testing.T) {
assert.True(t, errors.Is(err, context.Canceled))
})
t.Run("ReCall fails first and success second", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
flag := false
var mut sync.Mutex
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
mut.Lock()
defer mut.Unlock()
if flag {
return struct{}{}, nil
}
flag = true
return nil, errors.New("mock first")
})
assert.NoError(t, err)
})
t.Run("ReCall canceled in caller func", func(t *testing.T) {
initClient()
ctx, cancel := context.WithCancel(context.Background())
errMock := errors.New("mocked")
_, err := base.ReCall(ctx, func(client *mockClient) (any, error) {
@ -258,7 +245,7 @@ func TestClientBase_Recall(t *testing.T) {
})
assert.Error(t, err)
assert.True(t, errors.Is(err, context.Canceled))
assert.True(t, errors.Is(err, errMock))
base.grpcClientMtx.RLock()
// client shall not be reset
assert.NotNil(t, base.grpcClient)
@ -314,7 +301,7 @@ func TestClientBase_RetryPolicy(t *testing.T) {
Timeout: 60 * time.Second,
}
maxAttempts := 5
maxAttempts := 1
s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
@ -338,7 +325,6 @@ func TestClientBase_RetryPolicy(t *testing.T) {
MaxAttempts: maxAttempts,
InitialBackoff: 10.0,
MaxBackoff: 60.0,
BackoffMultiplier: 2.0,
}
clientBase.SetRole(typeutil.DataCoordRole)
clientBase.SetGetAddrFunc(func() (string, error) {
@ -352,9 +338,12 @@ func TestClientBase_RetryPolicy(t *testing.T) {
ctx := context.Background()
randID := rand.Int63()
res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{
NodeID: randID,
}}, nil
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
NodeID: randID,
},
Status: merr.Status(nil),
}, nil
})
assert.NoError(t, err)
assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID)
@ -375,7 +364,7 @@ func TestClientBase_Compression(t *testing.T) {
Timeout: 60 * time.Second,
}
maxAttempts := 5
maxAttempts := 1
s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
@ -399,7 +388,6 @@ func TestClientBase_Compression(t *testing.T) {
MaxAttempts: maxAttempts,
InitialBackoff: 10.0,
MaxBackoff: 60.0,
BackoffMultiplier: 2.0,
CompressionEnabled: true,
}
clientBase.SetRole(typeutil.DataCoordRole)
@ -414,9 +402,12 @@ func TestClientBase_Compression(t *testing.T) {
ctx := context.Background()
randID := rand.Int63()
res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{
NodeID: randID,
}}, nil
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
NodeID: randID,
},
Status: merr.Status(nil),
}, nil
})
assert.NoError(t, err)
assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID)

View File

@ -211,6 +211,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=

View File

@ -22,7 +22,7 @@ import (
)
const (
retriableFlag = 1 << 16
retryableFlag = 1 << 16
CanceledCode int32 = 10000
TimeoutCode int32 = 10001
)
@ -124,7 +124,7 @@ type milvusError struct {
func newMilvusError(msg string, code int32, retriable bool) milvusError {
if retriable {
code |= retriableFlag
code |= retryableFlag
}
return milvusError{
msg: msg,

View File

@ -56,8 +56,12 @@ func Code(err error) int32 {
}
}
func IsRetriable(err error) bool {
return Code(err)&retriableFlag != 0
func IsRetryableErr(err error) bool {
return IsRetryableCode(Code(err))
}
func IsRetryableCode(code int32) bool {
return code&retryableFlag != 0
}
func IsCanceledOrTimeout(err error) bool {
@ -130,7 +134,7 @@ func Error(status *commonpb.Status) error {
return newMilvusError(fmt.Sprintf("legacy error code:%d, reason: %s", status.GetErrorCode(), status.GetReason()), errUnexpected.errCode, false)
}
return newMilvusError(status.GetReason(), code, code&retriableFlag != 0)
return newMilvusError(status.GetReason(), code, code&retryableFlag != 0)
}
// CheckHealthy checks whether the state is healthy,

View File

@ -43,12 +43,10 @@ const (
DefaultKeepAliveTimeout = 20000
// Grpc retry policy
DefaultMaxAttempts = 5
DefaultInitialBackoff float64 = 1.0
DefaultMaxBackoff float64 = 10.0
DefaultBackoffMultiplier float64 = 2.0
DefaultCompressionEnabled bool = false
DefaultMaxAttempts = 10
DefaultInitialBackoff float64 = 0.2
DefaultMaxBackoff float64 = 10
DefaultCompressionEnabled bool = false
ProxyInternalPort = 19529
ProxyExternalPort = 19530
@ -194,10 +192,9 @@ type GrpcClientConfig struct {
KeepAliveTime ParamItem `refreshable:"false"`
KeepAliveTimeout ParamItem `refreshable:"false"`
MaxAttempts ParamItem `refreshable:"false"`
InitialBackoff ParamItem `refreshable:"false"`
MaxBackoff ParamItem `refreshable:"false"`
BackoffMultiplier ParamItem `refreshable:"false"`
MaxAttempts ParamItem `refreshable:"false"`
InitialBackoff ParamItem `refreshable:"false"`
MaxBackoff ParamItem `refreshable:"false"`
}
func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
@ -318,19 +315,13 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
if v == "" {
return maxAttempts
}
iv, err := strconv.Atoi(v)
_, err := strconv.Atoi(v)
if err != nil {
log.Warn("Failed to convert int when parsing grpc.client.maxMaxAttempts, set to default",
zap.String("role", p.Domain),
zap.String("grpc.client.maxMaxAttempts", v))
return maxAttempts
}
if iv < 2 || iv > 5 {
log.Warn("The value of %s should be greater than 1 and less than 6, set to default",
zap.String("role", p.Domain),
zap.String("grpc.client.maxMaxAttempts", v))
return maxAttempts
}
return v
},
Export: true,
@ -345,7 +336,7 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
if v == "" {
return initialBackoff
}
_, err := strconv.Atoi(v)
_, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Warn("Failed to convert int when parsing grpc.client.initialBackoff, set to default",
zap.String("role", p.Domain),
@ -379,27 +370,6 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
}
p.MaxBackoff.Init(base.mgr)
backoffMultiplier := fmt.Sprintf("%f", DefaultBackoffMultiplier)
p.BackoffMultiplier = ParamItem{
Key: "grpc.client.backoffMultiplier",
Version: "2.0.0",
Formatter: func(v string) string {
if v == "" {
return backoffMultiplier
}
_, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Warn("Failed to convert int when parsing grpc.client.backoffMultiplier, set to default",
zap.String("role", p.Domain),
zap.String("grpc.client.backoffMultiplier", v))
return backoffMultiplier
}
return v
},
Export: true,
}
p.BackoffMultiplier.Init(base.mgr)
compressionEnabled := fmt.Sprintf("%t", DefaultCompressionEnabled)
p.CompressionEnabled = ParamItem{
Key: "grpc.client.compressionEnabled",
@ -413,7 +383,7 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
log.Warn("Failed to convert int when parsing grpc.client.compressionEnabled, set to default",
zap.String("role", p.Domain),
zap.String("grpc.client.compressionEnabled", v))
return backoffMultiplier
return compressionEnabled
}
return v
},

View File

@ -122,15 +122,14 @@ func TestGrpcClientParams(t *testing.T) {
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts)
base.Save("grpc.client.maxMaxAttempts", "a")
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts)
base.Save("grpc.client.maxMaxAttempts", "1")
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts)
base.Save("grpc.client.maxMaxAttempts", "10")
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts)
base.Save("grpc.client.maxMaxAttempts", "4")
assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), 4)
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff)
base.Save("grpc.client.initialBackOff", "a")
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff)
base.Save("grpc.client.initialBackOff", "2.0")
assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), 2.0)
assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff)
base.Save("grpc.client.maxBackOff", "a")
@ -138,12 +137,6 @@ func TestGrpcClientParams(t *testing.T) {
base.Save("grpc.client.maxBackOff", "50.0")
assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), 50.0)
assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), DefaultBackoffMultiplier)
base.Save("grpc.client.backoffMultiplier", "a")
assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), DefaultBackoffMultiplier)
base.Save("grpc.client.backoffMultiplier", "3.0")
assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), 3.0)
assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled)
base.Save("grpc.client.CompressionEnabled", "a")
assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled)

View File

@ -38,7 +38,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
for i := uint(0); i < c.attempts; i++ {
if err := fn(); err != nil {
if i%10 == 0 {
if i%4 == 0 {
log.Error("retry func failed", zap.Uint("retry time", i), zap.Error(err))
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
@ -88,10 +89,13 @@ func (s *CrossClusterRoutingSuite) SetupSuite() {
rand.Seed(time.Now().UnixNano())
paramtable.Init()
paramtable.Get().Save("grpc.client.maxMaxAttempts", "1")
s.factory = dependency.NewDefaultFactory(true)
}
func (s *CrossClusterRoutingSuite) TearDownSuite() {
paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10))
}
func (s *CrossClusterRoutingSuite) SetupTest() {