From cdd2abc4c1396f275777719b3193349b2484c82e Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sat, 1 Nov 2025 22:57:32 +0100 Subject: [PATCH 01/16] Initial implementation of graceful shutdown --- .../@node-red/runtime/lib/api/settings.js | 2 + .../@node-red/runtime/lib/flows/Flow.js | 135 +++++++++++++++--- .../@node-red/runtime/lib/flows/index.js | 28 ++++ .../@node-red/runtime/lib/nodes/Node.js | 27 +++- packages/node_modules/node-red/settings.js | 3 + 5 files changed, 169 insertions(+), 26 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/api/settings.js b/packages/node_modules/@node-red/runtime/lib/api/settings.js index 634f5dbf3..973959201 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/settings.js +++ b/packages/node_modules/@node-red/runtime/lib/api/settings.js @@ -172,6 +172,8 @@ var api = module.exports = { safeSettings.runtimeState.ui = false; // cannot have UI without endpoint } + safeSettings.gracefulShutdownEnabled = runtime.settings.gracefulShutdown?.enabled || false; + runtime.settings.exportNodeSettings(safeSettings); runtime.plugins.exportPluginSettings(safeSettings); } diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 02b976a32..d98940a41 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -26,8 +26,11 @@ let Subflow; let Log; let Group; +let activeFlows; + let nodeCloseTimeout = 15000; let asyncMessageDelivery = true; +let gracefulShutdown = false; /** * This class represents a flow within the runtime. It is responsible for @@ -67,6 +70,16 @@ class Flow { // _env is an object for direct lookup of env name -> value this.env = this.flow.env this._env = {} + + // Graceful Shutdown props + /** @type {Number} */ + this.pendingMsgCount = 0; + /** @type {object[]|null} */ + this.shutdownScope = null; + /** @type {Number} */ + this.shutdownTimeout = 10000; // 10s + /** @type {object[]} */ + this.messageInitiatorNodes = []; } /** @@ -137,6 +150,16 @@ class Flow { this.parent.log(msg); } + decrementPendindMsgCount() { + this.pendingMsgCount--; + this.debug("Decrement pending messages count to: " + this.pendingMsgCount); + } + + incrementPendingMsgCount() { + this.pendingMsgCount++; + this.debug("Increment pending messages count to: " + this.pendingMsgCount); + } + /** * Start this flow. * The `diff` argument helps define what needs to be started in the case @@ -153,6 +176,11 @@ class Flow { this.statusNodes = []; this.completeNodeMap = {}; + // TODO: check values + this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || []; + this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; + this.shutdownTimeout = this.flow.shutdownTimeout; + if (this.isGlobalFlow) { // This is the global flow. It needs to go find the `global-config` @@ -355,7 +383,6 @@ class Flow { */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); - var i; if (!stopList) { stopList = Object.keys(this.activeNodes); } @@ -378,28 +405,84 @@ class Flow { }); stopList = nodesToStop.concat(configsToStop); - var promises = []; - for (i=0;i{})); - } catch(err) { - node.error(err); - } - if (removedMap[stopList[i]]) { - events.emit("node-status",{ - id: node.id - }); + /** @type {Promise[]} */ + const initialPromises = []; + /** @type {Promise} */ + let shutdownPromise = Promise.resolve(); + if (gracefulShutdown && this.messageInitiatorNodes.length) { + for (const node of this.messageInitiatorNodes) { + if (node) { + delete this.activeNodes[node.id]; + try { + const removed = removedMap[node.id]; + if (removed) { + events.emit("node-status", { id: node.id }); + } + initialPromises.push(stopNode(node, removed).catch(() => { })); + } catch (error) { + node.error(error); + } } } + + shutdownPromise = new Promise((resolve, _reject) => { + let remainingPeriod = this.shutdownTimeout; + const loop = () => { + this.debug(`Pending message count: ${this.pendingMsgCount}`); + if (this.shutdownScope === null) { + if (this.pendingMsgCount === 0) { + this.pendingMsgCount = 0; + return resolve(); + } + } else if (this.shutdownScope.length) { + let count = 0; + this.shutdownScope.forEach((flow) => { + if (flow && "pendingMsgCount" in flow) { + count += flow.pendingMsgCount; + } + }); + if (count === 0) { + this.pendingMsgCount = 0; + return resolve(); + } + } else if (remainingPeriod < 0) { + this.pendingMsgCount = 0; + return resolve(); + } + remainingPeriod -= 1000; + setTimeout(loop, 1000); // polling every 1000ms + }; + setImmediate(loop); + }); } - return Promise.all(promises); + + /** @type {() => Promise[]} */ + const finalClose = () => { + const finalPromises = []; + for (var i=0;i { })); + } catch(err) { + node.error(err); + } + } + } + return finalPromises; + }; + + return Promise.all(initialPromises) + .then(() => shutdownPromise) + .then(() => Promise.all(finalClose())); } /** @@ -695,6 +778,7 @@ class Flow { } handleComplete(node,msg) { + // Trigger Complete nodes if (this.completeNodeMap[node.id]) { let toSend = msg; this.completeNodeMap[node.id].forEach((completeNode,index) => { @@ -814,7 +898,7 @@ function handlePreRoute(flow, sendEvent, reportError) { function deliverMessageToDestination(sendEvent) { if (sendEvent?.destination?.node) { try { - sendEvent.destination.node.receive(sendEvent.msg); + sendEvent.destination.node._receive(sendEvent.msg); } catch(err) { Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`) Log.error(err.stack) @@ -828,6 +912,9 @@ function handlePreDeliver(flow,sendEvent, reportError) { reportError(err,sendEvent); return; } else if (err !== false) { + // node.send(msg) called + // Do it now to avoid next loop and an offbeat count + flow.incrementPendingMsgCount(); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) @@ -848,7 +935,8 @@ function handlePreDeliver(flow,sendEvent, reportError) { module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; - asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery + asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery; + gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false; Log = runtime.log; Subflow = require("./Subflow"); Group = require("./Group").Group @@ -856,5 +944,8 @@ module.exports = { create: function(parent,global,conf) { return new Flow(parent,global,conf) }, + setActiveFlows: function(flows) { + activeFlows = flows; + }, Flow: Flow } diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index f21bd56f9..6aac0fb5d 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -70,6 +70,7 @@ function init(runtime) { typeEventRegistered = true; } Flow.init(runtime); + Flow.setActiveFlows(activeFlows); flowUtil.init(runtime); } @@ -579,6 +580,15 @@ async function addFlow(flow, user) { if (flow.hasOwnProperty('env')) { tabNode.env = flow.env; } + if (flow.hasOwnProperty('shutdownScope')) { + tabNode.shutdownScope = flow.shutdownScope; + } + if (flow.hasOwnProperty('shutdownTimeout')) { + tabNode.shutdownTimeout = flow.shutdownTimeout; + } + if (flow.hasOwnProperty('messageInitiatorNodes')) { + tabNode.messageInitiatorNodes = flow.messageInitiatorNodes; + } var nodes = [tabNode]; @@ -642,6 +652,15 @@ function getFlow(id) { if (flow.hasOwnProperty('env')) { result.env = flow.env; } + if (flow.hasOwnProperty('shutdownScope')) { + result.shutdownScope = flow.shutdownScope; + } + if (flow.hasOwnProperty('shutdownTimeout')) { + result.shutdownTimeout = flow.shutdownTimeout; + } + if (flow.hasOwnProperty('messageInitiatorNodes')) { + result.messageInitiatorNodes = flow.messageInitiatorNodes; + } if (id !== 'global') { result.nodes = []; } @@ -770,6 +789,15 @@ async function updateFlow(id,newFlow, user) { if (newFlow.hasOwnProperty('env')) { tabNode.env = newFlow.env; } + if (flow.hasOwnProperty('shutdownScope')) { + tabNode.shutdownScope = flow.shutdownScope; + } + if (flow.hasOwnProperty('shutdownTimeout')) { + tabNode.shutdownTimeout = flow.shutdownTimeout; + } + if (flow.hasOwnProperty('messageInitiatorNodes')) { + tabNode.messageInitiatorNodes = flow.messageInitiatorNodes; + } if (newFlow.hasOwnProperty('credentials')) { tabNode.credentials = newFlow.credentials; } diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 0b1ed349b..5ccfaa8e5 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -131,6 +131,10 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); + if (this._flow) { + // done() or done(error) called + this._flow.decrementPendindMsgCount(); + } hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { if (err) { this.error(err); @@ -216,6 +220,10 @@ Node.prototype._emitInput = function(arg) { function() { node.send.apply(node,arguments) }, function(err) { node._complete(arg,err); } ); + if (node._expectedDoneCount === 0) { + // Ensure done() is called + node._complete(arg); + } } catch(err) { node.error(err,arg); } @@ -485,11 +493,9 @@ Node.prototype.send = function(msg) { }; /** - * Receive a message. - * - * This will emit the `input` event with the provided message. + * Internal method to receive a message */ -Node.prototype.receive = function(msg) { +Node.prototype._receive = function (msg) { if (!msg) { msg = {}; } @@ -499,6 +505,19 @@ Node.prototype.receive = function(msg) { this.emit("input",msg); }; +/** + * Receive a message. + * + * This will emit the `input` event with the provided message. + */ +Node.prototype.receive = function (msg) { + if (this._flow) { + // Equivalent to node.send(msg) without handleOnSend + this._flow.incrementPendingMsgCount(); + } + this._receive(msg); +}; + function log_helper(self, level, msg) { var o = { level: level, diff --git a/packages/node_modules/node-red/settings.js b/packages/node_modules/node-red/settings.js index 269cac160..8a2b6406d 100644 --- a/packages/node_modules/node-red/settings.js +++ b/packages/node_modules/node-red/settings.js @@ -312,6 +312,9 @@ module.exports = { /** show or hide runtime stop/start options in the node-red editor. Must be set to `false` to hide */ ui: false, }, + gracefulShutdown: { + enabled: true, + }, telemetry: { /** * By default, telemetry is disabled until the user provides consent the first From 09e177bbbf36688c226a1c0284ca17c81ffdded9 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Mon, 3 Nov 2025 15:46:30 +0100 Subject: [PATCH 02/16] Allow graceful shutdown for node/flow deployment --- .../@node-red/runtime/lib/flows/Flow.js | 117 ++++++++++++++---- .../@node-red/runtime/lib/nodes/Node.js | 4 +- 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index d98940a41..2a9b82c2a 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -74,12 +74,16 @@ class Flow { // Graceful Shutdown props /** @type {Number} */ this.pendingMsgCount = 0; + /** @type {Record} */ + this.msgInProgressNodeMap = {}; /** @type {object[]|null} */ this.shutdownScope = null; /** @type {Number} */ this.shutdownTimeout = 10000; // 10s /** @type {object[]} */ this.messageInitiatorNodes = []; + /** @type {Record} */ + this.wiredNodeMap = {}; } /** @@ -150,14 +154,37 @@ class Flow { this.parent.log(msg); } - decrementPendindMsgCount() { + /** + * This method is called when a node finishes processing a message. + * e.g. `node.done()` or `node.done(error)`. + * @param {object} node + */ + decrementPendingMsgCount(node) { this.pendingMsgCount--; - this.debug("Decrement pending messages count to: " + this.pendingMsgCount); + if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) { + this.msgInProgressNodeMap[node.id]--; + if (this.msgInProgressNodeMap[node.id] <= 0) { + delete this.msgInProgressNodeMap[node.id]; + } + } + this.trace("decrement pending messages count to: " + this.pendingMsgCount); + this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`); } - incrementPendingMsgCount() { + /** + * This method is called when a node sends a message. + * e.g. `node.send(msg)` or `node.receive(msg)`. + * @param {object} node + */ + incrementPendingMsgCount(node) { this.pendingMsgCount++; - this.debug("Increment pending messages count to: " + this.pendingMsgCount); + if (this.msgInProgressNodeMap[node.id]) { + this.msgInProgressNodeMap[node.id]++; + } else { + this.msgInProgressNodeMap[node.id] = 1; + } + this.trace("increment pending messages count to: " + this.pendingMsgCount); + this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`); } /** @@ -280,6 +307,7 @@ class Flow { } } + this.wiredNodeMap = {}; for (id in this.flow.nodes) { if (this.flow.nodes.hasOwnProperty(id)) { node = this.flow.nodes[id]; @@ -321,6 +349,15 @@ class Flow { } } } + if (node.wires) { + // Create a map with incoming wires for each node + node.wires.forEach((output) => { + output.forEach((wire) => { + this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || []; + this.wiredNodeMap[wire].push(node.id); + }); + }); + } } else { this.debug("not starting disabled node : "+id); } @@ -383,7 +420,9 @@ class Flow { */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); + let fullStop = false; if (!stopList) { + fullStop = true; stopList = Object.keys(this.activeNodes); } // this.trace(" stopList: "+stopList.join(",")) @@ -425,29 +464,63 @@ class Flow { } } + /** @type {Set} */ + const lookupNodes = new Set(); + + if (!fullStop) { + // Create a list of nodes wired upstream of nodes to be stopped. + const list = stopList.slice(); + while (list.length) { + const id = list.pop(); + if (lookupNodes.has(id)) { + continue; + } + lookupNodes.add(id); + if (this.wiredNodeMap[id]) { + list.push(...this.wiredNodeMap[id]); + } + } + } + shutdownPromise = new Promise((resolve, _reject) => { let remainingPeriod = this.shutdownTimeout; const loop = () => { - this.debug(`Pending message count: ${this.pendingMsgCount}`); - if (this.shutdownScope === null) { - if (this.pendingMsgCount === 0) { - this.pendingMsgCount = 0; - return resolve(); - } - } else if (this.shutdownScope.length) { - let count = 0; - this.shutdownScope.forEach((flow) => { - if (flow && "pendingMsgCount" in flow) { - count += flow.pendingMsgCount; + if (fullStop) { + this.debug(`Pending message count: ${this.pendingMsgCount}`); + if (this.shutdownScope === null) { + if (this.pendingMsgCount === 0) { + return resolve(); } - }); - if (count === 0) { - this.pendingMsgCount = 0; + } else if (this.shutdownScope.length) { + let count = 0; + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow) { + count += flow.pendingMsgCount; + } + }); + if (count === 0) { + return resolve(); + } + } + if (remainingPeriod < 0) { + this.error("Timeout for graceful shutdown has expired"); + return resolve(); + } + } else { + let inProgress = false; + for (const id of Object.keys(this.msgInProgressNodeMap)) { + if (lookupNodes.has(id)) { + inProgress = true; + break; + } + } + if (!inProgress) { + return resolve(); + } + if (remainingPeriod < 0) { + this.error("Timeout for graceful shutdown has expired"); return resolve(); } - } else if (remainingPeriod < 0) { - this.pendingMsgCount = 0; - return resolve(); } remainingPeriod -= 1000; setTimeout(loop, 1000); // polling every 1000ms @@ -914,7 +987,7 @@ function handlePreDeliver(flow,sendEvent, reportError) { } else if (err !== false) { // node.send(msg) called // Do it now to avoid next loop and an offbeat count - flow.incrementPendingMsgCount(); + flow.incrementPendingMsgCount(sendEvent.destination.node); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 5ccfaa8e5..db3e90fe3 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -133,7 +133,7 @@ Node.prototype._complete = function(msg,error) { this.metric("done",msg); if (this._flow) { // done() or done(error) called - this._flow.decrementPendindMsgCount(); + this._flow.decrementPendingMsgCount(this); } hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { if (err) { @@ -513,7 +513,7 @@ Node.prototype._receive = function (msg) { Node.prototype.receive = function (msg) { if (this._flow) { // Equivalent to node.send(msg) without handleOnSend - this._flow.incrementPendingMsgCount(); + this._flow.incrementPendingMsgCount(this); } this._receive(msg); }; From d52520250d93dc789a6301f27c6c7117ec77fbc6 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Mon, 3 Nov 2025 18:47:36 +0100 Subject: [PATCH 03/16] Add type guards and fix undefined node --- .../@node-red/runtime/lib/flows/Flow.js | 14 ++++++++------ .../@node-red/runtime/lib/nodes/Node.js | 6 +++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 2a9b82c2a..22d721eb2 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -203,11 +203,6 @@ class Flow { this.statusNodes = []; this.completeNodeMap = {}; - // TODO: check values - this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || []; - this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; - this.shutdownTimeout = this.flow.shutdownTimeout; - if (this.isGlobalFlow) { // This is the global flow. It needs to go find the `global-config` @@ -364,6 +359,13 @@ class Flow { } } + this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || []; + this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; + if (typeof this.flow.shutdownTimeout === "number") { + // TODO: Math.max + this.shutdownTimeout = this.flow.shutdownTimeout; + } + var activeCount = Object.keys(this.activeNodes).length; if (activeCount > 0) { this.trace("------------------|--------------|-----------------"); @@ -987,7 +989,7 @@ function handlePreDeliver(flow,sendEvent, reportError) { } else if (err !== false) { // node.send(msg) called // Do it now to avoid next loop and an offbeat count - flow.incrementPendingMsgCount(sendEvent.destination.node); + flow.incrementPendingMsgCount(sendEvent.destination.node || { id: sendEvent.destination.id }); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index db3e90fe3..657c90e19 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -131,7 +131,7 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); - if (this._flow) { + if (this._flow && typeof this._flow.decrementPendingMsgCount === "function") { // done() or done(error) called this._flow.decrementPendingMsgCount(this); } @@ -220,7 +220,7 @@ Node.prototype._emitInput = function(arg) { function() { node.send.apply(node,arguments) }, function(err) { node._complete(arg,err); } ); - if (node._expectedDoneCount === 0) { + if (node._expectedDoneCount === 0 && node._flow) { // Ensure done() is called node._complete(arg); } @@ -511,7 +511,7 @@ Node.prototype._receive = function (msg) { * This will emit the `input` event with the provided message. */ Node.prototype.receive = function (msg) { - if (this._flow) { + if (this._flow && typeof this._flow.incrementPendingMsgCount === "function") { // Equivalent to node.send(msg) without handleOnSend this._flow.incrementPendingMsgCount(this); } From e17256934d4e8ae834a3d47dbed2c0a5dee76396 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:15:55 +0100 Subject: [PATCH 04/16] Remove the polling loop + improvements --- .../@node-red/runtime/lib/flows/Flow.js | 144 ++++++++++-------- .../@node-red/runtime/lib/nodes/Node.js | 7 + 2 files changed, 87 insertions(+), 64 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 22d721eb2..7558ad946 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -80,7 +80,9 @@ class Flow { this.shutdownScope = null; /** @type {Number} */ this.shutdownTimeout = 10000; // 10s - /** @type {object[]} */ + /** @type {null|(() => void)} */ + this.shutdown = null; + /** @type {string[]} */ this.messageInitiatorNodes = []; /** @type {Record} */ this.wiredNodeMap = {}; @@ -169,6 +171,10 @@ class Flow { } this.trace("decrement pending messages count to: " + this.pendingMsgCount); this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`); + + if (this.shutdown) { + this.shutdown(); + } } /** @@ -347,6 +353,7 @@ class Flow { if (node.wires) { // Create a map with incoming wires for each node node.wires.forEach((output) => { + // TODO: Still an array? output.forEach((wire) => { this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || []; this.wiredNodeMap[wire].push(node.id); @@ -359,7 +366,7 @@ class Flow { } } - this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || []; + this.messageInitiatorNodes = this.flow.messageInitiatorNodes || []; this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; if (typeof this.flow.shutdownTimeout === "number") { // TODO: Math.max @@ -416,9 +423,9 @@ class Flow { * Stop this flow. * The `stopList` argument helps define what needs to be stopped in the case * of a modified-nodes/flows type deploy. - * @param {[type]} stopList [description] - * @param {[type]} removedList [description] - * @return {[type]} [description] + * @param {string[]} [stopList] [description] + * @param {string[]} [removedList] [description] + * @return {Promise} [description] */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); @@ -451,20 +458,8 @@ class Flow { /** @type {Promise} */ let shutdownPromise = Promise.resolve(); if (gracefulShutdown && this.messageInitiatorNodes.length) { - for (const node of this.messageInitiatorNodes) { - if (node) { - delete this.activeNodes[node.id]; - try { - const removed = removedMap[node.id]; - if (removed) { - events.emit("node-status", { id: node.id }); - } - initialPromises.push(stopNode(node, removed).catch(() => { })); - } catch (error) { - node.error(error); - } - } - } + // Start by closing initiator nodes + initialPromises.push(...this.stopNodes(this.messageInitiatorNodes, removedMap)); /** @type {Set} */ const lookupNodes = new Set(); @@ -484,11 +479,11 @@ class Flow { } } - shutdownPromise = new Promise((resolve, _reject) => { - let remainingPeriod = this.shutdownTimeout; - const loop = () => { + /** @type {Promise} */ + const closePromise = new Promise((resolve) => { + this.shutdown = () => { if (fullStop) { - this.debug(`Pending message count: ${this.pendingMsgCount}`); + this.trace(`Pending message count: ${this.pendingMsgCount}`); if (this.shutdownScope === null) { if (this.pendingMsgCount === 0) { return resolve(); @@ -501,13 +496,15 @@ class Flow { } }); if (count === 0) { + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) { + // Retrigger validation in each flow + flow.shutdown(); + } + }); return resolve(); } } - if (remainingPeriod < 0) { - this.error("Timeout for graceful shutdown has expired"); - return resolve(); - } } else { let inProgress = false; for (const id of Object.keys(this.msgInProgressNodeMap)) { @@ -519,45 +516,64 @@ class Flow { if (!inProgress) { return resolve(); } - if (remainingPeriod < 0) { - this.error("Timeout for graceful shutdown has expired"); - return resolve(); - } - } - remainingPeriod -= 1000; - setTimeout(loop, 1000); // polling every 1000ms - }; - setImmediate(loop); - }); - } - - /** @type {() => Promise[]} */ - const finalClose = () => { - const finalPromises = []; - for (var i=0;i { })); - } catch(err) { - node.error(err); } } + }); + + /** @type {NodeJS.Timeout|null} */ + let timer = null; + const timeoutPromise = new Promise((_resolve, reject) => { + timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout); + }); + const start = Date.now(); + shutdownPromise = Promise.race([closePromise, timeoutPromise]) + .then(() => { + clearTimeout(timer); + const delta = Date.now() - start; + this.trace(`Stopped gracefuly in ${delta}ms`); + }).catch((error) => { + clearTimeout(timer); + this.error(Log._("nodes.flows.stopping-error", { message: error })); + }); + + if (this.shutdown) { + setImmediate(this.shutdown); } - return finalPromises; - }; + } return Promise.all(initialPromises) .then(() => shutdownPromise) - .then(() => Promise.all(finalClose())); + .then(() => Promise.all(this.stopNodes(stopList, removedMap))); + } + + /** + * Stop every node of stopList + * @param {string[]} stopList + * @param {Record} removedMap + * @returns {Promise[]} + */ + stopNodes(stopList, removedMap) { + const promises = []; + for (const nodeId of stopList) { + const node = this.activeNodes[nodeId]; + if (node) { + delete this.activeNodes[nodeId]; + if (this.subflowInstanceNodes[nodeId]) { + delete this.subflowInstanceNodes[nodeId]; + } + try { + const removed = removedMap[nodeId]; + if (removed) { + // Clears the node status + events.emit("node-status", { id: node.id }); + } + promises.push(stopNode(node, removed).catch(() => { })); + } catch(error) { + node.error(error); + } + } + } + return promises; } /** @@ -912,9 +928,9 @@ class Flow { /** * Stop an individual node within this flow. * - * @param {[type]} node [description] - * @param {[type]} removed [description] - * @return {[type]} [description] + * @param {object} node [description] + * @param {boolean} removed [description] + * @return {Promise} [description] */ function stopNode(node,removed) { Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); @@ -989,7 +1005,7 @@ function handlePreDeliver(flow,sendEvent, reportError) { } else if (err !== false) { // node.send(msg) called // Do it now to avoid next loop and an offbeat count - flow.incrementPendingMsgCount(sendEvent.destination.node || { id: sendEvent.destination.id }); + flow.incrementPendingMsgCount(sendEvent.destination.node); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 657c90e19..224c37de1 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -203,6 +203,7 @@ Node.prototype.emit = function(event, ...args) { * This will call all registered handlers for the 'input' event. */ Node.prototype._emitInput = function(arg) { + // TODO: inject node directly used the event to trigger a message, is this allowed? var node = this; this.metric("receive", arg); let receiveEvent = { msg:arg, destination: { id: this.id, node: this } } @@ -221,6 +222,7 @@ Node.prototype._emitInput = function(arg) { function(err) { node._complete(arg,err); } ); if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? // Ensure done() is called node._complete(arg); } @@ -245,6 +247,11 @@ Node.prototype._emitInput = function(arg) { } } ); + if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? + // Ensure done() is called + node._complete(arg); + } } catch(err) { node.error(err,arg); } From ff5c2a264ddc7b4fb2adfbd7eebea3af58079e43 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:39:41 +0100 Subject: [PATCH 05/16] Allow scoped flows to have their timeout --- packages/node_modules/@node-red/runtime/lib/flows/Flow.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 7558ad946..a0898a506 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -489,6 +489,9 @@ class Flow { return resolve(); } } else if (this.shutdownScope.length) { + if (this.pendingMsgCount === 0) { + return resolve(); + } let count = 0; this.shutdownScope.forEach((flow) => { if (flow instanceof Flow) { From 0cd00d8410a46df6bfa0ed293edfe353ff42cff3 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:44:48 +0100 Subject: [PATCH 06/16] Allow scoped flow to cancel shutdown of other flows --- .../@node-red/runtime/lib/flows/Flow.js | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index a0898a506..63d65b3ef 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -81,7 +81,11 @@ class Flow { /** @type {Number} */ this.shutdownTimeout = 10000; // 10s /** @type {null|(() => void)} */ + this.cancelShutdown = null; + /** @type {null|(() => void)} */ this.shutdown = null; + /** @type {boolean} */ + this.failFast = false; /** @type {string[]} */ this.messageInitiatorNodes = []; /** @type {Record} */ @@ -366,6 +370,9 @@ class Flow { } } + this.shutdown = null; + this.cancelShutdown = null; + this.failFast = this.flow.failFast || false; this.messageInitiatorNodes = this.flow.messageInitiatorNodes || []; this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; if (typeof this.flow.shutdownTimeout === "number") { @@ -526,6 +533,9 @@ class Flow { /** @type {NodeJS.Timeout|null} */ let timer = null; const timeoutPromise = new Promise((_resolve, reject) => { + if (this.failFast && this.shutdownScope) { + this.cancelShutdown = () => reject("Graceful shutdown cancelled"); + } timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout); }); const start = Date.now(); @@ -537,6 +547,14 @@ class Flow { }).catch((error) => { clearTimeout(timer); this.error(Log._("nodes.flows.stopping-error", { message: error })); + if (this.failFast && this.shutdownScope) { + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow && flow.cancelShutdown && flow.id !== this.id) { + // Cancel other flow shutdown if this one fails + flow.cancelShutdown(); + } + }); + } }); if (this.shutdown) { From 0d076f2b65d6c1cdb1e35605d711c1d159e1b851 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Wed, 5 Nov 2025 12:42:53 +0100 Subject: [PATCH 07/16] Ensure the global flow is the last one closed --- .../@node-red/runtime/lib/flows/index.js | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index 6aac0fb5d..f97b15b5e 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -36,7 +36,9 @@ var activeFlowConfig = null; var activeFlows = {}; var started = false; -var state = 'stop' +var state = 'stop'; +/** @type {boolean} */ +let gracefulShutdown; var credentialsPendingReset = false; @@ -53,6 +55,7 @@ function init(runtime) { log = runtime.log; started = false; state = 'stop'; + gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false; if (!typeEventRegistered) { events.on('type-registered',function(type) { if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) { @@ -472,7 +475,9 @@ function stop(type,diff,muteLog,isDeploy) { let globalIndex = activeFlowIds.indexOf("global"); if (globalIndex !== -1) { activeFlowIds.splice(globalIndex,1); - activeFlowIds.push("global"); + if (!gracefulShutdown) { + activeFlowIds.push("global"); + } } activeFlowIds.forEach(id => { @@ -486,7 +491,26 @@ function stop(type,diff,muteLog,isDeploy) { } }); - return Promise.all(promises).then(function() { + /** @type {() => Promise} */ + const globalFlowPromise = () => { + if (gracefulShutdown) { + // Ensure the global flow is the last one closed + return new Promise((resolve) => { + if (activeFlows.hasOwnProperty("global")) { + const flowStateChanged = diff && (diff.flowChanged.indexOf("global") !== -1 || diff.added.indexOf("global") !== -1 || diff.removed.indexOf("global") !== -1); + log.debug("red/nodes/flows.stop : stopping flow : global"); + activeFlows["global"].stop(flowStateChanged ? null : stopList, removedList).then(resolve); + if (type === "full" || flowStateChanged || diff.removed.indexOf("global")!==-1) { + delete activeFlows["global"]; + } + } + }); + } else { + return Promise.resolve(); + } + }; + + return Promise.all(promises).then(globalFlowPromise).then(function () { for (let id in activeNodesToFlow) { if (activeNodesToFlow.hasOwnProperty(id)) { if (!activeFlows[activeNodesToFlow[id]]) { From af6d7c2b9b066b5f1d51bfcd6ac0fda244e94cf6 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Thu, 6 Nov 2025 20:11:02 +0100 Subject: [PATCH 08/16] Calculate shutdown scope within the runtime --- .../@node-red/runtime/lib/flows/Flow.js | 49 ++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 63d65b3ef..00008bcdf 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -60,6 +60,8 @@ class Flow { this.groupOrder = [] this.activeNodes = {}; this.subflowInstanceNodes = {}; + this.completeNodeMap = {}; + this.linkNodes = []; this.catchNodes = []; this.statusNodes = []; this.path = this.id; @@ -209,6 +211,7 @@ class Flow { var node; var newNode; var id; + this.linkNodes = []; this.catchNodes = []; this.statusNodes = []; this.completeNodeMap = {}; @@ -374,12 +377,52 @@ class Flow { this.cancelShutdown = null; this.failFast = this.flow.failFast || false; this.messageInitiatorNodes = this.flow.messageInitiatorNodes || []; - this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null; if (typeof this.flow.shutdownTimeout === "number") { // TODO: Math.max this.shutdownTimeout = this.flow.shutdownTimeout; } + // Determine the shutdown scope; flows needed to calculate pending msgs + setImmediate(() => { + const seen = new Set(); + const scope = { [this.id]: this }; + const nodes = this.linkNodes.slice(); + seen.add(this.id); + while (nodes.length) { + const node = nodes.pop(); + if (seen.has(node.id)) { + continue; + } + seen.add(node.id); + const flow = activeFlows[node.z]; + if (flow && flow.flow) { + const config = flow.flow.nodes[node.id]; + if (config && config.links) { + config.links.forEach((id) => { + const link = this.getNode(id, false); + if (link && !scope[link._flow.id]) { + scope[link._flow.id] = link._flow; + nodes.push(...link._flow.linkNodes); + } + }); + } + } + } + + this.shutdownScope = []; + const flows = Object.values(scope); + while (flows.length) { + const flow = flows.pop(); + if (flow) { + this.shutdownScope.push(flow); + if (flow.subflowInstanceNodes) { + flows.push(...Object.values(flow.subflowInstanceNodes)); + } + } + + } + }); + var activeCount = Object.keys(this.activeNodes).length; if (activeCount > 0) { this.trace("------------------|--------------|-----------------"); @@ -391,7 +434,9 @@ class Flow { if (this.activeNodes.hasOwnProperty(id)) { node = this.activeNodes[id]; this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":"")); - if (node.type === "catch") { + if (node.type.startsWith("link ")) { + this.linkNodes.push(node); + } else if (node.type === "catch") { this.catchNodes.push(node); } else if (node.type === "status") { this.statusNodes.push(node); From 5113851189f26be9348d8ca81c4a653ee2f07050 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Thu, 6 Nov 2025 21:38:28 +0100 Subject: [PATCH 09/16] Add the Graceful Shutdown UI --- .../editor-client/locales/en-US/editor.json | 5 +- .../@node-red/editor-client/src/js/nodes.js | 13 +- .../editor-client/src/js/ui/editor.js | 9 + .../src/js/ui/editors/panes/shutdown.js | 269 ++++++++++++++++++ .../editor-client/src/js/ui/subflow.js | 10 +- .../editor-client/src/js/ui/workspaces.js | 3 + .../@node-red/registry/lib/subflow.js | 5 +- .../@node-red/runtime/lib/flows/index.js | 36 +-- 8 files changed, 326 insertions(+), 24 deletions(-) create mode 100644 packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js diff --git a/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json b/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json index c1c9316d8..aa123b824 100644 --- a/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json +++ b/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json @@ -476,6 +476,8 @@ "inputType": "Input type", "selectType": "select types...", "loadCredentials": "Loading node credentials", + "initiatorNodes": "During shutdown, start by stopping message initiator nodes:", + "failFast": "Cancel shutdown of listed flows if this flow fails to stop", "inputs": { "input": "input", "select": "select", @@ -1238,7 +1240,8 @@ "description": "Description", "appearance": "Appearance", "preview": "UI Preview", - "defaultValue": "Default value" + "defaultValue": "Default value", + "shutdown": "Graceful Shutdown" }, "tourGuide": { "takeATour": "Take a tour", diff --git a/packages/node_modules/@node-red/editor-client/src/js/nodes.js b/packages/node_modules/@node-red/editor-client/src/js/nodes.js index 4a114560b..1db933834 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/nodes.js +++ b/packages/node_modules/@node-red/editor-client/src/js/nodes.js @@ -112,7 +112,10 @@ RED.nodes = (function() { disabled: {value: false}, locked: {value: false}, info: {value: ""}, - env: {value: []} + env: {value: []}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, } }; @@ -1141,7 +1144,10 @@ RED.nodes = (function() { } else { return errors } - }} + }}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }, icon: function() { return sf.icon||"subflow.svg" }, category: sf.category || "subflows", @@ -1457,6 +1463,9 @@ RED.nodes = (function() { node.in = []; node.out = []; node.env = n.env; + node.failFast = n.failFast; + node.timeout = n.timeout; + node.initiatorNodes = n.initiatorNodes; node.meta = n.meta; if (exportCreds) { diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js index f804e6de2..f6f52ccf2 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js @@ -1963,6 +1963,11 @@ RED.editor = (function() { 'editor-tab-description', 'editor-tab-appearance' ]; + + if (RED.settings.gracefulShutdownEnabled) { + nodeEditPanes.splice(1, 0, 'editor-tab-shutdown'); + } + prepareEditDialog(trayBody, nodeEditPanes, subflow, subflow._def, "subflow-input", defaultTab, function (_activeEditPanes) { activeEditPanes = _activeEditPanes; trayBody.i18n(); @@ -2215,6 +2220,10 @@ RED.editor = (function() { 'editor-tab-envProperties' ]; + if (RED.settings.gracefulShutdownEnabled) { + nodeEditPanes.splice(1, 0, 'editor-tab-shutdown'); + } + if (!workspace.hasOwnProperty("disabled")) { workspace.disabled = false; } diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js new file mode 100644 index 000000000..1335d965b --- /dev/null +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js @@ -0,0 +1,269 @@ +; (function () { + const tabcontent = ` +
+ + +
+
+ + +
+
+
+
+
+ +
+
+ +
+
+
+
+
`; + + RED.editor.registerEditPane("editor-tab-shutdown", function (node) { + return { + label: RED._("editor-tab.shutdown"), + name: RED._("editor-tab.shutdown"), + iconClass: "fa fa-power-off", + + create: function (container) { + const dialogForm = $('
').appendTo(container); + + if (RED.settings.gracefulShutdownEnabled) { + $(tabcontent).appendTo(dialogForm); + buildScopeForm(dialogForm, node); + buildInitiatorForm(dialogForm, node); + $("#node-input-timeout").spinner({ min: 1 }).val((node.timeout || 10000) / 1000); + $("#node-input-failFast").prop("checked", node.failFast || false); + this._resize = function () { + const rows = $(dialogForm).find(">div:not(.node-input-initiator-list-row)"); + let height = $(dialogForm).height(); + for (let i = 0; i < rows.length; i++) { + height -= $(rows[i]).outerHeight(true); + } + $(dialogForm).find(">div.node-input-initiator-list-row").css("height", height + "px"); + }; + } else { + $("

Graceful shutdown disabled

").appendTo(dialogForm); + } + }, + close: function () { }, + show: function () { }, + resize: function (_size) { + if (this._resize) { + this._resize(); + } + }, + /** @type {(editState: { changes?: Record, changed?: boolean }) => void} */ + apply: function (editState) { + const failFast = $("#node-input-failFast").prop("checked"); + if (node.failFast != failFast) { + editState.changes = editState.changes || {}; + editState.changes.failFast = node.failFast; + editState.changed = true; + node.failFast = failFast; + } + + let timeout = parseFloat($("#node-input-timeout").val() || "10") * 1000; + if (Number.isNaN(timeout)) { + timeout = 10000; + } + if (node.timeout !== timeout) { + editState.changes = editState.changes || {}; + editState.changes.timeout = node.timeout; + editState.changed = true; + node.timeout = timeout; + } + + const initiatorNodes = $("#node-input-initiator-target-container-div").treeList("selected").map((i) => i.node.id); + if (JSON.stringify(initiatorNodes) !== JSON.stringify(node.initialNodes || [])) { + editState.changes = editState.changes || {}; + editState.changes.initiatorNodes = node.initiatorNodes; + editState.changed = true; + node.initiatorNodes = initiatorNodes; + } + } + } + }); + + /** @type {(node: object) => JQuery} */ + function getNodeLabel(node) { + var div = $('
',{class:"red-ui-node-list-item red-ui-info-outline-item"}); + RED.utils.createNodeIcon(node, true).appendTo(div); + div.find(".red-ui-node-label").addClass("red-ui-info-outline-item-label") + return div; + } + + /** @type {(container: JQuery, node: object) => void} */ + function buildScopeForm(container, node) { + const scope = getScope(node); + + if (!scope.length) { + $(container).find(".node-input-scope-row").hide(); + return; + } + + const items = []; + const flowItemMap = {}; + for (const id of scope) { + if (id === node.id) { + continue; + } + + const isSuflow = id.startsWith("subflow:"); + const workspace = isSuflow ? RED.nodes.subflow(id.substring(8)) : RED.nodes.workspace(id); + if (workspace) { + flowItemMap[workspace.id] = { + element: getNodeLabel(workspace), + selected: true, + }; + + items.push(flowItemMap[workspace.id]); + } + } + + const dirList = $(container).find("#node-input-scope-target-container-div"); + dirList.css({ width: "100%", height: "100%" }).treeList({ multi: true, data: items }); + } + + /** @type {(container: JQuery, node: object) => void} */ + function buildInitiatorForm(container, node) { + const dirList = $(container).find("#node-input-initiator-target-container-div"); + + // We assume that a message initiator node must have at least one output + const nodeFilter = function (n) { + if (n.type.startsWith("link ")) { + // Link nodes transmits messages, but does not generate them. + return false; + } + if (n.hasOwnProperty("outputs")) { + return n.outputs > 0; + } + return true; + }; + const candidateNodes = RED.nodes.filterNodes({ z: node.id }).filter(nodeFilter); + const search = $(container).find("#node-input-initiator-target-filter").searchBox({ + style: "compact", + delay: 300, + change: function () { + const val = $(this).val().trim().toLowerCase(); + if (val === "") { + dirList.treeList("filter", null); + search.searchBox("count", ""); + } else { + const count = dirList.treeList("filter", function (item) { + return item.label.toLowerCase().indexOf(val) > -1 || item.node.type.toLowerCase().indexOf(val) > -1 + }); + search.searchBox("count", count + " / " + candidateNodes.length); + } + } + }); + + dirList.css({ width: "100%", height: "100%" }) + .treeList({ multi: true }).on("treelistitemmouseover", function (e, item) { + item.node.highlighted = true; + item.node.dirty = true; + RED.view.redraw(); + }).on("treelistitemmouseout", function (e, item) { + item.node.highlighted = false; + item.node.dirty = true; + RED.view.redraw(); + }); + + const items = []; + const nodeItemMap = {}; + const scope = node.initiatorNodes || []; + candidateNodes.forEach(function (n) { + const isChecked = scope.indexOf(n.id) !== -1; + const nodeDef = RED.nodes.getType(n.type); + let label, sublabel; + if (nodeDef) { + const l = nodeDef.label; + label = (typeof l === "function" ? l.call(n) : l) || ""; + sublabel = n.type; + if (sublabel.indexOf("subflow:") === 0) { + const subflowId = sublabel.substring(8); + const subflow = RED.nodes.subflow(subflowId); + sublabel = "subflow : " + subflow.name; + } + } + if (!nodeDef || !label) { + label = n.type; + } + nodeItemMap[n.id] = { + node: n, + label: label, + sublabel: sublabel, + selected: isChecked, + checkbox: true + }; + + items.push(nodeItemMap[n.id]); + }); + + dirList.treeList("data", items); + + $(container).find("#node-input-initiator-target-select").on("click", function (event) { + event.preventDefault(); + const preselected = dirList.treeList("selected").map((item) => item.node.id); + RED.tray.hide(); + RED.view.selectNodes({ + selected: preselected, + onselect: function (selection) { + RED.tray.show(); + const newlySelected = {}; + selection.forEach(function (n) { + newlySelected[n.id] = true; + if (nodeItemMap[n.id]) { + nodeItemMap[n.id].treeList.select(true); + } + }); + preselected.forEach(function (id) { + if (!newlySelected[id]) { + nodeItemMap[id].treeList.select(false); + } + }); + }, + oncancel: function () { + RED.tray.show(); + }, + filter: nodeFilter, + }); + }); + } + + /** @type {(flow: object) => string[]} */ + function getScope(flow) { + const activeWorkspace = flow.id; + /** @type {(node: object) => boolean} */ + const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); + const nodes = RED.nodes.filterNodes({ z: activeWorkspace }).filter(nodeFilter); + const seen = new Set(); + const scope = { [activeWorkspace]: true }; + seen.add(activeWorkspace); + while (nodes.length) { + const node = nodes.pop(); + if (seen.has(node.id)) { + continue; + } + seen.add(node.id); + if (node.links) { + node.links.forEach((id) => { + const link = RED.nodes.node(id); + if (link && !scope[link.z]) { + scope[link.z] = true; + nodes.push(...RED.nodes.filterNodes({ z: link.z }).filter(nodeFilter)); + } + }) + } else if (node.type.startsWith("subflow:") && !scope[node.type]) { + scope[node.type] = true; + nodes.push(...RED.nodes.filterNodes({ z: node.type.substring(8) }).filter(nodeFilter)); + } + } + delete scope[activeWorkspace]; + return Object.keys(scope); + } + +})(); diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js b/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js index 3e1b9a410..ae4243a09 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js @@ -608,7 +608,10 @@ RED.subflow = (function() { name:name, info:"", in: [], - out: [] + out: [], + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }; RED.nodes.addSubflow(subflow); RED.history.push({ @@ -777,7 +780,10 @@ RED.subflow = (function() { i:index, id:RED.nodes.id(), wires:[{id:v.source.id,port:v.sourcePort}] - }}) + }}), + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }; RED.nodes.addSubflow(subflow); diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js b/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js index 78e1399cd..da720a418 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js @@ -84,6 +84,9 @@ RED.workspaces = (function() { info: "", label: RED._('workspace.defaultName',{number:workspaceIndex}), env: [], + failFast: false, + timeout: 10000, + initiatorNodes: [], hideable: true, }; if (!skipHistoryEntry) { diff --git a/packages/node_modules/@node-red/registry/lib/subflow.js b/packages/node_modules/@node-red/registry/lib/subflow.js index 39fe083ab..29b786592 100644 --- a/packages/node_modules/@node-red/registry/lib/subflow.js +++ b/packages/node_modules/@node-red/registry/lib/subflow.js @@ -16,7 +16,10 @@ function generateSubflowConfig(subflow) { const icon = subflow.icon || "arrow-in.svg"; const defaults = { - name: {value: ""} + name: {value: ""}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, } const credentials = {} diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index f97b15b5e..e7c332bb3 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -604,14 +604,14 @@ async function addFlow(flow, user) { if (flow.hasOwnProperty('env')) { tabNode.env = flow.env; } - if (flow.hasOwnProperty('shutdownScope')) { - tabNode.shutdownScope = flow.shutdownScope; + if (flow.hasOwnProperty('failFast')) { + tabNode.failFast = flow.failFast; } - if (flow.hasOwnProperty('shutdownTimeout')) { - tabNode.shutdownTimeout = flow.shutdownTimeout; + if (flow.hasOwnProperty('timeout')) { + tabNode.timeout = flow.timeout; } - if (flow.hasOwnProperty('messageInitiatorNodes')) { - tabNode.messageInitiatorNodes = flow.messageInitiatorNodes; + if (flow.hasOwnProperty('initiatorNodes')) { + tabNode.initiatorNodes = flow.initiatorNodes; } var nodes = [tabNode]; @@ -676,14 +676,14 @@ function getFlow(id) { if (flow.hasOwnProperty('env')) { result.env = flow.env; } - if (flow.hasOwnProperty('shutdownScope')) { - result.shutdownScope = flow.shutdownScope; + if (flow.hasOwnProperty('failFast')) { + result.failFast = flow.failFast; } - if (flow.hasOwnProperty('shutdownTimeout')) { - result.shutdownTimeout = flow.shutdownTimeout; + if (flow.hasOwnProperty('timeout')) { + result.timeout = flow.timeout; } - if (flow.hasOwnProperty('messageInitiatorNodes')) { - result.messageInitiatorNodes = flow.messageInitiatorNodes; + if (flow.hasOwnProperty('initiatorNodes')) { + result.initiatorNodes = flow.initiatorNodes; } if (id !== 'global') { result.nodes = []; @@ -813,14 +813,14 @@ async function updateFlow(id,newFlow, user) { if (newFlow.hasOwnProperty('env')) { tabNode.env = newFlow.env; } - if (flow.hasOwnProperty('shutdownScope')) { - tabNode.shutdownScope = flow.shutdownScope; + if (flow.hasOwnProperty('failFast')) { + tabNode.failFast = flow.failFast; } - if (flow.hasOwnProperty('shutdownTimeout')) { - tabNode.shutdownTimeout = flow.shutdownTimeout; + if (flow.hasOwnProperty('timeout')) { + tabNode.timeout = flow.timeout; } - if (flow.hasOwnProperty('messageInitiatorNodes')) { - tabNode.messageInitiatorNodes = flow.messageInitiatorNodes; + if (flow.hasOwnProperty('initiatorNodes')) { + tabNode.initiatorNodes = flow.initiatorNodes; } if (newFlow.hasOwnProperty('credentials')) { tabNode.credentials = newFlow.credentials; From d49a2c9e3d8472ab6a9308e34f9f8cc5863ea2b9 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Fri, 7 Nov 2025 08:34:23 +0100 Subject: [PATCH 10/16] Missed to replace props name in the runtime --- packages/node_modules/@node-red/runtime/lib/flows/Flow.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 00008bcdf..2a308ef40 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -376,10 +376,10 @@ class Flow { this.shutdown = null; this.cancelShutdown = null; this.failFast = this.flow.failFast || false; - this.messageInitiatorNodes = this.flow.messageInitiatorNodes || []; - if (typeof this.flow.shutdownTimeout === "number") { + this.messageInitiatorNodes = this.flow.initiatorNodes || []; + if (typeof this.flow.timeout === "number") { // TODO: Math.max - this.shutdownTimeout = this.flow.shutdownTimeout; + this.shutdownTimeout = this.flow.timeout; } // Determine the shutdown scope; flows needed to calculate pending msgs From 6d005533e87d7f6a9de0e40dd59d2e57680bb260 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sat, 8 Nov 2025 17:13:57 +0100 Subject: [PATCH 11/16] Some fixes (scope, failFast and subflow) --- .../src/js/ui/editors/panes/shutdown.js | 4 ++- .../@node-red/runtime/lib/flows/Flow.js | 33 +++++++++++++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js index 1335d965b..ce466d712 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js @@ -78,7 +78,7 @@ } const initiatorNodes = $("#node-input-initiator-target-container-div").treeList("selected").map((i) => i.node.id); - if (JSON.stringify(initiatorNodes) !== JSON.stringify(node.initialNodes || [])) { + if (JSON.stringify(initiatorNodes) !== JSON.stringify(node.initiatorNodes || [])) { editState.changes = editState.changes || {}; editState.changes.initiatorNodes = node.initiatorNodes; editState.changed = true; @@ -124,6 +124,7 @@ } } + // TODO: filter items const dirList = $(container).find("#node-input-scope-target-container-div"); dirList.css({ width: "100%", height: "100%" }).treeList({ multi: true, data: items }); } @@ -236,6 +237,7 @@ /** @type {(flow: object) => string[]} */ function getScope(flow) { + // TODO: Scope = flows + initiator subflow const activeWorkspace = flow.id; /** @type {(node: object) => boolean} */ const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 2a308ef40..6282d1c54 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -376,7 +376,7 @@ class Flow { this.shutdown = null; this.cancelShutdown = null; this.failFast = this.flow.failFast || false; - this.messageInitiatorNodes = this.flow.initiatorNodes || []; + this.messageInitiatorNodes = this.flow.initiatorNodes || this.subflowDef?.initiatorNodes || []; if (typeof this.flow.timeout === "number") { // TODO: Math.max this.shutdownTimeout = this.flow.timeout; @@ -409,18 +409,34 @@ class Flow { } } + seen.clear(); + const parents = []; this.shutdownScope = []; const flows = Object.values(scope); + this._shutdownScope = Object.values(scope); while (flows.length) { const flow = flows.pop(); - if (flow) { + if (flow && !seen.has(flow.id)) { + seen.add(flow.id); this.shutdownScope.push(flow); + if (flow.parent.id && flow.parent.id !== "global") { + flows.push(flow.parent); + parents.push(flow.parent); + } if (flow.subflowInstanceNodes) { flows.push(...Object.values(flow.subflowInstanceNodes)); } } } + + // TODO: improve this shit + setImmediate(() => { + for (const parent of parents) { + this.shutdownScope = [...this.shutdownScope, ...parent._shutdownScope].filter((flow, index, self) => self.indexOf(flow) === index); + } + console.warn(this.id, this.shutdownScope.map(n=>n.id)); + }) }); var activeCount = Object.keys(this.activeNodes).length; @@ -509,7 +525,7 @@ class Flow { const initialPromises = []; /** @type {Promise} */ let shutdownPromise = Promise.resolve(); - if (gracefulShutdown && this.messageInitiatorNodes.length) { + if (gracefulShutdown && this.messageInitiatorNodes) { // Start by closing initiator nodes initialPromises.push(...this.stopNodes(this.messageInitiatorNodes, removedMap)); @@ -541,9 +557,6 @@ class Flow { return resolve(); } } else if (this.shutdownScope.length) { - if (this.pendingMsgCount === 0) { - return resolve(); - } let count = 0; this.shutdownScope.forEach((flow) => { if (flow instanceof Flow) { @@ -551,6 +564,12 @@ class Flow { } }); if (count === 0) { + this.shutdown = null; + if (this.messageInitiatorNodes.includes(this.id)) { + // This is a subflow initiator node + // Other flows from the scope are stil running at this time + return resolve(); + } this.shutdownScope.forEach((flow) => { if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) { // Retrigger validation in each flow @@ -578,7 +597,7 @@ class Flow { /** @type {NodeJS.Timeout|null} */ let timer = null; const timeoutPromise = new Promise((_resolve, reject) => { - if (this.failFast && this.shutdownScope) { + if (this.shutdownScope) { this.cancelShutdown = () => reject("Graceful shutdown cancelled"); } timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout); From 02d1a582b63fa6770527a00cb4514626a01b6e84 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:31:10 +0100 Subject: [PATCH 12/16] Filters message initiator subflows in the UI --- .../editor-client/src/js/ui/editors/panes/shutdown.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js index ce466d712..06f176ade 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js @@ -124,7 +124,7 @@ } } - // TODO: filter items + // TODO: sort items const dirList = $(container).find("#node-input-scope-target-container-div"); dirList.css({ width: "100%", height: "100%" }).treeList({ multi: true, data: items }); } @@ -237,7 +237,6 @@ /** @type {(flow: object) => string[]} */ function getScope(flow) { - // TODO: Scope = flows + initiator subflow const activeWorkspace = flow.id; /** @type {(node: object) => boolean} */ const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); @@ -260,8 +259,11 @@ } }) } else if (node.type.startsWith("subflow:") && !scope[node.type]) { - scope[node.type] = true; - nodes.push(...RED.nodes.filterNodes({ z: node.type.substring(8) }).filter(nodeFilter)); + const workspace = RED.nodes.workspace(node.z) || RED.nodes.subflow(node.z); + if (workspace && workspace.initiatorNodes && workspace.initiatorNodes.includes(node.id)) { + scope[node.type] = true; + nodes.push(...RED.nodes.filterNodes({ z: node.type.substring(8) }).filter(nodeFilter)); + } } } delete scope[activeWorkspace]; From 437282e2b348a57ee4ecbfefa15f06b0068fbb44 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:32:42 +0100 Subject: [PATCH 13/16] Improve the logic to determine the shutdown scope --- .../@node-red/runtime/lib/flows/Flow.js | 88 ++++++++----------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 6282d1c54..f24f1872a 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -383,61 +383,51 @@ class Flow { } // Determine the shutdown scope; flows needed to calculate pending msgs - setImmediate(() => { - const seen = new Set(); - const scope = { [this.id]: this }; - const nodes = this.linkNodes.slice(); - seen.add(this.id); - while (nodes.length) { - const node = nodes.pop(); - if (seen.has(node.id)) { - continue; - } - seen.add(node.id); - const flow = activeFlows[node.z]; - if (flow && flow.flow) { - const config = flow.flow.nodes[node.id]; - if (config && config.links) { - config.links.forEach((id) => { - const link = this.getNode(id, false); - if (link && !scope[link._flow.id]) { - scope[link._flow.id] = link._flow; - nodes.push(...link._flow.linkNodes); - } - }); + /** @type {(node: import("@node-red/registry").Node) => boolean} */ + const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); + const nodes = Object.values(this.flow.nodes || {}).filter(nodeFilter); + const seen = new Set(); + const scope = { [this.id]: true }; + seen.add(this.id); + if (this.subflowInstance && this.subflowInstance.z) { + scope[this.subflowInstance.z] = true; + nodes.push(...Object.values(this.global.flows[this.subflowInstance.z]?.nodes || {}).filter(nodeFilter)); + } + while (nodes.length) { + const node = nodes.pop(); + if (seen.has(node.id)) { + continue; + } + seen.add(node.id); + if (node.links) { + node.links.forEach((id) => { + const link = this.global.allNodes[id]; + if (link && !scope[link.z]) { + scope[link.z] = true; + nodes.push(...Object.values(this.global.flows[link.z]?.nodes || {}).filter(nodeFilter)); } + }) + } else if (node.type.startsWith("subflow:") && !scope[node.id]) { + scope[node.id] = true; + if (!scope[node.type]) { + scope[node.type] = true; + nodes.push(...Object.values(this.global.flows[node.type.substring(8)]?.nodes || {}).filter(nodeFilter)); } } + } - seen.clear(); - const parents = []; - this.shutdownScope = []; - const flows = Object.values(scope); - this._shutdownScope = Object.values(scope); - while (flows.length) { - const flow = flows.pop(); - if (flow && !seen.has(flow.id)) { - seen.add(flow.id); - this.shutdownScope.push(flow); - if (flow.parent.id && flow.parent.id !== "global") { - flows.push(flow.parent); - parents.push(flow.parent); - } - if (flow.subflowInstanceNodes) { - flows.push(...Object.values(flow.subflowInstanceNodes)); - } - } + const scopeList = Object.keys(scope); + this.trace("---------------------------------------------------"); + this.trace(" shutdown scope"); + this.trace("---------------------------------------------------"); + this.trace(scopeList.join(", ")); - } - - // TODO: improve this shit + if (scopeList.length > 1) { setImmediate(() => { - for (const parent of parents) { - this.shutdownScope = [...this.shutdownScope, ...parent._shutdownScope].filter((flow, index, self) => self.indexOf(flow) === index); - } - console.warn(this.id, this.shutdownScope.map(n=>n.id)); - }) - }); + // Lookup for the flow instance + this.shutdownScope = scopeList.map((id) => this.getNode(id, false)?._flow || activeFlows[id]); + }); + } var activeCount = Object.keys(this.activeNodes).length; if (activeCount > 0) { From 0bfcbce4700f02bb4a95861a43041a712617190c Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:35:26 +0100 Subject: [PATCH 14/16] Filter initiator nodes with nodes to be closed --- packages/node_modules/@node-red/runtime/lib/flows/Flow.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index f24f1872a..eb8b8fa6d 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -502,7 +502,9 @@ class Flow { let nodesToStop = []; let configsToStop = []; + const stopMap = {}; stopList.forEach(id => { + stopMap[id] = true; if (this.flow.configs[id]) { configsToStop.push(id); } else { @@ -517,7 +519,8 @@ class Flow { let shutdownPromise = Promise.resolve(); if (gracefulShutdown && this.messageInitiatorNodes) { // Start by closing initiator nodes - initialPromises.push(...this.stopNodes(this.messageInitiatorNodes, removedMap)); + const filteredNodes = this.messageInitiatorNodes.filter((id) => stopMap[id]); + initialPromises.push(...this.stopNodes(filteredNodes, removedMap)); /** @type {Set} */ const lookupNodes = new Set(); From 1d006de7c6170b12493a2d1e3bead3c7667038c0 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:37:22 +0100 Subject: [PATCH 15/16] Allow scoped flows to close itself if `failFast` enabled --- .../node_modules/@node-red/runtime/lib/flows/Flow.js | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index eb8b8fa6d..6c356a5d5 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -550,6 +550,16 @@ class Flow { return resolve(); } } else if (this.shutdownScope.length) { + if (this.pendingMsgCount === 0 && this.failFast) { + // Need to double check due to flow crossing (z) + if (Object.keys(this.msgInProgressNodeMap).length === 0) { + // Allow a scoped flow to stop itself + // Not capable to detect a loop or a callback; it is up + // to the user to choose whether or not to enable flow closure. + this.shutdown = null; + return resolve(); + } + } let count = 0; this.shutdownScope.forEach((flow) => { if (flow instanceof Flow) { @@ -560,7 +570,7 @@ class Flow { this.shutdown = null; if (this.messageInitiatorNodes.includes(this.id)) { // This is a subflow initiator node - // Other flows from the scope are stil running at this time + // Other flows from the scope are still running at that time return resolve(); } this.shutdownScope.forEach((flow) => { From 93a00c1cc270f475ce80964aa4878b64cbbe5a47 Mon Sep 17 00:00:00 2001 From: GogoVega <92022724+GogoVega@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:39:54 +0100 Subject: [PATCH 16/16] Update unit tests --- .../@node-red/nodes/core/common/20-inject.js | 6 +- .../@node-red/runtime/lib/flows/Flow.js | 2 +- test/nodes/core/common/60-link_spec.js | 2 +- .../@node-red/runtime/lib/flows/Flow_spec.js | 114 +++++++++--------- .../runtime/lib/flows/Subflow_spec.js | 68 +++++------ 5 files changed, 96 insertions(+), 96 deletions(-) diff --git a/packages/node_modules/@node-red/nodes/core/common/20-inject.js b/packages/node_modules/@node-red/nodes/core/common/20-inject.js index da1469494..90b168f91 100644 --- a/packages/node_modules/@node-red/nodes/core/common/20-inject.js +++ b/packages/node_modules/@node-red/nodes/core/common/20-inject.js @@ -77,17 +77,17 @@ module.exports = function(RED) { this.repeat = this.repeat * 1000; this.debug(RED._("inject.repeat", this)); this.interval_id = setInterval(function() { - node.emit("input", {}); + node.receive(); }, this.repeat); } else if (this.crontab) { this.debug(RED._("inject.crontab", this)); - this.cronjob = scheduleTask(this.crontab,() => { node.emit("input", {})}); + this.cronjob = scheduleTask(this.crontab,() => { node.receive()}); } }; if (this.once) { this.onceTimeout = setTimeout( function() { - node.emit("input",{}); + node.receive(); node.repeaterSetup(); }, this.onceDelay); } else { diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index 6c356a5d5..e22d942c0 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -710,7 +710,7 @@ class Flow { } } } - } else { + } else if (typeof this.parent.getNode === "function") { // Node not found inside this flow - ask the parent return this.parent.getNode(id); } diff --git a/test/nodes/core/common/60-link_spec.js b/test/nodes/core/common/60-link_spec.js index be7ffc39f..743ad8540 100644 --- a/test/nodes/core/common/60-link_spec.js +++ b/test/nodes/core/common/60-link_spec.js @@ -221,7 +221,7 @@ describe('link Node', function() { const flow = [ { id: "tab-flow-1", type: "tab", label: "Flow 1" }, { id: "link-in-1", z: "tab-flow-1", type: "link in", name: "double payload", wires: [["func"]] }, - { id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]] }, + { id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]], x: 1, y: 1 }, { id: "link-out-1", z: "tab-flow-1", type: "link out", mode: "" }, //not return mode, cause link-call timeout { id: "link-call", z: "tab-flow-1", type: "link call", linkType: "static", "timeout": "0.5", links: ["link-in-1"], wires: [["n4"]] }, { id: "catch-all", z: "tab-flow-1", type: "catch", scope: ["link-call"], uncaught: true, wires: [["n4"]] }, diff --git a/test/unit/@node-red/runtime/lib/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js index 8406a01b7..e3320f1d7 100644 --- a/test/unit/@node-red/runtime/lib/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js @@ -204,8 +204,8 @@ describe('Flow', function() { it("instantiates an initial configuration and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -256,8 +256,8 @@ describe('Flow', function() { it("instantiates config nodes in the right order",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"5"}, // This node depends on #5 {id:"5",z:"t1",type:"test"} @@ -308,8 +308,8 @@ describe('Flow', function() { it("rewires nodes specified by diff", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); @@ -318,7 +318,7 @@ describe('Flow', function() { await flow.start(); //TODO: use update to pass in new wiring and verify the change createCount.should.equal(3); - flow.start({rewired:["2"]}); + flow.start({rewired:[["2"]]}); createCount.should.equal(3); rewiredNodes.should.have.a.property("2"); }); @@ -355,8 +355,8 @@ describe('Flow', function() { it("ignores disabled nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"}, {id:"5",z:"t1",type:"test",d:true,foo:"a"} @@ -406,8 +406,8 @@ describe('Flow', function() { it("stops all nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"asyncTest",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -428,8 +428,8 @@ describe('Flow', function() { it("stops specified nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -450,9 +450,9 @@ describe('Flow', function() { it("stops config nodes last", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, {id:"c1",z:"t1",type:"test"}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"c2",z:"t1",type:"test"}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"c3",z:"t1",type:"test"} @@ -485,8 +485,8 @@ describe('Flow', function() { }}); var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -514,8 +514,8 @@ describe('Flow', function() { it("gets a node known to the flow", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -529,8 +529,8 @@ describe('Flow', function() { it("passes to parent if node not known locally", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -547,8 +547,8 @@ describe('Flow', function() { it("does not pass to parent if cancelBubble set", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -567,8 +567,8 @@ describe('Flow', function() { it("passes a status event to the adjacent status node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]} @@ -605,10 +605,10 @@ describe('Flow', function() { it("passes a status event to the adjacent scoped status node ", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, - {id:"sn",x:10,y:10,z:"t1",type:"status",scope:["2"],foo:"a",wires:[]}, + {id:"sn",x:10,y:10,z:"t1",type:"status",scope:[["2"]],foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"status",scope:["1"],foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -639,7 +639,7 @@ describe('Flow', function() { {id: "g1", type: "group", g: "g3", z:"t1" }, {id: "g2", type: "group", z:"t1" }, {id: "g3", type: "group", z:"t1" }, - {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]}, // sn - in the same group as source node {id:"sn",x:10,y:10,z:"t1",g:"g1", type:"status",scope:"group",wires:[]}, // sn2 - in a different group hierarchy to the source node @@ -669,8 +669,8 @@ describe('Flow', function() { it("passes an error event to the adjacent catch node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]}, @@ -712,10 +712,10 @@ describe('Flow', function() { it("passes an error event to the adjacent scoped catch node ", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, - {id:"sn",x:10,y:10,z:"t1",type:"catch",scope:["2"],foo:"a",wires:[]}, + {id:"sn",x:10,y:10,z:"t1",type:"catch",scope:[["2"]],foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"catch",scope:["1"],foo:"a",wires:[]}, {id:"sn3",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]}, {id:"sn4",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]} @@ -766,7 +766,7 @@ describe('Flow', function() { {id: "g1", type: "group", g: "g3", z:"t1" }, {id: "g2", type: "group", z:"t1" }, {id: "g3", type: "group", z:"t1" }, - {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]}, // sn - in the same group as source node {id:"sn",x:10,y:10,z:"t1",g:"g1", type:"catch",scope:"group",wires:[]}, // sn2 - in a different group hierarchy to the source node @@ -795,8 +795,8 @@ describe('Flow', function() { it("moves any existing error object sideways", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]} ]); @@ -828,8 +828,8 @@ describe('Flow', function() { it("passes a complete event to the adjacent Complete node",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"testDone",foo:"a",wires:[]}, {id:"cn",x:10,y:10,z:"t1",type:"complete",scope:["1","3"],foo:"a",wires:[]} ]); @@ -856,8 +856,8 @@ describe('Flow', function() { it("sends a message - no cloning", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -875,7 +875,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; try { msg.should.be.exactly(message); @@ -898,8 +898,8 @@ describe('Flow', function() { it("sends a message - cloning", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -916,7 +916,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { try { // Message should be cloned msg.should.be.eql(message); @@ -939,8 +939,8 @@ describe('Flow', function() { it("sends multiple messages", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -957,7 +957,7 @@ describe('Flow', function() { else { resolve() } } var messageCount = 0; - n2.receive = function(msg) { + n2._receive = function(msg) { try { msg.should.be.exactly(messages[messageCount++]); if (messageCount === 2) { @@ -1032,8 +1032,8 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -1049,7 +1049,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; try { msg.should.be.eql(message); @@ -1103,14 +1103,14 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); flow = Flow.create({},config,config.flows["t1"]); await flow.start(); n1 = flow.getNode('1'); n2 = flow.getNode('2'); - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; } n1.error = function(err) { @@ -1180,14 +1180,14 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); flow = Flow.create({},config,config.flows["t1"]); await flow.start(); n1 = flow.getNode('1'); n2 = flow.getNode('2'); - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; } n1.error = function(err) { diff --git a/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js index 618205f6d..d8d3ab7b4 100644 --- a/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js @@ -295,8 +295,8 @@ describe('Subflow', function() { it("instantiates a subflow and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", @@ -359,8 +359,8 @@ describe('Subflow', function() { it("instantiates a subflow inside a subflow and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 1","info":"", @@ -389,8 +389,8 @@ describe('Subflow', function() { it("rewires a subflow node on update/start", async function(){ var rawConfig = [ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", @@ -443,8 +443,8 @@ describe('Subflow', function() { it("stops subflow instance nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -466,8 +466,8 @@ describe('Subflow', function() { it("passes a status event to the subflow's parent tab status node - all scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -496,8 +496,8 @@ describe('Subflow', function() { it("passes a status event to the subflow's parent tab status node - targetted scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -534,8 +534,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.payload as string", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -571,8 +571,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.payload as status obj", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -610,8 +610,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.status", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -649,8 +649,8 @@ describe('Subflow', function() { it("does not emit a regular status event if it contains a subflow-status node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -682,8 +682,8 @@ describe('Subflow', function() { it("passes an error event to the subflow's parent tab catch node - all scope",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -714,8 +714,8 @@ describe('Subflow', function() { it("passes an error event to the subflow's parent tab catch node - targetted scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -752,8 +752,8 @@ describe('Subflow', function() { it("can access process env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ], @@ -779,8 +779,8 @@ describe('Subflow', function() { it("can access subflow env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"",env: [{name: '__KEY__', value: '__VAL1__', type: 'str'}], "in":[ {wires:[{id:"sf1-1"}]} ], @@ -815,8 +815,8 @@ describe('Subflow', function() { it("can access nested subflow env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab", env: [{name: '__KEY1__', value: 't1', type: 'str'}]}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 1",info:"", env: [{name: '__KEY2__', value: 'sf1', type: 'str'}], @@ -854,8 +854,8 @@ describe('Subflow', function() { it("can access name of subflow as env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ], @@ -879,8 +879,8 @@ describe('Subflow', function() { it("can access id of subflow as env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ],