mirror of https://github.com/milvus-io/milvus.git
Add ut for flow_graph_dd_node to 100% (#7511)
This PR: - Change mutex to sync.Map - Add ut for flow_graph_dd_node.go to 100% See also: #6357 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/7537/head
parent
f00f882d10
commit
29756c6ce8
|
@ -21,7 +21,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
@ -32,33 +31,26 @@ type ddNode struct {
|
|||
clearSignal chan<- UniqueID
|
||||
collectionID UniqueID
|
||||
|
||||
mu sync.RWMutex
|
||||
seg2SegInfo map[UniqueID]*datapb.SegmentInfo // Segment ID to UnFlushed Segment
|
||||
vchanInfo *datapb.VchannelInfo
|
||||
segID2SegInfo sync.Map // segment ID to *SegmentInfo
|
||||
flushedSegments []UniqueID
|
||||
}
|
||||
|
||||
func (ddn *ddNode) Name() string {
|
||||
return "ddNode"
|
||||
}
|
||||
|
||||
func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||
|
||||
func (ddn *ddNode) Operate(in []Msg) []Msg {
|
||||
// log.Debug("DDNode Operating")
|
||||
|
||||
if len(in) != 1 {
|
||||
log.Error("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
|
||||
// TODO: add error handling
|
||||
}
|
||||
|
||||
if len(in) == 0 {
|
||||
return []flowgraph.Msg{}
|
||||
log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
msMsg, ok := in[0].(*MsgStreamMsg)
|
||||
if !ok {
|
||||
log.Error("type assertion failed for MsgStreamMsg")
|
||||
return []flowgraph.Msg{}
|
||||
// TODO: add error handling
|
||||
log.Warn("Type assertion failed for MsgStreamMsg")
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
var spans []opentracing.Span
|
||||
|
@ -68,10 +60,6 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
msg.SetTraceCtx(ctx)
|
||||
}
|
||||
|
||||
if msMsg == nil {
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
var iMsg = insertMsg{
|
||||
insertMessages: make([]*msgstream.InsertMsg, 0),
|
||||
timeRange: TimeRange{
|
||||
|
@ -93,6 +81,12 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
case commonpb.MsgType_Insert:
|
||||
log.Debug("DDNode with insert messages")
|
||||
imsg := msg.(*msgstream.InsertMsg)
|
||||
if imsg.CollectionID != ddn.collectionID {
|
||||
//log.Debug("filter invalid InsertMsg, collection mis-match",
|
||||
// zap.Int64("Get msg collID", imsg.CollectionID),
|
||||
// zap.Int64("Expected collID", ddn.collectionID))
|
||||
continue
|
||||
}
|
||||
if msg.EndTs() < FilterThreshold {
|
||||
log.Info("Filtering Insert Messages",
|
||||
zap.Uint64("Message endts", msg.EndTs()),
|
||||
|
@ -102,12 +96,6 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
continue
|
||||
}
|
||||
}
|
||||
if imsg.CollectionID != ddn.collectionID {
|
||||
//log.Debug("filter invalid InsertMsg, collection mis-match",
|
||||
// zap.Int64("msg collID", imsg.CollectionID),
|
||||
// zap.Int64("ddn collID", ddn.collectionID))
|
||||
continue
|
||||
}
|
||||
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
|
||||
}
|
||||
}
|
||||
|
@ -129,23 +117,18 @@ func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg)
|
|||
return true
|
||||
}
|
||||
|
||||
ddn.mu.Lock()
|
||||
if si, ok := ddn.seg2SegInfo[msg.GetSegmentID()]; ok {
|
||||
if msg.EndTs() <= si.GetDmlPosition().GetTimestamp() {
|
||||
if si, ok := ddn.segID2SegInfo.Load(msg.GetSegmentID()); ok {
|
||||
if msg.EndTs() <= si.(*datapb.SegmentInfo).GetDmlPosition().GetTimestamp() {
|
||||
return true
|
||||
}
|
||||
delete(ddn.seg2SegInfo, msg.GetSegmentID())
|
||||
}
|
||||
|
||||
ddn.mu.Unlock()
|
||||
ddn.segID2SegInfo.Delete(msg.GetSegmentID())
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
|
||||
ddn.mu.Lock()
|
||||
defer ddn.mu.Unlock()
|
||||
|
||||
for _, id := range ddn.vchanInfo.GetFlushedSegments() {
|
||||
for _, id := range ddn.flushedSegments {
|
||||
if id == segmentID {
|
||||
return true
|
||||
}
|
||||
|
@ -157,16 +140,28 @@ func newDDNode(clearSignal chan<- UniqueID, collID UniqueID, vchanInfo *datapb.V
|
|||
baseNode := BaseNode{}
|
||||
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
|
||||
|
||||
si := make(map[UniqueID]*datapb.SegmentInfo)
|
||||
for _, us := range vchanInfo.GetUnflushedSegments() {
|
||||
si[us.GetID()] = us
|
||||
fs := make([]UniqueID, 0, len(vchanInfo.GetFlushedSegments()))
|
||||
fs = append(fs, vchanInfo.GetFlushedSegments()...)
|
||||
log.Debug("ddNode add flushed segment",
|
||||
zap.Int64("collectionID", vchanInfo.GetCollectionID()),
|
||||
zap.Int("No. Segment", len(vchanInfo.GetFlushedSegments())),
|
||||
)
|
||||
|
||||
dd := &ddNode{
|
||||
BaseNode: baseNode,
|
||||
clearSignal: clearSignal,
|
||||
collectionID: collID,
|
||||
flushedSegments: fs,
|
||||
}
|
||||
|
||||
return &ddNode{
|
||||
BaseNode: baseNode,
|
||||
clearSignal: clearSignal,
|
||||
collectionID: collID,
|
||||
seg2SegInfo: si,
|
||||
vchanInfo: vchanInfo,
|
||||
for _, us := range vchanInfo.GetUnflushedSegments() {
|
||||
dd.segID2SegInfo.Store(us.GetID(), us)
|
||||
}
|
||||
|
||||
log.Debug("ddNode add unflushed segment",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Int("No. Segment", len(vchanInfo.GetUnflushedSegments())),
|
||||
)
|
||||
|
||||
return dd
|
||||
}
|
||||
|
|
|
@ -13,18 +13,276 @@ package datanode
|
|||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
func TestFlowGraphDDNode_Operate(t *testing.T) {
|
||||
// clearSignal := make(chan UniqueID)
|
||||
// collectionID := UniqueID(1)
|
||||
// vchanInfo := &datapb.VchannelInfo{
|
||||
// CollectionID: collectionID,
|
||||
// }
|
||||
// ddNode := newDDNode(clearSignal)
|
||||
// ddNode := newDDNode()
|
||||
func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
|
||||
tests := []struct {
|
||||
inCollID UniqueID
|
||||
|
||||
// var inMsg Msg = msgStream
|
||||
// ddNode.Operate([]Msg{inMsg})
|
||||
inFlushedSegs []UniqueID
|
||||
inUnFlushedSegID UniqueID
|
||||
inUnFlushedChannelTs Timestamp
|
||||
|
||||
description string
|
||||
}{
|
||||
{UniqueID(1), []UniqueID{100, 101, 102}, 200, 666666,
|
||||
"Input VchannelInfo with 3 flushed segs and 1 unflushed seg"},
|
||||
{UniqueID(2), []UniqueID{103}, 200, 666666,
|
||||
"Input VchannelInfo with 1 flushed seg and 1 unflushed seg"},
|
||||
{UniqueID(3), []UniqueID{}, 200, 666666,
|
||||
"Input VchannelInfo with 0 flushed segs and 1 unflushed seg"},
|
||||
{UniqueID(3), []UniqueID{104}, 0, 0,
|
||||
"Input VchannelInfo with 1 flushed seg and empty unflushed seg"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
di := &datapb.SegmentInfo{}
|
||||
|
||||
if test.inUnFlushedSegID != 0 {
|
||||
di.ID = test.inUnFlushedSegID
|
||||
di.DmlPosition = &internalpb.MsgPosition{Timestamp: test.inUnFlushedChannelTs}
|
||||
}
|
||||
|
||||
ddNode := newDDNode(
|
||||
make(chan UniqueID),
|
||||
test.inCollID,
|
||||
&datapb.VchannelInfo{
|
||||
FlushedSegments: test.inFlushedSegs,
|
||||
UnflushedSegments: []*datapb.SegmentInfo{di},
|
||||
},
|
||||
)
|
||||
|
||||
assert.Equal(t, "ddNode", ddNode.Name())
|
||||
assert.Equal(t, test.inCollID, ddNode.collectionID)
|
||||
assert.Equal(t, len(test.inFlushedSegs), len(ddNode.flushedSegments))
|
||||
assert.ElementsMatch(t, test.inFlushedSegs, ddNode.flushedSegments)
|
||||
|
||||
si, ok := ddNode.segID2SegInfo.Load(test.inUnFlushedSegID)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, test.inUnFlushedSegID, si.(*datapb.SegmentInfo).GetID())
|
||||
assert.Equal(t, test.inUnFlushedChannelTs, si.(*datapb.SegmentInfo).GetDmlPosition().GetTimestamp())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlowGraph_DDNode_Operate(to *testing.T) {
|
||||
to.Run("Test DDNode Operate DropCollection Msg", func(te *testing.T) {
|
||||
// invalid inputs
|
||||
invalidInTests := []struct {
|
||||
in []Msg
|
||||
description string
|
||||
}{
|
||||
{[]Msg{},
|
||||
"Invalid input length == 0"},
|
||||
{[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}},
|
||||
"Invalid input length == 3"},
|
||||
{[]Msg{&insertMsg{}},
|
||||
"Invalid input length == 1 but input message is not msgStreamMsg"},
|
||||
}
|
||||
|
||||
for _, test := range invalidInTests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
ddn := ddNode{}
|
||||
rt := ddn.Operate(test.in)
|
||||
assert.Empty(t, rt)
|
||||
})
|
||||
}
|
||||
|
||||
// valid inputs
|
||||
tests := []struct {
|
||||
ddnClearSignal chan UniqueID
|
||||
ddnCollID UniqueID
|
||||
|
||||
msgCollID UniqueID
|
||||
expectedChlen int
|
||||
|
||||
description string
|
||||
}{
|
||||
{make(chan UniqueID, 1), 1, 1, 1,
|
||||
"DropCollectionMsg collID == ddNode collID"},
|
||||
{make(chan UniqueID, 1), 1, 2, 0,
|
||||
"DropCollectionMsg collID != ddNode collID"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
ddn := ddNode{
|
||||
clearSignal: test.ddnClearSignal,
|
||||
collectionID: test.ddnCollID,
|
||||
}
|
||||
|
||||
var createCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{
|
||||
DropCollectionRequest: internalpb.DropCollectionRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
|
||||
CollectionID: test.msgCollID,
|
||||
},
|
||||
}
|
||||
tsMessages := []msgstream.TsMsg{createCollMsg}
|
||||
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
|
||||
|
||||
rt := ddn.Operate([]Msg{msgStreamMsg})
|
||||
assert.Equal(t, test.expectedChlen, len(test.ddnClearSignal))
|
||||
|
||||
if test.ddnCollID == test.msgCollID {
|
||||
assert.Empty(t, rt)
|
||||
} else {
|
||||
assert.NotEmpty(t, rt)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
to.Run("Test DDNode Operate Insert Msg", func(te *testing.T) {
|
||||
tests := []struct {
|
||||
ddnCollID UniqueID
|
||||
inMsgCollID UniqueID
|
||||
|
||||
MsgEndTs Timestamp
|
||||
threshold Timestamp
|
||||
|
||||
ddnFlushedSegment UniqueID
|
||||
inMsgSegID UniqueID
|
||||
|
||||
expectedRtLen int
|
||||
description string
|
||||
}{
|
||||
{1, 1, 2000, 3000, 100, 100, 0,
|
||||
"MsgEndTs(2000) < threshold(3000), inMsgSegID(100) IN ddnFlushedSeg {100}"},
|
||||
{1, 1, 2000, 3000, 100, 200, 1,
|
||||
"MsgEndTs(2000) < threshold(3000), inMsgSegID(200) NOT IN ddnFlushedSeg {100}"},
|
||||
{1, 1, 4000, 3000, 100, 101, 1,
|
||||
"Seg 101, MsgEndTs(4000) > FilterThreshold(3000)"},
|
||||
{1, 1, 4000, 3000, 100, 200, 1,
|
||||
"Seg 200, MsgEndTs(4000) > FilterThreshold(3000)"},
|
||||
{1, 2, 4000, 3000, 100, 100, 0,
|
||||
"inMsgCollID(2) != ddnCollID"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
// Prepare ddNode states
|
||||
ddn := ddNode{
|
||||
flushedSegments: []UniqueID{test.ddnFlushedSegment},
|
||||
collectionID: test.ddnCollID,
|
||||
}
|
||||
FilterThreshold = test.threshold
|
||||
|
||||
// Prepare insert messages
|
||||
var iMsg msgstream.TsMsg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{EndTimestamp: test.MsgEndTs},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert},
|
||||
CollectionID: test.inMsgCollID,
|
||||
SegmentID: test.inMsgSegID,
|
||||
},
|
||||
}
|
||||
tsMessages := []msgstream.TsMsg{iMsg}
|
||||
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
|
||||
|
||||
// Test
|
||||
rt := ddn.Operate([]Msg{msgStreamMsg})
|
||||
assert.Equal(t, test.expectedRtLen, len(rt[0].(*insertMsg).insertMessages))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestFlowGraph_DDNode_filterMessages(te *testing.T) {
|
||||
tests := []struct {
|
||||
ddnFlushedSegments []UniqueID
|
||||
ddnSegID2Ts map[UniqueID]Timestamp
|
||||
|
||||
inMsgSegID UniqueID
|
||||
inMsgSegEntTs Timestamp
|
||||
expectedOut bool
|
||||
|
||||
description string
|
||||
}{
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 1, 1500, true,
|
||||
"Seg 1 in flushedSegs {1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 2, 1500, true,
|
||||
"Seg 2 in flushedSegs {1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 3, 1500, true,
|
||||
"Seg 3 in flushedSegs {1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 1500, false,
|
||||
"Seg 4, inMsgSegEntTs(1500) > SegCheckPoint(1000)"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 500, true,
|
||||
"Seg 4, inMsgSegEntTs(500) <= SegCheckPoint(1000)"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 1000, true,
|
||||
"Seg 4 inMsgSegEntTs(1000) <= SegCheckPoint(1000)"},
|
||||
{[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 5, 1500, true,
|
||||
"Seg 5 inMsgSegEntTs(1500) <= SegCheckPoint(2000)"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
// Prepare ddNode states
|
||||
ddn := ddNode{
|
||||
flushedSegments: test.ddnFlushedSegments,
|
||||
}
|
||||
|
||||
for k, v := range test.ddnSegID2Ts {
|
||||
ddn.segID2SegInfo.Store(k, &datapb.SegmentInfo{DmlPosition: &internalpb.MsgPosition{Timestamp: v}})
|
||||
}
|
||||
|
||||
// Prepare insert messages
|
||||
var iMsg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{EndTimestamp: test.inMsgSegEntTs},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert},
|
||||
SegmentID: test.inMsgSegID,
|
||||
},
|
||||
}
|
||||
|
||||
// Test
|
||||
rt := ddn.filterFlushedSegmentInsertMessages(iMsg)
|
||||
assert.Equal(t, test.expectedOut, rt)
|
||||
|
||||
si, ok := ddn.segID2SegInfo.Load(iMsg.GetSegmentID())
|
||||
if !rt {
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, si)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlowGraph_DDNode_isFlushed(te *testing.T) {
|
||||
tests := []struct {
|
||||
influshedSegment []UniqueID
|
||||
inSeg UniqueID
|
||||
|
||||
expectedOut bool
|
||||
|
||||
description string
|
||||
}{
|
||||
{[]UniqueID{1, 2, 3}, 1, true,
|
||||
"Input seg 1 in flushedSegs{1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, 2, true,
|
||||
"Input seg 2 in flushedSegs{1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, 3, true,
|
||||
"Input seg 3 in flushedSegs{1, 2, 3}"},
|
||||
{[]UniqueID{1, 2, 3}, 4, false,
|
||||
"Input seg 4 not in flushedSegs{1, 2, 3}"},
|
||||
{[]UniqueID{}, 5, false,
|
||||
"Input seg 5, no flushedSegs {}"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
te.Run(test.description, func(t *testing.T) {
|
||||
ddn := &ddNode{flushedSegments: test.influshedSegment}
|
||||
assert.Equal(t, test.expectedOut, ddn.isFlushed(test.inSeg))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue