enhance: [cherry-pick] optimize datanode cpu usage and correct the update logic of ttchecker (#34383)

This PR cherry-picks the following commits:
- Try to improve cpu usage by refactoring the ttchecker logic and
caching string. https://github.com/milvus-io/milvus/pull/33267
- Correct the update logic of timerecorder in the flowgraph to avoid
false failure: "some node(s) haven't received input".
https://github.com/milvus-io/milvus/pull/34339

issue: https://github.com/milvus-io/milvus/issues/33266,
https://github.com/milvus-io/milvus/issues/34337

pr: https://github.com/milvus-io/milvus/pull/33267,
https://github.com/milvus-io/milvus/pull/34339

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Co-authored-by: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com>
pull/34475/head
yihao.dai 2024-07-04 16:34:17 +08:00 committed by GitHub
parent 5831908aa2
commit a57c9e61fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 129 additions and 99 deletions

View File

@ -75,7 +75,7 @@ type ddNode struct {
// Name returns node name, implementing flowgraph.Node
func (ddn *ddNode) Name() string {
return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName)
return fmt.Sprintf("ddNode-%s", ddn.vChannelName)
}
func (ddn *ddNode) IsValidInMsg(in []Msg) bool {
@ -92,10 +92,9 @@ func (ddn *ddNode) IsValidInMsg(in []Msg) bool {
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
log := log.With(zap.String("channel", ddn.vChannelName))
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for MsgStreamMsg", zap.String("channel", ddn.vChannelName), zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}
@ -111,12 +110,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
endPositions: msMsg.EndPositions(),
dropCollection: false,
}
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID))
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.String("channel", ddn.vChannelName), zap.Int64("collection", ddn.collectionID))
return []Msg{&fgMsg}
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.RatedInfo(1.0, "ddNode in dropMode")
log.RatedInfo(1.0, "ddNode in dropMode", zap.String("channel", ddn.vChannelName))
return []Msg{}
}
@ -147,7 +146,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
log.Info("Receiving DropCollection msg")
log.Info("Receiving DropCollection msg", zap.String("channel", ddn.vChannelName))
ddn.dropMode.Store(true)
log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName))
@ -158,7 +157,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
case commonpb.MsgType_DropPartition:
dpMsg := msg.(*msgstream.DropPartitionMsg)
if dpMsg.GetCollectionID() == ddn.collectionID {
log.Info("drop partition msg received", zap.Int64("partitionID", dpMsg.GetPartitionID()))
log.Info("drop partition msg received", zap.String("channel", ddn.vChannelName), zap.Int64("partitionID", dpMsg.GetPartitionID()))
fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID)
}
@ -167,6 +166,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if imsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid insert message, collection mis-match",
zap.Int64("Get collID", imsg.CollectionID),
zap.String("channel", ddn.vChannelName),
zap.Int64("Expected collID", ddn.collectionID))
continue
}
@ -174,6 +174,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if ddn.tryToFilterSegmentInsertMessages(imsg) {
log.Debug("filter insert messages",
zap.Int64("filter segmentID", imsg.GetSegmentID()),
zap.String("channel", ddn.vChannelName),
zap.Uint64("message timestamp", msg.EndTs()),
)
continue
@ -195,6 +196,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
log.Debug("DDNode receive insert messages",
zap.Int64("segmentID", imsg.GetSegmentID()),
zap.String("channel", ddn.vChannelName),
zap.Int("numRows", len(imsg.GetRowIDs())))
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
@ -204,11 +206,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if dmsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid DeleteMsg, collection mis-match",
zap.Int64("Get collID", dmsg.CollectionID),
zap.String("channel", ddn.vChannelName),
zap.Int64("Expected collID", ddn.collectionID))
continue
}
log.Debug("DDNode receive delete messages", zap.Int64("numRows", dmsg.NumRows))
log.Debug("DDNode receive delete messages", zap.String("channel", ddn.vChannelName), zap.Int64("numRows", dmsg.NumRows))
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
metrics.DataNodeConsumeBytesCount.

View File

@ -82,7 +82,7 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, ddNode)
assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vChannelName), ddNode.Name())
assert.Equal(t, fmt.Sprintf("ddNode-%s", ddNode.vChannelName), ddNode.Name())
assert.Equal(t, len(test.inSealedSegs), len(ddNode.sealedSegInfo))
assert.Equal(t, len(test.inGrowingSegs), len(ddNode.growingSegInfo))

