mirror of https://github.com/milvus-io/milvus.git
fix: add GetSegments optimization to avoid meta mutex competition (#31026)
pr: #31025 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>pull/31035/head
parent
91d17870d6
commit
095c94305c
|
@ -344,6 +344,20 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Get segments By filter function
|
||||
func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
var result []UniqueID
|
||||
for _, id := range segIDs {
|
||||
segment := m.segments.GetSegment(id)
|
||||
if segment != nil && filterFunc(segment) {
|
||||
result = append(result, id)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// GetSegment returns segment info with provided id
|
||||
// include the unhealthy segment
|
||||
// if not segment is found, nil will be returned
|
||||
|
|
|
@ -497,24 +497,16 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
|
|||
if len(segIDs) != 0 {
|
||||
segCandidates = segIDs
|
||||
}
|
||||
for _, id := range segCandidates {
|
||||
info := s.meta.GetHealthySegment(id)
|
||||
if info == nil {
|
||||
log.Warn("failed to get seg info from meta", zap.Int64("segmentID", id))
|
||||
continue
|
||||
}
|
||||
if info.CollectionID != collectionID {
|
||||
continue
|
||||
}
|
||||
// idempotent sealed
|
||||
if info.State == commonpb.SegmentState_Sealed {
|
||||
ret = append(ret, id)
|
||||
continue
|
||||
}
|
||||
// segment can be sealed only if it is growing.
|
||||
if info.State != commonpb.SegmentState_Growing {
|
||||
continue
|
||||
}
|
||||
|
||||
sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
|
||||
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
|
||||
})
|
||||
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
|
||||
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
|
||||
})
|
||||
ret = append(ret, sealedSegments...)
|
||||
|
||||
for _, id := range growingSegments {
|
||||
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue