diff --git a/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json b/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json index 8018b745f..5a9da81fd 100644 --- a/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json +++ b/packages/node_modules/@node-red/editor-client/locales/en-US/editor.json @@ -477,6 +477,8 @@ "inputType": "Input type", "selectType": "select types...", "loadCredentials": "Loading node credentials", + "initiatorNodes": "During shutdown, start by stopping message initiator nodes:", + "failFast": "Cancel shutdown of listed flows if this flow fails to stop", "inputs": { "input": "input", "select": "select", @@ -1239,7 +1241,8 @@ "description": "Description", "appearance": "Appearance", "preview": "UI Preview", - "defaultValue": "Default value" + "defaultValue": "Default value", + "shutdown": "Graceful Shutdown" }, "tourGuide": { "takeATour": "Take a tour", diff --git a/packages/node_modules/@node-red/editor-client/src/js/nodes.js b/packages/node_modules/@node-red/editor-client/src/js/nodes.js index 4a114560b..1db933834 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/nodes.js +++ b/packages/node_modules/@node-red/editor-client/src/js/nodes.js @@ -112,7 +112,10 @@ RED.nodes = (function() { disabled: {value: false}, locked: {value: false}, info: {value: ""}, - env: {value: []} + env: {value: []}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, } }; @@ -1141,7 +1144,10 @@ RED.nodes = (function() { } else { return errors } - }} + }}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }, icon: function() { return sf.icon||"subflow.svg" }, category: sf.category || "subflows", @@ -1457,6 +1463,9 @@ RED.nodes = (function() { node.in = []; node.out = []; node.env = n.env; + node.failFast = n.failFast; + node.timeout = n.timeout; + node.initiatorNodes = n.initiatorNodes; node.meta = n.meta; if (exportCreds) { diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js index 68883d4de..a2e199e5a 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editor.js @@ -1963,6 +1963,11 @@ RED.editor = (function() { 'editor-tab-description', 'editor-tab-appearance' ]; + + if (RED.settings.gracefulShutdownEnabled) { + nodeEditPanes.splice(1, 0, 'editor-tab-shutdown'); + } + prepareEditDialog(trayBody, nodeEditPanes, subflow, subflow._def, "subflow-input", defaultTab, function (_activeEditPanes) { activeEditPanes = _activeEditPanes; trayBody.i18n(); @@ -2215,6 +2220,10 @@ RED.editor = (function() { 'editor-tab-envProperties' ]; + if (RED.settings.gracefulShutdownEnabled) { + nodeEditPanes.splice(1, 0, 'editor-tab-shutdown'); + } + if (!workspace.hasOwnProperty("disabled")) { workspace.disabled = false; } diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js new file mode 100644 index 000000000..06f176ade --- /dev/null +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/editors/panes/shutdown.js @@ -0,0 +1,273 @@ +; (function () { + const tabcontent = ` +
+ + +
+
+ + +
+
+
+
+
+ +
+
+ +
+
+
+
+
`; + + RED.editor.registerEditPane("editor-tab-shutdown", function (node) { + return { + label: RED._("editor-tab.shutdown"), + name: RED._("editor-tab.shutdown"), + iconClass: "fa fa-power-off", + + create: function (container) { + const dialogForm = $('
').appendTo(container); + + if (RED.settings.gracefulShutdownEnabled) { + $(tabcontent).appendTo(dialogForm); + buildScopeForm(dialogForm, node); + buildInitiatorForm(dialogForm, node); + $("#node-input-timeout").spinner({ min: 1 }).val((node.timeout || 10000) / 1000); + $("#node-input-failFast").prop("checked", node.failFast || false); + this._resize = function () { + const rows = $(dialogForm).find(">div:not(.node-input-initiator-list-row)"); + let height = $(dialogForm).height(); + for (let i = 0; i < rows.length; i++) { + height -= $(rows[i]).outerHeight(true); + } + $(dialogForm).find(">div.node-input-initiator-list-row").css("height", height + "px"); + }; + } else { + $("

Graceful shutdown disabled

").appendTo(dialogForm); + } + }, + close: function () { }, + show: function () { }, + resize: function (_size) { + if (this._resize) { + this._resize(); + } + }, + /** @type {(editState: { changes?: Record, changed?: boolean }) => void} */ + apply: function (editState) { + const failFast = $("#node-input-failFast").prop("checked"); + if (node.failFast != failFast) { + editState.changes = editState.changes || {}; + editState.changes.failFast = node.failFast; + editState.changed = true; + node.failFast = failFast; + } + + let timeout = parseFloat($("#node-input-timeout").val() || "10") * 1000; + if (Number.isNaN(timeout)) { + timeout = 10000; + } + if (node.timeout !== timeout) { + editState.changes = editState.changes || {}; + editState.changes.timeout = node.timeout; + editState.changed = true; + node.timeout = timeout; + } + + const initiatorNodes = $("#node-input-initiator-target-container-div").treeList("selected").map((i) => i.node.id); + if (JSON.stringify(initiatorNodes) !== JSON.stringify(node.initiatorNodes || [])) { + editState.changes = editState.changes || {}; + editState.changes.initiatorNodes = node.initiatorNodes; + editState.changed = true; + node.initiatorNodes = initiatorNodes; + } + } + } + }); + + /** @type {(node: object) => JQuery} */ + function getNodeLabel(node) { + var div = $('
',{class:"red-ui-node-list-item red-ui-info-outline-item"}); + RED.utils.createNodeIcon(node, true).appendTo(div); + div.find(".red-ui-node-label").addClass("red-ui-info-outline-item-label") + return div; + } + + /** @type {(container: JQuery, node: object) => void} */ + function buildScopeForm(container, node) { + const scope = getScope(node); + + if (!scope.length) { + $(container).find(".node-input-scope-row").hide(); + return; + } + + const items = []; + const flowItemMap = {}; + for (const id of scope) { + if (id === node.id) { + continue; + } + + const isSuflow = id.startsWith("subflow:"); + const workspace = isSuflow ? RED.nodes.subflow(id.substring(8)) : RED.nodes.workspace(id); + if (workspace) { + flowItemMap[workspace.id] = { + element: getNodeLabel(workspace), + selected: true, + }; + + items.push(flowItemMap[workspace.id]); + } + } + + // TODO: sort items + const dirList = $(container).find("#node-input-scope-target-container-div"); + dirList.css({ width: "100%", height: "100%" }).treeList({ multi: true, data: items }); + } + + /** @type {(container: JQuery, node: object) => void} */ + function buildInitiatorForm(container, node) { + const dirList = $(container).find("#node-input-initiator-target-container-div"); + + // We assume that a message initiator node must have at least one output + const nodeFilter = function (n) { + if (n.type.startsWith("link ")) { + // Link nodes transmits messages, but does not generate them. + return false; + } + if (n.hasOwnProperty("outputs")) { + return n.outputs > 0; + } + return true; + }; + const candidateNodes = RED.nodes.filterNodes({ z: node.id }).filter(nodeFilter); + const search = $(container).find("#node-input-initiator-target-filter").searchBox({ + style: "compact", + delay: 300, + change: function () { + const val = $(this).val().trim().toLowerCase(); + if (val === "") { + dirList.treeList("filter", null); + search.searchBox("count", ""); + } else { + const count = dirList.treeList("filter", function (item) { + return item.label.toLowerCase().indexOf(val) > -1 || item.node.type.toLowerCase().indexOf(val) > -1 + }); + search.searchBox("count", count + " / " + candidateNodes.length); + } + } + }); + + dirList.css({ width: "100%", height: "100%" }) + .treeList({ multi: true }).on("treelistitemmouseover", function (e, item) { + item.node.highlighted = true; + item.node.dirty = true; + RED.view.redraw(); + }).on("treelistitemmouseout", function (e, item) { + item.node.highlighted = false; + item.node.dirty = true; + RED.view.redraw(); + }); + + const items = []; + const nodeItemMap = {}; + const scope = node.initiatorNodes || []; + candidateNodes.forEach(function (n) { + const isChecked = scope.indexOf(n.id) !== -1; + const nodeDef = RED.nodes.getType(n.type); + let label, sublabel; + if (nodeDef) { + const l = nodeDef.label; + label = (typeof l === "function" ? l.call(n) : l) || ""; + sublabel = n.type; + if (sublabel.indexOf("subflow:") === 0) { + const subflowId = sublabel.substring(8); + const subflow = RED.nodes.subflow(subflowId); + sublabel = "subflow : " + subflow.name; + } + } + if (!nodeDef || !label) { + label = n.type; + } + nodeItemMap[n.id] = { + node: n, + label: label, + sublabel: sublabel, + selected: isChecked, + checkbox: true + }; + + items.push(nodeItemMap[n.id]); + }); + + dirList.treeList("data", items); + + $(container).find("#node-input-initiator-target-select").on("click", function (event) { + event.preventDefault(); + const preselected = dirList.treeList("selected").map((item) => item.node.id); + RED.tray.hide(); + RED.view.selectNodes({ + selected: preselected, + onselect: function (selection) { + RED.tray.show(); + const newlySelected = {}; + selection.forEach(function (n) { + newlySelected[n.id] = true; + if (nodeItemMap[n.id]) { + nodeItemMap[n.id].treeList.select(true); + } + }); + preselected.forEach(function (id) { + if (!newlySelected[id]) { + nodeItemMap[id].treeList.select(false); + } + }); + }, + oncancel: function () { + RED.tray.show(); + }, + filter: nodeFilter, + }); + }); + } + + /** @type {(flow: object) => string[]} */ + function getScope(flow) { + const activeWorkspace = flow.id; + /** @type {(node: object) => boolean} */ + const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); + const nodes = RED.nodes.filterNodes({ z: activeWorkspace }).filter(nodeFilter); + const seen = new Set(); + const scope = { [activeWorkspace]: true }; + seen.add(activeWorkspace); + while (nodes.length) { + const node = nodes.pop(); + if (seen.has(node.id)) { + continue; + } + seen.add(node.id); + if (node.links) { + node.links.forEach((id) => { + const link = RED.nodes.node(id); + if (link && !scope[link.z]) { + scope[link.z] = true; + nodes.push(...RED.nodes.filterNodes({ z: link.z }).filter(nodeFilter)); + } + }) + } else if (node.type.startsWith("subflow:") && !scope[node.type]) { + const workspace = RED.nodes.workspace(node.z) || RED.nodes.subflow(node.z); + if (workspace && workspace.initiatorNodes && workspace.initiatorNodes.includes(node.id)) { + scope[node.type] = true; + nodes.push(...RED.nodes.filterNodes({ z: node.type.substring(8) }).filter(nodeFilter)); + } + } + } + delete scope[activeWorkspace]; + return Object.keys(scope); + } + +})(); diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js b/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js index 7d825e6c9..774b3f247 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/subflow.js @@ -608,7 +608,10 @@ RED.subflow = (function() { name:name, info:"", in: [], - out: [] + out: [], + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }; RED.nodes.addSubflow(subflow); RED.history.push({ @@ -777,7 +780,10 @@ RED.subflow = (function() { i:index, id:RED.nodes.id(), wires:[{id:v.source.id,port:v.sourcePort}] - }}) + }}), + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, }; RED.nodes.addSubflow(subflow); diff --git a/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js b/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js index 03a15d767..774590c0f 100644 --- a/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js +++ b/packages/node_modules/@node-red/editor-client/src/js/ui/workspaces.js @@ -84,6 +84,9 @@ RED.workspaces = (function() { info: "", label: RED._('workspace.defaultName',{number:workspaceIndex}), env: [], + failFast: false, + timeout: 10000, + initiatorNodes: [], hideable: true, }; if (!skipHistoryEntry) { diff --git a/packages/node_modules/@node-red/nodes/core/common/20-inject.js b/packages/node_modules/@node-red/nodes/core/common/20-inject.js index da1469494..90b168f91 100644 --- a/packages/node_modules/@node-red/nodes/core/common/20-inject.js +++ b/packages/node_modules/@node-red/nodes/core/common/20-inject.js @@ -77,17 +77,17 @@ module.exports = function(RED) { this.repeat = this.repeat * 1000; this.debug(RED._("inject.repeat", this)); this.interval_id = setInterval(function() { - node.emit("input", {}); + node.receive(); }, this.repeat); } else if (this.crontab) { this.debug(RED._("inject.crontab", this)); - this.cronjob = scheduleTask(this.crontab,() => { node.emit("input", {})}); + this.cronjob = scheduleTask(this.crontab,() => { node.receive()}); } }; if (this.once) { this.onceTimeout = setTimeout( function() { - node.emit("input",{}); + node.receive(); node.repeaterSetup(); }, this.onceDelay); } else { diff --git a/packages/node_modules/@node-red/registry/lib/subflow.js b/packages/node_modules/@node-red/registry/lib/subflow.js index 39fe083ab..29b786592 100644 --- a/packages/node_modules/@node-red/registry/lib/subflow.js +++ b/packages/node_modules/@node-red/registry/lib/subflow.js @@ -16,7 +16,10 @@ function generateSubflowConfig(subflow) { const icon = subflow.icon || "arrow-in.svg"; const defaults = { - name: {value: ""} + name: {value: ""}, + failFast: { value: false }, + timeout: { value: 10000 }, + initiatorNodes: { value: [] }, } const credentials = {} diff --git a/packages/node_modules/@node-red/runtime/lib/api/settings.js b/packages/node_modules/@node-red/runtime/lib/api/settings.js index 634f5dbf3..973959201 100644 --- a/packages/node_modules/@node-red/runtime/lib/api/settings.js +++ b/packages/node_modules/@node-red/runtime/lib/api/settings.js @@ -172,6 +172,8 @@ var api = module.exports = { safeSettings.runtimeState.ui = false; // cannot have UI without endpoint } + safeSettings.gracefulShutdownEnabled = runtime.settings.gracefulShutdown?.enabled || false; + runtime.settings.exportNodeSettings(safeSettings); runtime.plugins.exportPluginSettings(safeSettings); } diff --git a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js index f96a67780..34febcfd3 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/Flow.js @@ -26,8 +26,11 @@ let Subflow; let Log; let Group; +let activeFlows; + let nodeCloseTimeout = 15000; let asyncMessageDelivery = true; +let gracefulShutdown = false; /** * This class represents a flow within the runtime. It is responsible for @@ -57,6 +60,8 @@ class Flow { this.groupOrder = [] this.activeNodes = {}; this.subflowInstanceNodes = {}; + this.completeNodeMap = {}; + this.linkNodes = []; this.catchNodes = []; this.statusNodes = []; this.path = this.id; @@ -67,6 +72,26 @@ class Flow { // _env is an object for direct lookup of env name -> value this.env = this.flow.env this._env = {} + + // Graceful Shutdown props + /** @type {Number} */ + this.pendingMsgCount = 0; + /** @type {Record} */ + this.msgInProgressNodeMap = {}; + /** @type {object[]|null} */ + this.shutdownScope = null; + /** @type {Number} */ + this.shutdownTimeout = 10000; // 10s + /** @type {null|(() => void)} */ + this.cancelShutdown = null; + /** @type {null|(() => void)} */ + this.shutdown = null; + /** @type {boolean} */ + this.failFast = false; + /** @type {string[]} */ + this.messageInitiatorNodes = []; + /** @type {Record} */ + this.wiredNodeMap = {}; } /** @@ -137,6 +162,43 @@ class Flow { this.parent.log(msg); } + /** + * This method is called when a node finishes processing a message. + * e.g. `node.done()` or `node.done(error)`. + * @param {object} node + */ + decrementPendingMsgCount(node) { + this.pendingMsgCount--; + if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) { + this.msgInProgressNodeMap[node.id]--; + if (this.msgInProgressNodeMap[node.id] <= 0) { + delete this.msgInProgressNodeMap[node.id]; + } + } + this.trace("decrement pending messages count to: " + this.pendingMsgCount); + this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`); + + if (this.shutdown) { + this.shutdown(); + } + } + + /** + * This method is called when a node sends a message. + * e.g. `node.send(msg)` or `node.receive(msg)`. + * @param {object} node + */ + incrementPendingMsgCount(node) { + this.pendingMsgCount++; + if (this.msgInProgressNodeMap[node.id]) { + this.msgInProgressNodeMap[node.id]++; + } else { + this.msgInProgressNodeMap[node.id] = 1; + } + this.trace("increment pending messages count to: " + this.pendingMsgCount); + this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`); + } + /** * Start this flow. * The `diff` argument helps define what needs to be started in the case @@ -149,6 +211,7 @@ class Flow { var node; var newNode; var id; + this.linkNodes = []; this.catchNodes = []; this.statusNodes = []; this.completeNodeMap = {}; @@ -252,6 +315,7 @@ class Flow { } } + this.wiredNodeMap = {}; for (id in this.flow.nodes) { if (this.flow.nodes.hasOwnProperty(id)) { node = this.flow.nodes[id]; @@ -293,12 +357,78 @@ class Flow { } } } + if (node.wires) { + // Create a map with incoming wires for each node + node.wires.forEach((output) => { + // TODO: Still an array? + output.forEach((wire) => { + this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || []; + this.wiredNodeMap[wire].push(node.id); + }); + }); + } } else { this.debug("not starting disabled node : "+id); } } } + this.shutdown = null; + this.cancelShutdown = null; + this.failFast = this.flow.failFast || false; + this.messageInitiatorNodes = this.flow.initiatorNodes || this.subflowDef?.initiatorNodes || []; + if (typeof this.flow.timeout === "number") { + // TODO: Math.max + this.shutdownTimeout = this.flow.timeout; + } + + // Determine the shutdown scope; flows needed to calculate pending msgs + /** @type {(node: import("@node-red/registry").Node) => boolean} */ + const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:"); + const nodes = Object.values(this.flow.nodes || {}).filter(nodeFilter); + const seen = new Set(); + const scope = { [this.id]: true }; + seen.add(this.id); + if (this.subflowInstance && this.subflowInstance.z) { + scope[this.subflowInstance.z] = true; + nodes.push(...Object.values(this.global.flows[this.subflowInstance.z]?.nodes || {}).filter(nodeFilter)); + } + while (nodes.length) { + const node = nodes.pop(); + if (seen.has(node.id)) { + continue; + } + seen.add(node.id); + if (node.links) { + node.links.forEach((id) => { + const link = this.global.allNodes[id]; + if (link && !scope[link.z]) { + scope[link.z] = true; + nodes.push(...Object.values(this.global.flows[link.z]?.nodes || {}).filter(nodeFilter)); + } + }) + } else if (node.type.startsWith("subflow:") && !scope[node.id]) { + scope[node.id] = true; + if (!scope[node.type]) { + scope[node.type] = true; + nodes.push(...Object.values(this.global.flows[node.type.substring(8)]?.nodes || {}).filter(nodeFilter)); + } + } + } + + const scopeList = Object.keys(scope); + this.trace("---------------------------------------------------"); + this.trace(" shutdown scope"); + this.trace("---------------------------------------------------"); + this.trace(scopeList.join(", ")); + + if (scopeList.length > 1) { + setImmediate(() => { + // Lookup for the flow instance + this.shutdownScope = scopeList.map((id) => this.getNode(id, false)?._flow || activeFlows[id]); + }); + } + var activeCount = Object.keys(this.activeNodes).length; if (activeCount > 0) { this.trace("------------------|--------------|-----------------"); @@ -310,7 +440,9 @@ class Flow { if (this.activeNodes.hasOwnProperty(id)) { node = this.activeNodes[id]; this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":"")); - if (node.type === "catch") { + if (node.type.startsWith("link ")) { + this.linkNodes.push(node); + } else if (node.type === "catch") { this.catchNodes.push(node); } else if (node.type === "status") { this.statusNodes.push(node); @@ -349,14 +481,15 @@ class Flow { * Stop this flow. * The `stopList` argument helps define what needs to be stopped in the case * of a modified-nodes/flows type deploy. - * @param {[type]} stopList [description] - * @param {[type]} removedList [description] - * @return {[type]} [description] + * @param {string[]} [stopList] [description] + * @param {string[]} [removedList] [description] + * @return {Promise} [description] */ stop(stopList, removedList) { this.trace("stop "+this.TYPE); - var i; + let fullStop = false; if (!stopList) { + fullStop = true; stopList = Object.keys(this.activeNodes); } // this.trace(" stopList: "+stopList.join(",")) @@ -369,7 +502,9 @@ class Flow { let nodesToStop = []; let configsToStop = []; + const stopMap = {}; stopList.forEach(id => { + stopMap[id] = true; if (this.flow.configs[id]) { configsToStop.push(id); } else { @@ -378,28 +513,155 @@ class Flow { }); stopList = nodesToStop.concat(configsToStop); - var promises = []; - for (i=0;i[]} */ + const initialPromises = []; + /** @type {Promise} */ + let shutdownPromise = Promise.resolve(); + if (gracefulShutdown && this.messageInitiatorNodes) { + // Start by closing initiator nodes + const filteredNodes = this.messageInitiatorNodes.filter((id) => stopMap[id]); + initialPromises.push(...this.stopNodes(filteredNodes, removedMap)); + + /** @type {Set} */ + const lookupNodes = new Set(); + + if (!fullStop) { + // Create a list of nodes wired upstream of nodes to be stopped. + const list = stopList.slice(); + while (list.length) { + const id = list.pop(); + if (lookupNodes.has(id)) { + continue; + } + lookupNodes.add(id); + if (this.wiredNodeMap[id]) { + list.push(...this.wiredNodeMap[id]); + } + } + } + + /** @type {Promise} */ + const closePromise = new Promise((resolve) => { + this.shutdown = () => { + if (fullStop) { + this.trace(`Pending message count: ${this.pendingMsgCount}`); + if (this.shutdownScope === null) { + if (this.pendingMsgCount === 0) { + return resolve(); + } + } else if (this.shutdownScope.length) { + if (this.pendingMsgCount === 0 && this.failFast) { + // Need to double check due to flow crossing (z) + if (Object.keys(this.msgInProgressNodeMap).length === 0) { + // Allow a scoped flow to stop itself + // Not capable to detect a loop or a callback; it is up + // to the user to choose whether or not to enable flow closure. + this.shutdown = null; + return resolve(); + } + } + let count = 0; + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow) { + count += flow.pendingMsgCount; + } + }); + if (count === 0) { + this.shutdown = null; + if (this.messageInitiatorNodes.includes(this.id)) { + // This is a subflow initiator node + // Other flows from the scope are still running at that time + return resolve(); + } + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) { + // Retrigger validation in each flow + flow.shutdown(); + } + }); + return resolve(); + } + } + } else { + let inProgress = false; + for (const id of Object.keys(this.msgInProgressNodeMap)) { + if (lookupNodes.has(id)) { + inProgress = true; + break; + } + } + if (!inProgress) { + return resolve(); + } + } + } + }); + + /** @type {NodeJS.Timeout|null} */ + let timer = null; + const timeoutPromise = new Promise((_resolve, reject) => { + if (this.shutdownScope) { + this.cancelShutdown = () => reject("Graceful shutdown cancelled"); + } + timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout); + }); + const start = Date.now(); + shutdownPromise = Promise.race([closePromise, timeoutPromise]) + .then(() => { + clearTimeout(timer); + const delta = Date.now() - start; + this.trace(`Stopped gracefuly in ${delta}ms`); + }).catch((error) => { + clearTimeout(timer); + this.error(Log._("nodes.flows.stopping-error", { message: error })); + if (this.failFast && this.shutdownScope) { + this.shutdownScope.forEach((flow) => { + if (flow instanceof Flow && flow.cancelShutdown && flow.id !== this.id) { + // Cancel other flow shutdown if this one fails + flow.cancelShutdown(); + } + }); + } + }); + + if (this.shutdown) { + setImmediate(this.shutdown); + } + } + + return Promise.all(initialPromises) + .then(() => shutdownPromise) + .then(() => Promise.all(this.stopNodes(stopList, removedMap))); + } + + /** + * Stop every node of stopList + * @param {string[]} stopList + * @param {Record} removedMap + * @returns {Promise[]} + */ + stopNodes(stopList, removedMap) { + const promises = []; + for (const nodeId of stopList) { + const node = this.activeNodes[nodeId]; if (node) { - delete this.activeNodes[stopList[i]]; - if (this.subflowInstanceNodes[stopList[i]]) { - delete this.subflowInstanceNodes[stopList[i]]; + delete this.activeNodes[nodeId]; + if (this.subflowInstanceNodes[nodeId]) { + delete this.subflowInstanceNodes[nodeId]; } try { - var removed = removedMap[stopList[i]]; - promises.push(stopNode(node,removed).catch(()=>{})); - } catch(err) { - node.error(err); - } - if (removedMap[stopList[i]]) { - events.emit("node-status",{ - id: node.id - }); + const removed = removedMap[nodeId]; + if (removed) { + // Clears the node status + events.emit("node-status", { id: node.id }); + } + promises.push(stopNode(node, removed).catch(() => { })); + } catch(error) { + node.error(error); } } } - return Promise.all(promises); + return promises; } /** @@ -448,7 +710,7 @@ class Flow { } } } - } else { + } else if (typeof this.parent.getNode === "function") { // Node not found inside this flow - ask the parent return this.parent.getNode(id); } @@ -695,6 +957,7 @@ class Flow { } handleComplete(node,msg) { + // Trigger Complete nodes if (this.completeNodeMap[node.id]) { let toSend = msg; this.completeNodeMap[node.id].forEach((completeNode,index) => { @@ -760,9 +1023,9 @@ class Flow { /** * Stop an individual node within this flow. * - * @param {[type]} node [description] - * @param {[type]} removed [description] - * @return {[type]} [description] + * @param {object} node [description] + * @param {boolean} removed [description] + * @return {Promise} [description] */ function stopNode(node,removed) { Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":"")); @@ -821,7 +1084,7 @@ function handlePreRoute(flow, sendEvent, reportError) { function deliverMessageToDestination(sendEvent) { if (sendEvent?.destination?.node) { try { - sendEvent.destination.node.receive(sendEvent.msg); + sendEvent.destination.node._receive(sendEvent.msg); } catch(err) { Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`) Log.error(err.stack) @@ -835,6 +1098,9 @@ function handlePreDeliver(flow,sendEvent, reportError) { reportError(err,sendEvent); return; } else if (err !== false) { + // node.send(msg) called + // Do it now to avoid next loop and an offbeat count + flow.incrementPendingMsgCount(sendEvent.destination.node); if (asyncMessageDelivery) { setImmediate(function() { deliverMessageToDestination(sendEvent) @@ -855,7 +1121,8 @@ function handlePreDeliver(flow,sendEvent, reportError) { module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; - asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery + asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery; + gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false; Log = runtime.log; Subflow = require("./Subflow"); Group = require("./Group").Group @@ -863,5 +1130,8 @@ module.exports = { create: function(parent,global,conf) { return new Flow(parent,global,conf) }, + setActiveFlows: function(flows) { + activeFlows = flows; + }, Flow: Flow } diff --git a/packages/node_modules/@node-red/runtime/lib/flows/index.js b/packages/node_modules/@node-red/runtime/lib/flows/index.js index f21bd56f9..e7c332bb3 100644 --- a/packages/node_modules/@node-red/runtime/lib/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/flows/index.js @@ -36,7 +36,9 @@ var activeFlowConfig = null; var activeFlows = {}; var started = false; -var state = 'stop' +var state = 'stop'; +/** @type {boolean} */ +let gracefulShutdown; var credentialsPendingReset = false; @@ -53,6 +55,7 @@ function init(runtime) { log = runtime.log; started = false; state = 'stop'; + gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false; if (!typeEventRegistered) { events.on('type-registered',function(type) { if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) { @@ -70,6 +73,7 @@ function init(runtime) { typeEventRegistered = true; } Flow.init(runtime); + Flow.setActiveFlows(activeFlows); flowUtil.init(runtime); } @@ -471,7 +475,9 @@ function stop(type,diff,muteLog,isDeploy) { let globalIndex = activeFlowIds.indexOf("global"); if (globalIndex !== -1) { activeFlowIds.splice(globalIndex,1); - activeFlowIds.push("global"); + if (!gracefulShutdown) { + activeFlowIds.push("global"); + } } activeFlowIds.forEach(id => { @@ -485,7 +491,26 @@ function stop(type,diff,muteLog,isDeploy) { } }); - return Promise.all(promises).then(function() { + /** @type {() => Promise} */ + const globalFlowPromise = () => { + if (gracefulShutdown) { + // Ensure the global flow is the last one closed + return new Promise((resolve) => { + if (activeFlows.hasOwnProperty("global")) { + const flowStateChanged = diff && (diff.flowChanged.indexOf("global") !== -1 || diff.added.indexOf("global") !== -1 || diff.removed.indexOf("global") !== -1); + log.debug("red/nodes/flows.stop : stopping flow : global"); + activeFlows["global"].stop(flowStateChanged ? null : stopList, removedList).then(resolve); + if (type === "full" || flowStateChanged || diff.removed.indexOf("global")!==-1) { + delete activeFlows["global"]; + } + } + }); + } else { + return Promise.resolve(); + } + }; + + return Promise.all(promises).then(globalFlowPromise).then(function () { for (let id in activeNodesToFlow) { if (activeNodesToFlow.hasOwnProperty(id)) { if (!activeFlows[activeNodesToFlow[id]]) { @@ -579,6 +604,15 @@ async function addFlow(flow, user) { if (flow.hasOwnProperty('env')) { tabNode.env = flow.env; } + if (flow.hasOwnProperty('failFast')) { + tabNode.failFast = flow.failFast; + } + if (flow.hasOwnProperty('timeout')) { + tabNode.timeout = flow.timeout; + } + if (flow.hasOwnProperty('initiatorNodes')) { + tabNode.initiatorNodes = flow.initiatorNodes; + } var nodes = [tabNode]; @@ -642,6 +676,15 @@ function getFlow(id) { if (flow.hasOwnProperty('env')) { result.env = flow.env; } + if (flow.hasOwnProperty('failFast')) { + result.failFast = flow.failFast; + } + if (flow.hasOwnProperty('timeout')) { + result.timeout = flow.timeout; + } + if (flow.hasOwnProperty('initiatorNodes')) { + result.initiatorNodes = flow.initiatorNodes; + } if (id !== 'global') { result.nodes = []; } @@ -770,6 +813,15 @@ async function updateFlow(id,newFlow, user) { if (newFlow.hasOwnProperty('env')) { tabNode.env = newFlow.env; } + if (flow.hasOwnProperty('failFast')) { + tabNode.failFast = flow.failFast; + } + if (flow.hasOwnProperty('timeout')) { + tabNode.timeout = flow.timeout; + } + if (flow.hasOwnProperty('initiatorNodes')) { + tabNode.initiatorNodes = flow.initiatorNodes; + } if (newFlow.hasOwnProperty('credentials')) { tabNode.credentials = newFlow.credentials; } diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index f31504214..582231f07 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -131,6 +131,10 @@ Node.prototype.context = function() { */ Node.prototype._complete = function(msg,error) { this.metric("done",msg); + if (this._flow && typeof this._flow.decrementPendingMsgCount === "function") { + // done() or done(error) called + this._flow.decrementPendingMsgCount(this); + } hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => { if (err) { this.error(err); @@ -199,6 +203,7 @@ Node.prototype.emit = function(event, ...args) { * This will call all registered handlers for the 'input' event. */ Node.prototype._emitInput = function(arg) { + // TODO: inject node directly used the event to trigger a message, is this allowed? var node = this; this.metric("receive", arg); let receiveEvent = { msg:arg, destination: { id: this.id, node: this } } @@ -216,6 +221,11 @@ Node.prototype._emitInput = function(arg) { function() { node.send.apply(node,arguments) }, function(err) { node._complete(arg,err); } ); + if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? + // Ensure done() is called + node._complete(arg); + } } catch(err) { node.error(err,arg); } @@ -237,6 +247,11 @@ Node.prototype._emitInput = function(arg) { } } ); + if (node._expectedDoneCount === 0 && node._flow) { + // TODO: Call Complete node is expected? + // Ensure done() is called + node._complete(arg); + } } catch(err) { node.error(err,arg); } @@ -492,11 +507,9 @@ Node.prototype.send = function(msg) { }; /** - * Receive a message. - * - * This will emit the `input` event with the provided message. + * Internal method to receive a message */ -Node.prototype.receive = function(msg) { +Node.prototype._receive = function (msg) { if (!msg) { msg = {}; } @@ -506,6 +519,19 @@ Node.prototype.receive = function(msg) { this.emit("input",msg); }; +/** + * Receive a message. + * + * This will emit the `input` event with the provided message. + */ +Node.prototype.receive = function (msg) { + if (this._flow && typeof this._flow.incrementPendingMsgCount === "function") { + // Equivalent to node.send(msg) without handleOnSend + this._flow.incrementPendingMsgCount(this); + } + this._receive(msg); +}; + function log_helper(self, level, msg) { var o = { level: level, diff --git a/packages/node_modules/node-red/settings.js b/packages/node_modules/node-red/settings.js index 39e6a6b47..89a811794 100644 --- a/packages/node_modules/node-red/settings.js +++ b/packages/node_modules/node-red/settings.js @@ -312,6 +312,9 @@ module.exports = { /** show or hide runtime stop/start options in the node-red editor. Must be set to `false` to hide */ ui: false, }, + gracefulShutdown: { + enabled: true, + }, telemetry: { /** * By default, telemetry is disabled until the user provides consent the first diff --git a/test/nodes/core/common/60-link_spec.js b/test/nodes/core/common/60-link_spec.js index be7ffc39f..743ad8540 100644 --- a/test/nodes/core/common/60-link_spec.js +++ b/test/nodes/core/common/60-link_spec.js @@ -221,7 +221,7 @@ describe('link Node', function() { const flow = [ { id: "tab-flow-1", type: "tab", label: "Flow 1" }, { id: "link-in-1", z: "tab-flow-1", type: "link in", name: "double payload", wires: [["func"]] }, - { id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]] }, + { id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]], x: 1, y: 1 }, { id: "link-out-1", z: "tab-flow-1", type: "link out", mode: "" }, //not return mode, cause link-call timeout { id: "link-call", z: "tab-flow-1", type: "link call", linkType: "static", "timeout": "0.5", links: ["link-in-1"], wires: [["n4"]] }, { id: "catch-all", z: "tab-flow-1", type: "catch", scope: ["link-call"], uncaught: true, wires: [["n4"]] }, diff --git a/test/unit/@node-red/runtime/lib/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js index 8406a01b7..e3320f1d7 100644 --- a/test/unit/@node-red/runtime/lib/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Flow_spec.js @@ -204,8 +204,8 @@ describe('Flow', function() { it("instantiates an initial configuration and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -256,8 +256,8 @@ describe('Flow', function() { it("instantiates config nodes in the right order",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"5"}, // This node depends on #5 {id:"5",z:"t1",type:"test"} @@ -308,8 +308,8 @@ describe('Flow', function() { it("rewires nodes specified by diff", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); @@ -318,7 +318,7 @@ describe('Flow', function() { await flow.start(); //TODO: use update to pass in new wiring and verify the change createCount.should.equal(3); - flow.start({rewired:["2"]}); + flow.start({rewired:[["2"]]}); createCount.should.equal(3); rewiredNodes.should.have.a.property("2"); }); @@ -355,8 +355,8 @@ describe('Flow', function() { it("ignores disabled nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"}, {id:"5",z:"t1",type:"test",d:true,foo:"a"} @@ -406,8 +406,8 @@ describe('Flow', function() { it("stops all nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"asyncTest",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -428,8 +428,8 @@ describe('Flow', function() { it("stops specified nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -450,9 +450,9 @@ describe('Flow', function() { it("stops config nodes last", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, {id:"c1",z:"t1",type:"test"}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"c2",z:"t1",type:"test"}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"c3",z:"t1",type:"test"} @@ -485,8 +485,8 @@ describe('Flow', function() { }}); var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -514,8 +514,8 @@ describe('Flow', function() { it("gets a node known to the flow", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -529,8 +529,8 @@ describe('Flow', function() { it("passes to parent if node not known locally", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -547,8 +547,8 @@ describe('Flow', function() { it("does not pass to parent if cancelBubble set", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",z:"t1",type:"test",foo:"a"} ]); @@ -567,8 +567,8 @@ describe('Flow', function() { it("passes a status event to the adjacent status node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]} @@ -605,10 +605,10 @@ describe('Flow', function() { it("passes a status event to the adjacent scoped status node ", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, - {id:"sn",x:10,y:10,z:"t1",type:"status",scope:["2"],foo:"a",wires:[]}, + {id:"sn",x:10,y:10,z:"t1",type:"status",scope:[["2"]],foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"status",scope:["1"],foo:"a",wires:[]} ]); var flow = Flow.create({},config,config.flows["t1"]); @@ -639,7 +639,7 @@ describe('Flow', function() { {id: "g1", type: "group", g: "g3", z:"t1" }, {id: "g2", type: "group", z:"t1" }, {id: "g3", type: "group", z:"t1" }, - {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]}, // sn - in the same group as source node {id:"sn",x:10,y:10,z:"t1",g:"g1", type:"status",scope:"group",wires:[]}, // sn2 - in a different group hierarchy to the source node @@ -669,8 +669,8 @@ describe('Flow', function() { it("passes an error event to the adjacent catch node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]}, @@ -712,10 +712,10 @@ describe('Flow', function() { it("passes an error event to the adjacent scoped catch node ", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, - {id:"sn",x:10,y:10,z:"t1",type:"catch",scope:["2"],foo:"a",wires:[]}, + {id:"sn",x:10,y:10,z:"t1",type:"catch",scope:[["2"]],foo:"a",wires:[]}, {id:"sn2",x:10,y:10,z:"t1",type:"catch",scope:["1"],foo:"a",wires:[]}, {id:"sn3",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]}, {id:"sn4",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]} @@ -766,7 +766,7 @@ describe('Flow', function() { {id: "g1", type: "group", g: "g3", z:"t1" }, {id: "g2", type: "group", z:"t1" }, {id: "g3", type: "group", z:"t1" }, - {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]}, + {id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]}, // sn - in the same group as source node {id:"sn",x:10,y:10,z:"t1",g:"g1", type:"catch",scope:"group",wires:[]}, // sn2 - in a different group hierarchy to the source node @@ -795,8 +795,8 @@ describe('Flow', function() { it("moves any existing error object sideways", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]} ]); @@ -828,8 +828,8 @@ describe('Flow', function() { it("passes a complete event to the adjacent Complete node",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"testDone",foo:"a",wires:[]}, {id:"cn",x:10,y:10,z:"t1",type:"complete",scope:["1","3"],foo:"a",wires:[]} ]); @@ -856,8 +856,8 @@ describe('Flow', function() { it("sends a message - no cloning", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -875,7 +875,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; try { msg.should.be.exactly(message); @@ -898,8 +898,8 @@ describe('Flow', function() { it("sends a message - cloning", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -916,7 +916,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { try { // Message should be cloned msg.should.be.eql(message); @@ -939,8 +939,8 @@ describe('Flow', function() { it("sends multiple messages", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -957,7 +957,7 @@ describe('Flow', function() { else { resolve() } } var messageCount = 0; - n2.receive = function(msg) { + n2._receive = function(msg) { try { msg.should.be.exactly(messages[messageCount++]); if (messageCount === 2) { @@ -1032,8 +1032,8 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); var flow = Flow.create({},config,config.flows["t1"]); await flow.start(); @@ -1049,7 +1049,7 @@ describe('Flow', function() { if (err) { reject(err) } else { resolve() } } - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; try { msg.should.be.eql(message); @@ -1103,14 +1103,14 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); flow = Flow.create({},config,config.flows["t1"]); await flow.start(); n1 = flow.getNode('1'); n2 = flow.getNode('2'); - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; } n1.error = function(err) { @@ -1180,14 +1180,14 @@ describe('Flow', function() { }) var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]} + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]} ]); flow = Flow.create({},config,config.flows["t1"]); await flow.start(); n1 = flow.getNode('1'); n2 = flow.getNode('2'); - n2.receive = function(msg) { + n2._receive = function(msg) { messageReceived = true; } n1.error = function(err) { diff --git a/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js index 618205f6d..d8d3ab7b4 100644 --- a/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/flows/Subflow_spec.js @@ -295,8 +295,8 @@ describe('Subflow', function() { it("instantiates a subflow and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", @@ -359,8 +359,8 @@ describe('Subflow', function() { it("instantiates a subflow inside a subflow and stops it", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 1","info":"", @@ -389,8 +389,8 @@ describe('Subflow', function() { it("rewires a subflow node on update/start", async function(){ var rawConfig = [ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", @@ -443,8 +443,8 @@ describe('Subflow', function() { it("stops subflow instance nodes", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -466,8 +466,8 @@ describe('Subflow', function() { it("passes a status event to the subflow's parent tab status node - all scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -496,8 +496,8 @@ describe('Subflow', function() { it("passes a status event to the subflow's parent tab status node - targetted scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -534,8 +534,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.payload as string", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -571,8 +571,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.payload as status obj", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -610,8 +610,8 @@ describe('Subflow', function() { it("emits a status event when a message is passed to a subflow-status node - msg.status", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -649,8 +649,8 @@ describe('Subflow', function() { it("does not emit a regular status event if it contains a subflow-status node", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, { id:"sf1", @@ -682,8 +682,8 @@ describe('Subflow', function() { it("passes an error event to the subflow's parent tab catch node - all scope",async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -714,8 +714,8 @@ describe('Subflow', function() { it("passes an error event to the subflow's parent tab catch node - targetted scope", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}, {id:"sf1",type:"subflow","name":"Subflow 2","info":"", "in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]}, @@ -752,8 +752,8 @@ describe('Subflow', function() { it("can access process env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ], @@ -779,8 +779,8 @@ describe('Subflow', function() { it("can access subflow env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"",env: [{name: '__KEY__', value: '__VAL1__', type: 'str'}], "in":[ {wires:[{id:"sf1-1"}]} ], @@ -815,8 +815,8 @@ describe('Subflow', function() { it("can access nested subflow env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab", env: [{name: '__KEY1__', value: 't1', type: 'str'}]}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 1",info:"", env: [{name: '__KEY2__', value: 'sf1', type: 'str'}], @@ -854,8 +854,8 @@ describe('Subflow', function() { it("can access name of subflow as env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ], @@ -879,8 +879,8 @@ describe('Subflow', function() { it("can access id of subflow as env var", async function() { var config = flowUtils.parseConfig([ {id:"t1",type:"tab"}, - {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]}, - {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]}, + {id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]}, + {id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]}, {id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]}, {id:"sf1",type:"subflow",name:"Subflow 2",info:"", "in":[ {wires:[{id:"sf1-1"}]} ],