enhance:[10kcp] Reduce GetIndexInfos calls (#37877)

Batch GetIndexInfos calls for segments to reduce RPC calls.

issue: https://github.com/milvus-io/milvus/issues/37634

pr: https://github.com/milvus-io/milvus/pull/37695

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/37891/head
yihao.dai 2024-11-21 15:09:39 +08:00 committed by GitHub
parent 0bd26171d5
commit 92ab65ada0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 124 additions and 106 deletions

View File

@ -542,26 +542,24 @@ func (m *indexMeta) IsUnIndexedSegment(collectionID UniqueID, segID UniqueID) bo
return false
}
func (m *indexMeta) getSegmentIndexes(segID UniqueID) map[UniqueID]*model.SegmentIndex {
func (m *indexMeta) GetSegmentsIndexes(collectionID UniqueID, segIDs []UniqueID) map[int64]map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
return ret
segmentsIndexes := make(map[int64]map[UniqueID]*model.SegmentIndex)
for _, segmentID := range segIDs {
segmentsIndexes[segmentID] = m.getSegmentIndexes(collectionID, segmentID)
}
for _, segIdx := range segIndexInfos {
ret[segIdx.IndexID] = model.CloneSegmentIndex(segIdx)
}
return ret
return segmentsIndexes
}
func (m *indexMeta) GetSegmentIndexes(collectionID UniqueID, segID UniqueID) map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
return m.getSegmentIndexes(collectionID, segID)
}
// Note: thread-unsafe, don't call it outside indexMeta
func (m *indexMeta) getSegmentIndexes(collectionID UniqueID, segID UniqueID) map[UniqueID]*model.SegmentIndex {
ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {

View File

@ -737,12 +737,12 @@ func TestMeta_GetSegmentIndexes(t *testing.T) {
m := createMeta(catalog, nil, createIndexMeta(catalog))
t.Run("success", func(t *testing.T) {
segIndexes := m.indexMeta.getSegmentIndexes(segID)
segIndexes := m.indexMeta.GetSegmentIndexes(collID, segID)
assert.Equal(t, 1, len(segIndexes))
})
t.Run("segment not exist", func(t *testing.T) {
segIndexes := m.indexMeta.getSegmentIndexes(segID + 100)
segIndexes := m.indexMeta.GetSegmentIndexes(collID, segID+100)
assert.Equal(t, 0, len(segIndexes))
})

View File

@ -846,8 +846,9 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
SegmentInfo: map[int64]*indexpb.SegmentInfo{},
}
segmentsIndexes := s.meta.indexMeta.GetSegmentsIndexes(req.GetCollectionID(), req.GetSegmentIDs())
for _, segID := range req.GetSegmentIDs() {
segIdxes := s.meta.indexMeta.GetSegmentIndexes(req.GetCollectionID(), segID)
segIdxes := segmentsIndexes[segID]
ret.SegmentInfo[segID] = &indexpb.SegmentInfo{
CollectionID: req.GetCollectionID(),
SegmentID: segID,

View File

@ -101,7 +101,7 @@ func (s *taskScheduler) Stop() {
func (s *taskScheduler) reloadFromKV() {
segments := s.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
for _, segIndex := range s.meta.indexMeta.getSegmentIndexes(segment.ID) {
for _, segIndex := range s.meta.indexMeta.GetSegmentIndexes(segment.GetCollectionID(), segment.ID) {
if segIndex.IsDeleted {
continue
}

View File

@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const MaxSegmentNumPerGetIndexInfoRPC = 1024
var _ Checker = (*IndexChecker)(nil)
// IndexChecker perform segment index check.
@ -132,18 +134,21 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
}
segmentsToUpdate := typeutil.NewSet[int64]()
for segment, fields := range targets {
missingFields := typeutil.NewSet(fields...)
infos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segment)
for _, segmentIDs := range lo.Chunk(lo.Keys(idSegments), MaxSegmentNumPerGetIndexInfoRPC) {
segmentIndexInfos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segmentIDs...)
if err != nil {
log.Warn("failed to get indexInfo for segment", zap.Int64("segmentID", segment), zap.Error(err))
log.Warn("failed to get indexInfo for segments", zap.Int64s("segmentIDs", segmentIDs), zap.Error(err))
continue
}
for _, info := range infos {
if missingFields.Contain(info.GetFieldID()) &&
info.GetEnableIndex() &&
len(info.GetIndexFilePaths()) > 0 {
segmentsToUpdate.Insert(segment)
for segmentID, segmentIndexInfo := range segmentIndexInfos {
fields := targets[segmentID]
missingFields := typeutil.NewSet(fields...)
for _, fieldIndexInfo := range segmentIndexInfo {
if missingFields.Contain(fieldIndexInfo.GetFieldID()) &&
fieldIndexInfo.GetEnableIndex() &&
len(fieldIndexInfo.GetIndexFilePaths()) > 0 {
segmentsToUpdate.Insert(segmentID)
}
}
}
}

View File

@ -116,14 +116,14 @@ func (suite *IndexCheckerSuite) TestLoadIndex() {
// broker
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(2)).
Return([]*querypb.FieldIndexInfo{
Return(map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}, nil)
}}, nil)
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
@ -180,28 +180,28 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel"))
// broker
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).Call.
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
if segmentID == 2 {
return []*querypb.FieldIndexInfo{
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).
RunAndReturn(func(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) {
if segmentIDs[0] == 2 {
return map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: false,
},
}
}}, nil
}
if segmentID == 3 {
return []*querypb.FieldIndexInfo{
if segmentIDs[0] == 3 {
return map[int64][]*querypb.FieldIndexInfo{3: {
{
FieldID: 101,
IndexID: 1002,
EnableIndex: false,
},
}
}}, nil
}
return nil
}, nil)
return nil, nil
})
suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
@ -298,23 +298,21 @@ func (suite *IndexCheckerSuite) TestCreateNewIndex() {
}, nil
},
)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).Call.
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
return []*querypb.FieldIndexInfo{
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
{
FieldID: 102,
IndexID: 1001,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).
Return(map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
{
FieldID: 102,
IndexID: 1001,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}}, nil)
tasks := checker.Check(context.Background())
suite.Len(tasks, 1)

View File

@ -48,7 +48,7 @@ type Broker interface {
GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)
ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error)
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error)
GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error)
@ -306,13 +306,13 @@ func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...Uniq
return ret, nil
}
func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) {
func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", segmentID),
zap.Int64s("segmentIDs", segmentIDs),
)
// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
@ -322,7 +322,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
retry.Do(ctx, func() error {
resp, err = broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
CollectionID: collectionID,
SegmentIDs: []int64{segmentID},
SegmentIDs: segmentIDs,
})
if errors.Is(err, merr.ErrServiceUnimplemented) {
@ -337,32 +337,30 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
}
if resp.GetSegmentInfo() == nil {
err = merr.WrapErrIndexNotFoundForSegment(segmentID)
log.Warn("failed to get segment index info",
err = merr.WrapErrIndexNotFoundForSegments(segmentIDs)
log.Warn("failed to get segments index info",
zap.Error(err))
return nil, err
}
segmentInfo, ok := resp.GetSegmentInfo()[segmentID]
if !ok || len(segmentInfo.GetIndexInfos()) == 0 {
return nil, merr.WrapErrIndexNotFoundForSegment(segmentID)
}
indexes := make([]*querypb.FieldIndexInfo, 0)
for _, info := range segmentInfo.GetIndexInfos() {
indexes = append(indexes, &querypb.FieldIndexInfo{
FieldID: info.GetFieldID(),
EnableIndex: true, // deprecated, but keep it for compatibility
IndexName: info.GetIndexName(),
IndexID: info.GetIndexID(),
BuildID: info.GetBuildID(),
IndexParams: info.GetIndexParams(),
IndexFilePaths: info.GetIndexFilePaths(),
IndexSize: int64(info.GetSerializedSize()),
IndexVersion: info.GetIndexVersion(),
NumRows: info.GetNumRows(),
CurrentIndexVersion: info.GetCurrentIndexVersion(),
})
indexes := make(map[int64][]*querypb.FieldIndexInfo, 0)
for segmentID, segmentInfo := range resp.GetSegmentInfo() {
indexes[segmentID] = make([]*querypb.FieldIndexInfo, 0)
for _, info := range segmentInfo.GetIndexInfos() {
indexes[segmentID] = append(indexes[segmentID], &querypb.FieldIndexInfo{
FieldID: info.GetFieldID(),
EnableIndex: true, // deprecated, but keep it for compatibility
IndexName: info.GetIndexName(),
IndexID: info.GetIndexID(),
BuildID: info.GetBuildID(),
IndexParams: info.GetIndexParams(),
IndexFilePaths: info.GetIndexFilePaths(),
IndexSize: int64(info.GetSerializedSize()),
IndexVersion: info.GetIndexVersion(),
NumRows: info.GetNumRows(),
CurrentIndexVersion: info.GetCurrentIndexVersion(),
})
}
}
return indexes, nil

View File

@ -443,7 +443,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {
infos, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
s.NoError(err)
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *querypb.FieldIndexInfo, _ int) int64 {
s.ElementsMatch(indexIDs, lo.Map(infos[segmentID], func(info *querypb.FieldIndexInfo, _ int) int64 {
return info.GetIndexID()
}))
s.resetMock()

View File

@ -202,25 +202,36 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C
return _c
}
// GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentID
func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentID int64) ([]*querypb.FieldIndexInfo, error) {
ret := _m.Called(ctx, collectionID, segmentID)
var r0 []*querypb.FieldIndexInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, int64) ([]*querypb.FieldIndexInfo, error)); ok {
return rf(ctx, collectionID, segmentID)
// GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentIDs
func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) {
_va := make([]interface{}, len(segmentIDs))
for _i := range segmentIDs {
_va[_i] = segmentIDs[_i]
}
if rf, ok := ret.Get(0).(func(context.Context, int64, int64) []*querypb.FieldIndexInfo); ok {
r0 = rf(ctx, collectionID, segmentID)
var _ca []interface{}
_ca = append(_ca, ctx, collectionID)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetIndexInfo")
}
var r0 map[int64][]*querypb.FieldIndexInfo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, ...int64) (map[int64][]*querypb.FieldIndexInfo, error)); ok {
return rf(ctx, collectionID, segmentIDs...)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, ...int64) map[int64][]*querypb.FieldIndexInfo); ok {
r0 = rf(ctx, collectionID, segmentIDs...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.FieldIndexInfo)
r0 = ret.Get(0).(map[int64][]*querypb.FieldIndexInfo)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, int64) error); ok {
r1 = rf(ctx, collectionID, segmentID)
if rf, ok := ret.Get(1).(func(context.Context, int64, ...int64) error); ok {
r1 = rf(ctx, collectionID, segmentIDs...)
} else {
r1 = ret.Error(1)
}
@ -236,24 +247,31 @@ type MockBroker_GetIndexInfo_Call struct {
// GetIndexInfo is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segmentID int64
func (_e *MockBroker_Expecter) GetIndexInfo(ctx interface{}, collectionID interface{}, segmentID interface{}) *MockBroker_GetIndexInfo_Call {
return &MockBroker_GetIndexInfo_Call{Call: _e.mock.On("GetIndexInfo", ctx, collectionID, segmentID)}
// - segmentIDs ...int64
func (_e *MockBroker_Expecter) GetIndexInfo(ctx interface{}, collectionID interface{}, segmentIDs ...interface{}) *MockBroker_GetIndexInfo_Call {
return &MockBroker_GetIndexInfo_Call{Call: _e.mock.On("GetIndexInfo",
append([]interface{}{ctx, collectionID}, segmentIDs...)...)}
}
func (_c *MockBroker_GetIndexInfo_Call) Run(run func(ctx context.Context, collectionID int64, segmentID int64)) *MockBroker_GetIndexInfo_Call {
func (_c *MockBroker_GetIndexInfo_Call) Run(run func(ctx context.Context, collectionID int64, segmentIDs ...int64)) *MockBroker_GetIndexInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int64))
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(context.Context), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockBroker_GetIndexInfo_Call) Return(_a0 []*querypb.FieldIndexInfo, _a1 error) *MockBroker_GetIndexInfo_Call {
func (_c *MockBroker_GetIndexInfo_Call) Return(_a0 map[int64][]*querypb.FieldIndexInfo, _a1 error) *MockBroker_GetIndexInfo_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBroker_GetIndexInfo_Call) RunAndReturn(run func(context.Context, int64, int64) ([]*querypb.FieldIndexInfo, error)) *MockBroker_GetIndexInfo_Call {
func (_c *MockBroker_GetIndexInfo_Call) RunAndReturn(run func(context.Context, int64, ...int64) (map[int64][]*querypb.FieldIndexInfo, error)) *MockBroker_GetIndexInfo_Call {
_c.Call.Return(run)
return _c
}

View File

@ -710,7 +710,7 @@ func (ex *Executor) getLoadInfo(ctx context.Context, collectionID, segmentID int
return nil, nil, err
}
// update the field index params
for _, segmentIndex := range indexes {
for _, segmentIndex := range indexes[segment.GetID()] {
index, found := lo.Find(indexInfos, func(indexInfo *indexpb.IndexInfo) bool {
return indexInfo.IndexID == segmentIndex.IndexID
})
@ -727,7 +727,7 @@ func (ex *Executor) getLoadInfo(ctx context.Context, collectionID, segmentID int
segmentIndex.IndexParams = funcutil.Map2KeyValuePair(params)
}
loadInfo := utils.PackSegmentLoadInfo(segment, channel.GetSeekPosition(), indexes)
loadInfo := utils.PackSegmentLoadInfo(segment, channel.GetSeekPosition(), indexes[segment.GetID()])
return loadInfo, indexInfos, nil
}

View File

@ -530,7 +530,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
InsertChannel: channel.ChannelName,
},
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFoundForSegment(segment))
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFoundForSegments([]int64{segment}))
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Success(), nil)

View File

@ -120,7 +120,7 @@ func (s *ErrSuite) TestWrap() {
// Index related
s.ErrorIs(WrapErrIndexNotFound("failed to get Index"), ErrIndexNotFound)
s.ErrorIs(WrapErrIndexNotFoundForCollection("milvus_hello", "failed to get collection index"), ErrIndexNotFound)
s.ErrorIs(WrapErrIndexNotFoundForSegment(100, "failed to get collection index"), ErrIndexNotFound)
s.ErrorIs(WrapErrIndexNotFoundForSegments([]int64{100}, "failed to get collection index"), ErrIndexNotFound)
s.ErrorIs(WrapErrIndexNotSupported("wsnh", "failed to create index"), ErrIndexNotSupported)
// Node related

View File

@ -772,8 +772,8 @@ func WrapErrIndexNotFound(indexName string, msg ...string) error {
return err
}
func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error {
err := wrapFields(ErrIndexNotFound, value("segmentID", segmentID))
func WrapErrIndexNotFoundForSegments(segmentIDs []int64, msg ...string) error {
err := wrapFields(ErrIndexNotFound, value("segmentIDs", segmentIDs))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}