mirror of https://github.com/milvus-io/milvus.git
enhance: Force to reset coord connection for unavailable error (#33910)
pr: #33908 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/33900/head
parent
4513569207
commit
aafa86095f
|
@ -218,13 +218,13 @@ func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (*clientConnWrapper[T
|
|||
return c.grpcClient, nil
|
||||
}
|
||||
|
||||
func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T]) {
|
||||
if time.Since(c.lastReset.Load()) < c.minResetInterval {
|
||||
func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T], forceReset bool) {
|
||||
if !forceReset && time.Since(c.lastReset.Load()) < c.minResetInterval {
|
||||
return
|
||||
}
|
||||
c.grpcClientMtx.Lock()
|
||||
defer c.grpcClientMtx.Unlock()
|
||||
if time.Since(c.lastReset.Load()) < c.minResetInterval {
|
||||
if !forceReset && time.Since(c.lastReset.Load()) < c.minResetInterval {
|
||||
return
|
||||
}
|
||||
if generic.IsZero(c.grpcClient) {
|
||||
|
@ -396,12 +396,12 @@ func (c *ClientBase[T]) needResetCancel() (needReset bool) {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, needReset bool, retErr error) {
|
||||
func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, needReset, forceReset bool, retErr error) {
|
||||
log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole()))
|
||||
// Unknown err
|
||||
if !funcutil.IsGrpcErr(err) {
|
||||
log.Warn("fail to grpc call because of unknown error", zap.Error(err))
|
||||
return false, false, err
|
||||
return false, false, false, err
|
||||
}
|
||||
|
||||
// grpc err
|
||||
|
@ -409,22 +409,25 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry,
|
|||
switch {
|
||||
case funcutil.IsGrpcErr(err, codes.Canceled, codes.DeadlineExceeded):
|
||||
// canceled or deadline exceeded
|
||||
return true, c.needResetCancel(), err
|
||||
return true, c.needResetCancel(), false, err
|
||||
case funcutil.IsGrpcErr(err, codes.Unimplemented):
|
||||
// for unimplemented error, reset coord connection to avoid old coord's side effect.
|
||||
// old coord's side effect: when coord changed, the connection in coord's client won't reset automatically.
|
||||
// so if new interface appear in new coord, will got a unimplemented error
|
||||
return false, true, merr.WrapErrServiceUnimplemented(err)
|
||||
return false, true, true, merr.WrapErrServiceUnimplemented(err)
|
||||
case IsServerIDMismatchErr(err):
|
||||
if ok := c.checkNodeSessionExist(ctx); !ok {
|
||||
// if session doesn't exist, no need to retry for datanode/indexnode/querynode/proxy
|
||||
return false, false, err
|
||||
return false, false, false, err
|
||||
}
|
||||
return true, true, err
|
||||
return true, true, true, err
|
||||
case IsCrossClusterRoutingErr(err):
|
||||
return true, true, err
|
||||
return true, true, true, err
|
||||
case funcutil.IsGrpcErr(err, codes.Unavailable):
|
||||
// for unavailable error in coord, force to reset coord connection
|
||||
return true, true, !c.isNode, err
|
||||
default:
|
||||
return true, true, err
|
||||
return true, true, false, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -454,8 +457,8 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
|
|||
log.Warn("fail to get grpc client", zap.Error(clientErr))
|
||||
}
|
||||
|
||||
resetClientFunc := func() {
|
||||
c.resetConnection(wrapper)
|
||||
resetClientFunc := func(forceReset bool) {
|
||||
c.resetConnection(wrapper, forceReset)
|
||||
wrapper, clientErr = c.GetGrpcClient(ctx)
|
||||
if clientErr != nil {
|
||||
log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr))
|
||||
|
@ -473,7 +476,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
|
|||
|
||||
err := errors.Wrap(clientErr, "empty grpc client")
|
||||
log.Warn("grpc client is nil, maybe fail to get client in the retry state", zap.Error(err))
|
||||
resetClientFunc()
|
||||
resetClientFunc(false)
|
||||
return true, err
|
||||
}
|
||||
|
||||
|
@ -483,17 +486,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
|
|||
wrapper.Unpin()
|
||||
|
||||
if err != nil {
|
||||
var needRetry, needReset bool
|
||||
needRetry, needReset, err = c.checkGrpcErr(ctx, err)
|
||||
var needRetry, needReset, forceReset bool
|
||||
needRetry, needReset, forceReset, err = c.checkGrpcErr(ctx, err)
|
||||
if needReset {
|
||||
log.Warn("start to reset connection because of specific reasons", zap.Error(err))
|
||||
resetClientFunc()
|
||||
resetClientFunc(forceReset)
|
||||
} else {
|
||||
// err occurs but no need to reset connection, try to verify session
|
||||
err := c.verifySession(ctx)
|
||||
if err != nil {
|
||||
log.Warn("failed to verify session, reset connection", zap.Error(err))
|
||||
resetClientFunc()
|
||||
resetClientFunc(forceReset)
|
||||
}
|
||||
}
|
||||
return needRetry, err
|
||||
|
|
|
@ -370,28 +370,38 @@ func TestClientBase_CheckGrpcError(t *testing.T) {
|
|||
base.MaxAttempts = 1
|
||||
|
||||
ctx := context.Background()
|
||||
retry, reset, _ := base.checkGrpcErr(ctx, status.Errorf(codes.Canceled, "fake context canceled"))
|
||||
retry, reset, forceReset, _ := base.checkGrpcErr(ctx, status.Errorf(codes.Canceled, "fake context canceled"))
|
||||
assert.True(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.False(t, forceReset)
|
||||
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unimplemented, "fake context canceled"))
|
||||
retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unimplemented, "fake context canceled"))
|
||||
assert.False(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.True(t, forceReset)
|
||||
|
||||
retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unavailable, "fake context canceled"))
|
||||
assert.True(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.True(t, forceReset)
|
||||
|
||||
// test serverId mismatch
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error()))
|
||||
retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error()))
|
||||
assert.True(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.True(t, forceReset)
|
||||
|
||||
// test cross cluster
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrServiceCrossClusterRouting.Error()))
|
||||
retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrServiceCrossClusterRouting.Error()))
|
||||
assert.True(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.True(t, forceReset)
|
||||
|
||||
// test default
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotFound.Error()))
|
||||
retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotFound.Error()))
|
||||
assert.True(t, retry)
|
||||
assert.True(t, reset)
|
||||
assert.False(t, forceReset)
|
||||
}
|
||||
|
||||
type server struct {
|
||||
|
|
Loading…
Reference in New Issue