mirror of https://github.com/milvus-io/milvus.git
fix: Use correct ts to avoid exclude segment list leak (#31991)
issue: #31990 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32197/head
parent
43a9be2cce
commit
68dec7dcd4
|
@ -403,6 +403,7 @@ message ReleaseSegmentsRequest {
|
|||
DataScope scope = 7; // All, Streaming, Historical
|
||||
string shard = 8;
|
||||
bool need_transfer = 11;
|
||||
msg.MsgPosition checkpoint = 12; // channel's check point
|
||||
}
|
||||
|
||||
message SearchRequest {
|
||||
|
|
|
@ -246,7 +246,14 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
|||
ctx := task.Context()
|
||||
|
||||
dstNode := action.Node()
|
||||
|
||||
req := packReleaseSegmentRequest(task, action)
|
||||
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), task.Shard(), meta.CurrentTarget)
|
||||
if channel != nil {
|
||||
// if channel exists in current target, set cp to ReleaseSegmentRequest, need to use it as growing segment's exclude ts
|
||||
req.Checkpoint = channel.GetSeekPosition()
|
||||
}
|
||||
|
||||
if action.Scope() == querypb.DataScope_Streaming {
|
||||
// Any modification to the segment distribution have to set NeedTransfer true,
|
||||
// to protect the version, which serves search/query
|
||||
|
@ -270,6 +277,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
|||
log.Warn(msg, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
dstNode = view.ID
|
||||
log = log.With(zap.Int64("shardLeader", view.ID))
|
||||
req.NeedTransfer = true
|
||||
|
|
|
@ -839,7 +839,11 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
|||
// when we try to release a segment, add it to pipeline's exclude list first
|
||||
// in case of consumed it's growing segment again
|
||||
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
if req.GetCheckpoint() == nil {
|
||||
return id, typeutil.MaxTimestamp
|
||||
}
|
||||
|
||||
return id, req.GetCheckpoint().GetTimestamp()
|
||||
})
|
||||
sd.AddExcludedSegments(droppedInfos)
|
||||
|
||||
|
|
|
@ -293,17 +293,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
})
|
||||
delegator.AddExcludedSegments(growingInfo)
|
||||
|
||||
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
delegator.AddExcludedSegments(flushedInfo)
|
||||
for _, channelInfo := range req.GetInfos() {
|
||||
droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
})
|
||||
delegator.AddExcludedSegments(droppedInfos)
|
||||
}
|
||||
|
||||
err = loadL0Segments(ctx, delegator, req)
|
||||
if err != nil {
|
||||
log.Warn("failed to load l0 segments", zap.Error(err))
|
||||
|
@ -1287,7 +1276,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||
case querypb.SyncType_UpdateVersion:
|
||||
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
||||
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
|
||||
return id, typeutil.MaxTimestamp
|
||||
if action.GetCheckpoint() == nil {
|
||||
return id, typeutil.MaxTimestamp
|
||||
}
|
||||
return id, action.GetCheckpoint().Timestamp
|
||||
})
|
||||
shardDelegator.AddExcludedSegments(droppedInfos)
|
||||
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
||||
|
|
Loading…
Reference in New Issue