Refine log and err handling in querycoord broker (#27546)

- Add log.Ctx(ctx) for all log occurences
- Use `merr.CheckRPCErr` for all grpc response error handling

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27631/head
congqixia 2023-10-10 11:49:32 +08:00 committed by GitHub
parent cb71a3e235
commit b91a5ef42c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 366 additions and 139 deletions

View File

@ -74,12 +74,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
CollectionID: collectionID,
}
resp, err := broker.rootCoord.DescribeCollection(ctx, req)
if err != nil {
return nil, err
}
err = merr.Error(resp.GetStatus())
if err != nil {
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("failed to get collection schema", zap.Error(err))
return nil, err
}
@ -89,6 +84,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
req := &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
@ -97,13 +93,7 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID
CollectionID: collectionID,
}
resp, err := broker.rootCoord.ShowPartitions(ctx, req)
if err != nil {
log.Warn("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
err = merr.Error(resp.GetStatus())
if err != nil {
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return nil, err
}
@ -114,6 +104,10 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID
func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
)
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
Base: commonpbutil.NewMsgBase(
@ -123,14 +117,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
PartitionID: partitionID,
}
recoveryInfo, err := broker.dataCoord.GetRecoveryInfo(ctx, getRecoveryInfoRequest)
if err != nil {
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
return nil, nil, err
}
if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = merr.Error(recoveryInfo.GetStatus())
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
if err := merr.CheckRPCCall(recoveryInfo, err); err != nil {
log.Warn("get recovery info failed", zap.Error(err))
return nil, nil, err
}
@ -140,6 +128,10 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDis", partitionIDs),
)
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequestV2{
Base: commonpbutil.NewMsgBase(
@ -149,15 +141,9 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
PartitionIDs: partitionIDs,
}
recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest)
if err != nil {
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
return nil, nil, err
}
if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = merr.Error(recoveryInfo.GetStatus())
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
if err := merr.CheckRPCCall(recoveryInfo, err); err != nil {
log.Warn("get recovery info failed", zap.Error(err))
return nil, nil, err
}
@ -167,22 +153,22 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(
zap.Int64s("segments", ids),
)
req := &datapb.GetSegmentInfoRequest{
SegmentIDs: ids,
IncludeUnHealthy: true,
}
resp, err := broker.dataCoord.GetSegmentInfo(ctx, req)
if err != nil {
log.Warn("failed to get segment info from DataCoord",
zap.Int64s("segments", ids),
zap.Error(err))
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
return nil, err
}
if len(resp.Infos) == 0 {
log.Warn("No such segment in DataCoord",
zap.Int64s("segments", ids))
log.Warn("No such segment in DataCoord")
return nil, fmt.Errorf("no such segment in DataCoord")
}
@ -202,15 +188,12 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
CollectionID: collectionID,
SegmentIDs: []int64{segmentID},
})
if err == nil {
err = merr.Error(resp.GetStatus())
}
if err != nil {
log.Warn("failed to get segment index info",
zap.Error(err))
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get segment index info", zap.Error(err))
return nil, err
}
if resp.GetSegmentInfo() == nil {
err = merr.WrapErrIndexNotFoundForSegment(segmentID)
log.Warn("failed to get segment index info",
@ -251,11 +234,11 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID
CollectionID: collectionID,
})
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Error("failed to fetch index meta",
zap.Int64("collection", collectionID),
zap.Error(err))
return nil, err
}
return resp.IndexInfos, nil
return resp.GetIndexInfos(), nil
}

View File

