mirror of https://github.com/milvus-io/milvus.git
feat: skip some empty ttMsg in Datanode flowgraph (#28756)
/kind feature Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/29029/head^2
parent
fb089cda8b
commit
6736f65345
|
@ -433,6 +433,12 @@ dataNode:
|
|||
maxQueueLength: 16 # Maximum length of task queue in flowgraph
|
||||
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
|
||||
maxParallelSyncTaskNum: 6 # Maximum number of sync tasks executed in parallel in each flush manager
|
||||
skipMode:
|
||||
# when there are only timetick msg in flowgraph for a while (longer than coldTime),
|
||||
# flowGraph will turn on skip mode to skip most timeticks to reduce cost, especially there are a lot of channels
|
||||
enable: true
|
||||
skipNum: 4
|
||||
coldTime: 60
|
||||
segment:
|
||||
insertBufSize: 16777216 # Max buffer size to flush for a single segment.
|
||||
deleteBufBytes: 67108864 # Max buffer size to flush del for a single channel
|
||||
|
|
|
@ -19,15 +19,18 @@ package flowgraph
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -49,6 +52,10 @@ type InputNode struct {
|
|||
dataType string
|
||||
|
||||
closeGracefully *atomic.Bool
|
||||
|
||||
skipMode bool
|
||||
skipCount int
|
||||
lastNotTimetickTime time.Time
|
||||
}
|
||||
|
||||
// IsInputNode returns whether Node is InputNode
|
||||
|
@ -129,6 +136,11 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
var spans []trace.Span
|
||||
defer func() {
|
||||
for _, span := range spans {
|
||||
span.End()
|
||||
}
|
||||
}()
|
||||
for _, msg := range msgPack.Msgs {
|
||||
ctx := msg.TraceCtx()
|
||||
if ctx == nil {
|
||||
|
@ -140,6 +152,33 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||
msg.SetTraceCtx(ctx)
|
||||
}
|
||||
|
||||
// skip timetick message feature
|
||||
if inNode.role == typeutil.DataNodeRole &&
|
||||
len(msgPack.Msgs) > 0 &&
|
||||
paramtable.Get().DataNodeCfg.FlowGraphSkipModeEnable.GetAsBool() {
|
||||
if msgPack.Msgs[0].Type() == commonpb.MsgType_TimeTick {
|
||||
if inNode.skipMode {
|
||||
// if empty timetick message and in skipMode, will skip some of the timetick messages to reduce downstream work
|
||||
if inNode.skipCount == paramtable.Get().DataNodeCfg.FlowGraphSkipModeSkipNum.GetAsInt() {
|
||||
inNode.skipCount = 0
|
||||
} else {
|
||||
inNode.skipCount = inNode.skipCount + 1
|
||||
return []Msg{}
|
||||
}
|
||||
} else {
|
||||
cd := paramtable.Get().DataNodeCfg.FlowGraphSkipModeColdTime.GetAsInt()
|
||||
if time.Since(inNode.lastNotTimetickTime) > time.Second*time.Duration(cd) {
|
||||
inNode.skipMode = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if non empty message, refresh the lastNotTimetickTime and close skip mode
|
||||
inNode.skipMode = false
|
||||
inNode.skipCount = 0
|
||||
inNode.lastNotTimetickTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
var msgStreamMsg Msg = &MsgStreamMsg{
|
||||
tsMessages: msgPack.Msgs,
|
||||
timestampMin: msgPack.BeginTs,
|
||||
|
@ -148,10 +187,6 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||
endPositions: msgPack.EndPositions,
|
||||
}
|
||||
|
||||
for _, span := range spans {
|
||||
span.End()
|
||||
}
|
||||
|
||||
return []Msg{msgStreamMsg}
|
||||
}
|
||||
|
||||
|
@ -162,13 +197,15 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng
|
|||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &InputNode{
|
||||
BaseNode: baseNode,
|
||||
input: input,
|
||||
name: nodeName,
|
||||
role: role,
|
||||
nodeID: nodeID,
|
||||
collectionID: collectionID,
|
||||
dataType: dataType,
|
||||
closeGracefully: atomic.NewBool(CloseImmediately),
|
||||
BaseNode: baseNode,
|
||||
input: input,
|
||||
name: nodeName,
|
||||
role: role,
|
||||
nodeID: nodeID,
|
||||
collectionID: collectionID,
|
||||
dataType: dataType,
|
||||
closeGracefully: atomic.NewBool(CloseImmediately),
|
||||
skipCount: 0,
|
||||
lastNotTimetickTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,12 +18,18 @@ package flowgraph
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func TestInputNode(t *testing.T) {
|
||||
|
@ -66,3 +72,82 @@ func Test_NewInputNode(t *testing.T) {
|
|||
assert.Equal(t, node.maxQueueLength, maxQueueLength)
|
||||
assert.Equal(t, node.maxParallelism, maxParallelism)
|
||||
}
|
||||
|
||||
func Test_InputNodeSkipMode(t *testing.T) {
|
||||
t.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/Test_InputNodeSkipMode")
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlowGraphSkipModeColdTime.Key, "3")
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlowGraphSkipModeSkipNum.Key, "1")
|
||||
|
||||
msgStream, _ := factory.NewMsgStream(context.TODO())
|
||||
channels := []string{"cc" + fmt.Sprint(rand.Int())}
|
||||
msgStream.AsConsumer(context.Background(), channels, "sub", mqwrapper.SubscriptionPositionEarliest)
|
||||
|
||||
produceStream, _ := factory.NewMsgStream(context.TODO())
|
||||
produceStream.AsProducer(channels)
|
||||
closeCh := make(chan struct{})
|
||||
outputCh := make(chan bool)
|
||||
|
||||
nodeName := "input_node"
|
||||
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
|
||||
defer inputNode.Close()
|
||||
|
||||
outputCount := 0
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-closeCh:
|
||||
return
|
||||
default:
|
||||
output := inputNode.Operate(nil)
|
||||
if len(output) > 0 {
|
||||
outputCount = outputCount + 1
|
||||
}
|
||||
outputCh <- true
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(closeCh)
|
||||
|
||||
msgPack := generateMsgPack()
|
||||
produceStream.Produce(&msgPack)
|
||||
log.Info("produce empty ttmsg")
|
||||
<-outputCh
|
||||
assert.Equal(t, 1, outputCount)
|
||||
assert.Equal(t, false, inputNode.skipMode)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
assert.Equal(t, false, inputNode.skipMode)
|
||||
produceStream.Produce(&msgPack)
|
||||
log.Info("after 3 seconds with no active msg receive, input node will turn on skip mode")
|
||||
<-outputCh
|
||||
assert.Equal(t, 2, outputCount)
|
||||
assert.Equal(t, true, inputNode.skipMode)
|
||||
|
||||
log.Info("some ttmsg will be skipped in skip mode")
|
||||
// this msg will be skipped
|
||||
produceStream.Produce(&msgPack)
|
||||
<-outputCh
|
||||
assert.Equal(t, 2, outputCount)
|
||||
assert.Equal(t, true, inputNode.skipMode)
|
||||
|
||||
// this msg will be consumed
|
||||
produceStream.Produce(&msgPack)
|
||||
<-outputCh
|
||||
assert.Equal(t, 3, outputCount)
|
||||
assert.Equal(t, true, inputNode.skipMode)
|
||||
|
||||
//log.Info("non empty msg will awake input node, turn off skip mode")
|
||||
//insertMsgPack := generateInsertMsgPack()
|
||||
//produceStream.Produce(&insertMsgPack)
|
||||
//<-outputCh
|
||||
//assert.Equal(t, 3, outputCount)
|
||||
//assert.Equal(t, false, inputNode.skipMode)
|
||||
//
|
||||
//log.Info("empty msg will be consumed in not-skip mode")
|
||||
//produceStream.Produce(&msgPack)
|
||||
//<-outputCh
|
||||
//assert.Equal(t, 4, outputCount)
|
||||
//assert.Equal(t, false, inputNode.skipMode)
|
||||
//close(closeCh)
|
||||
}
|
||||
|
|
|
@ -56,6 +56,22 @@ func generateMsgPack() msgstream.MsgPack {
|
|||
return msgPack
|
||||
}
|
||||
|
||||
func generateInsertMsgPack() msgstream.MsgPack {
|
||||
msgPack := msgstream.MsgPack{}
|
||||
insertMsg := &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: uint64(time.Now().Unix()),
|
||||
EndTimestamp: uint64(time.Now().Unix() + 1),
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert},
|
||||
},
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
|
||||
return msgPack
|
||||
}
|
||||
|
||||
func TestNodeManager_Start(t *testing.T) {
|
||||
t.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart")
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
|
|
|
@ -2559,6 +2559,11 @@ type dataNodeConfig struct {
|
|||
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
|
||||
MaxParallelSyncTaskNum ParamItem `refreshable:"false"`
|
||||
|
||||
// skip mode
|
||||
FlowGraphSkipModeEnable ParamItem `refreshable:"true"`
|
||||
FlowGraphSkipModeSkipNum ParamItem `refreshable:"true"`
|
||||
FlowGraphSkipModeColdTime ParamItem `refreshable:"true"`
|
||||
|
||||
// segment
|
||||
FlushInsertBufferSize ParamItem `refreshable:"true"`
|
||||
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
|
||||
|
@ -2612,6 +2617,36 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
|||
}
|
||||
p.FlowGraphMaxParallelism.Init(base.mgr)
|
||||
|
||||
p.FlowGraphSkipModeEnable = ParamItem{
|
||||
Key: "datanode.dataSync.skipMode.enable",
|
||||
Version: "2.3.4",
|
||||
DefaultValue: "true",
|
||||
PanicIfEmpty: false,
|
||||
Doc: "Support skip some timetick message to reduce CPU usage",
|
||||
Export: true,
|
||||
}
|
||||
p.FlowGraphSkipModeEnable.Init(base.mgr)
|
||||
|
||||
p.FlowGraphSkipModeSkipNum = ParamItem{
|
||||
Key: "datanode.dataSync.skipMode.skipNum",
|
||||
Version: "2.3.4",
|
||||
DefaultValue: "5",
|
||||
PanicIfEmpty: false,
|
||||
Doc: "Consume one for every n records skipped",
|
||||
Export: true,
|
||||
}
|
||||
p.FlowGraphSkipModeSkipNum.Init(base.mgr)
|
||||
|
||||
p.FlowGraphSkipModeColdTime = ParamItem{
|
||||
Key: "datanode.dataSync.skipMode.coldTime",
|
||||
Version: "2.3.4",
|
||||
DefaultValue: "60",
|
||||
PanicIfEmpty: false,
|
||||
Doc: "Turn on skip mode after there are only timetick msg for x seconds",
|
||||
Export: true,
|
||||
}
|
||||
p.FlowGraphSkipModeColdTime.Init(base.mgr)
|
||||
|
||||
p.MaxParallelSyncTaskNum = ParamItem{
|
||||
Key: "dataNode.dataSync.maxParallelSyncTaskNum",
|
||||
Version: "2.3.0",
|
||||
|
|
|
@ -372,6 +372,15 @@ func TestComponentParam(t *testing.T) {
|
|||
maxParallelism := Params.FlowGraphMaxParallelism.GetAsInt()
|
||||
t.Logf("flowGraphMaxParallelism: %d", maxParallelism)
|
||||
|
||||
flowGraphSkipModeEnable := Params.FlowGraphSkipModeEnable.GetAsBool()
|
||||
t.Logf("flowGraphSkipModeEnable: %t", flowGraphSkipModeEnable)
|
||||
|
||||
flowGraphSkipModeSkipNum := Params.FlowGraphSkipModeSkipNum.GetAsInt()
|
||||
t.Logf("flowGraphSkipModeSkipNum: %d", flowGraphSkipModeSkipNum)
|
||||
|
||||
flowGraphSkipModeColdTime := Params.FlowGraphSkipModeColdTime.GetAsInt()
|
||||
t.Logf("flowGraphSkipModeColdTime: %d", flowGraphSkipModeColdTime)
|
||||
|
||||
maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt()
|
||||
t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum)
|
||||
|
||||
|
|
Loading…
Reference in New Issue