mirror of https://github.com/milvus-io/milvus.git
fix dead lock in concurrent search/release segment (#26435)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/26461/head
parent
79ccf06cf6
commit
3b11ad2695
|
@ -338,9 +338,6 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
|
|||
}
|
||||
|
||||
func (mgr *segmentManager) Unpin(segments []Segment) {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
for _, segment := range segments {
|
||||
segment.RUnlock()
|
||||
}
|
||||
|
@ -388,64 +385,116 @@ func (mgr *segmentManager) Empty() bool {
|
|||
// false otherwise
|
||||
func (mgr *segmentManager) Remove(segmentID UniqueID, scope querypb.DataScope) (int, int) {
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
|
||||
var removeGrowing, removeSealed int
|
||||
var growing, sealed *LocalSegment
|
||||
switch scope {
|
||||
case querypb.DataScope_Streaming:
|
||||
if remove(segmentID, mgr.growingSegments) {
|
||||
growing = mgr.removeSegmentWithType(SegmentTypeGrowing, segmentID)
|
||||
if growing != nil {
|
||||
removeGrowing = 1
|
||||
}
|
||||
|
||||
case querypb.DataScope_Historical:
|
||||
if remove(segmentID, mgr.sealedSegments) {
|
||||
sealed = mgr.removeSegmentWithType(SegmentTypeSealed, segmentID)
|
||||
if sealed != nil {
|
||||
removeSealed = 1
|
||||
}
|
||||
|
||||
case querypb.DataScope_All:
|
||||
if remove(segmentID, mgr.growingSegments) {
|
||||
growing = mgr.removeSegmentWithType(SegmentTypeGrowing, segmentID)
|
||||
if growing != nil {
|
||||
removeGrowing = 1
|
||||
}
|
||||
if remove(segmentID, mgr.sealedSegments) {
|
||||
|
||||
sealed = mgr.removeSegmentWithType(SegmentTypeSealed, segmentID)
|
||||
if sealed != nil {
|
||||
removeSealed = 1
|
||||
}
|
||||
}
|
||||
|
||||
mgr.updateMetric()
|
||||
mgr.mu.Unlock()
|
||||
|
||||
if growing != nil {
|
||||
remove(growing)
|
||||
}
|
||||
|
||||
if sealed != nil {
|
||||
remove(sealed)
|
||||
}
|
||||
|
||||
return removeGrowing, removeSealed
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID UniqueID) *LocalSegment {
|
||||
switch typ {
|
||||
case SegmentTypeGrowing:
|
||||
s, ok := mgr.growingSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.growingSegments, segmentID)
|
||||
return s.(*LocalSegment)
|
||||
}
|
||||
|
||||
case SegmentTypeSealed:
|
||||
s, ok := mgr.sealedSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.sealedSegments, segmentID)
|
||||
return s.(*LocalSegment)
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) RemoveBy(filters ...SegmentFilter) (int, int) {
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
|
||||
var removeGrowing, removeSealed int
|
||||
var removeGrowing, removeSealed []*LocalSegment
|
||||
for id, segment := range mgr.growingSegments {
|
||||
if filter(segment, filters...) && remove(id, mgr.growingSegments) {
|
||||
removeGrowing++
|
||||
if filter(segment, filters...) {
|
||||
s := mgr.removeSegmentWithType(SegmentTypeGrowing, id)
|
||||
if s != nil {
|
||||
removeGrowing = append(removeGrowing, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for id, segment := range mgr.sealedSegments {
|
||||
if filter(segment, filters...) && remove(id, mgr.sealedSegments) {
|
||||
removeSealed++
|
||||
if filter(segment, filters...) {
|
||||
s := mgr.removeSegmentWithType(SegmentTypeSealed, id)
|
||||
if s != nil {
|
||||
removeSealed = append(removeSealed, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mgr.updateMetric()
|
||||
return removeGrowing, removeSealed
|
||||
mgr.mu.Unlock()
|
||||
|
||||
for _, s := range removeGrowing {
|
||||
remove(s)
|
||||
}
|
||||
|
||||
for _, s := range removeSealed {
|
||||
remove(s)
|
||||
}
|
||||
|
||||
return len(removeGrowing), len(removeSealed)
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Clear() {
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
|
||||
for id := range mgr.growingSegments {
|
||||
remove(id, mgr.growingSegments)
|
||||
for id, segment := range mgr.growingSegments {
|
||||
delete(mgr.growingSegments, id)
|
||||
remove(segment.(*LocalSegment))
|
||||
}
|
||||
|
||||
for id := range mgr.sealedSegments {
|
||||
remove(id, mgr.sealedSegments)
|
||||
for id, segment := range mgr.sealedSegments {
|
||||
delete(mgr.sealedSegments, id)
|
||||
remove(segment.(*LocalSegment))
|
||||
}
|
||||
mgr.updateMetric()
|
||||
}
|
||||
|
@ -465,17 +514,9 @@ func (mgr *segmentManager) updateMetric() {
|
|||
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len()))
|
||||
}
|
||||
|
||||
// returns true if the segment exists,
|
||||
// false otherwise
|
||||
func remove(segmentID int64, container map[int64]Segment) bool {
|
||||
segment, ok := container[segmentID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
delete(container, segmentID)
|
||||
|
||||
func remove(segment *LocalSegment) bool {
|
||||
rowNum := segment.RowNum()
|
||||
DeleteSegment(segment.(*LocalSegment))
|
||||
DeleteSegment(segment)
|
||||
|
||||
metrics.QueryNodeNumSegments.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
|
|
|
@ -71,6 +71,12 @@ func (s *ManagerSuite) TestGetBy() {
|
|||
segments := s.mgr.GetBy(WithType(typ))
|
||||
s.Contains(segments, s.segments[i])
|
||||
}
|
||||
s.mgr.Clear()
|
||||
|
||||
for _, typ := range s.types {
|
||||
segments := s.mgr.GetBy(WithType(typ))
|
||||
s.Len(segments, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestRemoveGrowing() {
|
||||
|
|
Loading…
Reference in New Issue