diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index d98940a41..2a9b82c2a 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -74,12 +74,16 @@ class Flow { // Graceful Shutdown props /** @type {Number} */ this.pendingMsgCount = 0; + /** @type {Record} */ + this.msgInProgressNodeMap = {}; /** @type {object[]|null} */ this.shutdownScope = null; /** @type {Number} */ this.shutdownTimeout = 10000; // 10s /** @type {object[]} */ this.messageInitiatorNodes = []; + /** @type {Record} */ + this.wiredNodeMap = {}; } /** @@ -150,14 +154,37 @@ class Flow { this.parent.log(msg); } - decrementPendindMsgCount() { + /** + * This method is called when a node finishes processing a message. + * e.g. `node.done()` or `node.done(error)`. + * @param {object} node + */ + decrementPendingMsgCount(node) { this.pendingMsgCount--; - this.debug("Decrement pending messages count to: " + this.pendingMsgCount); + if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) { + this.msgInProgressNodeMap[node.id]--; + if (this.msgInProgressNodeMap[node.id] <= 0) { + delete this.msgInProgressNodeMap[node.id]; + } + } + this.trace("decrement pending messages count to: " + this.pendingMsgCount); + this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`); } - incrementPendingMsgCount() { + /** + * This method is called when a node sends a message. + * e.g. `node.send(msg)` or `node.receive(msg)`. + * @param {object} node + */ + incrementPendingMsgCount(node) { this.pendingMsgCount++; - this.debug("Increment pending messages count to: " + this.pendingMsgCount); + if (this.msgInProgressNodeMap[node.id]) { + this.msgInProgressNodeMap[node.id]++; + } else { + this.msgInProgressNodeMap[node.id] = 1; + } + this.trace("increment pending messages count to: " + this.pendingMsgCount); + this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`); } /** @@ -280,6 +307,7 @@ class Flow { } } + this.wiredNodeMap = {}; for (id in this.flow.nodes) { if (this.flow.nodes.hasOwnProperty(id)) { node = this.flow.nodes[id]; @@ -321,6 +349,15 @@ class Flow { } } } + if (node.wires) { + // Create a map with incoming wires for each node + node.wires.forEach((output) => { + output.forEach((wire) => { + this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || []; + this.wiredNodeMap[wire].push(node.id); + }); + }); + } } else { this.debug("not starting disabled node : "+id); } @@ -383,7 +420,9 @@ class Flow { */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); + let fullStop = false; if (!stopList) { + fullStop = true; stopList = Object.keys(this.activeNodes); } // this.trace(" stopList: "+stopList.join(",")) @@ -425,29 +464,63 @@ class Flow { } } + /** @type {Set} */ + const lookupNodes = new Set(); + + if (!fullStop) { + // Create a list of nodes wired upstream of nodes to be stopped. + const list = stopList.slice(); + while (list.length) { + const id = list.pop(); + if (lookupNodes.has(id)) { + continue; + } + lookupNodes.add(id); + if (this.wiredNodeMap[id]) { + list.push(...this.wiredNodeMap[id]); + } + } + } + shutdownPromise = new Promise((resolve, _reject) => { let remainingPeriod = this.shutdownTimeout; const loop = () => { - this.debug(`Pending message count: ${this.pendingMsgCount}`); - if (this.shutdownScope === null) { - if (this.pendingMsgCount === 0) { - this.pendingMsgCount = 0; - return resolve(); - } - } else if (this.shutdownScope.length) { - let count = 0; - this.shutdownScope.forEach((flow) => { - if (flow && "pendingMsgCount" in flow) { - count += flow.pendingMsgCount; + if (fullStop) { + this.debug(`Pending message count: ${this.pendingMsgCount}`); + if (this.shutdownScope === null) { + if (this.pendingMsgCount === 0) { + return resolve(); } - }); - if (count === 0) { - this.pendingMsgCount = 0; + } else if (this.shutdownScope.length) { + let count = 0; + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow) { + count += flow.pendingMsgCount; + } + }); + if (count === 0) { + return resolve(); + } + } + if (remainingPeriod < 0) { + this.error("Timeout for graceful shutdown has expired"); + return resolve(); + } + } else { + let inProgress = false; + for (const id of Object.keys(this.msgInProgressNodeMap)) { + if (lookupNodes.has(id)) { + inProgress = true; + break; + } + } + if (!inProgress) { + return resolve(); + } + if (remainingPeriod < 0) { + this.error("Timeout for graceful shutdown has expired"); return resolve(); } - } else if (remainingPeriod < 0) { - this.pendingMsgCount = 0; - return resolve(); } remainingPeriod -= 1000; setTimeout(loop, 1000); // polling every 1000ms @@ -914,7 +987,7 @@ function handlePreDeliver(flow,sendEvent, reportError) { } else if (err !== false) { // node.send(msg) called // Do it now to avoid next loop and an offbeat count - flow.incrementPendingMsgCount(); + flow.incrementPendingMsgCount(sendEvent.destination.node); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 5ccfaa8e5..db3e90fe3 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -133,7 +133,7 @@ Node.prototype._complete = function(msg,error) { this.metric("done",msg); if (this._flow) { // done() or done(error) called - this._flow.decrementPendindMsgCount(); + this._flow.decrementPendingMsgCount(this); } hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { if (err) { @@ -513,7 +513,7 @@ Node.prototype._receive = function (msg) { Node.prototype.receive = function (msg) { if (this._flow) { // Equivalent to node.send(msg) without handleOnSend - this._flow.incrementPendingMsgCount(); + this._flow.incrementPendingMsgCount(this); } this._receive(msg); };