Fix datanode restart bug (#5976) (#6004)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/6013/head^2
XuanYang-cn 2021-06-23 12:26:10 +08:00 committed by GitHub
parent a529410b38
commit b640627a38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 4 deletions

View File

@ -20,18 +20,20 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func newDmInputNode(ctx context.Context, factory msgstream.Factory, vchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName
insertStream, _ := factory.NewTtMsgStream(ctx)
insertStream.AsConsumer([]string{vchannelName}, consumeSubName)
log.Debug("datanode AsConsumer: " + vchannelName + " : " + consumeSubName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)
if seekPos != nil {
// ChannelName in seek position is virtual channel name.
seekPos.ChannelName = pchannelName
log.Debug("datanode Seek: " + seekPos.GetChannelName())
insertStream.Seek([]*internalpb.MsgPosition{seekPos})
log.Debug("datanode Seek: " + vchannelName)
}
var stream msgstream.MsgStream = insertStream