enhance:[cherry-pick] Merge flowgraph goroutines into 1 (#28728)

see also: #28654 #28233

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/28776/head
wayblink 2023-11-27 21:04:27 +08:00 committed by GitHub
parent 5a962a631a
commit 1ed92da414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 70 deletions

View File

@ -58,18 +58,15 @@ type nodeCtxManager struct {
inputNodeCtx *nodeCtx
closeWg *sync.WaitGroup
closeOnce sync.Once
inputNodeCloseCh chan struct{} // notify input node work to exit
workNodeCh chan struct{} // notify ddnode and downstream node work to exit
closeCh chan struct{} // notify nodes to exit
}
// NewNodeCtxManager init with the inputNode and fg.closeWg
func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager {
return &nodeCtxManager{
inputNodeCtx: nodeCtx,
closeWg: closeWg,
inputNodeCloseCh: make(chan struct{}),
workNodeCh: make(chan struct{}),
inputNodeCtx: nodeCtx,
closeWg: closeWg,
closeCh: make(chan struct{}),
}
}
@ -77,67 +74,14 @@ func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManage
func (nodeCtxManager *nodeCtxManager) Start() {
// in dmInputNode, message from mq to channel, alloc goroutines
// limit the goroutines in other node to prevent huge goroutines numbers
nodeCtxManager.closeWg.Add(2)
go nodeCtxManager.inputNodeStart()
nodeCtxManager.closeWg.Add(1)
go nodeCtxManager.workNodeStart()
}
func (nodeCtxManager *nodeCtxManager) inputNodeStart() {
defer nodeCtxManager.closeWg.Done()
inputNode := nodeCtxManager.inputNodeCtx
name := fmt.Sprintf("nodeCtxTtChecker-%s", inputNode.node.Name())
// tt checker start
var checker *timerecord.GroupChecker
if enableTtChecker {
checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
})
checker.Check(name)
defer checker.Remove(name)
}
for {
select {
case <-nodeCtxManager.inputNodeCloseCh:
return
// handles node work spinning
// 1. collectMessage from upstream or just produce Msg from InputNode
// 2. invoke node.Operate
// 3. deliver the Operate result to downstream nodes
default:
// inputs from inputsMessages for Operate
var input, output []Msg
// inputNode.input not from nodeCtx.inputChannel
// the input message decides whether the operate method is executed
n := inputNode.node
inputNode.blockMutex.RLock()
if !n.IsValidInMsg(input) {
inputNode.blockMutex.RUnlock()
continue
}
output = n.Operate(input)
inputNode.blockMutex.RUnlock()
// the output decide whether the node should be closed.
if isCloseMsg(output) {
close(nodeCtxManager.inputNodeCloseCh)
// inputNode.Close()
if inputNode.inputChannel != nil {
close(inputNode.inputChannel)
}
}
// deliver to all following flow graph node.
inputNode.downstream.inputChannel <- output
if enableTtChecker {
checker.Check(name)
}
}
}
}
func (nodeCtxManager *nodeCtxManager) workNodeStart() {
defer nodeCtxManager.closeWg.Done()
ddNode := nodeCtxManager.inputNodeCtx.downstream
curNode := ddNode
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
// tt checker start
var checker *timerecord.GroupChecker
if enableTtChecker {
@ -154,26 +98,27 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
for {
select {
case <-nodeCtxManager.workNodeCh:
case <-nodeCtxManager.closeCh:
return
// handles node work spinning
// 1. collectMessage from upstream or just produce Msg from InputNode
// 2. invoke node.Operate
// 3. deliver the Operate result to downstream nodes
default:
// goroutine will work loop for all node(expect inpuNode) even when closeCh notify to exit
// input node will close all node
curNode = ddNode
curNode = inputNode
for curNode != nil {
// inputs from inputsMessages for Operate
var input, output []Msg
input = <-curNode.inputChannel
if curNode != inputNode {
// inputNode.input not from nodeCtx.inputChannel
input = <-curNode.inputChannel
}
// the input message decides whether the operate method is executed
n := curNode.node
curNode.blockMutex.RLock()
if !n.IsValidInMsg(input) {
curNode.blockMutex.RUnlock()
curNode = ddNode
curNode = inputNode
continue
}
@ -182,7 +127,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
// the output decide whether the node should be closed.
if isCloseMsg(output) {
nodeCtxManager.closeOnce.Do(func() {
close(nodeCtxManager.workNodeCh)
close(nodeCtxManager.closeCh)
})
if curNode.inputChannel != nil {
close(curNode.inputChannel)