Normalize dmInputNode log (#9652)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/9703/head
congqixia 2021-10-11 21:54:33 +08:00 committed by GitHub
parent f8054a409e
commit 80ac456cf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 5 additions and 3 deletions

View File

@ -13,13 +13,14 @@ package datanode
import (
"context"
"strconv"
"fmt"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/zap"
)
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
@ -30,7 +31,8 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu
maxParallelism := Params.FlowGraphMaxParallelism
// subName should be unique, since pchannelName is shared among several collections
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
// consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
consumeSubName := fmt.Sprintf("%s-%d", Params.MsgChannelSubName, collID)
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
@ -40,7 +42,7 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
pchannelName := rootcoord.ToPhysicalChannel(chanName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)
log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName))
if seekPos != nil {
seekPos.ChannelName = pchannelName