@ -21,129 +21,373 @@ import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestCoordinatorBroker_GetCollectionSchema(t *testing.T) {
t.Run("got error on DescribeCollection", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.On("DescribeCollection",
mock.Anything,
mock.Anything,
).Return(nil, errors.New("error mock DescribeCollection"))
ctx := context.Background()
broker := &CoordinatorBroker{rootCoord: rootCoord}
_, err := broker.GetCollectionSchema(ctx, 100)
assert.Error(t, err)
type CoordinatorBrokerRootCoordSuite struct {
suite.Suite
rootcoord *mocks.MockRootCoordClient
broker *CoordinatorBroker
}
func (s *CoordinatorBrokerRootCoordSuite) SetupSuite() {
paramtable.Init()
}
func (s *CoordinatorBrokerRootCoordSuite) SetupTest() {
s.rootcoord = mocks.NewMockRootCoordClient(s.T())
s.broker = NewCoordinatorBroker(nil, s.rootcoord)
}
func (s *CoordinatorBrokerRootCoordSuite) resetMock() {
s.rootcoord.AssertExpectations(s.T())
s.rootcoord.ExpectedCalls = nil
}
func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionSchema() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
collectionID := int64(100)
s.Run("normal case", func() {
s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Schema: &schemapb.CollectionSchema{Name: "test_schema"},
}, nil)
schema, err := s.broker.GetCollectionSchema(ctx, collectionID)
s.NoError(err)
s.Equal("test_schema", schema.GetName())
s.resetMock()
})
t.Run("non-success code", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.On("DescribeCollection",
mock.Anything,
mock.Anything,
).Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{rootCoord: rootCoord}
_, err := broker.GetCollectionSchema(ctx, 100)
assert.Error(t, err)
s.Run("rootcoord_return_error", func() {
s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(nil, errors.New("mock error"))
_, err := s.broker.GetCollectionSchema(ctx, collectionID)
s.Error(err)
s.resetMock()
})
t.Run("normal case", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.On("DescribeCollection",
mock.Anything,
mock.Anything,
).Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Schema: &schemapb.CollectionSchema{Name: "test_schema"},
}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{rootCoord: rootCoord}
schema, err := broker.GetCollectionSchema(ctx, 100)
assert.NoError(t, err)
assert.Equal(t, "test_schema", schema.GetName())
s.Run("return_failure_status", func() {
s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists},
}, nil)
_, err := s.broker.GetCollectionSchema(ctx, collectionID)
s.Error(err)
s.resetMock()
})
}
func TestCoordinatorBroker_GetRecoveryInfo(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
dc := mocks.NewMockDataCoordClient(t)
dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponseV2{}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{dataCoord: dc}
_, _, err := broker.GetRecoveryInfoV2(ctx, 1)
assert.NoError(t, err)
})
t.Run("get error", func(t *testing.T) {
dc := mocks.NewMockDataCoordClient(t)
fakeErr := errors.New("fake error")
dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(nil, fakeErr)
ctx := context.Background()
broker := &CoordinatorBroker{dataCoord: dc}
_, _, err := broker.GetRecoveryInfoV2(ctx, 1)
assert.ErrorIs(t, err, fakeErr)
})
t.Run("return non-success code", func(t *testing.T) {
dc := mocks.NewMockDataCoordClient(t)
dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponseV2{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{dataCoord: dc}
_, _, err := broker.GetRecoveryInfoV2(ctx, 1)
assert.Error(t, err)
})
}
func TestCoordinatorBroker_GetPartitions(t *testing.T) {
func (s *CoordinatorBrokerRootCoordSuite) TestGetPartitions() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
collection := int64(100)
partitions := []int64{10, 11, 12}
t.Run("normal case", func(t *testing.T) {
rc := mocks.NewMockRootCoordClient(t)
rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{},
s.Run("normal_case", func() {
s.rootcoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Status(nil),
PartitionIDs: partitions,
}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{rootCoord: rc}
retPartitions, err := broker.GetPartitions(ctx, collection)
assert.NoError(t, err)
assert.ElementsMatch(t, partitions, retPartitions)
retPartitions, err := s.broker.GetPartitions(ctx, collection)
s.NoError(err)
s.ElementsMatch(partitions, retPartitions)
s.resetMock()
})
t.Run("collection not exist", func(t *testing.T) {
rc := mocks.NewMockRootCoordClient(t)
rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
s.Run("collection_not_exist", func() {
s.rootcoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Status(merr.WrapErrCollectionNotFound("mock")),
}, nil)
ctx := context.Background()
broker := &CoordinatorBroker{rootCoord: rc}
_, err := broker.GetPartitions(ctx, collection)
assert.ErrorIs(t, err, merr.ErrCollectionNotFound)
_, err := s.broker.GetPartitions(ctx, collection)
s.Error(err)
s.ErrorIs(err, merr.ErrCollectionNotFound)
s.resetMock()
})
}
type CoordinatorBrokerDataCoordSuite struct {
suite.Suite
datacoord *mocks.MockDataCoordClient
broker *CoordinatorBroker
}
func (s *CoordinatorBrokerDataCoordSuite) SetupSuite() {
paramtable.Init()
}
func (s *CoordinatorBrokerDataCoordSuite) SetupTest() {
s.datacoord = mocks.NewMockDataCoordClient(s.T())
s.broker = NewCoordinatorBroker(s.datacoord, nil)
}
func (s *CoordinatorBrokerDataCoordSuite) resetMock() {
s.datacoord.AssertExpectations(s.T())
s.datacoord.ExpectedCalls = nil
}
func (s *CoordinatorBrokerDataCoordSuite) TestGetRecoveryInfo() {
collectionID := int64(100)
partitionID := int64(1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("normal_case", func() {
channels := []string{"dml_0"}
segmentIDs := []int64{1, 2, 3}
s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).
Return(&datapb.GetRecoveryInfoResponse{
Channels: lo.Map(channels, func(ch string, _ int) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "dml_0",
}
}),
Binlogs: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentBinlogs {
return &datapb.SegmentBinlogs{SegmentID: id}
}),
}, nil)
vchans, segInfos, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID)
s.NoError(err)
s.ElementsMatch(channels, lo.Map(vchans, func(info *datapb.VchannelInfo, _ int) string {
return info.GetChannelName()
}))
s.ElementsMatch(segmentIDs, lo.Map(segInfos, func(info *datapb.SegmentBinlogs, _ int) int64 {
return info.GetSegmentID()
}))
s.resetMock()
})
s.Run("datacoord_return_error", func() {
s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, _, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).
Return(&datapb.GetRecoveryInfoResponse{
Status: merr.Status(errors.New("mocked")),
}, nil)
_, _, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID)
s.Error(err)
s.resetMock()
})
}
func (s *CoordinatorBrokerDataCoordSuite) TestGetRecoveryInfoV2() {
collectionID := int64(100)
partitionID := int64(1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("normal_case", func() {
channels := []string{"dml_0"}
segmentIDs := []int64{1, 2, 3}
s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(&datapb.GetRecoveryInfoResponseV2{
Channels: lo.Map(channels, func(ch string, _ int) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "dml_0",
}
}),
Segments: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{ID: id}
}),
}, nil)
vchans, segInfos, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID)
s.NoError(err)
s.ElementsMatch(channels, lo.Map(vchans, func(info *datapb.VchannelInfo, _ int) string {
return info.GetChannelName()
}))
s.ElementsMatch(segmentIDs, lo.Map(segInfos, func(info *datapb.SegmentInfo, _ int) int64 {
return info.GetID()
}))
s.resetMock()
})
s.Run("datacoord_return_error", func() {
s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, _, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(&datapb.GetRecoveryInfoResponseV2{
Status: merr.Status(errors.New("mocked")),
}, nil)
_, _, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID)
s.Error(err)
s.resetMock()
})
}
func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionID := int64(100)
s.Run("normal_case", func() {
indexIDs := []int64{1, 2}
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
Return(&indexpb.DescribeIndexResponse{
Status: merr.Status(nil),
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo {
return &indexpb.IndexInfo{IndexID: id}
}),
}, nil)
infos, err := s.broker.DescribeIndex(ctx, collectionID)
s.NoError(err)
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() }))
s.resetMock()
})
s.Run("datacoord_return_error", func() {
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, err := s.broker.DescribeIndex(ctx, collectionID)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
Return(&indexpb.DescribeIndexResponse{
Status: merr.Status(errors.New("mocked")),
}, nil)
_, err := s.broker.DescribeIndex(ctx, collectionID)
s.Error(err)
s.resetMock()
})
}
func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionID := int64(100)
segmentIDs := []int64{10000, 10001, 10002}
s.Run("normal_case", func() {
s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return(&datapb.GetSegmentInfoResponse{
Status: merr.Status(nil),
Infos: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{ID: id, CollectionID: collectionID}
}),
}, nil)
resp, err := s.broker.GetSegmentInfo(ctx, segmentIDs...)
s.NoError(err)
s.ElementsMatch(segmentIDs, lo.Map(resp.GetInfos(), func(info *datapb.SegmentInfo, _ int) int64 {
return info.GetID()
}))
s.resetMock()
})
s.Run("datacoord_return_error", func() {
s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, err := s.broker.GetSegmentInfo(ctx, segmentIDs...)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return(&datapb.GetSegmentInfoResponse{Status: merr.Status(errors.New("mocked"))}, nil)
_, err := s.broker.GetSegmentInfo(ctx, segmentIDs...)
s.Error(err)
s.resetMock()
})
}
func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
collectionID := int64(100)
segmentID := int64(10000)
s.Run("normal_case", func() {
indexIDs := []int64{1, 2, 3}
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
Return(&indexpb.GetIndexInfoResponse{
Status: merr.Status(nil),
SegmentInfo: map[int64]*indexpb.SegmentInfo{
segmentID: {
SegmentID: segmentID,
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexFilePathInfo {
return &indexpb.IndexFilePathInfo{IndexID: id}
}),
},
},
}, nil)
infos, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
s.NoError(err)
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *querypb.FieldIndexInfo, _ int) int64 {
return info.GetIndexID()
}))
s.resetMock()
})
s.Run("datacoord_return_error", func() {
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
_, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything).
Return(&indexpb.GetIndexInfoResponse{Status: merr.Status(errors.New("mock"))}, nil)
_, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
s.Error(err)
s.resetMock()
})
}
func TestCoordinatorBroker(t *testing.T) {
suite.Run(t, new(CoordinatorBrokerRootCoordSuite))
suite.Run(t, new(CoordinatorBrokerDataCoordSuite))
}