View File

@ -62,7 +62,7 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie
log.Info("datanode consume successfully when register to msgDispatcher")
}
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
name := fmt.Sprintf("dmInputNode-data-%s", dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(
input,
name,

View File

@ -2,6 +2,7 @@ package datanode
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
@ -26,6 +27,11 @@ type writeNode struct {
metacache metacache.MetaCache
}
// Name returns node name, implementing flowgraph.Node
func (wNode *writeNode) Name() string {
return fmt.Sprintf("writeNode-%s", wNode.channelName)
}
func (wNode *writeNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)

View File

@ -148,7 +148,6 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) {
m.mu.Lock()
defer m.mu.Unlock()
sizeBeforeClean := len(m.statsCache)
log := log.With(zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean))
for channelName, lastSentTs := range lastSentTss {
_, ok := m.statsCache[channelName]
if ok {
@ -162,7 +161,7 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) {
delete(m.statsCache, channelName)
}
}
log.RatedDebug(30, "timeTickSender stats", zap.Int("sizeAfterClean", len(m.statsCache)))
log.RatedDebug(30, "timeTickSender stats", zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean), zap.Int("sizeAfterClean", len(m.statsCache)))
}
func (m *timeTickSender) sendReport(ctx context.Context) error {

View File

@ -43,13 +43,15 @@ const (
// InputNode is the entry point of flowgragh
type InputNode struct {
BaseNode
input <-chan *msgstream.MsgPack
lastMsg *msgstream.MsgPack
name string
role string
nodeID int64
collectionID int64
dataType string
input <-chan *msgstream.MsgPack
lastMsg *msgstream.MsgPack
name string
role string
nodeID int64
nodeIDStr string
collectionID int64
collectionIDStr string
dataType string
closeGracefully *atomic.Bool
@ -117,11 +119,11 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
sub := tsoutil.SubByNow(msgPack.EndTs)
if inNode.role == typeutil.DataNodeRole {
metrics.DataNodeConsumeMsgCount.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
Inc()
metrics.DataNodeConsumeTimeTickLag.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
Set(float64(sub))
}
@ -192,7 +194,9 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng
name: nodeName,
role: role,
nodeID: nodeID,
nodeIDStr: fmt.Sprint(nodeID),
collectionID: collectionID,
collectionIDStr: fmt.Sprint(collectionID),
dataType: dataType,
closeGracefully: atomic.NewBool(CloseImmediately),
skipCount: 0,

View File

@ -75,27 +75,23 @@ func (nodeCtxManager *nodeCtxManager) Start() {
// in dmInputNode, message from mq to channel, alloc goroutines
// limit the goroutines in other node to prevent huge goroutines numbers
nodeCtxManager.closeWg.Add(1)
curNode := nodeCtxManager.inputNodeCtx
// tt checker start
if enableTtChecker {
manager := timerecord.GetCheckerManger("fgNode", nodeCtxTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
})
for curNode != nil {
name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())
curNode.checker = timerecord.NewChecker(name, manager)
curNode = curNode.downstream
}
}
go nodeCtxManager.workNodeStart()
}
func (nodeCtxManager *nodeCtxManager) workNodeStart() {
defer nodeCtxManager.closeWg.Done()
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
// tt checker start
var checker *timerecord.GroupChecker
if enableTtChecker {
checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
})
for curNode != nil {
name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())
checker.Check(name)
curNode = curNode.downstream
defer checker.Remove(name)
}
}
for {
select {
case <-nodeCtxManager.closeCh:
@ -105,7 +101,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
// 2. invoke node.Operate
// 3. deliver the Operate result to downstream nodes
default:
curNode = inputNode
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
for curNode != nil {
// inputs from inputsMessages for Operate
var input, output []Msg
@ -137,8 +134,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
if curNode.downstream != nil {
curNode.downstream.inputChannel <- output
}
if enableTtChecker {
checker.Check(fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name()))
if enableTtChecker && curNode.checker != nil {
curNode.checker.Check()
}
curNode = curNode.downstream
}
@ -157,6 +154,7 @@ type nodeCtx struct {
node Node
inputChannel chan []Msg
downstream *nodeCtx
checker *timerecord.Checker
blockMutex sync.RWMutex
}
@ -192,6 +190,9 @@ func (nodeCtx *nodeCtx) Close() {
if nodeCtx.node.IsInputNode() {
for nodeCtx != nil {
nodeCtx.node.Close()
if nodeCtx.checker != nil {
nodeCtx.checker.Close()
}
log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name()))
nodeCtx = nodeCtx.downstream
}

View File

@ -24,21 +24,20 @@ type Node interface {
Name() string
MaxQueueLength() int32
Operate(in Msg) Msg
Start()
Close()
}
type nodeCtx struct {
node Node
inputChannel chan Msg
next *nodeCtx
checker *timerecord.GroupChecker
InputChannel chan Msg
Next *nodeCtx
Checker *timerecord.Checker
}
func newNodeCtx(node Node) *nodeCtx {
func NewNodeCtx(node Node) *nodeCtx {
return &nodeCtx{
node: node,
inputChannel: make(chan Msg, node.MaxQueueLength()),
InputChannel: make(chan Msg, node.MaxQueueLength()),
}
}
@ -57,12 +56,6 @@ func (node *BaseNode) MaxQueueLength() int32 {
return node.maxQueueLength
}
// Start implementing Node, base node does nothing when starts
func (node *BaseNode) Start() {}
// Close implementing Node, base node does nothing when stops
func (node *BaseNode) Close() {}
func NewBaseNode(name string, maxQueryLength int32) *BaseNode {
return &BaseNode{
name: name,

View File

@ -37,8 +37,6 @@ type pipeline struct {
inputChannel chan Msg
nodeTtInterval time.Duration
enableTtChecker bool
checkerNames map[string]string
}
func (p *pipeline) Add(nodes ...Node) {
@ -48,21 +46,19 @@ func (p *pipeline) Add(nodes ...Node) {
}
func (p *pipeline) addNode(node Node) {
nodeCtx := newNodeCtx(node)
nodeCtx := NewNodeCtx(node)
if p.enableTtChecker {
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
manager := timerecord.GetCheckerManger("fgNode", p.nodeTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
})
if p.checkerNames == nil {
p.checkerNames = make(map[string]string)
}
p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
name := fmt.Sprintf("nodeCtxTtChecker-%s", node.Name())
nodeCtx.Checker = timerecord.NewChecker(name, manager)
}
if len(p.nodes) != 0 {
p.nodes[len(p.nodes)-1].next = nodeCtx
p.nodes[len(p.nodes)-1].Next = nodeCtx
} else {
p.inputChannel = nodeCtx.inputChannel
p.inputChannel = nodeCtx.InputChannel
}
p.nodes = append(p.nodes, nodeCtx)
@ -74,8 +70,8 @@ func (p *pipeline) Start() error {
func (p *pipeline) Close() {
for _, node := range p.nodes {
if node.checker != nil {
node.checker.Remove(p.checkerNames[node.node.Name()])
if node.Checker != nil {
node.Checker.Close()
}
}
}
@ -87,18 +83,18 @@ func (p *pipeline) process() {
curNode := p.nodes[0]
for curNode != nil {
if len(curNode.inputChannel) == 0 {
if len(curNode.InputChannel) == 0 {
break
}
input := <-curNode.inputChannel
input := <-curNode.InputChannel
output := curNode.node.Operate(input)
if _, ok := p.checkerNames[curNode.node.Name()]; ok {
curNode.checker.Check(p.checkerNames[curNode.node.Name()])
if curNode.Checker != nil {
curNode.Checker.Check()
}
if curNode.next != nil && output != nil {
curNode.next.inputChannel <- output
if curNode.Next != nil && output != nil {
curNode.Next.InputChannel <- output
}
curNode = curNode.next
curNode = curNode.Next
}
}

View File

@ -18,23 +18,47 @@ package timerecord
import (
"sync"
"sync/atomic"
"time"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// groups maintains string to GroupChecker
var groups = typeutil.NewConcurrentMap[string, *GroupChecker]()
var groups = typeutil.NewConcurrentMap[string, *CheckerManager]()
// GroupChecker checks members in same group silent for certain period of time
type Checker struct {
name string
manager *CheckerManager
lastChecked atomic.Value
}
func NewChecker(name string, manager *CheckerManager) *Checker {
checker := &Checker{}
checker.name = name
checker.manager = manager
checker.lastChecked.Store(time.Now())
manager.Register(name, checker)
return checker
}
func (checker *Checker) Check() {
checker.lastChecked.Store(time.Now())
}
func (checker *Checker) Close() {
checker.manager.Remove(checker.name)
}
// CheckerManager checks members in same group silent for certain period of time
// print warning msg if there are item(s) that not reported
type GroupChecker struct {
type CheckerManager struct {
groupName string
d time.Duration // check duration
t *time.Ticker // internal ticker
ch chan struct{} // closing signal
lastest *typeutil.ConcurrentMap[string, time.Time] // map member name => lastest report time
d time.Duration // check duration
t *time.Ticker // internal ticker
ch chan struct{} // closing signal
checkers *typeutil.ConcurrentMap[string, *Checker] // map member name => checker
initOnce sync.Once
stopOnce sync.Once
@ -43,7 +67,7 @@ type GroupChecker struct {
// init start worker goroutine
// protected by initOnce
func (gc *GroupChecker) init() {
func (gc *CheckerManager) init() {
gc.initOnce.Do(func() {
gc.ch = make(chan struct{})
go gc.work()
@ -51,7 +75,7 @@ func (gc *GroupChecker) init() {
}
// work is the main procedure logic
func (gc *GroupChecker) work() {
func (gc *CheckerManager) work() {
gc.t = time.NewTicker(gc.d)
defer gc.t.Stop()
@ -63,8 +87,8 @@ func (gc *GroupChecker) work() {
}
var list []string
gc.lastest.Range(func(name string, ts time.Time) bool {
if time.Since(ts) > gc.d {
gc.checkers.Range(func(name string, checker *Checker) bool {
if time.Since(checker.lastChecked.Load().(time.Time)) > gc.d {
list = append(list, name)
}
return true
@ -75,18 +99,17 @@ func (gc *GroupChecker) work() {
}
}
// Check updates the latest timestamp for provided name
func (gc *GroupChecker) Check(name string) {
gc.lastest.Insert(name, time.Now())
func (gc *CheckerManager) Register(name string, checker *Checker) {
gc.checkers.Insert(name, checker)
}
// Remove deletes name from watch list
func (gc *GroupChecker) Remove(name string) {
gc.lastest.GetAndRemove(name)
func (gc *CheckerManager) Remove(name string) {
gc.checkers.GetAndRemove(name)
}
// Stop closes the GroupChecker
func (gc *GroupChecker) Stop() {
func (gc *CheckerManager) Stop() {
gc.stopOnce.Do(func() {
close(gc.ch)
groups.GetAndRemove(gc.groupName)
@ -96,12 +119,12 @@ func (gc *GroupChecker) Stop() {
// GetGroupChecker returns the GroupChecker with related group name
// if no exist GroupChecker has the provided name, a new instance will be created with provided params
// otherwise the params will be ignored
func GetGroupChecker(groupName string, duration time.Duration, fn func([]string)) *GroupChecker {
gc := &GroupChecker{
func GetCheckerManger(groupName string, duration time.Duration, fn func([]string)) *CheckerManager {
gc := &CheckerManager{
groupName: groupName,
d: duration,
fn: fn,
lastest: typeutil.NewConcurrentMap[string, time.Time](),
checkers: typeutil.NewConcurrentMap[string, *Checker](),
}
gc, loaded := groups.GetOrInsert(groupName, gc)
if !loaded {

View File

@ -23,20 +23,24 @@ import (
"github.com/stretchr/testify/assert"
)
func TestGroupChecker(t *testing.T) {
func TestChecker(t *testing.T) {
groupName := `test_group`
signal := make(chan []string, 1)
// 10ms period which set before is too short
// change 10ms to 500ms to ensure the group checker schedule after the second value stored
duration := 500 * time.Millisecond
gc1 := GetGroupChecker(groupName, duration, func(list []string) {
gc1 := GetCheckerManger(groupName, duration, func(list []string) {
signal <- list
})
gc1.Check("1")
gc2 := GetGroupChecker(groupName, time.Second, func(list []string) {
checker1 := NewChecker("1", gc1)
checker1.Check()
gc2 := GetCheckerManger(groupName, time.Second, func(list []string) {
t.FailNow()
})
gc2.Check("2")
checker2 := NewChecker("2", gc2)
checker2.Check()
assert.Equal(t, duration, gc2.d)
@ -45,11 +49,12 @@ func TestGroupChecker(t *testing.T) {
return len(list) == 2
}, duration*3, duration)
gc2.Remove("2")
checker2.Close()
list := <-signal
assert.ElementsMatch(t, []string{"1"}, list)
checker1.Close()
assert.NotPanics(t, func() {
gc1.Stop()
gc2.Stop()