mirror of https://github.com/milvus-io/milvus.git
Stop retry lazy load stats after segment released (#25054)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/25141/head
parent
b5de0160d3
commit
736916222b
|
@ -343,9 +343,10 @@ func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.Fie
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO if not retryable, add rebuild statslog logic
|
// TODO if not retryable, add rebuild statslog logic
|
||||||
log.Warn("failed to lazy load statslog for segment", zap.Int64("segment", s.segmentID), zap.Error(err))
|
log.Warn("failed to lazy load statslog for segment", zap.Int64("segment", s.segmentID), zap.Error(err))
|
||||||
if c.retryableLoadError(err) {
|
if c.retryableLoadError(err) && !s.isReleased() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
log.Warn("failed to lazy load statslog for segment, retrying...", zap.Int64("segment", s.segmentID), zap.Error(err))
|
log.Warn("failed to lazy load statslog for segment, retrying...", zap.Int64("segment", s.segmentID), zap.Error(err))
|
||||||
|
|
||||||
c.submitLoadStatsTask(s, statsBinlogs, ts)
|
c.submitLoadStatsTask(s, statsBinlogs, ts)
|
||||||
}
|
}
|
||||||
return struct{}{}, err
|
return struct{}{}, err
|
||||||
|
@ -565,6 +566,7 @@ func (c *ChannelMeta) removeSegments(segIDs ...UniqueID) {
|
||||||
seg.curDeleteBuf = nil
|
seg.curDeleteBuf = nil
|
||||||
seg.historyInsertBuf = nil
|
seg.historyInsertBuf = nil
|
||||||
seg.historyDeleteBuf = nil
|
seg.historyDeleteBuf = nil
|
||||||
|
seg.setReleased(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(c.segments, segID)
|
delete(c.segments, segID)
|
||||||
|
|
|
@ -55,6 +55,7 @@ type Segment struct {
|
||||||
startPos *msgpb.MsgPosition // TODO readonly
|
startPos *msgpb.MsgPosition // TODO readonly
|
||||||
lazyLoading atomic.Value
|
lazyLoading atomic.Value
|
||||||
syncing atomic.Value
|
syncing atomic.Value
|
||||||
|
released atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Segment) isSyncing() bool {
|
func (s *Segment) isSyncing() bool {
|
||||||
|
@ -85,6 +86,18 @@ func (s *Segment) setLoadingLazy(b bool) {
|
||||||
s.lazyLoading.Store(b)
|
s.lazyLoading.Store(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Segment) isReleased() bool {
|
||||||
|
b, ok := s.released.Load().(bool)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Segment) setReleased(b bool) {
|
||||||
|
s.released.Store(b)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Segment) isValid() bool {
|
func (s *Segment) isValid() bool {
|
||||||
return s.getType() != datapb.SegmentType_Compacted
|
return s.getType() != datapb.SegmentType_Compacted
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue