Add deleteMessage in flow_graph_filter_dm_node (#9504)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/9973/head
yukun 2021-10-15 16:54:43 +08:00 committed by GitHub
parent bb721f1cf4
commit 55f148f17e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 0 deletions

View File

@ -63,11 +63,13 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
deleteMessages: make([]*msgstream.DeleteMsg, 0),
timeRange: TimeRange{
timestampMin: msgStreamMsg.TimestampMin(),
timestampMax: msgStreamMsg.TimestampMax(),
},
}
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_Insert:
@ -75,6 +77,11 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
case commonpb.MsgType_Delete:
resMsg := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
if resMsg != nil {
iMsg.deleteMessages = append(iMsg.deleteMessages, resMsg)
}
default:
log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type())))
}
@ -87,6 +94,66 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
// check if collection and partition exist
collection := fdmNode.replica.hasCollection(msg.CollectionID)
partition := fdmNode.replica.hasPartition(msg.PartitionID)
if fdmNode.loadType == loadTypeCollection && !collection {
log.Debug("filter invalid delete message, collection dose not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
if fdmNode.loadType == loadTypePartition && !partition {
log.Debug("filter invalid delete message, partition dose not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
if msg.CollectionID != fdmNode.collectionID {
return nil
}
// if the flow graph type is partition, check if the partition is target partition
if fdmNode.loadType == loadTypePartition && msg.PartitionID != fdmNode.partitionID {
log.Debug("filter invalid delete message, partition is not the target partition",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
// check if partition has been released
if fdmNode.loadType == loadTypeCollection {
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
log.Warn(err.Error())
return nil
}
if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil {
log.Warn(err.Error())
return nil
}
}
if len(msg.PrimaryKeys) != len(msg.Timestamps) {
log.Warn("Error, misaligned messages detected")
return nil
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
return msg
}
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)

View File

@ -168,6 +168,95 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
})
}
func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("delete valid test", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.NotNil(t, res)
})
t.Run("test delete no collection", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msg.CollectionID = UniqueID(1000)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete no partition", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msg.PartitionID = UniqueID(1000)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
fg.loadType = loadTypePartition
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete not target collection", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
fg.collectionID = UniqueID(1000)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete not target partition", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
fg.loadType = loadTypePartition
fg.partitionID = UniqueID(1000)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete released partition", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
col, err := fg.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.addReleasedPartition(defaultPartitionID)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete misaligned messages", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete no data", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
msg.PrimaryKeys = make([]IntPrimaryKey, 0)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
}
func TestFlowGraphFilterDmNode_Operate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -24,6 +24,7 @@ type MsgStreamMsg = flowgraph.MsgStreamMsg
type insertMsg struct {
insertMessages []*msgstream.InsertMsg
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}