Rename insertMsg to flowGraphMsg in datanode flow graph (#8546)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/8561/head
Cai Yudong 2021-09-26 10:43:57 +08:00 committed by GitHub
parent c5b71e8d04
commit b2e5627061
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 41 additions and 43 deletions

View File

@ -60,7 +60,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
msg.SetTraceCtx(ctx)
}
var iMsg = insertMsg{
var fgMsg = flowGraphMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
@ -96,14 +96,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
continue
}
}
iMsg.insertMessages = append(iMsg.insertMessages, imsg)
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
}
}
iMsg.startPositions = append(iMsg.startPositions, msMsg.StartPositions()...)
iMsg.endPositions = append(iMsg.endPositions, msMsg.EndPositions()...)
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...)
var res Msg = &iMsg
var res Msg = &fgMsg
for _, sp := range spans {
sp.Finish()

View File

@ -83,9 +83,9 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
}{
{[]Msg{},
"Invalid input length == 0"},
{[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}},
{[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}},
"Invalid input length == 3"},
{[]Msg{&insertMsg{}},
{[]Msg{&flowGraphMsg{}},
"Invalid input length == 1 but input message is not msgStreamMsg"},
}
@ -190,7 +190,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
// Test
rt := ddn.Operate([]Msg{msgStreamMsg})
assert.Equal(t, test.expectedRtLen, len(rt[0].(*insertMsg).insertMessages))
assert.Equal(t, test.expectedRtLen, len(rt[0].(*flowGraphMsg).insertMessages))
})
}
})

View File

@ -49,9 +49,9 @@ func TestFlowGraphDeleteNode_Operate(te *testing.T) {
}{
{[]Msg{}, nil,
"Invalid input length == 0"},
{[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}}, nil,
{[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}}, nil,
"Invalid input length == 3"},
{[]Msg{&insertMsg{}}, nil,
{[]Msg{&flowGraphMsg{}}, nil,
"Invalid input length == 1 but input message is not msgStreamMsg"},
{nil, []Msg{&MsgStreamMsg{}},
"valid input"},

View File

@ -165,7 +165,6 @@ func (ibNode *insertBufferNode) Close() {
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// log.Debug("InsertBufferNode Operating")
if len(in) != 1 {
@ -173,36 +172,36 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
return []Msg{}
}
iMsg, ok := in[0].(*insertMsg)
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Error("type assertion failed for insertMsg")
log.Error("type assertion failed for flowGraphMsg")
ibNode.Close()
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range iMsg.insertMessages {
for _, msg := range fgMsg.insertMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.startPositions))
for idx := range iMsg.startPositions {
pos := proto.Clone(iMsg.startPositions[idx]).(*internalpb.MsgPosition)
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
startPositions = append(startPositions, pos)
}
endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions))
for idx := range iMsg.endPositions {
pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition)
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
for idx := range fgMsg.endPositions {
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
// Updating segment statistics in replica
seg2Upload, err := ibNode.updateSegStatesInReplica(iMsg.insertMessages, startPositions[0], endPositions[0])
seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0])
if err != nil {
log.Warn("update segment states in Replica wrong", zap.Error(err))
return []Msg{}
@ -216,7 +215,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
// insert messages -> buffer
for _, msg := range iMsg.insertMessages {
for _, msg := range fgMsg.insertMessages {
err := ibNode.bufferInsertMsg(msg, endPositions[0])
if err != nil {
log.Warn("msg to buffer failed", zap.Error(err))
@ -224,7 +223,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
// TODO GOOSE: log updated segments' states
if len(iMsg.insertMessages) > 0 {
if len(fgMsg.insertMessages) > 0 {
log.Debug("---insert buffer status---")
var stopSign int = 0
for k := range ibNode.insertBuffer.insertData {
@ -246,7 +245,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
log.Debug(". Insert Buffer full, auto flushing ",
zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush)))
collMeta, err := ibNode.getCollMetabySegID(segToFlush, iMsg.timeRange.timestampMax)
collMeta, err := ibNode.getCollMetabySegID(segToFlush, fgMsg.timeRange.timestampMax)
if err != nil {
log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
continue
@ -320,7 +319,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// TODO add error handling
}
collMeta, err := ibNode.getCollMetabySegID(currentSegID, iMsg.timeRange.timestampMax)
collMeta, err := ibNode.getCollMetabySegID(currentSegID, fgMsg.timeRange.timestampMax)
if err != nil {
log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
clearFn()
@ -347,7 +346,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
default:
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax); err != nil {
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
}

View File

@ -123,10 +123,10 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
}{
{[]Msg{},
"Invalid input length == 0"},
{[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}},
{[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}},
"Invalid input length == 3"},
{[]Msg{&mockMsg{}},
"Invalid input length == 1 but input message is not insertMsg"},
"Invalid input length == 1 but input message is not flowGraphMsg"},
}
for _, test := range invalidInTests {
@ -181,12 +181,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
collectionID: UniqueID(1),
}
inMsg := genInsertMsg(insertChannelName)
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{iMsg})
inMsg := genFlowGraphMsg(insertChannelName)
var fgMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{fgMsg})
}
func genInsertMsg(insertChannelName string) insertMsg {
func genFlowGraphMsg(insertChannelName string) flowGraphMsg {
timeRange := TimeRange{
timestampMin: 0,
@ -201,7 +201,7 @@ func genInsertMsg(insertChannelName string) insertMsg {
},
}
var iMsg = &insertMsg{
var iMsg = &flowGraphMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: timeRange.timestampMin,
@ -391,7 +391,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
// Auto flush number of rows set to 2
inMsg := genInsertMsg("datanode-03-test-autoflush")
inMsg := genFlowGraphMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
var iMsg flowgraph.Msg = &inMsg
@ -667,7 +667,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache())
require.NoError(t, err)
inMsg := genInsertMsg(insertChannelName)
inMsg := genFlowGraphMsg(insertChannelName)
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid
err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{})

View File

@ -25,20 +25,20 @@ type (
MsgStreamMsg = flowgraph.MsgStreamMsg
)
type insertMsg struct {
type flowGraphMsg struct {
insertMessages []*msgstream.InsertMsg
timeRange TimeRange
startPositions []*internalpb.MsgPosition
endPositions []*internalpb.MsgPosition
}
func (fgMsg *flowGraphMsg) TimeTick() Timestamp {
return fgMsg.timeRange.timestampMax
}
type flushMsg struct {
msgID UniqueID
timestamp Timestamp
segmentID UniqueID
collectionID UniqueID
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}

View File

@ -29,9 +29,8 @@ func TestInsertMsg_TimeTick(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
im := &insertMsg{timeRange: TimeRange{timestampMax: test.timeTimestanpMax}}
assert.Equal(t, test.timeTimestanpMax, im.TimeTick())
fgMsg := &flowGraphMsg{timeRange: TimeRange{timestampMax: test.timeTimestanpMax}}
assert.Equal(t, test.timeTimestanpMax, fgMsg.TimeTick())
})
}