mirror of https://github.com/milvus-io/milvus.git
Change SeekPosition to earliest of all segments (#10771)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/10800/head
parent
4186e785ab
commit
c019e80dbb
|
@ -250,7 +250,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
|
|||
|
||||
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
|
||||
for _, ch := range update.Channels {
|
||||
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, true)
|
||||
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, false)
|
||||
info := &datapb.ChannelWatchInfo{
|
||||
Vchan: vchan,
|
||||
StartTs: time.Now().Unix(),
|
||||
|
|
|
@ -683,29 +683,30 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
flushed := make([]*datapb.SegmentInfo, 0)
|
||||
unflushed := make([]*datapb.SegmentInfo, 0)
|
||||
var seekPosition *internalpb.MsgPosition
|
||||
var useUnflushedPosition bool
|
||||
for _, s := range segments {
|
||||
if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
|
||||
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
|
||||
if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
|
||||
if seekPosition == nil || (s.DmlPosition.Timestamp < seekPosition.Timestamp) {
|
||||
seekPosition = s.DmlPosition
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if s.DmlPosition == nil {
|
||||
if s.DmlPosition == nil { // segment position all nil
|
||||
continue
|
||||
}
|
||||
|
||||
unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo))
|
||||
|
||||
if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
|
||||
useUnflushedPosition = true
|
||||
if !seekFromStartPosition {
|
||||
seekPosition = s.DmlPosition
|
||||
} else {
|
||||
seekPosition = s.StartPosition
|
||||
}
|
||||
segmentPosition := s.DmlPosition
|
||||
if seekFromStartPosition {
|
||||
// need to use start position when load collection/partition, querynode does not support seek from checkpoint yet
|
||||
// TODO silverxia remove seek from start logic after checkpoint supported in querynode
|
||||
segmentPosition = s.StartPosition
|
||||
}
|
||||
|
||||
if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
|
||||
seekPosition = segmentPosition
|
||||
}
|
||||
}
|
||||
// use collection start position when segment position is not found
|
||||
|
|
|
@ -1179,7 +1179,7 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) {
|
||||
t.Run("test get earliest position of flushed segments as seek position", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
|
||||
|
@ -1204,7 +1204,7 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||
assert.EqualValues(t, 1, len(resp.GetChannels()))
|
||||
assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegments()))
|
||||
assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments())
|
||||
assert.EqualValues(t, 20, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||
assert.EqualValues(t, 10, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||
})
|
||||
|
||||
t.Run("test get recovery of unflushed segments ", func(t *testing.T) {
|
||||
|
@ -1232,6 +1232,7 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||
assert.EqualValues(t, 0, len(resp.GetBinlogs()))
|
||||
assert.EqualValues(t, 1, len(resp.GetChannels()))
|
||||
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
|
||||
assert.EqualValues(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||
})
|
||||
|
||||
t.Run("test get binlogs", func(t *testing.T) {
|
||||
|
|
|
@ -476,7 +476,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
channels := dresp.GetVirtualChannelNames()
|
||||
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
|
||||
for _, c := range channels {
|
||||
channelInfo := s.GetVChanPositions(c, collectionID, false)
|
||||
channelInfo := s.GetVChanPositions(c, collectionID, true)
|
||||
channelInfos = append(channelInfos, channelInfo)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue