mirror of https://github.com/milvus-io/milvus.git
fix: Add retry on unimplemented error for datacoord (#30554)
issue: #30553 when datacoord with version 2.2 and querycoord with version 2.3 coexist during rolling upgrade, `DescribeIndex/GetIndexInfo` will return `unimplemented` error This PR add retry on `DescribeIndex/GetIndexInfo`, to prevent load collection failed during rolling upgrade from milvus 2.2 to 2.3. --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/29830/head
parent
a6d9eb7f20
commit
99297ab81b
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -34,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
. "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -190,9 +192,20 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
|
|||
zap.Int64("segmentID", segmentID),
|
||||
)
|
||||
|
||||
resp, err := broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
|
||||
CollectionID: collectionID,
|
||||
SegmentIDs: []int64{segmentID},
|
||||
// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
|
||||
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
||||
var resp *indexpb.GetIndexInfoResponse
|
||||
var err error
|
||||
retry.Do(ctx, func() error {
|
||||
resp, err = broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
|
||||
CollectionID: collectionID,
|
||||
SegmentIDs: []int64{segmentID},
|
||||
})
|
||||
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
|
@ -236,8 +249,18 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID
|
|||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
resp, err := broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
CollectionID: collectionID,
|
||||
// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
|
||||
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
||||
var resp *indexpb.DescribeIndexResponse
|
||||
var err error
|
||||
retry.Do(ctx, func() error {
|
||||
resp, err = broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
|
|
|
@ -295,6 +295,26 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_unimplemented", func() {
|
||||
// mock old version datacoord return unimplemented
|
||||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(nil, merr.ErrServiceUnimplemented).Times(1)
|
||||
|
||||
// mock retry on new version datacoord return success
|
||||
indexIDs := []int64{1, 2}
|
||||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.DescribeIndexResponse{
|
||||
Status: merr.Status(nil),
|
||||
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo {
|
||||
return &indexpb.IndexInfo{IndexID: id}
|
||||
}),
|
||||
}, nil)
|
||||
|
||||
_, err := s.broker.DescribeIndex(ctx, collectionID)
|
||||
s.NoError(err)
|
||||
s.resetMock()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() {
|
||||
|
@ -386,6 +406,31 @@ func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {
|
|||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_unimplemented", func() {
|
||||
// mock old version datacoord return unimplemented
|
||||
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
|
||||
Return(nil, merr.ErrServiceUnimplemented).Times(1)
|
||||
|
||||
// mock retry on new version datacoord return success
|
||||
indexIDs := []int64{1, 2, 3}
|
||||
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.GetIndexInfoResponse{
|
||||
Status: merr.Status(nil),
|
||||
SegmentInfo: map[int64]*indexpb.SegmentInfo{
|
||||
segmentID: {
|
||||
SegmentID: segmentID,
|
||||
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexFilePathInfo {
|
||||
return &indexpb.IndexFilePathInfo{IndexID: id}
|
||||
}),
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
_, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
|
||||
s.NoError(err)
|
||||
s.resetMock()
|
||||
})
|
||||
}
|
||||
|
||||
func TestCoordinatorBroker(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue