mirror of https://github.com/milvus-io/milvus.git
fix: Avoid acquire index meta's lock for each segment (#31723)
issue: #31662 #31409 during FilterIndexedSegment in GetRecoveryInfo, it try to acquire index meta's read lock for every segment. when a collection has thousands of segments, which may blocked for more than 10 seconds and even longer. cause `AddSegmentIndex` may also triggered frequently, which try to get the write lock. This PR avoid acquire index meta's lock for each segment Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/31790/head
parent
b6fefee0cf
commit
4c8cc6ceff
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type indexMeta struct {
|
||||
|
@ -383,42 +384,40 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni
|
|||
return state
|
||||
}
|
||||
|
||||
func (m *indexMeta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
|
||||
func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
state := &indexpb.SegmentIndexState{
|
||||
SegmentID: segmentID,
|
||||
State: commonpb.IndexState_IndexStateNone,
|
||||
FailReason: "",
|
||||
}
|
||||
fieldIndexes, ok := m.indexes[collID]
|
||||
fieldIndexes, ok := m.indexes[collectionID]
|
||||
if !ok {
|
||||
state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
|
||||
return state
|
||||
return nil
|
||||
}
|
||||
|
||||
indexes, ok := m.segmentIndexes[segmentID]
|
||||
if !ok {
|
||||
state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID)
|
||||
state.State = commonpb.IndexState_Unissued
|
||||
return state
|
||||
}
|
||||
fieldIDSet := typeutil.NewUniqueSet(fieldIDs...)
|
||||
|
||||
for indexID, index := range fieldIndexes {
|
||||
if index.FieldID == fieldID && !index.IsDeleted {
|
||||
if segIdx, ok := indexes[indexID]; ok {
|
||||
state.IndexName = index.IndexName
|
||||
state.State = segIdx.IndexState
|
||||
state.FailReason = segIdx.FailReason
|
||||
return state
|
||||
checkSegmentState := func(indexes map[int64]*model.SegmentIndex) bool {
|
||||
indexedFields := 0
|
||||
for indexID, index := range fieldIndexes {
|
||||
if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted {
|
||||
continue
|
||||
}
|
||||
state.State = commonpb.IndexState_Unissued
|
||||
return state
|
||||
|
||||
if segIdx, ok := indexes[indexID]; ok && segIdx.IndexState == commonpb.IndexState_Finished {
|
||||
indexedFields += 1
|
||||
}
|
||||
}
|
||||
|
||||
return indexedFields == fieldIDSet.Len()
|
||||
}
|
||||
|
||||
ret := make([]int64, 0)
|
||||
for sid, indexes := range m.segmentIndexes {
|
||||
if checkSegmentState(indexes) {
|
||||
ret = append(ret, sid)
|
||||
}
|
||||
}
|
||||
state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID)
|
||||
return state
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// GetIndexesForCollection gets all indexes info with the specified collection.
|
||||
|
|
|
@ -533,7 +533,7 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
|
||||
func TestMeta_GetIndexedSegment(t *testing.T) {
|
||||
var (
|
||||
collID = UniqueID(1)
|
||||
partID = UniqueID(2)
|
||||
|
@ -614,23 +614,18 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID)
|
||||
assert.Equal(t, commonpb.IndexState_Finished, state.GetState())
|
||||
segments := m.GetIndexedSegments(collID, []int64{fieldID})
|
||||
assert.Len(t, segments, 1)
|
||||
})
|
||||
|
||||
t.Run("no index on field", func(t *testing.T) {
|
||||
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1)
|
||||
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
|
||||
segments := m.GetIndexedSegments(collID, []int64{fieldID + 1})
|
||||
assert.Len(t, segments, 0)
|
||||
})
|
||||
|
||||
t.Run("no index", func(t *testing.T) {
|
||||
state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1)
|
||||
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
|
||||
})
|
||||
|
||||
t.Run("segment not exist", func(t *testing.T) {
|
||||
state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID)
|
||||
assert.Equal(t, commonpb.IndexState_Unissued, state.GetState())
|
||||
segments := m.GetIndexedSegments(collID+1, []int64{fieldID})
|
||||
assert.Len(t, segments, 0)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -70,16 +71,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
|
|||
return nil
|
||||
}
|
||||
|
||||
segmentMap := make(map[int64]*SegmentInfo)
|
||||
collectionSegments := make(map[int64][]int64)
|
||||
collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 {
|
||||
return segment.GetCollectionID()
|
||||
})
|
||||
|
||||
vecFieldIDs := make(map[int64][]int64)
|
||||
for _, segment := range segments {
|
||||
collectionID := segment.GetCollectionID()
|
||||
segmentMap[segment.GetID()] = segment
|
||||
collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID())
|
||||
}
|
||||
for collection := range collectionSegments {
|
||||
ret := make([]*SegmentInfo, 0)
|
||||
for collection, segmentList := range collectionSegments {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
||||
coll, err := handler.GetCollection(ctx, collection)
|
||||
cancel()
|
||||
|
@ -87,32 +84,32 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
|
|||
log.Warn("failed to get collection schema", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// get vector field id
|
||||
vecFieldIDs := make([]int64, 0)
|
||||
for _, field := range coll.Schema.GetFields() {
|
||||
if typeutil.IsVectorType(field.GetDataType()) {
|
||||
vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID())
|
||||
vecFieldIDs = append(vecFieldIDs, field.GetFieldID())
|
||||
}
|
||||
}
|
||||
|
||||
// get indexed segments which finish build index on all vector field
|
||||
indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs)
|
||||
if len(indexed) > 0 {
|
||||
indexedSet := typeutil.NewUniqueSet(indexed...)
|
||||
for _, segment := range segmentList {
|
||||
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
|
||||
continue
|
||||
}
|
||||
|
||||
if indexedSet.Contain(segment.GetID()) {
|
||||
ret = append(ret, segment)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
indexedSegments := make([]*SegmentInfo, 0)
|
||||
for _, segment := range segments {
|
||||
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
|
||||
continue
|
||||
}
|
||||
|
||||
hasUnindexedVecField := false
|
||||
for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] {
|
||||
segmentIndexState := mt.indexMeta.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
|
||||
if segmentIndexState.State != commonpb.IndexState_Finished {
|
||||
hasUnindexedVecField = true
|
||||
}
|
||||
}
|
||||
if !hasUnindexedVecField {
|
||||
indexedSegments = append(indexedSegments, segment)
|
||||
}
|
||||
}
|
||||
|
||||
return indexedSegments
|
||||
return ret
|
||||
}
|
||||
|
||||
func getZeroTime() time.Time {
|
||||
|
|
Loading…
Reference in New Issue