2021-10-15 10:07:09 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 07:16:33 +00:00
// with the License. You may obtain a copy of the License at
//
2021-10-15 10:07:09 +00:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 07:16:33 +00:00
//
2021-10-15 10:07:09 +00:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 07:16:33 +00:00
2021-01-19 03:37:16 +00:00
package datanode
import (
"context"
2021-10-11 13:54:33 +00:00
"fmt"
2021-10-15 12:31:16 +00:00
"time"
2021-01-19 03:37:16 +00:00
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/log"
2022-02-28 11:11:55 +00:00
"github.com/milvus-io/milvus/internal/metrics"
2022-10-25 05:23:30 +00:00
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
2021-06-07 05:58:37 +00:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/util/flowgraph"
2022-03-15 13:51:21 +00:00
"github.com/milvus-io/milvus/internal/util/funcutil"
2021-10-11 13:54:33 +00:00
"go.uber.org/zap"
2021-01-19 03:37:16 +00:00
)
2021-10-06 15:02:08 +00:00
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
// messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is
// flowgraph ddNode.
2021-10-13 03:16:32 +00:00
func newDmInputNode ( ctx context . Context , seekPos * internalpb . MsgPosition , dmNodeConfig * nodeConfig ) ( * flowgraph . InputNode , error ) {
2021-09-18 01:13:50 +00:00
// subName should be unique, since pchannelName is shared among several collections
2022-08-03 09:10:36 +00:00
// use vchannel in case of reuse pchannel for same collection
consumeSubName := fmt . Sprintf ( "%s-%d-%s" , Params . CommonCfg . DataNodeSubName , Params . DataNodeCfg . GetNodeID ( ) , dmNodeConfig . vChannelName )
2021-10-13 03:16:32 +00:00
insertStream , err := dmNodeConfig . msFactory . NewTtMsgStream ( ctx )
2021-09-23 10:31:55 +00:00
if err != nil {
return nil , err
}
2021-06-07 05:58:37 +00:00
2021-10-06 15:02:08 +00:00
// MsgStream needs a physical channel name, but the channel name in seek position from DataCoord
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
2022-03-15 13:51:21 +00:00
pchannelName := funcutil . ToPhysicalChannel ( dmNodeConfig . vChannelName )
2021-06-08 11:25:37 +00:00
if seekPos != nil {
2022-10-25 05:23:30 +00:00
insertStream . AsConsumer ( [ ] string { pchannelName } , consumeSubName , mqwrapper . SubscriptionPositionUnknown )
2021-06-23 04:26:10 +00:00
seekPos . ChannelName = pchannelName
2021-10-15 12:31:16 +00:00
start := time . Now ( )
2022-07-08 02:18:28 +00:00
log . Info ( "datanode begin to seek" , zap . ByteString ( "seek msgID" , seekPos . GetMsgID ( ) ) , zap . String ( "physical channel" , seekPos . GetChannelName ( ) ) , zap . Int64 ( "collection ID" , dmNodeConfig . collectionID ) )
2021-10-08 11:25:00 +00:00
err = insertStream . Seek ( [ ] * internalpb . MsgPosition { seekPos } )
if err != nil {
return nil , err
}
2022-07-08 02:18:28 +00:00
log . Info ( "datanode seek successfully" , zap . ByteString ( "seek msgID" , seekPos . GetMsgID ( ) ) , zap . String ( "physical channel" , seekPos . GetChannelName ( ) ) , zap . Int64 ( "collection ID" , dmNodeConfig . collectionID ) , zap . Duration ( "elapse" , time . Since ( start ) ) )
2022-10-25 05:23:30 +00:00
} else {
insertStream . AsConsumer ( [ ] string { pchannelName } , consumeSubName , mqwrapper . SubscriptionPositionEarliest )
2021-06-08 11:25:37 +00:00
}
2022-10-25 05:23:30 +00:00
metrics . DataNodeNumConsumers . WithLabelValues ( fmt . Sprint ( Params . DataNodeCfg . GetNodeID ( ) ) ) . Inc ( )
log . Info ( "datanode AsConsumer" , zap . String ( "physical channel" , pchannelName ) , zap . String ( "subName" , consumeSubName ) , zap . Int64 ( "collection ID" , dmNodeConfig . collectionID ) )
2021-01-19 03:37:16 +00:00
2022-04-27 02:01:47 +00:00
name := fmt . Sprintf ( "dmInputNode-data-%d-%s" , dmNodeConfig . collectionID , dmNodeConfig . vChannelName )
2021-12-30 02:33:46 +00:00
node := flowgraph . NewInputNode ( insertStream , name , dmNodeConfig . maxQueueLength , dmNodeConfig . maxParallelism )
2021-09-23 10:31:55 +00:00
return node , nil
2021-01-19 03:37:16 +00:00
}