From f93ba3e37c602169b63c93d83e1c6fad9a7eb64f Mon Sep 17 00:00:00 2001
From: "cai.zhang" <cai.zhang@zilliz.com>
Date: Wed, 4 Jan 2023 10:57:34 +0800
Subject: [PATCH] Use tt msg stream for consume delete msg (#21478)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
---
 internal/querynode/mock_test.go           |  2 +-
 internal/querynode/segment_loader.go      | 31 +++++++++++------------
 internal/querynode/segment_loader_test.go |  4 ++-
 3 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go
index df26e79ba5..0401edd54c 100644
--- a/internal/querynode/mock_test.go
+++ b/internal/querynode/mock_test.go
@@ -1903,7 +1903,7 @@ func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.Msg
 }
 
 func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return nil, nil
+	return mm.mockMqStream, nil
 }
 
 func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go
index 2469698635..6735c83e51 100644
--- a/internal/querynode/segment_loader.go
+++ b/internal/querynode/segment_loader.go
@@ -710,7 +710,7 @@ func (loader *segmentLoader) loadDeltaLogs(ctx context.Context, segment *Segment
 func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition,
 	segmentIDs []int64) error {
 	startTs := time.Now()
-	stream, err := loader.factory.NewMsgStream(ctx)
+	stream, err := loader.factory.NewTtMsgStream(ctx)
 	if err != nil {
 		return err
 	}
@@ -765,6 +765,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
 		zap.String("seekPos", position.String()),
 		zap.Any("lastMsg", lastMsgID), // use any in case of nil
 	)
+
 	hasMore := true
 	for hasMore {
 		select {
@@ -804,21 +805,19 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
 						return err
 					}
 				}
-
-				ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID)
-				if err != nil {
-					log.Warn("check whether current MsgID less than last MsgID failed",
-						zap.Int64("collectionID", collectionID),
-						zap.String("channel", pChannelName),
-						zap.Error(err),
-					)
-					return err
-				}
-
-				if ret {
-					hasMore = false
-					break
-				}
+			}
+			ret, err := lastMsgID.LessOrEqualThan(msgPack.EndPositions[0].MsgID)
+			if err != nil {
+				log.Warn("check whether current MsgID less than last MsgID failed",
+					zap.Int64("collectionID", collectionID),
+					zap.String("channel", pChannelName),
+					zap.Error(err),
+				)
+				return err
+			}
+			if ret {
+				hasMore = false
+				break
 			}
 		}
 	}
diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go
index 359fcb92c8..6089f40049 100644
--- a/internal/querynode/segment_loader_test.go
+++ b/internal/querynode/segment_loader_test.go
@@ -827,7 +827,9 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea
 		msgChan <- nil
 		deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength)
 		deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
-		msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}}
+		msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2},
+			StartPositions: []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition},
+			EndPositions:   []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition}}
 	}
 	if closedStream {
 		close(msgChan)