mirror of https://github.com/milvus-io/milvus.git
fix insert meets vChannels reuse pChannel (#18695)
Signed-off-by: Wei Liu <wei.liu@zilliz.com> Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/18800/head
parent
6fd83464cd
commit
9338ad45c5
|
@ -157,7 +157,8 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
log.Info("filter insert messages",
|
||||
zap.Int64("filter segment ID", imsg.GetSegmentID()),
|
||||
zap.Uint64("message timestamp", msg.EndTs()),
|
||||
)
|
||||
zap.String("segment's vChannel", imsg.GetShardName()),
|
||||
zap.String("current vChannel", ddn.vChannelName))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -206,6 +207,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
func (ddn *ddNode) tryToFilterSegmentInsertMessages(msg *msgstream.InsertMsg) bool {
|
||||
if msg.GetShardName() != ddn.vChannelName {
|
||||
return true
|
||||
}
|
||||
|
||||
// Filter all dropped segments
|
||||
if ddn.isDropped(msg.GetSegmentID()) {
|
||||
return true
|
||||
|
|
|
@ -34,6 +34,11 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
ddNodeChannelName = ""
|
||||
anotherChannelName = "another_channel_name"
|
||||
)
|
||||
|
||||
func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
|
||||
tests := []struct {
|
||||
description string
|
||||
|
@ -402,6 +407,13 @@ func TestFlowGraph_DDNode_filterMessages(t *testing.T) {
|
|||
},
|
||||
getInsertMsg(111, 70000),
|
||||
false},
|
||||
// for pChannel reuse on same collection
|
||||
{"test insert msg with different channel name",
|
||||
[]UniqueID{100},
|
||||
nil,
|
||||
nil,
|
||||
getInsertMsgWithChannel(100, 10000, anotherChannelName),
|
||||
true},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -570,7 +582,7 @@ func TestFlowGraph_DDNode_isDropped(t *testing.T) {
|
|||
factory := mockMsgStreamFactory{true, true}
|
||||
deltaStream, err := factory.NewMsgStream(context.Background())
|
||||
assert.Nil(t, err)
|
||||
ddn := &ddNode{droppedSegmentIDs: dsIDs, deltaMsgStream: deltaStream}
|
||||
ddn := &ddNode{droppedSegmentIDs: dsIDs, deltaMsgStream: deltaStream, vChannelName: ddNodeChannelName}
|
||||
assert.Equal(t, test.expectedOut, ddn.isDropped(test.inSeg))
|
||||
})
|
||||
}
|
||||
|
@ -584,12 +596,17 @@ func getSegmentInfo(segmentID UniqueID, ts Timestamp) *datapb.SegmentInfo {
|
|||
}
|
||||
|
||||
func getInsertMsg(segmentID UniqueID, ts Timestamp) *msgstream.InsertMsg {
|
||||
return getInsertMsgWithChannel(segmentID, ts, ddNodeChannelName)
|
||||
}
|
||||
|
||||
func getInsertMsgWithChannel(segmentID UniqueID, ts Timestamp, vChannelName string) *msgstream.InsertMsg {
|
||||
return &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{EndTimestamp: ts},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert},
|
||||
SegmentID: segmentID,
|
||||
CollectionID: 1,
|
||||
ShardName: vChannelName,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue