diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 59919ce64c..778c96dca9 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -149,9 +149,9 @@ func NewSegmentManager() *segmentManager { } func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { + var replacedSegment []Segment mgr.mu.Lock() defer mgr.mu.Unlock() - targetMap := mgr.growingSegments switch segmentType { case SegmentTypeGrowing: @@ -165,37 +165,51 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { for _, segment := range segments { oldSegment, ok := targetMap[segment.ID()] - if ok && oldSegment.Version() >= segment.Version() { - log.Warn("Invalid segment distribution changed, skip it", - zap.Int64("segmentID", segment.ID()), - zap.Int64("oldVersion", oldSegment.Version()), - zap.Int64("newVersion", segment.Version()), - ) - continue + if ok { + if oldSegment.Version() >= segment.Version() { + log.Warn("Invalid segment distribution changed, skip it", + zap.Int64("segmentID", segment.ID()), + zap.Int64("oldVersion", oldSegment.Version()), + zap.Int64("newVersion", segment.Version()), + ) + // delete redundant segment + if s, ok := segment.(*LocalSegment); ok { + DeleteSegment(s) + } + continue + } + replacedSegment = append(replacedSegment, oldSegment) } targetMap[segment.ID()] = segment - if !ok { - eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) - metrics.QueryNodeNumSegments.WithLabelValues( + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) + metrics.QueryNodeNumSegments.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + fmt.Sprint(len(segment.Indexes())), + ).Inc() + if segment.RowNum() > 0 { + metrics.QueryNodeNumEntities.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), fmt.Sprint(segment.Partition()), segment.Type().String(), fmt.Sprint(len(segment.Indexes())), - ).Inc() - if segment.RowNum() > 0 { - metrics.QueryNodeNumEntities.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Add(float64(segment.RowNum())) - } + ).Add(float64(segment.RowNum())) } } mgr.updateMetric() + + // release replaced segment + if len(replacedSegment) > 0 { + go func() { + for _, segment := range replacedSegment { + remove(segment.(*LocalSegment)) + } + }() + } } func (mgr *segmentManager) UpdateSegmentBy(action SegmentAction, filters ...SegmentFilter) int {