mirror of https://github.com/milvus-io/milvus.git
enhance: [Cherry-Pick] Use `ListIndexes` instead of `DescribeIndex` for qc broker (#31163)
Cherry pick from master pr: #31122 See also #31103 Since querycoord need index meta information from datacoord only, broker shall use `ListIndexes` to skip segment index building check logic in datacoord This PR is also related to #30538, in which DescribeIndex caused lots of memory usage and lead to OOM eventually --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/31177/head
parent
1dd4f4b4dc
commit
3e7f2e8e7d
|
@ -65,8 +65,8 @@ type MockBroker_AllocTimestamp_Call struct {
|
|||
}
|
||||
|
||||
// AllocTimestamp is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - num uint32
|
||||
// - ctx context.Context
|
||||
// - num uint32
|
||||
func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call {
|
||||
return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)}
|
||||
}
|
||||
|
@ -127,8 +127,8 @@ type MockBroker_AssignSegmentID_Call struct {
|
|||
}
|
||||
|
||||
// AssignSegmentID is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - reqs ...*datapb.SegmentIDRequest
|
||||
// - ctx context.Context
|
||||
// - reqs ...*datapb.SegmentIDRequest
|
||||
func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call {
|
||||
return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID",
|
||||
append([]interface{}{ctx}, reqs...)...)}
|
||||
|
@ -189,9 +189,9 @@ type MockBroker_DescribeCollection_Call struct {
|
|||
}
|
||||
|
||||
// DescribeCollection is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - ts uint64
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - ts uint64
|
||||
func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call {
|
||||
return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)}
|
||||
}
|
||||
|
@ -245,8 +245,8 @@ type MockBroker_DropVirtualChannel_Call struct {
|
|||
}
|
||||
|
||||
// DropVirtualChannel is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *datapb.DropVirtualChannelRequest
|
||||
// - ctx context.Context
|
||||
// - req *datapb.DropVirtualChannelRequest
|
||||
func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call {
|
||||
return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)}
|
||||
}
|
||||
|
@ -300,8 +300,8 @@ type MockBroker_GetSegmentInfo_Call struct {
|
|||
}
|
||||
|
||||
// GetSegmentInfo is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - segmentIDs []int64
|
||||
// - ctx context.Context
|
||||
// - segmentIDs []int64
|
||||
func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call {
|
||||
return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)}
|
||||
}
|
||||
|
@ -343,8 +343,8 @@ type MockBroker_ReportImport_Call struct {
|
|||
}
|
||||
|
||||
// ReportImport is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *rootcoordpb.ImportResult
|
||||
// - ctx context.Context
|
||||
// - req *rootcoordpb.ImportResult
|
||||
func (_e *MockBroker_Expecter) ReportImport(ctx interface{}, req interface{}) *MockBroker_ReportImport_Call {
|
||||
return &MockBroker_ReportImport_Call{Call: _e.mock.On("ReportImport", ctx, req)}
|
||||
}
|
||||
|
@ -386,8 +386,8 @@ type MockBroker_ReportTimeTick_Call struct {
|
|||
}
|
||||
|
||||
// ReportTimeTick is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - msgs []*msgpb.DataNodeTtMsg
|
||||
// - ctx context.Context
|
||||
// - msgs []*msgpb.DataNodeTtMsg
|
||||
func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call {
|
||||
return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)}
|
||||
}
|
||||
|
@ -429,8 +429,8 @@ type MockBroker_SaveBinlogPaths_Call struct {
|
|||
}
|
||||
|
||||
// SaveBinlogPaths is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *datapb.SaveBinlogPathsRequest
|
||||
// - ctx context.Context
|
||||
// - req *datapb.SaveBinlogPathsRequest
|
||||
func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call {
|
||||
return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)}
|
||||
}
|
||||
|
@ -472,8 +472,8 @@ type MockBroker_SaveImportSegment_Call struct {
|
|||
}
|
||||
|
||||
// SaveImportSegment is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *datapb.SaveImportSegmentRequest
|
||||
// - ctx context.Context
|
||||
// - req *datapb.SaveImportSegmentRequest
|
||||
func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call {
|
||||
return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)}
|
||||
}
|
||||
|
@ -527,9 +527,9 @@ type MockBroker_ShowPartitions_Call struct {
|
|||
}
|
||||
|
||||
// ShowPartitions is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - dbName string
|
||||
// - collectionName string
|
||||
// - ctx context.Context
|
||||
// - dbName string
|
||||
// - collectionName string
|
||||
func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call {
|
||||
return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)}
|
||||
}
|
||||
|
@ -571,8 +571,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct {
|
|||
}
|
||||
|
||||
// UpdateChannelCheckpoint is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - channelCPs []*msgpb.MsgPosition
|
||||
// - ctx context.Context
|
||||
// - channelCPs []*msgpb.MsgPosition
|
||||
func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call {
|
||||
return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)}
|
||||
}
|
||||
|
@ -614,8 +614,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct {
|
|||
}
|
||||
|
||||
// UpdateSegmentStatistics is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - req *datapb.UpdateSegmentStatisticsRequest
|
||||
// - ctx context.Context
|
||||
// - req *datapb.UpdateSegmentStatisticsRequest
|
||||
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
|
||||
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
|
||||
}
|
||||
|
|
|
@ -973,8 +973,8 @@ type DataCoordCatalog_SaveChannelCheckpoints_Call struct {
|
|||
}
|
||||
|
||||
// SaveChannelCheckpoints is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - positions []*msgpb.MsgPosition
|
||||
// - ctx context.Context
|
||||
// - positions []*msgpb.MsgPosition
|
||||
func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoints(ctx interface{}, positions interface{}) *DataCoordCatalog_SaveChannelCheckpoints_Call {
|
||||
return &DataCoordCatalog_SaveChannelCheckpoints_Call{Call: _e.mock.On("SaveChannelCheckpoints", ctx, positions)}
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ func (suite *JobSuite) SetupSuite() {
|
|||
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).
|
||||
Return(nil, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(nil, nil)
|
||||
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
|
@ -1192,10 +1192,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
|
|||
// call LoadPartitions failed at get index info
|
||||
getIndexErr := fmt.Errorf("mock get index error")
|
||||
suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool {
|
||||
return call.Method != "DescribeIndex"
|
||||
return call.Method != "ListIndexes"
|
||||
})
|
||||
for _, collection := range suite.collections {
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, collection).Return(nil, getIndexErr)
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr)
|
||||
loadCollectionReq := &querypb.LoadCollectionRequest{
|
||||
CollectionID: collection,
|
||||
}
|
||||
|
@ -1281,10 +1281,10 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() {
|
|||
}
|
||||
|
||||
suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool {
|
||||
return call.Method != "DescribeIndex" && call.Method != "GetCollectionSchema"
|
||||
return call.Method != "ListIndexes" && call.Method != "GetCollectionSchema"
|
||||
})
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
}
|
||||
|
||||
func (suite *JobSuite) TestCallReleasePartitionFailed() {
|
||||
|
|
|
@ -78,7 +78,7 @@ func loadPartitions(ctx context.Context,
|
|||
return err
|
||||
}
|
||||
}
|
||||
indexes, err := broker.DescribeIndex(ctx, collection)
|
||||
indexes, err := broker.ListIndexes(ctx, collection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ type Broker interface {
|
|||
GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error)
|
||||
GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
|
||||
GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)
|
||||
DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
|
||||
ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
|
||||
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error)
|
||||
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)
|
||||
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
|
||||
|
@ -243,7 +243,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
|
|||
return indexes, nil
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
||||
func (broker *CoordinatorBroker) describeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
|
@ -273,3 +273,25 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID
|
|||
}
|
||||
return resp.GetIndexInfos(), nil
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
||||
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
resp, err := broker.dataCoord.ListIndexes(ctx, &indexpb.ListIndexesRequest{
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
|
||||
err = merr.CheckRPCCall(resp, err)
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
log.Warn("datacoord does not implement ListIndex API fallback to DescribeIndex")
|
||||
return broker.describeIndex(ctx, collectionID)
|
||||
}
|
||||
log.Warn("failed to fetch index meta", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.GetIndexInfos(), nil
|
||||
}
|
||||
|
|
|
@ -270,7 +270,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
return &indexpb.IndexInfo{IndexID: id}
|
||||
}),
|
||||
}, nil)
|
||||
infos, err := s.broker.DescribeIndex(ctx, collectionID)
|
||||
infos, err := s.broker.describeIndex(ctx, collectionID)
|
||||
s.NoError(err)
|
||||
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() }))
|
||||
s.resetMock()
|
||||
|
@ -283,7 +283,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*1)
|
||||
defer cancel2()
|
||||
time.Sleep(time.Millisecond * 2)
|
||||
_, err := s.broker.DescribeIndex(ctx2, collectionID)
|
||||
_, err := s.broker.describeIndex(ctx2, collectionID)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -292,7 +292,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mock"))
|
||||
|
||||
_, err := s.broker.DescribeIndex(ctx, collectionID)
|
||||
_, err := s.broker.describeIndex(ctx, collectionID)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -303,7 +303,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
Status: merr.Status(errors.New("mocked")),
|
||||
}, nil)
|
||||
|
||||
_, err := s.broker.DescribeIndex(ctx, collectionID)
|
||||
_, err := s.broker.describeIndex(ctx, collectionID)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -323,12 +323,69 @@ func (s *CoordinatorBrokerDataCoordSuite) TestDescribeIndex() {
|
|||
}),
|
||||
}, nil)
|
||||
|
||||
_, err := s.broker.DescribeIndex(ctx, collectionID)
|
||||
_, err := s.broker.describeIndex(ctx, collectionID)
|
||||
s.NoError(err)
|
||||
s.resetMock()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *CoordinatorBrokerDataCoordSuite) TestListIndexes() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
collectionID := int64(100)
|
||||
|
||||
s.Run("normal_case", func() {
|
||||
indexIDs := []int64{1, 2}
|
||||
s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.ListIndexesResponse{
|
||||
Status: merr.Status(nil),
|
||||
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo {
|
||||
return &indexpb.IndexInfo{IndexID: id}
|
||||
}),
|
||||
}, nil).Once()
|
||||
infos, err := s.broker.ListIndexes(ctx, collectionID)
|
||||
s.NoError(err)
|
||||
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *indexpb.IndexInfo, _ int) int64 { return info.GetIndexID() }))
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_error", func() {
|
||||
s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mocked")).Once()
|
||||
|
||||
_, err := s.broker.ListIndexes(ctx, collectionID)
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_failure_status", func() {
|
||||
s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.ListIndexesResponse{
|
||||
Status: merr.Status(errors.New("mocked")),
|
||||
}, nil).Once()
|
||||
|
||||
_, err := s.broker.ListIndexes(ctx, collectionID)
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("datacoord_return_unimplemented", func() {
|
||||
// mock old version datacoord return unimplemented
|
||||
s.datacoord.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(nil, merr.ErrServiceUnimplemented).Once()
|
||||
|
||||
// mock retry on old version datacoord descibe index
|
||||
indexIDs := []int64{1, 2}
|
||||
s.datacoord.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
Return(&indexpb.DescribeIndexResponse{
|
||||
Status: merr.Status(nil),
|
||||
IndexInfos: lo.Map(indexIDs, func(id int64, _ int) *indexpb.IndexInfo {
|
||||
return &indexpb.IndexInfo{IndexID: id}
|
||||
}),
|
||||
}, nil).Once()
|
||||
|
||||
_, err := s.broker.ListIndexes(ctx, collectionID)
|
||||
s.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *CoordinatorBrokerDataCoordSuite) TestSegmentInfo() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
|
@ -28,61 +28,6 @@ func (_m *MockBroker) EXPECT() *MockBroker_Expecter {
|
|||
return &MockBroker_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// DescribeIndex provides a mock function with given fields: ctx, collectionID
|
||||
func (_m *MockBroker) DescribeIndex(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) {
|
||||
ret := _m.Called(ctx, collectionID)
|
||||
|
||||
var r0 []*indexpb.IndexInfo
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok {
|
||||
return rf(ctx, collectionID)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok {
|
||||
r0 = rf(ctx, collectionID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*indexpb.IndexInfo)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||
r1 = rf(ctx, collectionID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockBroker_DescribeIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeIndex'
|
||||
type MockBroker_DescribeIndex_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DescribeIndex is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
func (_e *MockBroker_Expecter) DescribeIndex(ctx interface{}, collectionID interface{}) *MockBroker_DescribeIndex_Call {
|
||||
return &MockBroker_DescribeIndex_Call{Call: _e.mock.On("DescribeIndex", ctx, collectionID)}
|
||||
}
|
||||
|
||||
func (_c *MockBroker_DescribeIndex_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_DescribeIndex_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_DescribeIndex_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_DescribeIndex_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_DescribeIndex_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_DescribeIndex_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCollectionSchema provides a mock function with given fields: ctx, collectionID
|
||||
func (_m *MockBroker) GetCollectionSchema(ctx context.Context, collectionID int64) (*schemapb.CollectionSchema, error) {
|
||||
ret := _m.Called(ctx, collectionID)
|
||||
|
@ -462,6 +407,61 @@ func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context,
|
|||
return _c
|
||||
}
|
||||
|
||||
// ListIndexes provides a mock function with given fields: ctx, collectionID
|
||||
func (_m *MockBroker) ListIndexes(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error) {
|
||||
ret := _m.Called(ctx, collectionID)
|
||||
|
||||
var r0 []*indexpb.IndexInfo
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*indexpb.IndexInfo, error)); ok {
|
||||
return rf(ctx, collectionID)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) []*indexpb.IndexInfo); ok {
|
||||
r0 = rf(ctx, collectionID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*indexpb.IndexInfo)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||
r1 = rf(ctx, collectionID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockBroker_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes'
|
||||
type MockBroker_ListIndexes_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// ListIndexes is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
func (_e *MockBroker_Expecter) ListIndexes(ctx interface{}, collectionID interface{}) *MockBroker_ListIndexes_Call {
|
||||
return &MockBroker_ListIndexes_Call{Call: _e.mock.On("ListIndexes", ctx, collectionID)}
|
||||
}
|
||||
|
||||
func (_c *MockBroker_ListIndexes_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_ListIndexes_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_ListIndexes_Call) Return(_a0 []*indexpb.IndexInfo, _a1 error) *MockBroker_ListIndexes_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_ListIndexes_Call) RunAndReturn(run func(context.Context, int64) ([]*indexpb.IndexInfo, error)) *MockBroker_ListIndexes_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockBroker(t interface {
|
||||
|
|
|
@ -221,7 +221,7 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
|
|||
}
|
||||
|
||||
// Get collection index info
|
||||
indexInfo, err := o.broker.DescribeIndex(ctx, leaderView.CollectionID)
|
||||
indexInfo, err := o.broker.ListIndexes(ctx, leaderView.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("fail to get index info of collection", zap.Error(err))
|
||||
return false
|
||||
|
|
|
@ -123,8 +123,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
|
|||
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
|
||||
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
|
||||
// will cause sync failed once
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once()
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once()
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
{IndexName: "test"},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
|
@ -220,7 +220,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
|
|||
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
{IndexName: "test"},
|
||||
}, nil)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
|
@ -356,7 +356,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
|
|||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil)
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
observer.target.UpdateCollectionCurrentTarget(1)
|
||||
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
|
||||
|
@ -426,7 +426,7 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
|
|||
|
||||
schema := utils.CreateTestSchema()
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
{IndexName: "test"},
|
||||
}, nil)
|
||||
channels := []*datapb.VchannelInfo{
|
||||
|
@ -511,7 +511,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
|
|||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
|
||||
channels, segments, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
|
||||
{IndexName: "test"},
|
||||
}, nil)
|
||||
observer.target.UpdateCollectionNextTarget(int64(1))
|
||||
|
|
|
@ -381,7 +381,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView
|
|||
}
|
||||
|
||||
// Get collection index info
|
||||
indexInfo, err := ob.broker.DescribeIndex(ctx, leaderView.CollectionID)
|
||||
indexInfo, err := ob.broker.ListIndexes(ctx, leaderView.CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("fail to get index info of collection", zap.Error(err))
|
||||
return false
|
||||
|
|
|
@ -587,7 +587,7 @@ func (suite *ServerSuite) hackServer() {
|
|||
)
|
||||
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe()
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
for _, collection := range suite.collections {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
|
||||
suite.expectGetRecoverInfo(collection)
|
||||
|
|
|
@ -1732,7 +1732,7 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) {
|
|||
func (suite *ServiceSuite) expectLoadPartitions() {
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).
|
||||
Return(nil, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).
|
||||
Return(nil, nil)
|
||||
suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(merr.Success(), nil)
|
||||
|
|
|
@ -209,7 +209,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes)
|
||||
|
||||
// Get collection index info
|
||||
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
|
||||
indexInfo, err := ex.broker.ListIndexes(ctx, task.CollectionID())
|
||||
if err != nil {
|
||||
log.Warn("fail to get index meta of collection")
|
||||
return err
|
||||
|
@ -359,7 +359,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
|||
log.Warn("failed to get partitions of collection")
|
||||
return err
|
||||
}
|
||||
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
|
||||
indexInfo, err := ex.broker.ListIndexes(ctx, task.CollectionID())
|
||||
if err != nil {
|
||||
log.Warn("fail to get index meta of collection")
|
||||
return err
|
||||
|
|
|
@ -216,7 +216,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
|
|||
},
|
||||
}, nil)
|
||||
}
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
FieldID: 100,
|
||||
|
@ -390,7 +390,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
|||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
|
@ -486,7 +486,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
|
|||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
|
@ -780,7 +780,7 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
|
|||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
|
@ -950,7 +950,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
|
|||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
|
@ -1037,7 +1037,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
|
|||
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
|
||||
},
|
||||
}, nil)
|
||||
suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
|
||||
{
|
||||
CollectionID: suite.collection,
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue