Use tt msg stream for consume delete msg (#21478) (#21500)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/21477/head
cai.zhang 2023-01-05 11:03:35 +08:00 committed by GitHub
parent 58b79eb74c
commit 9dd2e1dcd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 18 deletions

View File

@ -1903,7 +1903,7 @@ func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.Msg
} }
func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return nil, nil return mm.mockMqStream, nil
} }
func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {

View File

@ -710,7 +710,7 @@ func (loader *segmentLoader) loadDeltaLogs(ctx context.Context, segment *Segment
func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition, func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition,
segmentIDs []int64) error { segmentIDs []int64) error {
startTs := time.Now() startTs := time.Now()
stream, err := loader.factory.NewMsgStream(ctx) stream, err := loader.factory.NewTtMsgStream(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -764,6 +764,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
zap.String("seekPos", position.String()), zap.String("seekPos", position.String()),
zap.Any("lastMsg", lastMsgID), // use any in case of nil zap.Any("lastMsg", lastMsgID), // use any in case of nil
) )
hasMore := true hasMore := true
for hasMore { for hasMore {
select { select {
@ -803,21 +804,19 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
return err return err
} }
} }
}
ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID) ret, err := lastMsgID.LessOrEqualThan(msgPack.EndPositions[0].MsgID)
if err != nil { if err != nil {
log.Warn("check whether current MsgID less than last MsgID failed", log.Warn("check whether current MsgID less than last MsgID failed",
zap.Int64("collectionID", collectionID), zap.Int64("collectionID", collectionID),
zap.String("channel", pChannelName), zap.String("channel", pChannelName),
zap.Error(err), zap.Error(err),
) )
return err return err
} }
if ret {
if ret { hasMore = false
hasMore = false break
break
}
} }
} }
} }

View File

@ -849,7 +849,9 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea
msgChan <- nil msgChan <- nil
deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength) deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength)
deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}} msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2},
StartPositions: []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition},
EndPositions: []*internalpb.MsgPosition{genMsgStreamBaseMsg().MsgPosition}}
} }
if closedStream { if closedStream {
close(msgChan) close(msgChan)