diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 2b986757b3..1eaa0babe4 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -74,12 +74,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec CollectionID: collectionID, } resp, err := broker.rootCoord.DescribeCollection(ctx, req) - if err != nil { - return nil, err - } - - err = merr.Error(resp.GetStatus()) - if err != nil { + if err := merr.CheckRPCCall(resp, err); err != nil { log.Ctx(ctx).Warn("failed to get collection schema", zap.Error(err)) return nil, err } @@ -89,6 +84,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) req := &milvuspb.ShowPartitionsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), @@ -97,13 +93,7 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID CollectionID: collectionID, } resp, err := broker.rootCoord.ShowPartitions(ctx, req) - if err != nil { - log.Warn("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - return nil, err - } - - err = merr.Error(resp.GetStatus()) - if err != nil { + if err := merr.CheckRPCCall(resp, err); err != nil { log.Warn("failed to get partitions", zap.Error(err)) return nil, err } @@ -114,6 +104,10 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collectionID), + zap.Int64("partitionID", partitionID), + ) getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ Base: commonpbutil.NewMsgBase( @@ -123,14 +117,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection PartitionID: partitionID, } recoveryInfo, err := broker.dataCoord.GetRecoveryInfo(ctx, getRecoveryInfoRequest) - if err != nil { - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) - return nil, nil, err - } - - if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - err = merr.Error(recoveryInfo.GetStatus()) - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) + if err := merr.CheckRPCCall(recoveryInfo, err); err != nil { + log.Warn("get recovery info failed", zap.Error(err)) return nil, nil, err } @@ -140,6 +128,10 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collectionID), + zap.Int64s("partitionIDis", partitionIDs), + ) getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequestV2{ Base: commonpbutil.NewMsgBase( @@ -149,15 +141,9 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti PartitionIDs: partitionIDs, } recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest) - if err != nil { - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), - zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) - return nil, nil, err - } - if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - err = merr.Error(recoveryInfo.GetStatus()) - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) + if err := merr.CheckRPCCall(recoveryInfo, err); err != nil { + log.Warn("get recovery info failed", zap.Error(err)) return nil, nil, err } @@ -167,22 +153,22 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() + log := log.Ctx(ctx).With( + zap.Int64s("segments", ids), + ) req := &datapb.GetSegmentInfoRequest{ SegmentIDs: ids, IncludeUnHealthy: true, } resp, err := broker.dataCoord.GetSegmentInfo(ctx, req) - if err != nil { - log.Warn("failed to get segment info from DataCoord", - zap.Int64s("segments", ids), - zap.Error(err)) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get segment info from DataCoord", zap.Error(err)) return nil, err } if len(resp.Infos) == 0 { - log.Warn("No such segment in DataCoord", - zap.Int64s("segments", ids)) + log.Warn("No such segment in DataCoord") return nil, fmt.Errorf("no such segment in DataCoord") } @@ -202,15 +188,12 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID CollectionID: collectionID, SegmentIDs: []int64{segmentID}, }) - if err == nil { - err = merr.Error(resp.GetStatus()) - } - if err != nil { - log.Warn("failed to get segment index info", - zap.Error(err)) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get segment index info", zap.Error(err)) return nil, err } + if resp.GetSegmentInfo() == nil { err = merr.WrapErrIndexNotFoundForSegment(segmentID) log.Warn("failed to get segment index info", @@ -251,11 +234,11 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID CollectionID: collectionID, }) - if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + if err := merr.CheckRPCCall(resp, err); err != nil { log.Error("failed to fetch index meta", zap.Int64("collection", collectionID), zap.Error(err)) return nil, err } - return resp.IndexInfos, nil + return resp.GetIndexInfos(), nil } diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index f3e372d784..a5e7f8abd5 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -21,129 +21,373 @@ import ( "testing" "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" + "github.com/samber/lo" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) -func TestCoordinatorBroker_GetCollectionSchema(t *testing.T) { - t.Run("got error on DescribeCollection", func(t *testing.T) { - rootCoord := mocks.NewMockRootCoordClient(t) - rootCoord.On("DescribeCollection", - mock.Anything, - mock.Anything, - ).Return(nil, errors.New("error mock DescribeCollection")) - ctx := context.Background() - broker := &CoordinatorBroker{rootCoord: rootCoord} - _, err := broker.GetCollectionSchema(ctx, 100) - assert.Error(t, err) +type CoordinatorBrokerRootCoordSuite struct { + suite.Suite + + rootcoord *mocks.MockRootCoordClient + broker *CoordinatorBroker +} + +func (s *CoordinatorBrokerRootCoordSuite) SetupSuite() { + paramtable.Init() +} + +func (s *CoordinatorBrokerRootCoordSuite) SetupTest() { + s.rootcoord = mocks.NewMockRootCoordClient(s.T()) + s.broker = NewCoordinatorBroker(nil, s.rootcoord) +} + +func (s *CoordinatorBrokerRootCoordSuite) resetMock() { + s.rootcoord.AssertExpectations(s.T()) + s.rootcoord.ExpectedCalls = nil +} + +func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionSchema() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + collectionID := int64(100) + + s.Run("normal case", func() { + s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Schema: &schemapb.CollectionSchema{Name: "test_schema"}, + }, nil) + + schema, err := s.broker.GetCollectionSchema(ctx, collectionID) + s.NoError(err) + s.Equal("test_schema", schema.GetName()) + s.resetMock() }) - t.Run("non-success code", func(t *testing.T) { - rootCoord := mocks.NewMockRootCoordClient(t) - rootCoord.On("DescribeCollection", - mock.Anything, - mock.Anything, - ).Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}, - }, nil) - ctx := context.Background() - broker := &CoordinatorBroker{rootCoord: rootCoord} - _, err := broker.GetCollectionSchema(ctx, 100) - assert.Error(t, err) + s.Run("rootcoord_return_error", func() { + s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(nil, errors.New("mock error")) + + _, err := s.broker.GetCollectionSchema(ctx, collectionID) + s.Error(err) + s.resetMock() }) - t.Run("normal case", func(t *testing.T) { - rootCoord := mocks.NewMockRootCoordClient(t) - rootCoord.On("DescribeCollection", - mock.Anything, - mock.Anything, - ).Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - Schema: &schemapb.CollectionSchema{Name: "test_schema"}, - }, nil) - ctx := context.Background() - broker := &CoordinatorBroker{rootCoord: rootCoord} - schema, err := broker.GetCollectionSchema(ctx, 100) - assert.NoError(t, err) - assert.Equal(t, "test_schema", schema.GetName()) + s.Run("return_failure_status", func() { + s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}, + }, nil) + + _, err := s.broker.GetCollectionSchema(ctx, collectionID) + s.Error(err) + s.resetMock() }) } -func TestCoordinatorBroker_GetRecoveryInfo(t *testing.T) { - t.Run("normal case", func(t *testing.T) { - dc := mocks.NewMockDataCoordClient(t) - dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponseV2{}, nil) - - ctx := context.Background() - broker := &CoordinatorBroker{dataCoord: dc} - - _, _, err := broker.GetRecoveryInfoV2(ctx, 1) - assert.NoError(t, err) - }) - - t.Run("get error", func(t *testing.T) { - dc := mocks.NewMockDataCoordClient(t) - fakeErr := errors.New("fake error") - dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(nil, fakeErr) - - ctx := context.Background() - broker := &CoordinatorBroker{dataCoord: dc} - - _, _, err := broker.GetRecoveryInfoV2(ctx, 1) - assert.ErrorIs(t, err, fakeErr) - }) - - t.Run("return non-success code", func(t *testing.T) { - dc := mocks.NewMockDataCoordClient(t) - dc.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponseV2{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil) - - ctx := context.Background() - broker := &CoordinatorBroker{dataCoord: dc} - - _, _, err := broker.GetRecoveryInfoV2(ctx, 1) - assert.Error(t, err) - }) -} - -func TestCoordinatorBroker_GetPartitions(t *testing.T) { +func (s *CoordinatorBrokerRootCoordSuite) TestGetPartitions() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() collection := int64(100) partitions := []int64{10, 11, 12} - t.Run("normal case", func(t *testing.T) { - rc := mocks.NewMockRootCoordClient(t) - rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{ - Status: &commonpb.Status{}, + s.Run("normal_case", func() { + s.rootcoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{ + Status: merr.Status(nil), PartitionIDs: partitions, }, nil) - ctx := context.Background() - broker := &CoordinatorBroker{rootCoord: rc} - - retPartitions, err := broker.GetPartitions(ctx, collection) - assert.NoError(t, err) - assert.ElementsMatch(t, partitions, retPartitions) + retPartitions, err := s.broker.GetPartitions(ctx, collection) + s.NoError(err) + s.ElementsMatch(partitions, retPartitions) + s.resetMock() }) - t.Run("collection not exist", func(t *testing.T) { - rc := mocks.NewMockRootCoordClient(t) - rc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{ + s.Run("collection_not_exist", func() { + s.rootcoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{ Status: merr.Status(merr.WrapErrCollectionNotFound("mock")), }, nil) - ctx := context.Background() - broker := &CoordinatorBroker{rootCoord: rc} - _, err := broker.GetPartitions(ctx, collection) - assert.ErrorIs(t, err, merr.ErrCollectionNotFound) + _, err := s.broker.GetPartitions(ctx, collection) + s.Error(err) + s.ErrorIs(err, merr.ErrCollectionNotFound) + s.resetMock() }) } + +type CoordinatorBrokerDataCoordSuite struct { + suite.Suite + + datacoord *mocks.MockDataCoordClient + broker *CoordinatorBroker +} + +func (s *CoordinatorBrokerDataCoordSuite) SetupSuite() { + paramtable.Init() +} + +func (s *CoordinatorBrokerDataCoordSuite) SetupTest() { + s.datacoord = mocks.NewMockDataCoordClient(s.T()) + s.broker = NewCoordinatorBroker(s.datacoord, nil) +} + +func (s *CoordinatorBrokerDataCoordSuite) resetMock() { + s.datacoord.AssertExpectations(s.T()) + s.datacoord.ExpectedCalls = nil +} + +func (s *CoordinatorBrokerDataCoordSuite) TestGetRecoveryInfo() { + collectionID := int64(100) + partitionID := int64(1000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("normal_case", func() { + channels := []string{"dml_0"} + segmentIDs := []int64{1, 2, 3} + s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything). + Return(&datapb.GetRecoveryInfoResponse{ + Channels: lo.Map(channels, func(ch string, _ int) *datapb.VchannelInfo { + return &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "dml_0", + } + }), + Binlogs: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentBinlogs { + return &datapb.SegmentBinlogs{SegmentID: id} + }), + }, nil) + + vchans, segInfos, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID) + s.NoError(err) + s.ElementsMatch(channels, lo.Map(vchans, func(info *datapb.VchannelInfo, _ int) string { + return info.GetChannelName() + })) + s.ElementsMatch(segmentIDs, lo.Map(segInfos, func(info *datapb.SegmentBinlogs, _ int) int64 { + return info.GetSegmentID() + })) + s.resetMock() + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything). + Return(nil, errors.New("mock")) + + _, _, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID) + s.Error(err) + s.resetMock() + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything). + Return(&datapb.GetRecoveryInfoResponse{ + Status: merr.Status(errors.New("mocked")), + }, nil) + + _, _, err := s.broker.GetRecoveryInfo(ctx, collectionID, partitionID) + s.Error(err) + s.resetMock() + }) +} + +func (s *CoordinatorBrokerDataCoordSuite) TestGetRecoveryInfoV2() { + collectionID := int64(100) + partitionID := int64(1000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.Run("normal_case", func() { + channels := []string{"dml_0"} + segmentIDs := []int64{1, 2, 3} + s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything). + Return(&datapb.GetRecoveryInfoResponseV2{ + Channels: lo.Map(channels, func(ch string, _ int) *datapb.VchannelInfo { + return &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "dml_0", + } + }), + Segments: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ID: id} + }), + }, nil) + + vchans, segInfos, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID) + s.NoError(err) + s.ElementsMatch(channels, lo.Map(vchans, func(info *datapb.VchannelInfo, _ int) string { + return info.GetChannelName() + })) + s.ElementsMatch(segmentIDs, lo.Map(segInfos, func(info *datapb.SegmentInfo, _ int) int64 { + return info.GetID() + })) + s.resetMock() + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything). + Return(nil, errors.New("mock")) + + _, _, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID) + s.Error(err) + s.resetMock() + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything). + Return(&datapb.GetRecoveryInfoResponseV2{ + Status: merr.Status(errors.New("mocked")), + }, nil) + + _, _, err := s.broker.GetRecoveryInfoV2(ctx, collectionID, partitionID) + s.Error(err) + s.resetMock() + }) +} + +func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + collectionID := int64(100) + + s.Run("normal_case", func() { + indexIDs := []int64{1, 2} + s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(&indexpb.DescribeIndexResponse{ + Status: merr.Status(nil), + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo { + return &indexpb.IndexInfo{IndexID: id} + }), + }, nil) + infos, err := s.broker.DescribeIndex(ctx, collectionID) + s.NoError(err) + s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() })) + s.resetMock() + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(nil, errors.New("mock")) + + _, err := s.broker.DescribeIndex(ctx, collectionID) + s.Error(err) + s.resetMock() + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(&indexpb.DescribeIndexResponse{ + Status: merr.Status(errors.New("mocked")), + }, nil) + + _, err := s.broker.DescribeIndex(ctx, collectionID) + s.Error(err) + s.resetMock() + }) +} + +func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + collectionID := int64(100) + segmentIDs := []int64{10000, 10001, 10002} + + s.Run("normal_case", func() { + s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything). + Return(&datapb.GetSegmentInfoResponse{ + Status: merr.Status(nil), + Infos: lo.Map(segmentIDs, func(id int64, _ int) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ID: id, CollectionID: collectionID} + }), + }, nil) + + resp, err := s.broker.GetSegmentInfo(ctx, segmentIDs...) + s.NoError(err) + s.ElementsMatch(segmentIDs, lo.Map(resp.GetInfos(), func(info *datapb.SegmentInfo, _ int) int64 { + return info.GetID() + })) + s.resetMock() + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything). + Return(nil, errors.New("mock")) + + _, err := s.broker.GetSegmentInfo(ctx, segmentIDs...) + s.Error(err) + s.resetMock() + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything). + Return(&datapb.GetSegmentInfoResponse{Status: merr.Status(errors.New("mocked"))}, nil) + + _, err := s.broker.GetSegmentInfo(ctx, segmentIDs...) + s.Error(err) + s.resetMock() + }) +} + +func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + collectionID := int64(100) + segmentID := int64(10000) + + s.Run("normal_case", func() { + indexIDs := []int64{1, 2, 3} + s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything). + Return(&indexpb.GetIndexInfoResponse{ + Status: merr.Status(nil), + SegmentInfo: map[int64]*indexpb.SegmentInfo{ + segmentID: { + SegmentID: segmentID, + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexFilePathInfo { + return &indexpb.IndexFilePathInfo{IndexID: id} + }), + }, + }, + }, nil) + + infos, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID) + s.NoError(err) + s.ElementsMatch(indexIDs, lo.Map(infos, func(info *querypb.FieldIndexInfo, _ int) int64 { + return info.GetIndexID() + })) + s.resetMock() + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything). + Return(nil, errors.New("mock")) + + _, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID) + s.Error(err) + s.resetMock() + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().GetIndexInfos(mock.Anything, mock.Anything). + Return(&indexpb.GetIndexInfoResponse{Status: merr.Status(errors.New("mock"))}, nil) + + _, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID) + s.Error(err) + s.resetMock() + }) +} + +func TestCoordinatorBroker(t *testing.T) { + suite.Run(t, new(CoordinatorBrokerRootCoordSuite)) + suite.Run(t, new(CoordinatorBrokerDataCoordSuite)) +}