From e17256934d4e8ae834a3d47dbed2c0a5dee76396 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:15:55 +0100 Subject: [PATCH] Remove the polling loop + improvements --- .../@node-red/runtime/lib/flows/Flow.js | 144 ++++++++++-------- .../@node-red/runtime/lib/nodes/Node.js | 7 + 2 files changed, 87 insertions(+), 64 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 22d721eb2..7558ad946 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -80,7 +80,9 @@ class Flow { this.shutdownScope = null; /** @type {Number} */ this.shutdownTimeout = 10000; // 10s - /** @type {object[]} */ + /** @type {null|(() => void)} */ + this.shutdown = null; + /** @type {string[]} */ this.messageInitiatorNodes = []; /** @type {Record} */ this.wiredNodeMap = {}; @@ -169,6 +171,10 @@ class Flow { } this.trace("decrement pending messages count to: " + this.pendingMsgCount); this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`); + + if (this.shutdown) { + this.shutdown(); + } } /** @@ -347,6 +353,7 @@ class Flow { if (node.wires) { // Create a map with incoming wires for each node node.wires.forEach((output) => { + // TODO: Still an array? output.forEach((wire) => { this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || []; this.wiredNodeMap[wire].push(node.id); @@ -359,7 +366,7 @@ class Flow { } } - this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || []; + this.messageInitiatorNodes = this.flow.messageInitiatorNodes || []; this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; if (typeof this.flow.shutdownTimeout === "number") { // TODO: Math.max @@ -416,9 +423,9 @@ class Flow { * Stop this flow. * The `stopList` argument helps define what needs to be stopped in the case * of a modified-nodes/flows type deploy. - * @param {[type]} stopList [description] - * @param {[type]} removedList [description] - * @return {[type]} [description] + * @param {string[]} [stopList] [description] + * @param {string[]} [removedList] [description] + * @return {Promise} [description] */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); @@ -451,20 +458,8 @@ class Flow { /** @type {Promise} */ let shutdownPromise = Promise.resolve(); if (gracefulShutdown && this.messageInitiatorNodes.length) { - for (const node of this.messageInitiatorNodes) { - if (node) { - delete this.activeNodes[node.id]; - try { - const removed = removedMap[node.id]; - if (removed) { - events.emit("node-status", { id: node.id }); - } - initialPromises.push(stopNode(node, removed).catch(() => { })); - } catch (error) { - node.error(error); - } - } - } + // Start by closing initiator nodes + initialPromises.push(...this.stopNodes(this.messageInitiatorNodes, removedMap)); /** @type {Set} */ const lookupNodes = new Set(); @@ -484,11 +479,11 @@ class Flow { } } - shutdownPromise = new Promise((resolve, _reject) => { - let remainingPeriod = this.shutdownTimeout; - const loop = () => { + /** @type {Promise} */ + const closePromise = new Promise((resolve) => { + this.shutdown = () => { if (fullStop) { - this.debug(`Pending message count: ${this.pendingMsgCount}`); + this.trace(`Pending message count: ${this.pendingMsgCount}`); if (this.shutdownScope === null) { if (this.pendingMsgCount === 0) { return resolve(); @@ -501,13 +496,15 @@ class Flow { } }); if (count === 0) { + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) { + // Retrigger validation in each flow + flow.shutdown(); + } + }); return resolve(); } } - if (remainingPeriod < 0) { - this.error("Timeout for graceful shutdown has expired"); - return resolve(); - } } else { let inProgress = false; for (const id of Object.keys(this.msgInProgressNodeMap)) { @@ -519,45 +516,64 @@ class Flow { if (!inProgress) { return resolve(); } - if (remainingPeriod < 0) { - this.error("Timeout for graceful shutdown has expired"); - return resolve(); - } - } - remainingPeriod -= 1000; - setTimeout(loop, 1000); // polling every 1000ms - }; - setImmediate(loop); - }); - } - - /** @type {() => Promise[]} */ - const finalClose = () => { - const finalPromises = []; - for (var i=0;i { })); - } catch(err) { - node.error(err); } } + }); + + /** @type {NodeJS.Timeout|null} */ + let timer = null; + const timeoutPromise = new Promise((_resolve, reject) => { + timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout); + }); + const start = Date.now(); + shutdownPromise = Promise.race([closePromise, timeoutPromise]) + .then(() => { + clearTimeout(timer); + const delta = Date.now() - start; + this.trace(`Stopped gracefuly in ${delta}ms`); + }).catch((error) => { + clearTimeout(timer); + this.error(Log._("nodes.flows.stopping-error", { message: error })); + }); + + if (this.shutdown) { + setImmediate(this.shutdown); } - return finalPromises; - }; + } return Promise.all(initialPromises) .then(() => shutdownPromise) - .then(() => Promise.all(finalClose())); + .then(() => Promise.all(this.stopNodes(stopList, removedMap))); + } + + /** + * Stop every node of stopList + * @param {string[]} stopList + * @param {Record} removedMap + * @returns {Promise[]} + */ + stopNodes(stopList, removedMap) { + const promises = []; + for (const nodeId of stopList) { + const node = this.activeNodes[nodeId]; + if (node) { + delete this.activeNodes[nodeId]; + if (this.subflowInstanceNodes[nodeId]) { + delete this.subflowInstanceNodes[nodeId]; + } + try { + const removed = removedMap[nodeId]; + if (removed) { + // Clears the node status + events.emit("node-status", { id: node.id }); + } + promises.push(stopNode(node, removed).catch(() => { })); + } catch(error) { + node.error(error); + } + } + } + return promises; } /** @@ -912,9 +928,9 @@ class Flow { /** * Stop an individual node within this flow. * - * @param {[type]} node [description] - * @param {[type]} removed [description] - * @return {[type]} [description] + * @param {object} node [description] + * @param {boolean} removed [description] + * @return {Promise} [description] */ function stopNode(node,removed) { Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); @@ -989,7 +1005,7 @@ function handlePreDeliver(flow,sendEvent, reportError) { } else if (err !== false) { // node.send(msg) called // Do it now to avoid next loop and an offbeat count - flow.incrementPendingMsgCount(sendEvent.destination.node || { id: sendEvent.destination.id }); + flow.incrementPendingMsgCount(sendEvent.destination.node); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 657c90e19..224c37de1 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -203,6 +203,7 @@ Node.prototype.emit = function(event, ...args) { * This will call all registered handlers for the 'input' event. */ Node.prototype._emitInput = function(arg) { + // TODO: inject node directly used the event to trigger a message, is this allowed? var node = this; this.metric("receive", arg); let receiveEvent = { msg:arg, destination: { id: this.id, node: this } } @@ -221,6 +222,7 @@ Node.prototype._emitInput = function(arg) { function(err) { node._complete(arg,err); } ); if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? // Ensure done() is called node._complete(arg); } @@ -245,6 +247,11 @@ Node.prototype._emitInput = function(arg) { } } ); + if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? + // Ensure done() is called + node._complete(arg); + } } catch(err) { node.error(err,arg); }