mirror of https://github.com/milvus-io/milvus.git
Fix channel checkpoint stuck (#26534)
See also: #23621 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/26518/head
parent
f9c060e0d2
commit
27691e843c
|
@ -274,7 +274,8 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
|
|||
|
||||
validSegs := make([]*Segment, 0)
|
||||
for _, seg := range c.segments {
|
||||
if seg.getType() == datapb.SegmentType_Flushed || seg.getType() == datapb.SegmentType_Compacted {
|
||||
// flushed segments also need to update cp
|
||||
if !seg.isValid() {
|
||||
continue
|
||||
}
|
||||
validSegs = append(validSegs, seg)
|
||||
|
@ -701,6 +702,7 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
if !c.hasSegment(ID, true) || c.hasSegment(ID, false) {
|
||||
inValidSegments = append(inValidSegments, ID)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(inValidSegments) > 0 {
|
||||
|
@ -717,6 +719,7 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
return errors.New("invalid context")
|
||||
default:
|
||||
}
|
||||
|
||||
for _, ID := range compactedFrom {
|
||||
// the existent of the segments are already checked
|
||||
s := c.segments[ID]
|
||||
|
@ -725,6 +728,11 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
// release bloom filter
|
||||
s.currentStat = nil
|
||||
s.historyStats = nil
|
||||
|
||||
// set correct lastSyncTs for 10-mins channelCP force sync.
|
||||
if s.lastSyncTs < seg.lastSyncTs {
|
||||
seg.lastSyncTs = s.lastSyncTs
|
||||
}
|
||||
}
|
||||
|
||||
// only store segments with numRows > 0
|
||||
|
|
Loading…
Reference in New Issue