mirror of https://github.com/milvus-io/milvus.git
enhance: When describing an index, fetch the index info in batches (#31239)
issue: #29313 master pr: #31238 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/31402/head
parent
e77afcb5d5
commit
ef530a2324
|
@ -834,3 +834,34 @@ func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
|
|||
}
|
||||
return metas
|
||||
}
|
||||
|
||||
func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
ret := make(map[int64]map[int64]*indexpb.SegmentIndexState, 0)
|
||||
fieldIndexes, ok := m.indexes[collectionID]
|
||||
if !ok {
|
||||
return ret
|
||||
}
|
||||
|
||||
for _, segID := range segmentIDs {
|
||||
ret[segID] = make(map[int64]*indexpb.SegmentIndexState)
|
||||
segIndexInfos, ok := m.segmentIndexes[segID]
|
||||
if !ok || len(segIndexInfos) == 0 {
|
||||
return ret
|
||||
}
|
||||
|
||||
for _, segIdx := range segIndexInfos {
|
||||
if index, ok := fieldIndexes[segIdx.IndexID]; ok && !index.IsDeleted {
|
||||
ret[segID][segIdx.IndexID] = &indexpb.SegmentIndexState{
|
||||
SegmentID: segID,
|
||||
State: segIdx.IndexState,
|
||||
FailReason: segIdx.FailReason,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
|
|
@ -276,7 +276,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
|
|||
|
||||
indexInfo := &indexpb.IndexInfo{}
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -330,6 +330,31 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *Server) selectSegmentIndexesStats(selector SegmentInfoSelector) map[int64]*indexStats {
|
||||
ret := make(map[int64]*indexStats)
|
||||
|
||||
segments := s.meta.SelectSegments(selector)
|
||||
segmentIDs := lo.Map(segments, func(info *SegmentInfo, i int) int64 {
|
||||
return info.GetID()
|
||||
})
|
||||
if len(segments) == 0 {
|
||||
return ret
|
||||
}
|
||||
segmentsIndexes := s.meta.indexMeta.getSegmentsIndexStates(segments[0].CollectionID, segmentIDs)
|
||||
for _, info := range segments {
|
||||
is := &indexStats{
|
||||
ID: info.GetID(),
|
||||
numRows: info.GetNumOfRows(),
|
||||
compactionFrom: info.GetCompactionFrom(),
|
||||
indexStates: segmentsIndexes[info.GetID()],
|
||||
state: info.GetState(),
|
||||
lastExpireTime: info.GetLastExpireTime(),
|
||||
}
|
||||
ret[info.GetID()] = is
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 {
|
||||
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
|
||||
for segID, seg := range segments {
|
||||
|
@ -504,7 +529,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -557,7 +582,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -616,7 +641,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
|
|||
}
|
||||
|
||||
// The total rows of all indexes should be based on the current perspective
|
||||
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
|
||||
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
|
||||
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
|
||||
})
|
||||
|
||||
|
@ -765,32 +790,6 @@ func (s *Server) getUnIndexTaskSegments() []*SegmentInfo {
|
|||
return unindexedSegments
|
||||
}
|
||||
|
||||
func (s *Server) selectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
|
||||
ret := make(map[int64]*indexStats)
|
||||
|
||||
for _, info := range s.meta.SelectSegments(selector) {
|
||||
is := &indexStats{
|
||||
ID: info.GetID(),
|
||||
numRows: info.GetNumOfRows(),
|
||||
compactionFrom: info.GetCompactionFrom(),
|
||||
indexStates: make(map[int64]*indexpb.SegmentIndexState),
|
||||
state: info.GetState(),
|
||||
lastExpireTime: info.GetLastExpireTime(),
|
||||
}
|
||||
|
||||
indexIDToSegIdxes := s.meta.indexMeta.GetSegmentIndexes(info.GetCollectionID(), info.GetID())
|
||||
for indexID, segIndex := range indexIDToSegIdxes {
|
||||
is.indexStates[indexID] = &indexpb.SegmentIndexState{
|
||||
SegmentID: segIndex.SegmentID,
|
||||
State: segIndex.IndexState,
|
||||
FailReason: segIndex.FailReason,
|
||||
}
|
||||
}
|
||||
ret[info.GetID()] = is
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// ListIndexes returns all indexes created on provided collection.
|
||||
func (s *Server) ListIndexes(ctx context.Context, req *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) {
|
||||
log := log.Ctx(ctx).With(
|
||||
|
|
Loading…
Reference in New Issue