2021-12-28 13:55:07 +00:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 05:42:47 +00:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-12-28 13:55:07 +00:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 05:42:47 +00:00
|
|
|
//
|
2021-12-28 13:55:07 +00:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 05:42:47 +00:00
|
|
|
|
2020-11-09 08:27:11 +00:00
|
|
|
package flowgraph
|
|
|
|
|
|
|
|
import (
|
2023-01-12 08:09:39 +00:00
|
|
|
"context"
|
2022-11-07 02:15:02 +00:00
|
|
|
"fmt"
|
2023-12-06 17:00:37 +00:00
|
|
|
"time"
|
2022-10-10 14:15:22 +00:00
|
|
|
|
2023-01-12 08:09:39 +00:00
|
|
|
"go.opentelemetry.io/otel"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2023-09-21 01:45:27 +00:00
|
|
|
"go.uber.org/atomic"
|
|
|
|
"go.uber.org/zap"
|
2022-11-07 02:15:02 +00:00
|
|
|
|
2023-12-06 17:00:37 +00:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
2023-04-06 11:14:32 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
2023-12-06 17:00:37 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
2023-09-21 01:45:27 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
2020-11-09 08:27:11 +00:00
|
|
|
)
|
|
|
|
|
2023-08-21 03:16:20 +00:00
|
|
|
const (
|
|
|
|
CloseGracefully bool = true
|
|
|
|
CloseImmediately bool = false
|
|
|
|
)
|
|
|
|
|
2021-09-29 12:19:59 +00:00
|
|
|
// InputNode is the entry point of flowgragh
|
2020-11-09 08:27:11 +00:00
|
|
|
type InputNode struct {
|
|
|
|
BaseNode
|
2024-05-24 20:43:41 +00:00
|
|
|
input <-chan *msgstream.MsgPack
|
|
|
|
lastMsg *msgstream.MsgPack
|
|
|
|
name string
|
|
|
|
role string
|
|
|
|
nodeID int64
|
|
|
|
nodeIDStr string
|
|
|
|
collectionID int64
|
|
|
|
collectionIDStr string
|
|
|
|
dataType string
|
2023-08-21 03:16:20 +00:00
|
|
|
|
|
|
|
closeGracefully *atomic.Bool
|
2023-12-06 17:00:37 +00:00
|
|
|
|
|
|
|
skipMode bool
|
|
|
|
skipCount int
|
|
|
|
lastNotTimetickTime time.Time
|
2020-11-09 08:27:11 +00:00
|
|
|
}
|
|
|
|
|
2021-09-29 12:19:59 +00:00
|
|
|
// IsInputNode returns whether Node is InputNode
|
2020-11-09 08:27:11 +00:00
|
|
|
func (inNode *InputNode) IsInputNode() bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2023-01-06 06:49:36 +00:00
|
|
|
func (inNode *InputNode) IsValidInMsg(in []Msg) bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2021-09-29 12:19:59 +00:00
|
|
|
// Name returns node name
|
2020-11-09 08:27:11 +00:00
|
|
|
func (inNode *InputNode) Name() string {
|
|
|
|
return inNode.name
|
|
|
|
}
|
|
|
|
|
2023-08-21 03:16:20 +00:00
|
|
|
func (inNode *InputNode) SetCloseMethod(gracefully bool) {
|
|
|
|
inNode.closeGracefully.Store(gracefully)
|
|
|
|
log.Info("input node close method set",
|
|
|
|
zap.String("node", inNode.Name()),
|
|
|
|
zap.Int64("collection", inNode.collectionID),
|
2024-01-05 08:12:48 +00:00
|
|
|
zap.Bool("gracefully", gracefully))
|
2023-08-21 03:16:20 +00:00
|
|
|
}
|
|
|
|
|
2021-10-04 14:38:02 +00:00
|
|
|
// Operate consume a message pack from msgstream and return
|
2021-03-25 06:41:46 +00:00
|
|
|
func (inNode *InputNode) Operate(in []Msg) []Msg {
|
2023-02-13 08:38:33 +00:00
|
|
|
msgPack, ok := <-inNode.input
|
2022-10-10 14:15:22 +00:00
|
|
|
if !ok {
|
2023-08-21 03:16:20 +00:00
|
|
|
log := log.With(
|
|
|
|
zap.String("node", inNode.Name()),
|
|
|
|
zap.Int64("collection", inNode.collectionID),
|
|
|
|
)
|
|
|
|
log.Info("input node message stream closed",
|
|
|
|
zap.Bool("closeGracefully", inNode.closeGracefully.Load()),
|
|
|
|
)
|
|
|
|
if inNode.lastMsg != nil && inNode.closeGracefully.Load() {
|
|
|
|
log.Info("input node trigger force sync",
|
2023-07-28 02:11:08 +00:00
|
|
|
zap.Any("position", inNode.lastMsg.EndPositions))
|
2023-01-06 06:49:36 +00:00
|
|
|
return []Msg{&MsgStreamMsg{
|
|
|
|
BaseMsg: NewBaseMsg(true),
|
|
|
|
tsMessages: []msgstream.TsMsg{},
|
|
|
|
timestampMin: inNode.lastMsg.BeginTs,
|
|
|
|
timestampMax: inNode.lastMsg.EndTs,
|
|
|
|
startPositions: inNode.lastMsg.StartPositions,
|
|
|
|
endPositions: inNode.lastMsg.EndPositions,
|
|
|
|
}}
|
|
|
|
}
|
2022-09-09 02:00:36 +00:00
|
|
|
return []Msg{&MsgStreamMsg{
|
2023-01-06 06:49:36 +00:00
|
|
|
BaseMsg: NewBaseMsg(true),
|
2022-09-09 02:00:36 +00:00
|
|
|
}}
|
|
|
|
}
|
2022-10-10 14:15:22 +00:00
|
|
|
|
|
|
|
// TODO: add status
|
|
|
|
if msgPack == nil {
|
|
|
|
return []Msg{}
|
|
|
|
}
|
2022-11-07 02:15:02 +00:00
|
|
|
|
2023-01-06 06:49:36 +00:00
|
|
|
inNode.lastMsg = msgPack
|
2022-11-07 02:15:02 +00:00
|
|
|
sub := tsoutil.SubByNow(msgPack.EndTs)
|
|
|
|
if inNode.role == typeutil.DataNodeRole {
|
|
|
|
metrics.DataNodeConsumeMsgCount.
|
2024-05-24 20:43:41 +00:00
|
|
|
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
|
2022-11-07 02:15:02 +00:00
|
|
|
Inc()
|
|
|
|
|
|
|
|
metrics.DataNodeConsumeTimeTickLag.
|
2024-05-24 20:43:41 +00:00
|
|
|
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
|
2022-11-07 02:15:02 +00:00
|
|
|
Set(float64(sub))
|
|
|
|
}
|
|
|
|
|
2023-01-12 08:09:39 +00:00
|
|
|
var spans []trace.Span
|
2023-12-06 17:00:37 +00:00
|
|
|
defer func() {
|
|
|
|
for _, span := range spans {
|
|
|
|
span.End()
|
|
|
|
}
|
|
|
|
}()
|
2022-10-10 14:15:22 +00:00
|
|
|
for _, msg := range msgPack.Msgs {
|
2023-01-12 08:09:39 +00:00
|
|
|
ctx := msg.TraceCtx()
|
|
|
|
if ctx == nil {
|
|
|
|
ctx = context.Background()
|
|
|
|
}
|
|
|
|
ctx, sp := otel.Tracer(inNode.role).Start(ctx, "Operate")
|
|
|
|
sp.AddEvent("input_node name" + inNode.Name())
|
2022-10-10 14:15:22 +00:00
|
|
|
spans = append(spans, sp)
|
|
|
|
msg.SetTraceCtx(ctx)
|
|
|
|
}
|
|
|
|
|
2023-12-06 17:00:37 +00:00
|
|
|
// skip timetick message feature
|
|
|
|
if inNode.role == typeutil.DataNodeRole &&
|
|
|
|
len(msgPack.Msgs) > 0 &&
|
|
|
|
paramtable.Get().DataNodeCfg.FlowGraphSkipModeEnable.GetAsBool() {
|
|
|
|
if msgPack.Msgs[0].Type() == commonpb.MsgType_TimeTick {
|
|
|
|
if inNode.skipMode {
|
|
|
|
// if empty timetick message and in skipMode, will skip some of the timetick messages to reduce downstream work
|
|
|
|
if inNode.skipCount == paramtable.Get().DataNodeCfg.FlowGraphSkipModeSkipNum.GetAsInt() {
|
|
|
|
inNode.skipCount = 0
|
|
|
|
} else {
|
|
|
|
inNode.skipCount = inNode.skipCount + 1
|
|
|
|
return []Msg{}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
cd := paramtable.Get().DataNodeCfg.FlowGraphSkipModeColdTime.GetAsInt()
|
|
|
|
if time.Since(inNode.lastNotTimetickTime) > time.Second*time.Duration(cd) {
|
|
|
|
inNode.skipMode = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// if non empty message, refresh the lastNotTimetickTime and close skip mode
|
|
|
|
inNode.skipMode = false
|
|
|
|
inNode.skipCount = 0
|
|
|
|
inNode.lastNotTimetickTime = time.Now()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-10 14:15:22 +00:00
|
|
|
var msgStreamMsg Msg = &MsgStreamMsg{
|
|
|
|
tsMessages: msgPack.Msgs,
|
|
|
|
timestampMin: msgPack.BeginTs,
|
|
|
|
timestampMax: msgPack.EndTs,
|
|
|
|
startPositions: msgPack.StartPositions,
|
|
|
|
endPositions: msgPack.EndPositions,
|
|
|
|
}
|
|
|
|
|
|
|
|
return []Msg{msgStreamMsg}
|
2020-11-09 08:27:11 +00:00
|
|
|
}
|
|
|
|
|
2023-02-13 08:38:33 +00:00
|
|
|
// NewInputNode composes an InputNode with provided input channel, name and parameters
|
|
|
|
func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLength int32, maxParallelism int32, role string, nodeID int64, collectionID int64, dataType string) *InputNode {
|
2020-11-09 08:27:11 +00:00
|
|
|
baseNode := BaseNode{}
|
2020-11-18 09:32:52 +00:00
|
|
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
|
|
|
baseNode.SetMaxParallelism(maxParallelism)
|
2020-11-09 08:27:11 +00:00
|
|
|
|
|
|
|
return &InputNode{
|
2023-12-06 17:00:37 +00:00
|
|
|
BaseNode: baseNode,
|
|
|
|
input: input,
|
|
|
|
name: nodeName,
|
|
|
|
role: role,
|
|
|
|
nodeID: nodeID,
|
2024-05-24 20:43:41 +00:00
|
|
|
nodeIDStr: fmt.Sprint(nodeID),
|
2023-12-06 17:00:37 +00:00
|
|
|
collectionID: collectionID,
|
2024-05-24 20:43:41 +00:00
|
|
|
collectionIDStr: fmt.Sprint(collectionID),
|
2023-12-06 17:00:37 +00:00
|
|
|
dataType: dataType,
|
|
|
|
closeGracefully: atomic.NewBool(CloseImmediately),
|
|
|
|
skipCount: 0,
|
|
|
|
lastNotTimetickTime: time.Now(),
|
2020-11-09 08:27:11 +00:00
|
|
|
}
|
|
|
|
}
|