mirror of https://github.com/milvus-io/milvus.git
fix: Make datacoord client retry on index api (#30654)
issue: #20553 This PR add retry on all interface which belong to indexcoord in milvus 2.2 and. move to data coord in milvus 2.3, to prevent meet `unimplemented` error during rolling upgrade from milvus 2.2 to 2.3. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31002/head
parent
882b50c5c1
commit
fa73520b57
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -34,7 +35,9 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -559,9 +562,24 @@ func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest, op
|
|||
|
||||
// CreateIndex sends the build index request to IndexCoord.
|
||||
func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateIndex(ctx, req)
|
||||
var resp *commonpb.Status
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// AlterIndex sends the alter index request to IndexCoord.
|
||||
|
@ -573,51 +591,155 @@ func (c *Client) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest,
|
|||
|
||||
// GetIndexState gets the index states from IndexCoord.
|
||||
func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStateResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
|
||||
return client.GetIndexState(ctx, req)
|
||||
var resp *indexpb.GetIndexStateResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
|
||||
return client.GetIndexState(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// GetSegmentIndexState gets the index states from IndexCoord.
|
||||
func (c *Client) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetSegmentIndexStateResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
|
||||
return client.GetSegmentIndexState(ctx, req)
|
||||
var resp *indexpb.GetSegmentIndexStateResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
|
||||
return client.GetSegmentIndexState(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// GetIndexInfos gets the index file paths from IndexCoord.
|
||||
func (c *Client) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest, opts ...grpc.CallOption) (*indexpb.GetIndexInfoResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
|
||||
return client.GetIndexInfos(ctx, req)
|
||||
var resp *indexpb.GetIndexInfoResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
|
||||
return client.GetIndexInfos(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// DescribeIndex describe the index info of the collection.
|
||||
func (c *Client) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest, opts ...grpc.CallOption) (*indexpb.DescribeIndexResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
|
||||
return client.DescribeIndex(ctx, req)
|
||||
var resp *indexpb.DescribeIndexResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
|
||||
return client.DescribeIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// GetIndexStatistics get the statistics of the index.
|
||||
func (c *Client) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStatisticsResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
|
||||
return client.GetIndexStatistics(ctx, req)
|
||||
var resp *indexpb.GetIndexStatisticsResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
|
||||
return client.GetIndexStatistics(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// GetIndexBuildProgress describe the progress of the index.
|
||||
func (c *Client) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest, opts ...grpc.CallOption) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
return client.GetIndexBuildProgress(ctx, req)
|
||||
var resp *indexpb.GetIndexBuildProgressResponse
|
||||
var err error
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
return client.GetIndexBuildProgress(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// DropIndex sends the drop index request to IndexCoord.
|
||||
func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropIndex(ctx, req)
|
||||
var resp *commonpb.Status
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *Client) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -411,7 +411,10 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry,
|
|||
// canceled or deadline exceeded
|
||||
return true, c.needResetCancel(), err
|
||||
case funcutil.IsGrpcErr(err, codes.Unimplemented):
|
||||
return false, false, merr.WrapErrServiceUnimplemented(err)
|
||||
// for unimplemented error, reset coord connection to avoid old coord's side effect.
|
||||
// old coord's side effect: when coord changed, the connection in coord's client won't reset automatically.
|
||||
// so if new interface appear in new coord, will got a unimplemented error
|
||||
return false, true, merr.WrapErrServiceUnimplemented(err)
|
||||
case IsServerIDMismatchErr(err):
|
||||
if ok := c.checkNodeSessionExist(ctx); !ok {
|
||||
// if session doesn't exist, no need to retry for datanode/indexnode/querynode/proxy
|
||||
|
|
|
@ -367,7 +367,7 @@ func TestClientBase_CheckGrpcError(t *testing.T) {
|
|||
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unimplemented, "fake context canceled"))
|
||||
assert.False(t, retry)
|
||||
assert.False(t, reset)
|
||||
assert.True(t, reset)
|
||||
|
||||
// test serverId mismatch
|
||||
retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error()))
|
||||
|
|
Loading…
Reference in New Issue