diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index f9e3325386..0ae56f955e 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -58,18 +58,15 @@ type nodeCtxManager struct { inputNodeCtx *nodeCtx closeWg *sync.WaitGroup closeOnce sync.Once - - inputNodeCloseCh chan struct{} // notify input node work to exit - workNodeCh chan struct{} // notify ddnode and downstream node work to exit + closeCh chan struct{} // notify nodes to exit } // NewNodeCtxManager init with the inputNode and fg.closeWg func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager { return &nodeCtxManager{ - inputNodeCtx: nodeCtx, - closeWg: closeWg, - inputNodeCloseCh: make(chan struct{}), - workNodeCh: make(chan struct{}), + inputNodeCtx: nodeCtx, + closeWg: closeWg, + closeCh: make(chan struct{}), } } @@ -77,67 +74,14 @@ func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManage func (nodeCtxManager *nodeCtxManager) Start() { // in dmInputNode, message from mq to channel, alloc goroutines // limit the goroutines in other node to prevent huge goroutines numbers - nodeCtxManager.closeWg.Add(2) - go nodeCtxManager.inputNodeStart() + nodeCtxManager.closeWg.Add(1) go nodeCtxManager.workNodeStart() } -func (nodeCtxManager *nodeCtxManager) inputNodeStart() { - defer nodeCtxManager.closeWg.Done() - inputNode := nodeCtxManager.inputNodeCtx - name := fmt.Sprintf("nodeCtxTtChecker-%s", inputNode.node.Name()) - // tt checker start - var checker *timerecord.GroupChecker - if enableTtChecker { - checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) { - log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval)) - }) - checker.Check(name) - defer checker.Remove(name) - } - - for { - select { - case <-nodeCtxManager.inputNodeCloseCh: - return - // handles node work spinning - // 1. collectMessage from upstream or just produce Msg from InputNode - // 2. invoke node.Operate - // 3. deliver the Operate result to downstream nodes - default: - // inputs from inputsMessages for Operate - var input, output []Msg - // inputNode.input not from nodeCtx.inputChannel - // the input message decides whether the operate method is executed - n := inputNode.node - inputNode.blockMutex.RLock() - if !n.IsValidInMsg(input) { - inputNode.blockMutex.RUnlock() - continue - } - output = n.Operate(input) - inputNode.blockMutex.RUnlock() - // the output decide whether the node should be closed. - if isCloseMsg(output) { - close(nodeCtxManager.inputNodeCloseCh) - // inputNode.Close() - if inputNode.inputChannel != nil { - close(inputNode.inputChannel) - } - } - // deliver to all following flow graph node. - inputNode.downstream.inputChannel <- output - if enableTtChecker { - checker.Check(name) - } - } - } -} - func (nodeCtxManager *nodeCtxManager) workNodeStart() { defer nodeCtxManager.closeWg.Done() - ddNode := nodeCtxManager.inputNodeCtx.downstream - curNode := ddNode + inputNode := nodeCtxManager.inputNodeCtx + curNode := inputNode // tt checker start var checker *timerecord.GroupChecker if enableTtChecker { @@ -154,26 +98,27 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { for { select { - case <-nodeCtxManager.workNodeCh: + case <-nodeCtxManager.closeCh: return // handles node work spinning // 1. collectMessage from upstream or just produce Msg from InputNode // 2. invoke node.Operate // 3. deliver the Operate result to downstream nodes default: - // goroutine will work loop for all node(expect inpuNode) even when closeCh notify to exit - // input node will close all node - curNode = ddNode + curNode = inputNode for curNode != nil { // inputs from inputsMessages for Operate var input, output []Msg - input = <-curNode.inputChannel + if curNode != inputNode { + // inputNode.input not from nodeCtx.inputChannel + input = <-curNode.inputChannel + } // the input message decides whether the operate method is executed n := curNode.node curNode.blockMutex.RLock() if !n.IsValidInMsg(input) { curNode.blockMutex.RUnlock() - curNode = ddNode + curNode = inputNode continue } @@ -182,7 +127,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { // the output decide whether the node should be closed. if isCloseMsg(output) { nodeCtxManager.closeOnce.Do(func() { - close(nodeCtxManager.workNodeCh) + close(nodeCtxManager.closeCh) }) if curNode.inputChannel != nil { close(curNode.inputChannel)