mirror of https://github.com/milvus-io/milvus.git
Trim segmentinfo binlog for VChaninfo usage (#10425)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/10438/head
parent
73c9ab43e0
commit
285c3f63e3
|
@ -683,7 +683,7 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
var useUnflushedPosition bool
|
||||
for _, s := range segments {
|
||||
if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
|
||||
flushed = append(flushed, s.SegmentInfo)
|
||||
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
|
||||
if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
|
||||
seekPosition = s.DmlPosition
|
||||
}
|
||||
|
@ -694,7 +694,7 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
continue
|
||||
}
|
||||
|
||||
unflushed = append(unflushed, s.SegmentInfo)
|
||||
unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo))
|
||||
|
||||
if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
|
||||
useUnflushedPosition = true
|
||||
|
@ -728,3 +728,19 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
UnflushedSegments: unflushed,
|
||||
}
|
||||
}
|
||||
|
||||
// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
|
||||
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
ID: info.ID,
|
||||
CollectionID: info.CollectionID,
|
||||
PartitionID: info.PartitionID,
|
||||
InsertChannel: info.InsertChannel,
|
||||
NumOfRows: info.NumOfRows,
|
||||
State: info.State,
|
||||
MaxRowNum: info.MaxRowNum,
|
||||
LastExpireTime: info.LastExpireTime,
|
||||
StartPosition: info.StartPosition,
|
||||
DmlPosition: info.DmlPosition,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1196,7 +1196,7 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.EqualValues(t, 1, len(resp.GetChannels()))
|
||||
assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegments()))
|
||||
assert.ElementsMatch(t, []*datapb.SegmentInfo{seg1, seg2}, resp.GetChannels()[0].GetFlushedSegments())
|
||||
assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments())
|
||||
assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue