diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index d12db68624..bee9c6215b 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -834,3 +834,34 @@ func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex { } return metas } + +func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState { + m.RLock() + defer m.RUnlock() + + ret := make(map[int64]map[int64]*indexpb.SegmentIndexState, 0) + fieldIndexes, ok := m.indexes[collectionID] + if !ok { + return ret + } + + for _, segID := range segmentIDs { + ret[segID] = make(map[int64]*indexpb.SegmentIndexState) + segIndexInfos, ok := m.segmentIndexes[segID] + if !ok || len(segIndexInfos) == 0 { + return ret + } + + for _, segIdx := range segIndexInfos { + if index, ok := fieldIndexes[segIdx.IndexID]; ok && !index.IsDeleted { + ret[segID][segIdx.IndexID] = &indexpb.SegmentIndexState{ + SegmentID: segID, + State: segIdx.IndexState, + FailReason: segIdx.FailReason, + } + } + } + } + + return ret +} diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index bed52eda88..d23b95fa4b 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -276,7 +276,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe indexInfo := &indexpb.IndexInfo{} // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool { + segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) @@ -330,6 +330,31 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme return ret, nil } +func (s *Server) selectSegmentIndexesStats(selector SegmentInfoSelector) map[int64]*indexStats { + ret := make(map[int64]*indexStats) + + segments := s.meta.SelectSegments(selector) + segmentIDs := lo.Map(segments, func(info *SegmentInfo, i int) int64 { + return info.GetID() + }) + if len(segments) == 0 { + return ret + } + segmentsIndexes := s.meta.indexMeta.getSegmentsIndexStates(segments[0].CollectionID, segmentIDs) + for _, info := range segments { + is := &indexStats{ + ID: info.GetID(), + numRows: info.GetNumOfRows(), + compactionFrom: info.GetCompactionFrom(), + indexStates: segmentsIndexes[info.GetID()], + state: info.GetState(), + lastExpireTime: info.GetLastExpireTime(), + } + ret[info.GetID()] = is + } + return ret +} + func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 { unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]() for segID, seg := range segments { @@ -504,7 +529,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool { + segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) @@ -557,7 +582,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool { + segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) @@ -616,7 +641,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool { + segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) @@ -765,32 +790,6 @@ func (s *Server) getUnIndexTaskSegments() []*SegmentInfo { return unindexedSegments } -func (s *Server) selectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats { - ret := make(map[int64]*indexStats) - - for _, info := range s.meta.SelectSegments(selector) { - is := &indexStats{ - ID: info.GetID(), - numRows: info.GetNumOfRows(), - compactionFrom: info.GetCompactionFrom(), - indexStates: make(map[int64]*indexpb.SegmentIndexState), - state: info.GetState(), - lastExpireTime: info.GetLastExpireTime(), - } - - indexIDToSegIdxes := s.meta.indexMeta.GetSegmentIndexes(info.GetCollectionID(), info.GetID()) - for indexID, segIndex := range indexIDToSegIdxes { - is.indexStates[indexID] = &indexpb.SegmentIndexState{ - SegmentID: segIndex.SegmentID, - State: segIndex.IndexState, - FailReason: segIndex.FailReason, - } - } - ret[info.GetID()] = is - } - return ret -} - // ListIndexes returns all indexes created on provided collection. func (s *Server) ListIndexes(ctx context.Context, req *indexpb.ListIndexesRequest) (*indexpb.ListIndexesResponse, error) { log := log.Ctx(ctx).With(