mirror of https://github.com/milvus-io/milvus.git
enhance: When describing an index, fetch the index info in batches (#31305)
issue: #29313 master pr: #31238 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/31344/head
parent
7e3ac66617
commit
85a1b6c96e
|
@ -854,3 +854,35 @@ func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
|
|||
}
|
||||
return metas
|
||||
}
|
||||
|
||||
func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
ret := make(map[int64]map[int64]*indexpb.SegmentIndexState, 0)
|
||||
fieldIndexes, ok := m.indexes[collectionID]
|
||||
if !ok {
|
||||
return ret
|
||||
}
|
||||
|
||||
for _, segID := range segmentIDs {
|
||||
ret[segID] = make(map[int64]*indexpb.SegmentIndexState)
|
||||
segIndexInfos, ok := m.segmentIndexes[segID]
|
||||
if !ok || len(segIndexInfos) == 0 {
|
||||
return ret
|
||||
}
|
||||
|
||||
for _, segIdx := range segIndexInfos {
|
||||
if index, ok := fieldIndexes[segIdx.IndexID]; ok && !index.IsDeleted {
|
||||
ret[segID][segIdx.IndexID] = &indexpb.SegmentIndexState{
|
||||
SegmentID: segID,
|
||||
State: segIdx.IndexState,
|
||||
FailReason: segIdx.FailReason,
|
||||
IndexName: index.IndexName,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
|
|
@ -398,7 +398,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
|
|||
|
||||
indexInfo := &indexpb.IndexInfo{}
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -450,27 +450,26 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *Server) selectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
|
||||
func (s *Server) selectSegmentIndexesStats(selector SegmentInfoSelector) map[int64]*indexStats {
|
||||
ret := make(map[int64]*indexStats)
|
||||
|
||||
for _, info := range s.meta.SelectSegments(selector) {
|
||||
segments := s.meta.SelectSegments(selector)
|
||||
segmentIDs := lo.Map(segments, func(info *SegmentInfo, i int) int64 {
|
||||
return info.GetID()
|
||||
})
|
||||
if len(segments) == 0 {
|
||||
return ret
|
||||
}
|
||||
segmentsIndexes := s.meta.indexMeta.getSegmentsIndexStates(segments[0].CollectionID, segmentIDs)
|
||||
for _, info := range segments {
|
||||
is := &indexStats{
|
||||
ID: info.GetID(),
|
||||
numRows: info.GetNumOfRows(),
|
||||
compactionFrom: info.GetCompactionFrom(),
|
||||
indexStates: make(map[int64]*indexpb.SegmentIndexState),
|
||||
indexStates: segmentsIndexes[info.GetID()],
|
||||
state: info.GetState(),
|
||||
lastExpireTime: info.GetLastExpireTime(),
|
||||
}
|
||||
|
||||
indexIDToSegIdxes := s.meta.indexMeta.GetSegmentIndexes(info.GetCollectionID(), info.GetID())
|
||||
for indexID, segIndex := range indexIDToSegIdxes {
|
||||
is.indexStates[indexID] = &indexpb.SegmentIndexState{
|
||||
SegmentID: segIndex.SegmentID,
|
||||
State: segIndex.IndexState,
|
||||
FailReason: segIndex.FailReason,
|
||||
}
|
||||
}
|
||||
ret[info.GetID()] = is
|
||||
}
|
||||
return ret
|
||||
|
@ -650,7 +649,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -703,7 +702,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -762,7 +761,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue