mirror of https://github.com/milvus-io/milvus.git
fix: [2.5] Segments return both growing&sealed result (#39789)
Cherry-pick from master pr: #39787 Previous PR #38311 Logic bug found in #38348 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/39180/head
parent
82dc57ace0
commit
1b14f5a7ab
|
@ -251,14 +251,10 @@ func (segments segments) Put(_ context.Context, segmentType SegmentType, segment
|
|||
}
|
||||
}
|
||||
|
||||
func (segments segments) Get(segmentID int64) (Segment, bool) {
|
||||
if segment, ok := segments.growingSegments.Get(segmentID); ok {
|
||||
return segment, true
|
||||
} else if segment, ok = segments.sealedSegments.Get(segmentID); ok {
|
||||
return segment, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
func (segments segments) Get(segmentID int64) (growing Segment, sealed Segment) {
|
||||
growing, _ = segments.growingSegments.Get(segmentID)
|
||||
sealed, _ = segments.sealedSegments.Get(segmentID)
|
||||
return growing, sealed
|
||||
}
|
||||
|
||||
func (segments segments) GetWithType(segmentID int64, segmentType SegmentType) (Segment, bool) {
|
||||
|
@ -298,17 +294,29 @@ func (segments segments) Remove(segment Segment) {
|
|||
func (segments segments) RangeWithFilter(criterion *segmentCriterion, process func(id int64, segType SegmentType, segment Segment) bool) {
|
||||
if criterion.segmentIDs != nil {
|
||||
for id := range criterion.segmentIDs {
|
||||
var segment Segment
|
||||
var ok bool
|
||||
// var segment Segment
|
||||
// var ok bool
|
||||
var segs []Segment
|
||||
if criterion.segmentType == commonpb.SegmentState_SegmentStateNone {
|
||||
segment, ok = segments.Get(id)
|
||||
growing, sealed := segments.Get(id)
|
||||
if growing != nil {
|
||||
segs = append(segs, growing)
|
||||
}
|
||||
if sealed != nil {
|
||||
segs = append(segs, sealed)
|
||||
}
|
||||
} else {
|
||||
segment, ok = segments.GetWithType(id, criterion.segmentType)
|
||||
segment, ok := segments.GetWithType(id, criterion.segmentType)
|
||||
if ok {
|
||||
segs = append(segs, segment)
|
||||
}
|
||||
}
|
||||
|
||||
if ok && criterion.Match(segment) {
|
||||
if !process(id, segment.Type(), segment) {
|
||||
return
|
||||
for _, segment := range segs {
|
||||
if criterion.Match(segment) {
|
||||
if !process(id, segment.Type(), segment) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -451,8 +459,11 @@ func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) b
|
|||
}
|
||||
|
||||
func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment {
|
||||
segment, _ := mgr.globalSegments.Get(segmentID)
|
||||
return segment
|
||||
growing, sealed := mgr.globalSegments.Get(segmentID)
|
||||
if growing != nil {
|
||||
return growing
|
||||
}
|
||||
return sealed
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) GetWithType(segmentID typeutil.UniqueID, typ SegmentType) Segment {
|
||||
|
@ -508,32 +519,50 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
|
|||
}
|
||||
}()
|
||||
|
||||
criterion := &segmentCriterion{}
|
||||
for _, filter := range filters {
|
||||
filter.AddFilter(criterion)
|
||||
}
|
||||
|
||||
for _, id := range segments {
|
||||
// growing, growingExist := mgr.growingSegments[id]
|
||||
// sealed, sealedExist := mgr.sealedSegments[id]
|
||||
segment, ok := mgr.globalSegments.Get(id)
|
||||
var segments []Segment
|
||||
if criterion.segmentType == commonpb.SegmentState_SegmentStateNone {
|
||||
growing, sealed := mgr.globalSegments.Get(id)
|
||||
|
||||
if !ok {
|
||||
err = merr.WrapErrSegmentNotLoaded(id, "segment not found")
|
||||
return nil, err
|
||||
if growing == nil && sealed == nil {
|
||||
err = merr.WrapErrSegmentNotLoaded(id, "segment not found")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
segments = []Segment{growing, sealed}
|
||||
} else {
|
||||
segment, ok := mgr.globalSegments.GetWithType(id, criterion.segmentType)
|
||||
if !ok {
|
||||
err = merr.WrapErrSegmentNotLoaded(id, "segment not found")
|
||||
return nil, err
|
||||
}
|
||||
segments = []Segment{segment}
|
||||
}
|
||||
|
||||
// L0 Segment should not be queryable.
|
||||
if segment.Level() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
for _, segment := range segments {
|
||||
if segment == nil {
|
||||
continue
|
||||
}
|
||||
// L0 Segment should not be queryable.
|
||||
if segment.Level() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// growingExist = growingExist && filter(growing, filters...)
|
||||
// sealedExist = sealedExist && filter(sealed, filters...)
|
||||
if !filter(segment, filters...) {
|
||||
continue
|
||||
}
|
||||
if !filter(segment, filters...) {
|
||||
continue
|
||||
}
|
||||
|
||||
err = segment.PinIfNotReleased()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
err = segment.PinIfNotReleased()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockedSegments = append(lockedSegments, segment)
|
||||
}
|
||||
lockedSegments = append(lockedSegments, segment)
|
||||
}
|
||||
|
||||
return lockedSegments, nil
|
||||
|
|
Loading…
Reference in New Issue