mirror of https://github.com/milvus-io/milvus.git
Check complete segment indexes for current data view (#22375)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22395/head
parent
62d0c2faad
commit
ba9443d11e
|
@ -616,7 +616,11 @@ func (i *IndexCoord) completeIndexInfo(indexInfo *indexpb.IndexInfo, segIDs []Un
|
|||
break
|
||||
}
|
||||
|
||||
indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs)
|
||||
segSet := typeutil.NewSet(segIDs...)
|
||||
|
||||
indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs, func(segIdx *model.SegmentIndex) bool {
|
||||
return segSet.Contain(segIdx.SegmentID)
|
||||
})
|
||||
allCnt := len(indexStates)
|
||||
switch {
|
||||
case indexStateCnt.Failed > 0:
|
||||
|
|
|
@ -310,6 +310,7 @@ func testIndexCoord(t *testing.T) {
|
|||
indexs := getSegmentIndexes()
|
||||
mockIndexs := make(map[UniqueID]map[UniqueID]*model.SegmentIndex)
|
||||
progressIndex := &model.SegmentIndex{
|
||||
SegmentID: 222,
|
||||
IndexState: commonpb.IndexState_InProgress,
|
||||
}
|
||||
failedIndex := &model.SegmentIndex{
|
||||
|
@ -318,6 +319,7 @@ func testIndexCoord(t *testing.T) {
|
|||
FailReason: "mock fail",
|
||||
}
|
||||
finishedIndex := &model.SegmentIndex{
|
||||
SegmentID: 111,
|
||||
IndexState: commonpb.IndexState_Finished,
|
||||
NumRows: 2048,
|
||||
}
|
||||
|
@ -397,9 +399,9 @@ func testIndexCoord(t *testing.T) {
|
|||
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
|
||||
return &datapb.GetSegmentInfoResponse{
|
||||
Infos: []*datapb.SegmentInfo{
|
||||
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
{State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
{ID: 222, State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
{ID: 333, State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
{ID: 444, State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -604,7 +604,7 @@ type IndexStateCnt struct {
|
|||
}
|
||||
|
||||
// GetIndexStates gets the index states for indexID from meta table.
|
||||
func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) ([]*IndexState, IndexStateCnt) {
|
||||
func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64, filters ...func(segIdx *model.SegmentIndex) bool) ([]*IndexState, IndexStateCnt) {
|
||||
mt.segmentIndexLock.RLock()
|
||||
defer mt.segmentIndexLock.RUnlock()
|
||||
|
||||
|
@ -630,6 +630,16 @@ func (mt *metaTable) GetIndexStates(indexID int64, createTs uint64) ([]*IndexSta
|
|||
// skip deleted index, deleted by compaction
|
||||
continue
|
||||
}
|
||||
var skip bool
|
||||
for _, f := range filters {
|
||||
if !f(segIdx) {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
switch segIdx.IndexState {
|
||||
case commonpb.IndexState_IndexStateNone:
|
||||
cntNone++
|
||||
|
|
|
@ -631,6 +631,13 @@ func TestMetaTable_GetIndexStates(t *testing.T) {
|
|||
assert.Equal(t, 1, stateCnt.Finished)
|
||||
}
|
||||
|
||||
func TestMetaTable_GetIndexStates_Filter(t *testing.T) {
|
||||
mt := constructMetaTable(&indexcoord.Catalog{})
|
||||
states, stateCnt := mt.GetIndexStates(indexID, 11, func(_ *model.SegmentIndex) bool { return false })
|
||||
assert.Equal(t, 0, len(states))
|
||||
assert.Equal(t, 0, stateCnt.Finished)
|
||||
}
|
||||
|
||||
func TestMetaTable_GetSegmentIndexes(t *testing.T) {
|
||||
mt := constructMetaTable(&indexcoord.Catalog{})
|
||||
segIdxes := mt.GetSegmentIndexes(segID)
|
||||
|
|
Loading…
Reference in New Issue