Fix retry when proxy stopped (#28263)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/28296/head
wei liu 2023-11-09 10:10:19 +08:00 committed by GitHub
parent 39c24fe07b
commit 16dc26833b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 3 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -155,6 +156,10 @@ func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context,
group.Go(func() error {
sta, err := v.InvalidateCollectionMetaCache(ctx, request)
if err != nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("InvalidateCollectionMetaCache failed due to proxy service not found", zap.Error(err))
return nil
}
return fmt.Errorf("InvalidateCollectionMetaCache failed, proxyID = %d, err = %s", k, err)
}
if sta.ErrorCode != commonpb.ErrorCode_Success {

View File

@ -196,6 +196,20 @@ func TestProxyClientManager_InvalidateCollectionMetaCache(t *testing.T) {
assert.Error(t, err)
})
t.Run("mock proxy service down", func(t *testing.T) {
ctx := context.Background()
p1 := newMockProxy()
p1.InvalidateCollectionMetaCacheFunc = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return nil, merr.ErrNodeNotFound
}
pcm := &proxyClientManager{proxyClient: map[int64]types.ProxyClient{
TestProxyID: p1,
}}
err := pcm.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{})
assert.NoError(t, err)
})
t.Run("normal case", func(t *testing.T) {
ctx := context.Background()
p1 := newMockProxy()

View File

@ -392,9 +392,9 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry,
func (c *ClientBase[T]) checkNodeSessionExist(ctx context.Context) (bool, error) {
switch c.GetRole() {
case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole:
case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole, typeutil.ProxyRole:
err := c.verifySession(ctx)
if err != nil && errors.Is(err, merr.ErrNodeNotFound) {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("failed to verify node session", zap.Error(err))
// stop retry
return false, err

View File

@ -116,7 +116,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) {
})
assert.True(t, errors.Is(err, merr.ErrNodeNotFound))
// test node already down, but new node start up with same ip and port
// test querynode/datanode/indexnode/proxy already down, but new node start up with same ip and port
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
@ -124,6 +124,15 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) {
return struct{}{}, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error())
})
assert.True(t, errors.Is(err, merr.ErrNodeNotFound))
// test querynode/datanode/indexnode/proxy down, return unavailable error
base.grpcClientMtx.Lock()
base.grpcClient = &mockClient{}
base.grpcClientMtx.Unlock()
_, err = base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, status.Errorf(codes.Unavailable, "fake error")
})
assert.True(t, errors.Is(err, merr.ErrNodeNotFound))
}
func TestClientBase_Call(t *testing.T) {