From 4c8cc6ceff0475b6b1f386f7e1e6d4d14d1a478b Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 1 Apr 2024 15:49:13 +0800 Subject: [PATCH] fix: Avoid acquire index meta's lock for each segment (#31723) issue: #31662 #31409 during FilterIndexedSegment in GetRecoveryInfo, it try to acquire index meta's read lock for every segment. when a collection has thousands of segments, which may blocked for more than 10 seconds and even longer. cause `AddSegmentIndex` may also triggered frequently, which try to get the write lock. This PR avoid acquire index meta's lock for each segment Signed-off-by: Wei Liu --- internal/datacoord/index_meta.go | 51 ++++++++++++------------- internal/datacoord/index_meta_test.go | 19 ++++----- internal/datacoord/util.go | 55 +++++++++++++-------------- 3 files changed, 58 insertions(+), 67 deletions(-) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index c2a9e1d534..c36701f9b1 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type indexMeta struct { @@ -383,42 +384,40 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni return state } -func (m *indexMeta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState { +func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 { m.RLock() defer m.RUnlock() - state := &indexpb.SegmentIndexState{ - SegmentID: segmentID, - State: commonpb.IndexState_IndexStateNone, - FailReason: "", - } - fieldIndexes, ok := m.indexes[collID] + fieldIndexes, ok := m.indexes[collectionID] if !ok { - state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID) - return state + return nil } - indexes, ok := m.segmentIndexes[segmentID] - if !ok { - state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID) - state.State = commonpb.IndexState_Unissued - return state - } + fieldIDSet := typeutil.NewUniqueSet(fieldIDs...) - for indexID, index := range fieldIndexes { - if index.FieldID == fieldID && !index.IsDeleted { - if segIdx, ok := indexes[indexID]; ok { - state.IndexName = index.IndexName - state.State = segIdx.IndexState - state.FailReason = segIdx.FailReason - return state + checkSegmentState := func(indexes map[int64]*model.SegmentIndex) bool { + indexedFields := 0 + for indexID, index := range fieldIndexes { + if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted { + continue } - state.State = commonpb.IndexState_Unissued - return state + + if segIdx, ok := indexes[indexID]; ok && segIdx.IndexState == commonpb.IndexState_Finished { + indexedFields += 1 + } + } + + return indexedFields == fieldIDSet.Len() + } + + ret := make([]int64, 0) + for sid, indexes := range m.segmentIndexes { + if checkSegmentState(indexes) { + ret = append(ret, sid) } } - state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID) - return state + + return ret } // GetIndexesForCollection gets all indexes info with the specified collection. diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index e2c6bcbbbd..022add231d 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -533,7 +533,7 @@ func TestMeta_GetSegmentIndexState(t *testing.T) { }) } -func TestMeta_GetSegmentIndexStateOnField(t *testing.T) { +func TestMeta_GetIndexedSegment(t *testing.T) { var ( collID = UniqueID(1) partID = UniqueID(2) @@ -614,23 +614,18 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) { } t.Run("success", func(t *testing.T) { - state := m.GetSegmentIndexStateOnField(collID, segID, fieldID) - assert.Equal(t, commonpb.IndexState_Finished, state.GetState()) + segments := m.GetIndexedSegments(collID, []int64{fieldID}) + assert.Len(t, segments, 1) }) t.Run("no index on field", func(t *testing.T) { - state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) + segments := m.GetIndexedSegments(collID, []int64{fieldID + 1}) + assert.Len(t, segments, 0) }) t.Run("no index", func(t *testing.T) { - state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1) - assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState()) - }) - - t.Run("segment not exist", func(t *testing.T) { - state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID) - assert.Equal(t, commonpb.IndexState_Unissued, state.GetState()) + segments := m.GetIndexedSegments(collID+1, []int64{fieldID}) + assert.Len(t, segments, 0) }) } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 1880a79a8e..0131a3949e 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -70,16 +71,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo return nil } - segmentMap := make(map[int64]*SegmentInfo) - collectionSegments := make(map[int64][]int64) + collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 { + return segment.GetCollectionID() + }) - vecFieldIDs := make(map[int64][]int64) - for _, segment := range segments { - collectionID := segment.GetCollectionID() - segmentMap[segment.GetID()] = segment - collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID()) - } - for collection := range collectionSegments { + ret := make([]*SegmentInfo, 0) + for collection, segmentList := range collectionSegments { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) coll, err := handler.GetCollection(ctx, collection) cancel() @@ -87,32 +84,32 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo log.Warn("failed to get collection schema", zap.Error(err)) continue } + + // get vector field id + vecFieldIDs := make([]int64, 0) for _, field := range coll.Schema.GetFields() { if typeutil.IsVectorType(field.GetDataType()) { - vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID()) + vecFieldIDs = append(vecFieldIDs, field.GetFieldID()) + } + } + + // get indexed segments which finish build index on all vector field + indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs) + if len(indexed) > 0 { + indexedSet := typeutil.NewUniqueSet(indexed...) + for _, segment := range segmentList { + if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped { + continue + } + + if indexedSet.Contain(segment.GetID()) { + ret = append(ret, segment) + } } } } - indexedSegments := make([]*SegmentInfo, 0) - for _, segment := range segments { - if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped { - continue - } - - hasUnindexedVecField := false - for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] { - segmentIndexState := mt.indexMeta.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID) - if segmentIndexState.State != commonpb.IndexState_Finished { - hasUnindexedVecField = true - } - } - if !hasUnindexedVecField { - indexedSegments = append(indexedSegments, segment) - } - } - - return indexedSegments + return ret } func getZeroTime() time.Time {