From 3e7f2e8e7d37372f72871c69c4da6223001279c2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 11 Mar 2024 14:41:02 +0800 Subject: [PATCH] enhance: [Cherry-Pick] Use `ListIndexes` instead of `DescribeIndex` for qc broker (#31163) Cherry pick from master pr: #31122 See also #31103 Since querycoord need index meta information from datacoord only, broker shall use `ListIndexes` to skip segment index building check logic in datacoord This PR is also related to #30538, in which DescribeIndex caused lots of memory usage and lead to OOM eventually --------- Signed-off-by: Congqi Xia --- internal/datanode/broker/mock_broker.go | 52 ++++----- .../metastore/mocks/mock_datacoord_catalog.go | 4 +- internal/querycoordv2/job/job_test.go | 10 +- internal/querycoordv2/job/utils.go | 2 +- .../querycoordv2/meta/coordinator_broker.go | 26 ++++- .../meta/coordinator_broker_test.go | 67 ++++++++++- internal/querycoordv2/meta/mock_broker.go | 110 +++++++++--------- .../querycoordv2/observers/leader_observer.go | 2 +- .../observers/leader_observer_test.go | 12 +- .../querycoordv2/observers/target_observer.go | 2 +- internal/querycoordv2/server_test.go | 2 +- internal/querycoordv2/services_test.go | 2 +- internal/querycoordv2/task/executor.go | 4 +- internal/querycoordv2/task/task_test.go | 12 +- 14 files changed, 193 insertions(+), 114 deletions(-) diff --git a/internal/datanode/broker/mock_broker.go b/internal/datanode/broker/mock_broker.go index 7865756463..26e169cfcd 100644 --- a/internal/datanode/broker/mock_broker.go +++ b/internal/datanode/broker/mock_broker.go @@ -65,8 +65,8 @@ type MockBroker_AllocTimestamp_Call struct { } // AllocTimestamp is a helper method to define mock.On call -// - ctx context.Context -// - num uint32 +// - ctx context.Context +// - num uint32 func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call { return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)} } @@ -127,8 +127,8 @@ type MockBroker_AssignSegmentID_Call struct { } // AssignSegmentID is a helper method to define mock.On call -// - ctx context.Context -// - reqs ...*datapb.SegmentIDRequest +// - ctx context.Context +// - reqs ...*datapb.SegmentIDRequest func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call { return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", append([]interface{}{ctx}, reqs...)...)} @@ -189,9 +189,9 @@ type MockBroker_DescribeCollection_Call struct { } // DescribeCollection is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - ts uint64 +// - ctx context.Context +// - collectionID int64 +// - ts uint64 func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call { return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)} } @@ -245,8 +245,8 @@ type MockBroker_DropVirtualChannel_Call struct { } // DropVirtualChannel is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.DropVirtualChannelRequest +// - ctx context.Context +// - req *datapb.DropVirtualChannelRequest func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call { return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)} } @@ -300,8 +300,8 @@ type MockBroker_GetSegmentInfo_Call struct { } // GetSegmentInfo is a helper method to define mock.On call -// - ctx context.Context -// - segmentIDs []int64 +// - ctx context.Context +// - segmentIDs []int64 func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call { return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)} } @@ -343,8 +343,8 @@ type MockBroker_ReportImport_Call struct { } // ReportImport is a helper method to define mock.On call -// - ctx context.Context -// - req *rootcoordpb.ImportResult +// - ctx context.Context +// - req *rootcoordpb.ImportResult func (_e *MockBroker_Expecter) ReportImport(ctx interface{}, req interface{}) *MockBroker_ReportImport_Call { return &MockBroker_ReportImport_Call{Call: _e.mock.On("ReportImport", ctx, req)} } @@ -386,8 +386,8 @@ type MockBroker_ReportTimeTick_Call struct { } // ReportTimeTick is a helper method to define mock.On call -// - ctx context.Context -// - msgs []*msgpb.DataNodeTtMsg +// - ctx context.Context +// - msgs []*msgpb.DataNodeTtMsg func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call { return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)} } @@ -429,8 +429,8 @@ type MockBroker_SaveBinlogPaths_Call struct { } // SaveBinlogPaths is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveBinlogPathsRequest +// - ctx context.Context +// - req *datapb.SaveBinlogPathsRequest func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call { return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)} } @@ -472,8 +472,8 @@ type MockBroker_SaveImportSegment_Call struct { } // SaveImportSegment is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.SaveImportSegmentRequest +// - ctx context.Context +// - req *datapb.SaveImportSegmentRequest func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call { return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)} } @@ -527,9 +527,9 @@ type MockBroker_ShowPartitions_Call struct { } // ShowPartitions is a helper method to define mock.On call -// - ctx context.Context -// - dbName string -// - collectionName string +// - ctx context.Context +// - dbName string +// - collectionName string func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call { return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)} } @@ -571,8 +571,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct { } // UpdateChannelCheckpoint is a helper method to define mock.On call -// - ctx context.Context -// - channelCPs []*msgpb.MsgPosition +// - ctx context.Context +// - channelCPs []*msgpb.MsgPosition func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call { return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)} } @@ -614,8 +614,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct { } // UpdateSegmentStatistics is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.UpdateSegmentStatisticsRequest +// - ctx context.Context +// - req *datapb.UpdateSegmentStatisticsRequest func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} } diff --git a/internal/metastore/mocks/mock_datacoord_catalog.go b/internal/metastore/mocks/mock_datacoord_catalog.go index 0771ce9b12..3a3c9a4c29 100644 --- a/internal/metastore/mocks/mock_datacoord_catalog.go +++ b/internal/metastore/mocks/mock_datacoord_catalog.go @@ -973,8 +973,8 @@ type DataCoordCatalog_SaveChannelCheckpoints_Call struct { } // SaveChannelCheckpoints is a helper method to define mock.On call -// - ctx context.Context -// - positions []*msgpb.MsgPosition +// - ctx context.Context +// - positions []*msgpb.MsgPosition func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoints(ctx interface{}, positions interface{}) *DataCoordCatalog_SaveChannelCheckpoints_Call { return &DataCoordCatalog_SaveChannelCheckpoints_Call{Call: _e.mock.On("SaveChannelCheckpoints", ctx, positions)} } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index b953554839..7929f71054 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -128,7 +128,7 @@ func (suite *JobSuite) SetupSuite() { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). Return(nil, nil) suite.cluster = session.NewMockCluster(suite.T()) @@ -1192,10 +1192,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { // call LoadPartitions failed at get index info getIndexErr := fmt.Errorf("mock get index error") suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeIndex" + return call.Method != "ListIndexes" }) for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeIndex(mock.Anything, collection).Return(nil, getIndexErr) + suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr) loadCollectionReq := &querypb.LoadCollectionRequest{ CollectionID: collection, } @@ -1281,10 +1281,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { } suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeIndex" && call.Method != "GetCollectionSchema" + return call.Method != "ListIndexes" && call.Method != "GetCollectionSchema" }) suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil) } func (suite *JobSuite) TestCallReleasePartitionFailed() { diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index c6a9b26cfc..e84e7eec4d 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -78,7 +78,7 @@ func loadPartitions(ctx context.Context, return err } } - indexes, err := broker.DescribeIndex(ctx, collection) + indexes, err := broker.ListIndexes(ctx, collection) if err != nil { return err } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 54addb12b3..09ea0bc00b 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -43,7 +43,7 @@ type Broker interface { GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) - DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) + ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) @@ -243,7 +243,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID return indexes, nil } -func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { +func (broker *CoordinatorBroker) describeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() @@ -273,3 +273,25 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID } return resp.GetIndexInfos(), nil } + +func (broker *CoordinatorBroker) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + + resp, err := broker.dataCoord.ListIndexes(ctx, &indexpb.ListIndexesRequest{ + CollectionID: collectionID, + }) + + err = merr.CheckRPCCall(resp, err) + if err != nil { + if errors.Is(err, merr.ErrServiceUnimplemented) { + log.Warn("datacoord does not implement ListIndex API fallback to DescribeIndex") + return broker.describeIndex(ctx, collectionID) + } + log.Warn("failed to fetch index meta", zap.Error(err)) + return nil, err + } + + return resp.GetIndexInfos(), nil +} diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index 979fe3302e..28dc18acba 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -270,7 +270,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { return &indexpb.IndexInfo{IndexID: id} }), }, nil) - infos, err := s.broker.DescribeIndex(ctx, collectionID) + infos, err := s.broker.describeIndex(ctx, collectionID) s.NoError(err) s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() })) s.resetMock() @@ -283,7 +283,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*1) defer cancel2() time.Sleep(time.Millisecond * 2) - _, err := s.broker.DescribeIndex(ctx2, collectionID) + _, err := s.broker.describeIndex(ctx2, collectionID) s.Error(err) s.resetMock() }) @@ -292,7 +292,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). Return(nil, errors.New("mock")) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.Error(err) s.resetMock() }) @@ -303,7 +303,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { Status: merr.Status(errors.New("mocked")), }, nil) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.Error(err) s.resetMock() }) @@ -323,12 +323,69 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() { }), }, nil) - _, err := s.broker.DescribeIndex(ctx, collectionID) + _, err := s.broker.describeIndex(ctx, collectionID) s.NoError(err) s.resetMock() }) } +func (s *CoordinatorBrokerDataCoordSuite) TestListIndexes() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + collectionID := int64(100) + + s.Run("normal_case", func() { + indexIDs := []int64{1, 2} + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(&indexpb.ListIndexesResponse{ + Status: merr.Status(nil), + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo { + return &indexpb.IndexInfo{IndexID: id} + }), + }, nil).Once() + infos, err := s.broker.ListIndexes(ctx, collectionID) + s.NoError(err) + s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() })) + }) + + s.Run("datacoord_return_error", func() { + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(nil, errors.New("mocked")).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.Error(err) + }) + + s.Run("datacoord_return_failure_status", func() { + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(&indexpb.ListIndexesResponse{ + Status: merr.Status(errors.New("mocked")), + }, nil).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.Error(err) + }) + + s.Run("datacoord_return_unimplemented", func() { + // mock old version datacoord return unimplemented + s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything). + Return(nil, merr.ErrServiceUnimplemented).Once() + + // mock retry on old version datacoord descibe index + indexIDs := []int64{1, 2} + s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(&indexpb.DescribeIndexResponse{ + Status: merr.Status(nil), + IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo { + return &indexpb.IndexInfo{IndexID: id} + }), + }, nil).Once() + + _, err := s.broker.ListIndexes(ctx, collectionID) + s.NoError(err) + }) +} + func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index a05bb3692d..be1a3b8337 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -28,61 +28,6 @@ func (_m *MockBroker) EXPECT() *MockBroker_Expecter { return &MockBroker_Expecter{mock: &_m.Mock} } -// DescribeIndex provides a mock function with given fields: ctx, collectionID -func (_m *MockBroker) DescribeIndex(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) { - ret := _m.Called(ctx, collectionID) - - var r0 []*indexpb.IndexInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok { - return rf(ctx, collectionID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok { - r0 = rf(ctx, collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*indexpb.IndexInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, collectionID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockBroker_DescribeIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeIndex' -type MockBroker_DescribeIndex_Call struct { - *mock.Call -} - -// DescribeIndex is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -func (_e *MockBroker_Expecter) DescribeIndex(ctx interface{}, collectionID interface{}) *MockBroker_DescribeIndex_Call { - return &MockBroker_DescribeIndex_Call{Call: _e.mock.On("DescribeIndex", ctx, collectionID)} -} - -func (_c *MockBroker_DescribeIndex_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_DescribeIndex_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) - }) - return _c -} - -func (_c *MockBroker_DescribeIndex_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_DescribeIndex_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockBroker_DescribeIndex_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_DescribeIndex_Call { - _c.Call.Return(run) - return _c -} - // GetCollectionSchema provides a mock function with given fields: ctx, collectionID func (_m *MockBroker) GetCollectionSchema(ctx context.Context, collectionID int64) (*schemapb.CollectionSchema, error) { ret := _m.Called(ctx, collectionID) @@ -462,6 +407,61 @@ func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context, return _c } +// ListIndexes provides a mock function with given fields: ctx, collectionID +func (_m *MockBroker) ListIndexes(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) { + ret := _m.Called(ctx, collectionID) + + var r0 []*indexpb.IndexInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok { + return rf(ctx, collectionID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok { + r0 = rf(ctx, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*indexpb.IndexInfo) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, collectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBroker_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes' +type MockBroker_ListIndexes_Call struct { + *mock.Call +} + +// ListIndexes is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +func (_e *MockBroker_Expecter) ListIndexes(ctx interface{}, collectionID interface{}) *MockBroker_ListIndexes_Call { + return &MockBroker_ListIndexes_Call{Call: _e.mock.On("ListIndexes", ctx, collectionID)} +} + +func (_c *MockBroker_ListIndexes_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_ListIndexes_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockBroker_ListIndexes_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_ListIndexes_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBroker_ListIndexes_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_ListIndexes_Call { + _c.Call.Return(run) + return _c +} + // NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBroker(t interface { diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 57d2d261be..7d9596b075 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -221,7 +221,7 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView * } // Get collection index info - indexInfo, err := o.broker.DescribeIndex(ctx, leaderView.CollectionID) + indexInfo, err := o.broker.ListIndexes(ctx, leaderView.CollectionID) if err != nil { log.Warn("fail to get index info of collection", zap.Error(err)) return false diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 8f1ec27099..c65cb8379b 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -123,8 +123,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return( &datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil) // will cause sync failed once - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once() - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ {IndexName: "test"}, }, nil) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( @@ -220,7 +220,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() { &datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( channels, segments, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ {IndexName: "test"}, }, nil) observer.target.UpdateCollectionNextTarget(int64(1)) @@ -356,7 +356,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() { suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( channels, segments, nil) suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil) + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil) observer.target.UpdateCollectionNextTarget(int64(1)) observer.target.UpdateCollectionCurrentTarget(1) observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) @@ -426,7 +426,7 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() { schema := utils.CreateTestSchema() suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ {IndexName: "test"}, }, nil) channels := []*datapb.VchannelInfo{ @@ -511,7 +511,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( channels, segments, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{ {IndexName: "test"}, }, nil) observer.target.UpdateCollectionNextTarget(int64(1)) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index c634804b2f..ae144f5696 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -381,7 +381,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView } // Get collection index info - indexInfo, err := ob.broker.DescribeIndex(ctx, leaderView.CollectionID) + indexInfo, err := ob.broker.ListIndexes(ctx, leaderView.CollectionID) if err != nil { log.Warn("fail to get index info of collection", zap.Error(err)) return false diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 85a50c0881..0ea351843c 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -587,7 +587,7 @@ func (suite *ServerSuite) hackServer() { ) suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe() - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() for _, collection := range suite.collections { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() suite.expectGetRecoverInfo(collection) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index c18cc886db..c01b433e51 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1732,7 +1732,7 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { func (suite *ServiceSuite) expectLoadPartitions() { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). Return(nil, nil) suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything). Return(merr.Success(), nil) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 4bd7a61792..4c97d191ad 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -209,7 +209,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes) // Get collection index info - indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) + indexInfo, err := ex.broker.ListIndexes(ctx, task.CollectionID()) if err != nil { log.Warn("fail to get index meta of collection") return err @@ -359,7 +359,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { log.Warn("failed to get partitions of collection") return err } - indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) + indexInfo, err := ex.broker.ListIndexes(ctx, task.CollectionID()) if err != nil { log.Warn("fail to get index meta of collection") return err diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 41cbe69afe..cebd226345 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -216,7 +216,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { }, }, nil) } - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, FieldID: 100, @@ -390,7 +390,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, }, }, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -486,7 +486,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, }, }, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -780,7 +780,7 @@ func (suite *TaskSuite) TestMoveSegmentTask() { {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, }, }, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -950,7 +950,7 @@ func (suite *TaskSuite) TestTaskCanceled() { {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, }, }, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, }, @@ -1037,7 +1037,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, }, }, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ { CollectionID: suite.collection, },