mirror of https://github.com/milvus-io/milvus.git
In Index service APIs, return error if occurs instead of always returning nil. Additionally, add more tests to cover this scenario. issue: https://github.com/milvus-io/milvus/issues/31069, https://github.com/milvus-io/milvus/issues/31027 pr: https://github.com/milvus-io/milvus/pull/31077 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/31114/head
parent
53f5a67112
commit
3eeeae8519
|
@ -562,19 +562,20 @@ func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
var resp *commonpb.Status
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -584,19 +585,20 @@ func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
|
|||
var resp *indexpb.GetIndexStateResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
|
||||
return client.GetIndexState(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -606,19 +608,20 @@ func (c *Client) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
|
|||
var resp *indexpb.GetSegmentIndexStateResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
|
||||
return client.GetSegmentIndexState(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -628,19 +631,20 @@ func (c *Client) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
|
|||
var resp *indexpb.GetIndexInfoResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
|
||||
return client.GetIndexInfos(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -650,19 +654,20 @@ func (c *Client) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
|
|||
var resp *indexpb.DescribeIndexResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
|
||||
return client.DescribeIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -672,19 +677,20 @@ func (c *Client) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
|
|||
var resp *indexpb.GetIndexStatisticsResponse
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
|
||||
return client.GetIndexStatistics(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -693,19 +699,20 @@ func (c *Client) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
|
|||
func (c *Client) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest, opts ...grpc.CallOption) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
var resp *indexpb.GetIndexBuildProgressResponse
|
||||
var err error
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
|
||||
return client.GetIndexBuildProgress(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
@ -715,19 +722,20 @@ func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest, o
|
|||
var resp *commonpb.Status
|
||||
var err error
|
||||
|
||||
err = retry.Do(ctx, func() error {
|
||||
var retryErr error
|
||||
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropIndex(ctx, req)
|
||||
})
|
||||
|
||||
// retry on un implemented, to be compatible with 2.2.x
|
||||
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
|
||||
return retryErr
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
err = retryErr
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -190,7 +190,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
|
|||
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
||||
var resp *indexpb.GetIndexInfoResponse
|
||||
var err error
|
||||
retry.Do(ctx, func() error {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
|
||||
CollectionID: collectionID,
|
||||
SegmentIDs: []int64{segmentID},
|
||||
|
@ -201,6 +201,10 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
|
|||
}
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
log.Warn("failed to get segment index info", zap.Error(err))
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
log.Warn("failed to get segment index info", zap.Error(err))
|
||||
|
@ -247,7 +251,7 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID
|
|||
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
||||
var resp *indexpb.DescribeIndexResponse
|
||||
var err error
|
||||
retry.Do(ctx, func() error {
|
||||
retryErr := retry.Do(ctx, func() error {
|
||||
resp, err = broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
|
@ -256,6 +260,10 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID
|
|||
}
|
||||
return nil
|
||||
})
|
||||
if retryErr != nil {
|
||||
log.Warn("failed to fetch index meta", zap.Int64("collection", collectionID), zap.Error(err))
|
||||
return nil, retryErr
|
||||
}
|
||||
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
log.Error("failed to fetch index meta",
|
||||
|
|
|
@ -19,6 +19,7 @@ package meta
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
|
@ -275,6 +276,18 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_context_exceeded", func() {
|
||||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.DescribeIndexResponse{Status: merr.Success()}, nil)
|
||||
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*1)
|
||||
defer cancel2()
|
||||
time.Sleep(time.Millisecond * 2)
|
||||
_, err := s.broker.DescribeIndex(ctx2, collectionID)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_error", func() {
|
||||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mock"))
|
||||
|
@ -388,6 +401,18 @@ func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {
|
|||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_context_exceeded", func() {
|
||||
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.GetIndexInfoResponse{Status: merr.Success()}, nil)
|
||||
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*1)
|
||||
defer cancel2()
|
||||
time.Sleep(time.Millisecond * 2)
|
||||
_, err := s.broker.GetIndexInfo(ctx2, collectionID, segmentID)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_error", func() {
|
||||
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mock"))
|
||||
|
|
Loading…
Reference in New Issue