From 9dd2e1dcd8644d22c1b40d72807cac36d56ecdf2 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 5 Jan 2023 11:03:35 +0800 Subject: [PATCH] Use tt msg stream for consume delete msg (#21478) (#21500) Signed-off-by: cai.zhang --- internal/querynode/mock_test.go | 2 +- internal/querynode/segment_loader.go | 31 +++++++++++------------ internal/querynode/segment_loader_test.go | 4 ++- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index ddf9a594fd..8864f1205e 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -1903,7 +1903,7 @@ func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.Msg } func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return nil, nil + return mm.mockMqStream, nil } func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 7ec01f465e..1cc7b19df2 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -710,7 +710,7 @@ func (loader *segmentLoader) loadDeltaLogs(ctx context.Context, segment *Segment func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition, segmentIDs []int64) error { startTs := time.Now() - stream, err := loader.factory.NewMsgStream(ctx) + stream, err := loader.factory.NewTtMsgStream(ctx) if err != nil { return err } @@ -764,6 +764,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection zap.String("seekPos", position.String()), zap.Any("lastMsg", lastMsgID), // use any in case of nil ) + hasMore := true for hasMore { select { @@ -803,21 +804,19 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection return err } } - - ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID) - if err != nil { - log.Warn("check whether current MsgID less than last MsgID failed", - zap.Int64("collectionID", collectionID), - zap.String("channel", pChannelName), - zap.Error(err), - ) - return err - } - - if ret { - hasMore = false - break - } + } + ret, err := lastMsgID.LessOrEqualThan(msgPack.EndPositions[0].MsgID) + if err != nil { + log.Warn("check whether current MsgID less than last MsgID failed", + zap.Int64("collectionID", collectionID), + zap.String("channel", pChannelName), + zap.Error(err), + ) + return err + } + if ret { + hasMore = false + break } } } diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 74c70b0bb3..2d8b2fedd1 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -849,7 +849,9 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea msgChan <- nil deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength) deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}} + msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}, + StartPositions: []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition}, + EndPositions: []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition}} } if closedStream { close(msgChan)