mirror of https://github.com/milvus-io/milvus.git
parent
a1e243b2d0
commit
57383c9f76
|
@ -60,7 +60,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
|
|||
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
|
||||
var filterDmNode node = newFilteredDmNode(streamingReplica, collectionID)
|
||||
var insertNode node = newInsertNode(streamingReplica)
|
||||
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel, factory)
|
||||
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
q.flowGraph.AddNode(filterDmNode)
|
||||
|
@ -127,7 +127,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
|
||||
var filterDeleteNode node = newFilteredDeleteNode(historicalReplica, collectionID)
|
||||
var deleteNode node = newDeleteNode(historicalReplica)
|
||||
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel, factory)
|
||||
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
q.flowGraph.AddNode(filterDeleteNode)
|
||||
|
|
|
@ -23,17 +23,15 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
// serviceTimeNode is one of the nodes in delta flow graph
|
||||
type serviceTimeNode struct {
|
||||
baseNode
|
||||
collectionID UniqueID
|
||||
vChannel Channel
|
||||
tSafeReplica TSafeReplicaInterface
|
||||
timeTickMsgStream msgstream.MsgStream
|
||||
collectionID UniqueID
|
||||
vChannel Channel
|
||||
tSafeReplica TSafeReplicaInterface
|
||||
}
|
||||
|
||||
// Name returns the name of serviceTimeNode
|
||||
|
@ -43,8 +41,6 @@ func (stNode *serviceTimeNode) Name() string {
|
|||
|
||||
// Close would close serviceTimeNode
|
||||
func (stNode *serviceTimeNode) Close() {
|
||||
// `Close` needs to be invoked to close producers
|
||||
stNode.timeTickMsgStream.Close()
|
||||
}
|
||||
|
||||
// Operate handles input messages, to execute insert operations
|
||||
|
@ -83,40 +79,14 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
// zap.Any("channel", stNode.vChannel),
|
||||
//)
|
||||
|
||||
//if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil {
|
||||
// log.Warn("Error: send time tick into pulsar channel failed", zap.Error(err))
|
||||
//}
|
||||
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
//func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
|
||||
// msgPack := msgstream.MsgPack{}
|
||||
// timeTickMsg := msgstream.TimeTickMsg{
|
||||
// BaseMsg: msgstream.BaseMsg{
|
||||
// BeginTimestamp: ts,
|
||||
// EndTimestamp: ts,
|
||||
// HashValues: []uint32{0},
|
||||
// },
|
||||
// TimeTickMsg: internalpb.TimeTickMsg{
|
||||
// Base: &commonpb.MsgBase{
|
||||
// MsgType: commonpb.MsgType_TimeTick,
|
||||
// MsgID: 0,
|
||||
// Timestamp: ts,
|
||||
// SourceID: Params.QueryNodeID,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
// msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||
// return stNode.timeTickMsgStream.Produce(&msgPack)
|
||||
//}
|
||||
|
||||
// newServiceTimeNode returns a new serviceTimeNode
|
||||
func newServiceTimeNode(ctx context.Context,
|
||||
tSafeReplica TSafeReplicaInterface,
|
||||
collectionID UniqueID,
|
||||
channel Channel,
|
||||
factory msgstream.Factory) *serviceTimeNode {
|
||||
channel Channel) *serviceTimeNode {
|
||||
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
@ -125,21 +95,10 @@ func newServiceTimeNode(ctx context.Context,
|
|||
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
timeTimeMsgStream, err := factory.NewMsgStream(ctx)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
} else {
|
||||
// TODO: use param table
|
||||
timeTickChannel := "query-node-time-tick-0"
|
||||
timeTimeMsgStream.AsProducer([]string{timeTickChannel})
|
||||
log.Debug("QueryNode serviceTimeNode AsProducer succeed", zap.String("channel name", timeTickChannel))
|
||||
}
|
||||
|
||||
return &serviceTimeNode{
|
||||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
vChannel: channel,
|
||||
tSafeReplica: tSafeReplica,
|
||||
timeTickMsgStream: timeTimeMsgStream,
|
||||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
vChannel: channel,
|
||||
tSafeReplica: tSafeReplica,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,6 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
|
@ -33,14 +31,10 @@ func TestServiceTimeNode_Operate(t *testing.T) {
|
|||
tSafe := newTSafeReplica()
|
||||
tSafe.addTSafe(defaultDMLChannel)
|
||||
|
||||
fac, err := genFactory()
|
||||
assert.NoError(t, err)
|
||||
|
||||
node := newServiceTimeNode(ctx,
|
||||
tSafe,
|
||||
defaultCollectionID,
|
||||
defaultDMLChannel,
|
||||
fac)
|
||||
defaultDMLChannel)
|
||||
return node
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue