Stop listen to dd channel in query node

Signed-off-by: ophunter233 <chengming.li@zilliz.com>
pull/4973/head^2
ophunter233 2021-03-03 16:06:02 +08:00 committed by yefu.chen
parent 07339ad2b8
commit 4a921ffd7e
4 changed files with 6 additions and 94 deletions

View File

@ -13,7 +13,6 @@ type dataSyncService struct {
fg *flowgraph.TimeTickedFlowGraph
dmStream msgstream.MsgStream
ddStream msgstream.MsgStream
msFactory msgstream.Factory
replica collectionReplica
@ -47,24 +46,18 @@ func (dsService *dataSyncService) initNodes() {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode node = dsService.newDmInputNode(dsService.ctx)
var ddStreamNode node = dsService.newDDInputNode(dsService.ctx)
var filterDmNode node = newFilteredDmNode(dsService.replica)
var ddNode node = newDDNode(dsService.replica)
var insertNode node = newInsertNode(dsService.replica)
var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica, dsService.msFactory)
var gcNode node = newGCNode(dsService.replica)
dsService.fg.AddNode(dmStreamNode)
dsService.fg.AddNode(ddStreamNode)
dsService.fg.AddNode(filterDmNode)
dsService.fg.AddNode(ddNode)
dsService.fg.AddNode(insertNode)
dsService.fg.AddNode(serviceTimeNode)
dsService.fg.AddNode(gcNode)
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
@ -75,33 +68,15 @@ func (dsService *dataSyncService) initNodes() {
log.Fatal("set edges failed in node:", dmStreamNode.Name())
}
// ddStreamNode
err = dsService.fg.SetEdges(ddStreamNode.Name(),
[]string{},
[]string{ddNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", ddStreamNode.Name())
}
// filterDmNode
err = dsService.fg.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name(), ddNode.Name()},
[]string{dmStreamNode.Name()},
[]string{insertNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", filterDmNode.Name())
}
// ddNode
err = dsService.fg.SetEdges(ddNode.Name(),
[]string{ddStreamNode.Name()},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", ddNode.Name())
}
// insertNode
err = dsService.fg.SetEdges(insertNode.Name(),
[]string{filterDmNode.Name()},
@ -114,17 +89,9 @@ func (dsService *dataSyncService) initNodes() {
// serviceTimeNode
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{gcNode.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
// gcNode
err = dsService.fg.SetEdges(gcNode.Name(),
[]string{serviceTimeNode.Name()},
[]string{})
if err != nil {
log.Fatal("set edges failed in node:", gcNode.Name())
}
}

View File

@ -109,7 +109,6 @@ func TestDataSyncService_Start(t *testing.T) {
// pulsar produce
const receiveBufSize = 1024
insertChannels := Params.InsertChannelNames
ddChannels := Params.DDChannelNames
pulsarURL := Params.PulsarAddress
msFactory := pulsarms.NewFactory()
@ -123,22 +122,14 @@ func TestDataSyncService_Start(t *testing.T) {
insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
insertStream.AsProducer(insertChannels)
ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
ddStream.AsProducer(ddChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(ctx, &msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory)

View File

@ -3,7 +3,6 @@ package querynode
import (
"context"
"log"
"math"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -11,7 +10,6 @@ import (
type filterDmNode struct {
baseNode
ddMsg *ddMsg
replica collectionReplica
}
@ -22,7 +20,7 @@ func (fdmNode *filterDmNode) Name() string {
func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Context) {
//fmt.Println("Do filterDmNode operation")
if len(in) != 2 {
if len(in) != 1 {
log.Println("Invalid operate message input in filterDmNode, input length = ", len(in))
// TODO: add error handling
}
@ -33,13 +31,6 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont
// TODO: add error handling
}
ddMsg, ok := in[1].(*ddMsg)
if !ok {
log.Println("type assertion failed for ddMsg")
// TODO: add error handling
}
fdmNode.ddMsg = ddMsg
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
@ -61,7 +52,6 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont
}
}
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
return []Msg{res}, ctx
@ -76,12 +66,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
return nil
}
// No dd record, do all insert requests.
records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionID]
if !ok {
return msg
}
// TODO: If the last record is drop type, all insert requests are invalid.
//if !records[len(records)-1].createOrDrop {
// return nil
@ -98,24 +82,10 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
tmpRowIDs := make([]int64, 0)
tmpRowData := make([]*commonpb.Blob, 0)
// calculate valid time range
timeBegin := Timestamp(0)
timeEnd := Timestamp(math.MaxUint64)
for _, record := range records {
if record.createOrDrop && timeBegin < record.timestamp {
timeBegin = record.timestamp
}
if !record.createOrDrop && timeEnd > record.timestamp {
timeEnd = record.timestamp
}
}
for i, t := range msg.Timestamps {
if t >= timeBegin && t <= timeEnd {
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
}
tmpTimestamps = append(tmpTimestamps, t)
tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i])
tmpRowData = append(tmpRowData, msg.RowData[i])
}
if len(tmpRowIDs) <= 0 {

View File

@ -17,19 +17,3 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph
node := flowgraph.NewInputNode(&insertStream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}
func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := Params.DDChannelNames
consumeSubName := Params.MsgChannelSubName
ddStream, _ := dsService.msFactory.NewTtMsgStream(ctx)
ddStream.AsConsumer(consumeChannels, consumeSubName)
dsService.ddStream = ddStream
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&ddStream, "ddInputNode", maxQueueLength, maxParallelism)
return node
}