pull/5346/merge
Gauthier Dandele 2026-03-24 10:15:09 -04:00 committed by GitHub
commit 8a4a067c99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 794 additions and 135 deletions

View File

@ -477,6 +477,8 @@
"inputType": "Input type",
"selectType": "select types...",
"loadCredentials": "Loading node credentials",
"initiatorNodes": "During shutdown, start by stopping message initiator nodes:",
"failFast": "Cancel shutdown of listed flows if this flow fails to stop",
"inputs": {
"input": "input",
"select": "select",
@ -1239,7 +1241,8 @@
"description": "Description",
"appearance": "Appearance",
"preview": "UI Preview",
"defaultValue": "Default value"
"defaultValue": "Default value",
"shutdown": "Graceful Shutdown"
},
"tourGuide": {
"takeATour": "Take a tour",

View File

@ -112,7 +112,10 @@ RED.nodes = (function() {
disabled: {value: false},
locked: {value: false},
info: {value: ""},
env: {value: []}
env: {value: []},
failFast: { value: false },
timeout: { value: 10000 },
initiatorNodes: { value: [] },
}
};
@ -1141,7 +1144,10 @@ RED.nodes = (function() {
} else {
return errors
}
}}
}},
failFast: { value: false },
timeout: { value: 10000 },
initiatorNodes: { value: [] },
},
icon: function() { return sf.icon||"subflow.svg" },
category: sf.category || "subflows",
@ -1457,6 +1463,9 @@ RED.nodes = (function() {
node.in = [];
node.out = [];
node.env = n.env;
node.failFast = n.failFast;
node.timeout = n.timeout;
node.initiatorNodes = n.initiatorNodes;
node.meta = n.meta;
if (exportCreds) {

View File

@ -1963,6 +1963,11 @@ RED.editor = (function() {
'editor-tab-description',
'editor-tab-appearance'
];
if (RED.settings.gracefulShutdownEnabled) {
nodeEditPanes.splice(1, 0, 'editor-tab-shutdown');
}
prepareEditDialog(trayBody, nodeEditPanes, subflow, subflow._def, "subflow-input", defaultTab, function (_activeEditPanes) {
activeEditPanes = _activeEditPanes;
trayBody.i18n();
@ -2215,6 +2220,10 @@ RED.editor = (function() {
'editor-tab-envProperties'
];
if (RED.settings.gracefulShutdownEnabled) {
nodeEditPanes.splice(1, 0, 'editor-tab-shutdown');
}
if (!workspace.hasOwnProperty("disabled")) {
workspace.disabled = false;
}

View File

@ -0,0 +1,273 @@
; (function () {
const tabcontent = `
<div class="form-row">
<label for="node-input-timeout"><i class="fa fa-clock-o"></i> <span data-i18n="node-red:exec.label.timeout"></span></label>
<input type="text" id="node-input-timeout" placeholder="10" style="width: 70px; margin-right: 5px;"> <span data-i18n="node-red:inject.seconds"></span>
</div>
<div class="form-row node-input-scope-row" style="margin-bottom: 0px;">
<input type="checkbox" id="node-input-failFast" style="display: inline-block; width: auto; vertical-align: top; margin-left: 30px; margin-right: 5px;">
<label for="node-input-failFast" style="width: auto;"><span data-i18n="editor.failFast"></span></label>
</div>
<div class="form-row node-input-scope-row" style="min-height: 100px; height: 10%;">
<div id="node-input-scope-target-container-div"></div>
</div>
<div class="form-row">
<label style="width: auto" data-i18n="editor:editor.initiatorNodes""></label>
</div>
<div class="form-row node-input-target-row">
<button type="button" id="node-input-initiator-target-select" class="red-ui-button" data-i18n="node-red:common.label.selectNodes"></button>
</div>
<div class="form-row node-input-target-row node-input-initiator-list-row" style="position: relative; min-height: 200px;">
<div style="position: absolute; top: -30px; right: 0;"><input type="text" id="node-input-initiator-target-filter"></div>
<div id="node-input-initiator-target-container-div"></div>
</div>`;
RED.editor.registerEditPane("editor-tab-shutdown", function (node) {
return {
label: RED._("editor-tab.shutdown"),
name: RED._("editor-tab.shutdown"),
iconClass: "fa fa-power-off",
create: function (container) {
const dialogForm = $('<form class="dialog-form form-horizontal" autocomplete="off"></form>').appendTo(container);
if (RED.settings.gracefulShutdownEnabled) {
$(tabcontent).appendTo(dialogForm);
buildScopeForm(dialogForm, node);
buildInitiatorForm(dialogForm, node);
$("#node-input-timeout").spinner({ min: 1 }).val((node.timeout || 10000) / 1000);
$("#node-input-failFast").prop("checked", node.failFast || false);
this._resize = function () {
const rows = $(dialogForm).find(">div:not(.node-input-initiator-list-row)");
let height = $(dialogForm).height();
for (let i = 0; i < rows.length; i++) {
height -= $(rows[i]).outerHeight(true);
}
$(dialogForm).find(">div.node-input-initiator-list-row").css("height", height + "px");
};
} else {
$("<p>Graceful shutdown disabled</p>").appendTo(dialogForm);
}
},
close: function () { },
show: function () { },
resize: function (_size) {
if (this._resize) {
this._resize();
}
},
/** @type {(editState: { changes?: Record<string, unknown>, changed?: boolean }) => void} */
apply: function (editState) {
const failFast = $("#node-input-failFast").prop("checked");
if (node.failFast != failFast) {
editState.changes = editState.changes || {};
editState.changes.failFast = node.failFast;
editState.changed = true;
node.failFast = failFast;
}
let timeout = parseFloat($("#node-input-timeout").val() || "10") * 1000;
if (Number.isNaN(timeout)) {
timeout = 10000;
}
if (node.timeout !== timeout) {
editState.changes = editState.changes || {};
editState.changes.timeout = node.timeout;
editState.changed = true;
node.timeout = timeout;
}
const initiatorNodes = $("#node-input-initiator-target-container-div").treeList("selected").map((i) => i.node.id);
if (JSON.stringify(initiatorNodes) !== JSON.stringify(node.initiatorNodes || [])) {
editState.changes = editState.changes || {};
editState.changes.initiatorNodes = node.initiatorNodes;
editState.changed = true;
node.initiatorNodes = initiatorNodes;
}
}
}
});
/** @type {(node: object) => JQuery} */
function getNodeLabel(node) {
var div = $('<div>',{class:"red-ui-node-list-item red-ui-info-outline-item"});
RED.utils.createNodeIcon(node, true).appendTo(div);
div.find(".red-ui-node-label").addClass("red-ui-info-outline-item-label")
return div;
}
/** @type {(container: JQuery, node: object) => void} */
function buildScopeForm(container, node) {
const scope = getScope(node);
if (!scope.length) {
$(container).find(".node-input-scope-row").hide();
return;
}
const items = [];
const flowItemMap = {};
for (const id of scope) {
if (id === node.id) {
continue;
}
const isSuflow = id.startsWith("subflow:");
const workspace = isSuflow ? RED.nodes.subflow(id.substring(8)) : RED.nodes.workspace(id);
if (workspace) {
flowItemMap[workspace.id] = {
element: getNodeLabel(workspace),
selected: true,
};
items.push(flowItemMap[workspace.id]);
}
}
// TODO: sort items
const dirList = $(container).find("#node-input-scope-target-container-div");
dirList.css({ width: "100%", height: "100%" }).treeList({ multi: true, data: items });
}
/** @type {(container: JQuery, node: object) => void} */
function buildInitiatorForm(container, node) {
const dirList = $(container).find("#node-input-initiator-target-container-div");
// We assume that a message initiator node must have at least one output
const nodeFilter = function (n) {
if (n.type.startsWith("link ")) {
// Link nodes transmits messages, but does not generate them.
return false;
}
if (n.hasOwnProperty("outputs")) {
return n.outputs > 0;
}
return true;
};
const candidateNodes = RED.nodes.filterNodes({ z: node.id }).filter(nodeFilter);
const search = $(container).find("#node-input-initiator-target-filter").searchBox({
style: "compact",
delay: 300,
change: function () {
const val = $(this).val().trim().toLowerCase();
if (val === "") {
dirList.treeList("filter", null);
search.searchBox("count", "");
} else {
const count = dirList.treeList("filter", function (item) {
return item.label.toLowerCase().indexOf(val) > -1 || item.node.type.toLowerCase().indexOf(val) > -1
});
search.searchBox("count", count + " / " + candidateNodes.length);
}
}
});
dirList.css({ width: "100%", height: "100%" })
.treeList({ multi: true }).on("treelistitemmouseover", function (e, item) {
item.node.highlighted = true;
item.node.dirty = true;
RED.view.redraw();
}).on("treelistitemmouseout", function (e, item) {
item.node.highlighted = false;
item.node.dirty = true;
RED.view.redraw();
});
const items = [];
const nodeItemMap = {};
const scope = node.initiatorNodes || [];
candidateNodes.forEach(function (n) {
const isChecked = scope.indexOf(n.id) !== -1;
const nodeDef = RED.nodes.getType(n.type);
let label, sublabel;
if (nodeDef) {
const l = nodeDef.label;
label = (typeof l === "function" ? l.call(n) : l) || "";
sublabel = n.type;
if (sublabel.indexOf("subflow:") === 0) {
const subflowId = sublabel.substring(8);
const subflow = RED.nodes.subflow(subflowId);
sublabel = "subflow : " + subflow.name;
}
}
if (!nodeDef || !label) {
label = n.type;
}
nodeItemMap[n.id] = {
node: n,
label: label,
sublabel: sublabel,
selected: isChecked,
checkbox: true
};
items.push(nodeItemMap[n.id]);
});
dirList.treeList("data", items);
$(container).find("#node-input-initiator-target-select").on("click", function (event) {
event.preventDefault();
const preselected = dirList.treeList("selected").map((item) => item.node.id);
RED.tray.hide();
RED.view.selectNodes({
selected: preselected,
onselect: function (selection) {
RED.tray.show();
const newlySelected = {};
selection.forEach(function (n) {
newlySelected[n.id] = true;
if (nodeItemMap[n.id]) {
nodeItemMap[n.id].treeList.select(true);
}
});
preselected.forEach(function (id) {
if (!newlySelected[id]) {
nodeItemMap[id].treeList.select(false);
}
});
},
oncancel: function () {
RED.tray.show();
},
filter: nodeFilter,
});
});
}
/** @type {(flow: object) => string[]} */
function getScope(flow) {
const activeWorkspace = flow.id;
/** @type {(node: object) => boolean} */
const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:");
const nodes = RED.nodes.filterNodes({ z: activeWorkspace }).filter(nodeFilter);
const seen = new Set();
const scope = { [activeWorkspace]: true };
seen.add(activeWorkspace);
while (nodes.length) {
const node = nodes.pop();
if (seen.has(node.id)) {
continue;
}
seen.add(node.id);
if (node.links) {
node.links.forEach((id) => {
const link = RED.nodes.node(id);
if (link && !scope[link.z]) {
scope[link.z] = true;
nodes.push(...RED.nodes.filterNodes({ z: link.z }).filter(nodeFilter));
}
})
} else if (node.type.startsWith("subflow:") && !scope[node.type]) {
const workspace = RED.nodes.workspace(node.z) || RED.nodes.subflow(node.z);
if (workspace && workspace.initiatorNodes && workspace.initiatorNodes.includes(node.id)) {
scope[node.type] = true;
nodes.push(...RED.nodes.filterNodes({ z: node.type.substring(8) }).filter(nodeFilter));
}
}
}
delete scope[activeWorkspace];
return Object.keys(scope);
}
})();

View File

@ -608,7 +608,10 @@ RED.subflow = (function() {
name:name,
info:"",
in: [],
out: []
out: [],
failFast: { value: false },
timeout: { value: 10000 },
initiatorNodes: { value: [] },
};
RED.nodes.addSubflow(subflow);
RED.history.push({
@ -777,7 +780,10 @@ RED.subflow = (function() {
i:index,
id:RED.nodes.id(),
wires:[{id:v.source.id,port:v.sourcePort}]
}})
}}),
failFast: { value: false },
timeout: { value: 10000 },
initiatorNodes: { value: [] },
};
RED.nodes.addSubflow(subflow);

View File

@ -84,6 +84,9 @@ RED.workspaces = (function() {
info: "",
label: RED._('workspace.defaultName',{number:workspaceIndex}),
env: [],
failFast: false,
timeout: 10000,
initiatorNodes: [],
hideable: true,
};
if (!skipHistoryEntry) {

View File

@ -77,17 +77,17 @@ module.exports = function(RED) {
this.repeat = this.repeat * 1000;
this.debug(RED._("inject.repeat", this));
this.interval_id = setInterval(function() {
node.emit("input", {});
node.receive();
}, this.repeat);
} else if (this.crontab) {
this.debug(RED._("inject.crontab", this));
this.cronjob = scheduleTask(this.crontab,() => { node.emit("input", {})});
this.cronjob = scheduleTask(this.crontab,() => { node.receive()});
}
};
if (this.once) {
this.onceTimeout = setTimeout( function() {
node.emit("input",{});
node.receive();
node.repeaterSetup();
}, this.onceDelay);
} else {

View File

@ -16,7 +16,10 @@ function generateSubflowConfig(subflow) {
const icon = subflow.icon || "arrow-in.svg";
const defaults = {
name: {value: ""}
name: {value: ""},
failFast: { value: false },
timeout: { value: 10000 },
initiatorNodes: { value: [] },
}
const credentials = {}

View File

@ -172,6 +172,8 @@ var api = module.exports = {
safeSettings.runtimeState.ui = false; // cannot have UI without endpoint
}
safeSettings.gracefulShutdownEnabled = runtime.settings.gracefulShutdown?.enabled || false;
runtime.settings.exportNodeSettings(safeSettings);
runtime.plugins.exportPluginSettings(safeSettings);
}

View File

@ -26,8 +26,11 @@ let Subflow;
let Log;
let Group;
let activeFlows;
let nodeCloseTimeout = 15000;
let asyncMessageDelivery = true;
let gracefulShutdown = false;
/**
* This class represents a flow within the runtime. It is responsible for
@ -57,6 +60,8 @@ class Flow {
this.groupOrder = []
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.completeNodeMap = {};
this.linkNodes = [];
this.catchNodes = [];
this.statusNodes = [];
this.path = this.id;
@ -67,6 +72,26 @@ class Flow {
// _env is an object for direct lookup of env name -> value
this.env = this.flow.env
this._env = {}
// Graceful Shutdown props
/** @type {Number} */
this.pendingMsgCount = 0;
/** @type {Record<string, number>} */
this.msgInProgressNodeMap = {};
/** @type {object[]|null} */
this.shutdownScope = null;
/** @type {Number} */
this.shutdownTimeout = 10000; // 10s
/** @type {null|(() => void)} */
this.cancelShutdown = null;
/** @type {null|(() => void)} */
this.shutdown = null;
/** @type {boolean} */
this.failFast = false;
/** @type {string[]} */
this.messageInitiatorNodes = [];
/** @type {Record<string, string[]>} */
this.wiredNodeMap = {};
}
/**
@ -137,6 +162,43 @@ class Flow {
this.parent.log(msg);
}
/**
* This method is called when a node finishes processing a message.
* e.g. `node.done()` or `node.done(error)`.
* @param {object} node
*/
decrementPendingMsgCount(node) {
this.pendingMsgCount--;
if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) {
this.msgInProgressNodeMap[node.id]--;
if (this.msgInProgressNodeMap[node.id] <= 0) {
delete this.msgInProgressNodeMap[node.id];
}
}
this.trace("decrement pending messages count to: " + this.pendingMsgCount);
this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`);
if (this.shutdown) {
this.shutdown();
}
}
/**
* This method is called when a node sends a message.
* e.g. `node.send(msg)` or `node.receive(msg)`.
* @param {object} node
*/
incrementPendingMsgCount(node) {
this.pendingMsgCount++;
if (this.msgInProgressNodeMap[node.id]) {
this.msgInProgressNodeMap[node.id]++;
} else {
this.msgInProgressNodeMap[node.id] = 1;
}
this.trace("increment pending messages count to: " + this.pendingMsgCount);
this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`);
}
/**
* Start this flow.
* The `diff` argument helps define what needs to be started in the case
@ -149,6 +211,7 @@ class Flow {
var node;
var newNode;
var id;
this.linkNodes = [];
this.catchNodes = [];
this.statusNodes = [];
this.completeNodeMap = {};
@ -252,6 +315,7 @@ class Flow {
}
}
this.wiredNodeMap = {};
for (id in this.flow.nodes) {
if (this.flow.nodes.hasOwnProperty(id)) {
node = this.flow.nodes[id];
@ -293,12 +357,78 @@ class Flow {
}
}
}
if (node.wires) {
// Create a map with incoming wires for each node
node.wires.forEach((output) => {
// TODO: Still an array?
output.forEach((wire) => {
this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || [];
this.wiredNodeMap[wire].push(node.id);
});
});
}
} else {
this.debug("not starting disabled node : "+id);
}
}
}
this.shutdown = null;
this.cancelShutdown = null;
this.failFast = this.flow.failFast || false;
this.messageInitiatorNodes = this.flow.initiatorNodes || this.subflowDef?.initiatorNodes || [];
if (typeof this.flow.timeout === "number") {
// TODO: Math.max
this.shutdownTimeout = this.flow.timeout;
}
// Determine the shutdown scope; flows needed to calculate pending msgs
/** @type {(node: import("@node-red/registry").Node) => boolean} */
const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:");
const nodes = Object.values(this.flow.nodes || {}).filter(nodeFilter);
const seen = new Set();
const scope = { [this.id]: true };
seen.add(this.id);
if (this.subflowInstance && this.subflowInstance.z) {
scope[this.subflowInstance.z] = true;
nodes.push(...Object.values(this.global.flows[this.subflowInstance.z]?.nodes || {}).filter(nodeFilter));
}
while (nodes.length) {
const node = nodes.pop();
if (seen.has(node.id)) {
continue;
}
seen.add(node.id);
if (node.links) {
node.links.forEach((id) => {
const link = this.global.allNodes[id];
if (link && !scope[link.z]) {
scope[link.z] = true;
nodes.push(...Object.values(this.global.flows[link.z]?.nodes || {}).filter(nodeFilter));
}
})
} else if (node.type.startsWith("subflow:") && !scope[node.id]) {
scope[node.id] = true;
if (!scope[node.type]) {
scope[node.type] = true;
nodes.push(...Object.values(this.global.flows[node.type.substring(8)]?.nodes || {}).filter(nodeFilter));
}
}
}
const scopeList = Object.keys(scope);
this.trace("---------------------------------------------------");
this.trace(" shutdown scope");
this.trace("---------------------------------------------------");
this.trace(scopeList.join(", "));
if (scopeList.length > 1) {
setImmediate(() => {
// Lookup for the flow instance
this.shutdownScope = scopeList.map((id) => this.getNode(id, false)?._flow || activeFlows[id]);
});
}
var activeCount = Object.keys(this.activeNodes).length;
if (activeCount > 0) {
this.trace("------------------|--------------|-----------------");
@ -310,7 +440,9 @@ class Flow {
if (this.activeNodes.hasOwnProperty(id)) {
node = this.activeNodes[id];
this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":""));
if (node.type === "catch") {
if (node.type.startsWith("link ")) {
this.linkNodes.push(node);
} else if (node.type === "catch") {
this.catchNodes.push(node);
} else if (node.type === "status") {
this.statusNodes.push(node);
@ -349,14 +481,15 @@ class Flow {
* Stop this flow.
* The `stopList` argument helps define what needs to be stopped in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} stopList [description]
* @param {[type]} removedList [description]
* @return {[type]} [description]
* @param {string[]} [stopList] [description]
* @param {string[]} [removedList] [description]
* @return {Promise<void>} [description]
*/
stop(stopList, removedList) {
this.trace("stop "+this.TYPE);
var i;
let fullStop = false;
if (!stopList) {
fullStop = true;
stopList = Object.keys(this.activeNodes);
}
// this.trace(" stopList: "+stopList.join(","))
@ -369,7 +502,9 @@ class Flow {
let nodesToStop = [];
let configsToStop = [];
const stopMap = {};
stopList.forEach(id => {
stopMap[id] = true;
if (this.flow.configs[id]) {
configsToStop.push(id);
} else {
@ -378,28 +513,155 @@ class Flow {
});
stopList = nodesToStop.concat(configsToStop);
var promises = [];
for (i=0;i<stopList.length;i++) {
var node = this.activeNodes[stopList[i]];
/** @type {Promise<void>[]} */
const initialPromises = [];
/** @type {Promise<void>} */
let shutdownPromise = Promise.resolve();
if (gracefulShutdown && this.messageInitiatorNodes) {
// Start by closing initiator nodes
const filteredNodes = this.messageInitiatorNodes.filter((id) => stopMap[id]);
initialPromises.push(...this.stopNodes(filteredNodes, removedMap));
/** @type {Set<string>} */
const lookupNodes = new Set();
if (!fullStop) {
// Create a list of nodes wired upstream of nodes to be stopped.
const list = stopList.slice();
while (list.length) {
const id = list.pop();
if (lookupNodes.has(id)) {
continue;
}
lookupNodes.add(id);
if (this.wiredNodeMap[id]) {
list.push(...this.wiredNodeMap[id]);
}
}
}
/** @type {Promise<void>} */
const closePromise = new Promise((resolve) => {
this.shutdown = () => {
if (fullStop) {
this.trace(`Pending message count: ${this.pendingMsgCount}`);
if (this.shutdownScope === null) {
if (this.pendingMsgCount === 0) {
return resolve();
}
} else if (this.shutdownScope.length) {
if (this.pendingMsgCount === 0 && this.failFast) {
// Need to double check due to flow crossing (z)
if (Object.keys(this.msgInProgressNodeMap).length === 0) {
// Allow a scoped flow to stop itself
// Not capable to detect a loop or a callback; it is up
// to the user to choose whether or not to enable flow closure.
this.shutdown = null;
return resolve();
}
}
let count = 0;
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow) {
count += flow.pendingMsgCount;
}
});
if (count === 0) {
this.shutdown = null;
if (this.messageInitiatorNodes.includes(this.id)) {
// This is a subflow initiator node
// Other flows from the scope are still running at that time
return resolve();
}
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) {
// Retrigger validation in each flow
flow.shutdown();
}
});
return resolve();
}
}
} else {
let inProgress = false;
for (const id of Object.keys(this.msgInProgressNodeMap)) {
if (lookupNodes.has(id)) {
inProgress = true;
break;
}
}
if (!inProgress) {
return resolve();
}
}
}
});
/** @type {NodeJS.Timeout|null} */
let timer = null;
const timeoutPromise = new Promise((_resolve, reject) => {
if (this.shutdownScope) {
this.cancelShutdown = () => reject("Graceful shutdown cancelled");
}
timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout);
});
const start = Date.now();
shutdownPromise = Promise.race([closePromise, timeoutPromise])
.then(() => {
clearTimeout(timer);
const delta = Date.now() - start;
this.trace(`Stopped gracefuly in ${delta}ms`);
}).catch((error) => {
clearTimeout(timer);
this.error(Log._("nodes.flows.stopping-error", { message: error }));
if (this.failFast && this.shutdownScope) {
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.cancelShutdown && flow.id !== this.id) {
// Cancel other flow shutdown if this one fails
flow.cancelShutdown();
}
});
}
});
if (this.shutdown) {
setImmediate(this.shutdown);
}
}
return Promise.all(initialPromises)
.then(() => shutdownPromise)
.then(() => Promise.all(this.stopNodes(stopList, removedMap)));
}
/**
* Stop every node of stopList
* @param {string[]} stopList
* @param {Record<string, true>} removedMap
* @returns {Promise<void>[]}
*/
stopNodes(stopList, removedMap) {
const promises = [];
for (const nodeId of stopList) {
const node = this.activeNodes[nodeId];
if (node) {
delete this.activeNodes[stopList[i]];
if (this.subflowInstanceNodes[stopList[i]]) {
delete this.subflowInstanceNodes[stopList[i]];
delete this.activeNodes[nodeId];
if (this.subflowInstanceNodes[nodeId]) {
delete this.subflowInstanceNodes[nodeId];
}
try {
var removed = removedMap[stopList[i]];
promises.push(stopNode(node,removed).catch(()=>{}));
} catch(err) {
node.error(err);
}
if (removedMap[stopList[i]]) {
events.emit("node-status",{
id: node.id
});
const removed = removedMap[nodeId];
if (removed) {
// Clears the node status
events.emit("node-status", { id: node.id });
}
promises.push(stopNode(node, removed).catch(() => { }));
} catch(error) {
node.error(error);
}
}
}
return Promise.all(promises);
return promises;
}
/**
@ -448,7 +710,7 @@ class Flow {
}
}
}
} else {
} else if (typeof this.parent.getNode === "function") {
// Node not found inside this flow - ask the parent
return this.parent.getNode(id);
}
@ -695,6 +957,7 @@ class Flow {
}
handleComplete(node,msg) {
// Trigger Complete nodes
if (this.completeNodeMap[node.id]) {
let toSend = msg;
this.completeNodeMap[node.id].forEach((completeNode,index) => {
@ -760,9 +1023,9 @@ class Flow {
/**
* Stop an individual node within this flow.
*
* @param {[type]} node [description]
* @param {[type]} removed [description]
* @return {[type]} [description]
* @param {object} node [description]
* @param {boolean} removed [description]
* @return {Promise<void>} [description]
*/
function stopNode(node,removed) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
@ -821,7 +1084,7 @@ function handlePreRoute(flow, sendEvent, reportError) {
function deliverMessageToDestination(sendEvent) {
if (sendEvent?.destination?.node) {
try {
sendEvent.destination.node.receive(sendEvent.msg);
sendEvent.destination.node._receive(sendEvent.msg);
} catch(err) {
Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`)
Log.error(err.stack)
@ -835,6 +1098,9 @@ function handlePreDeliver(flow,sendEvent, reportError) {
reportError(err,sendEvent);
return;
} else if (err !== false) {
// node.send(msg) called
// Do it now to avoid next loop and an offbeat count
flow.incrementPendingMsgCount(sendEvent.destination.node);
if (asyncMessageDelivery) {
setImmediate(function() {
deliverMessageToDestination(sendEvent)
@ -855,7 +1121,8 @@ function handlePreDeliver(flow,sendEvent, reportError) {
module.exports = {
init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery
asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery;
gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false;
Log = runtime.log;
Subflow = require("./Subflow");
Group = require("./Group").Group
@ -863,5 +1130,8 @@ module.exports = {
create: function(parent,global,conf) {
return new Flow(parent,global,conf)
},
setActiveFlows: function(flows) {
activeFlows = flows;
},
Flow: Flow
}

View File

@ -36,7 +36,9 @@ var activeFlowConfig = null;
var activeFlows = {};
var started = false;
var state = 'stop'
var state = 'stop';
/** @type {boolean} */
let gracefulShutdown;
var credentialsPendingReset = false;
@ -53,6 +55,7 @@ function init(runtime) {
log = runtime.log;
started = false;
state = 'stop';
gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false;
if (!typeEventRegistered) {
events.on('type-registered',function(type) {
if (activeFlowConfig && activeFlowConfig.missingTypes.length > 0) {
@ -70,6 +73,7 @@ function init(runtime) {
typeEventRegistered = true;
}
Flow.init(runtime);
Flow.setActiveFlows(activeFlows);
flowUtil.init(runtime);
}
@ -471,7 +475,9 @@ function stop(type,diff,muteLog,isDeploy) {
let globalIndex = activeFlowIds.indexOf("global");
if (globalIndex !== -1) {
activeFlowIds.splice(globalIndex,1);
activeFlowIds.push("global");
if (!gracefulShutdown) {
activeFlowIds.push("global");
}
}
activeFlowIds.forEach(id => {
@ -485,7 +491,26 @@ function stop(type,diff,muteLog,isDeploy) {
}
});
return Promise.all(promises).then(function() {
/** @type {() => Promise<void>} */
const globalFlowPromise = () => {
if (gracefulShutdown) {
// Ensure the global flow is the last one closed
return new Promise((resolve) => {
if (activeFlows.hasOwnProperty("global")) {
const flowStateChanged = diff && (diff.flowChanged.indexOf("global") !== -1 || diff.added.indexOf("global") !== -1 || diff.removed.indexOf("global") !== -1);
log.debug("red/nodes/flows.stop : stopping flow : global");
activeFlows["global"].stop(flowStateChanged ? null : stopList, removedList).then(resolve);
if (type === "full" || flowStateChanged || diff.removed.indexOf("global")!==-1) {
delete activeFlows["global"];
}
}
});
} else {
return Promise.resolve();
}
};
return Promise.all(promises).then(globalFlowPromise).then(function () {
for (let id in activeNodesToFlow) {
if (activeNodesToFlow.hasOwnProperty(id)) {
if (!activeFlows[activeNodesToFlow[id]]) {
@ -579,6 +604,15 @@ async function addFlow(flow, user) {
if (flow.hasOwnProperty('env')) {
tabNode.env = flow.env;
}
if (flow.hasOwnProperty('failFast')) {
tabNode.failFast = flow.failFast;
}
if (flow.hasOwnProperty('timeout')) {
tabNode.timeout = flow.timeout;
}
if (flow.hasOwnProperty('initiatorNodes')) {
tabNode.initiatorNodes = flow.initiatorNodes;
}
var nodes = [tabNode];
@ -642,6 +676,15 @@ function getFlow(id) {
if (flow.hasOwnProperty('env')) {
result.env = flow.env;
}
if (flow.hasOwnProperty('failFast')) {
result.failFast = flow.failFast;
}
if (flow.hasOwnProperty('timeout')) {
result.timeout = flow.timeout;
}
if (flow.hasOwnProperty('initiatorNodes')) {
result.initiatorNodes = flow.initiatorNodes;
}
if (id !== 'global') {
result.nodes = [];
}
@ -770,6 +813,15 @@ async function updateFlow(id,newFlow, user) {
if (newFlow.hasOwnProperty('env')) {
tabNode.env = newFlow.env;
}
if (flow.hasOwnProperty('failFast')) {
tabNode.failFast = flow.failFast;
}
if (flow.hasOwnProperty('timeout')) {
tabNode.timeout = flow.timeout;
}
if (flow.hasOwnProperty('initiatorNodes')) {
tabNode.initiatorNodes = flow.initiatorNodes;
}
if (newFlow.hasOwnProperty('credentials')) {
tabNode.credentials = newFlow.credentials;
}

View File

@ -131,6 +131,10 @@ Node.prototype.context = function() {
*/
Node.prototype._complete = function(msg,error) {
this.metric("done",msg);
if (this._flow && typeof this._flow.decrementPendingMsgCount === "function") {
// done() or done(error) called
this._flow.decrementPendingMsgCount(this);
}
hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => {
if (err) {
this.error(err);
@ -199,6 +203,7 @@ Node.prototype.emit = function(event, ...args) {
* This will call all registered handlers for the 'input' event.
*/
Node.prototype._emitInput = function(arg) {
// TODO: inject node directly used the event to trigger a message, is this allowed?
var node = this;
this.metric("receive", arg);
let receiveEvent = { msg:arg, destination: { id: this.id, node: this } }
@ -216,6 +221,11 @@ Node.prototype._emitInput = function(arg) {
function() { node.send.apply(node,arguments) },
function(err) { node._complete(arg,err); }
);
if (node._expectedDoneCount === 0 && node._flow) {
// TODO: Call Complete node is expected?
// Ensure done() is called
node._complete(arg);
}
} catch(err) {
node.error(err,arg);
}
@ -237,6 +247,11 @@ Node.prototype._emitInput = function(arg) {
}
}
);
if (node._expectedDoneCount === 0 && node._flow) {
// TODO: Call Complete node is expected?
// Ensure done() is called
node._complete(arg);
}
} catch(err) {
node.error(err,arg);
}
@ -492,11 +507,9 @@ Node.prototype.send = function(msg) {
};
/**
* Receive a message.
*
* This will emit the `input` event with the provided message.
* Internal method to receive a message
*/
Node.prototype.receive = function(msg) {
Node.prototype._receive = function (msg) {
if (!msg) {
msg = {};
}
@ -506,6 +519,19 @@ Node.prototype.receive = function(msg) {
this.emit("input",msg);
};
/**
* Receive a message.
*
* This will emit the `input` event with the provided message.
*/
Node.prototype.receive = function (msg) {
if (this._flow && typeof this._flow.incrementPendingMsgCount === "function") {
// Equivalent to node.send(msg) without handleOnSend
this._flow.incrementPendingMsgCount(this);
}
this._receive(msg);
};
function log_helper(self, level, msg) {
var o = {
level: level,

View File

@ -312,6 +312,9 @@ module.exports = {
/** show or hide runtime stop/start options in the node-red editor. Must be set to `false` to hide */
ui: false,
},
gracefulShutdown: {
enabled: true,
},
telemetry: {
/**
* By default, telemetry is disabled until the user provides consent the first

View File

@ -221,7 +221,7 @@ describe('link Node', function() {
const flow = [
{ id: "tab-flow-1", type: "tab", label: "Flow 1" },
{ id: "link-in-1", z: "tab-flow-1", type: "link in", name: "double payload", wires: [["func"]] },
{ id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]] },
{ id: "func", z: "tab-flow-1", type: "helper", wires: [["link-out-1"]], x: 1, y: 1 },
{ id: "link-out-1", z: "tab-flow-1", type: "link out", mode: "" }, //not return mode, cause link-call timeout
{ id: "link-call", z: "tab-flow-1", type: "link call", linkType: "static", "timeout": "0.5", links: ["link-in-1"], wires: [["n4"]] },
{ id: "catch-all", z: "tab-flow-1", type: "catch", scope: ["link-call"], uncaught: true, wires: [["n4"]] },

View File

@ -204,8 +204,8 @@ describe('Flow', function() {
it("instantiates an initial configuration and stops it", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"a"}
]);
@ -256,8 +256,8 @@ describe('Flow', function() {
it("instantiates config nodes in the right order",async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"5"}, // This node depends on #5
{id:"5",z:"t1",type:"test"}
@ -308,8 +308,8 @@ describe('Flow', function() {
it("rewires nodes specified by diff", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}
]);
@ -318,7 +318,7 @@ describe('Flow', function() {
await flow.start();
//TODO: use update to pass in new wiring and verify the change
createCount.should.equal(3);
flow.start({rewired:["2"]});
flow.start({rewired:[["2"]]});
createCount.should.equal(3);
rewiredNodes.should.have.a.property("2");
});
@ -355,8 +355,8 @@ describe('Flow', function() {
it("ignores disabled nodes", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",d:true,type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"a"},
{id:"5",z:"t1",type:"test",d:true,foo:"a"}
@ -406,8 +406,8 @@ describe('Flow', function() {
it("stops all nodes", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"asyncTest",foo:"a",wires:[]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
@ -428,8 +428,8 @@ describe('Flow', function() {
it("stops specified nodes", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
@ -450,9 +450,9 @@ describe('Flow', function() {
it("stops config nodes last", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"c1",z:"t1",type:"test"},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"c2",z:"t1",type:"test"},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"c3",z:"t1",type:"test"}
@ -485,8 +485,8 @@ describe('Flow', function() {
}});
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"testAsync",closeDelay: 80, foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
@ -514,8 +514,8 @@ describe('Flow', function() {
it("gets a node known to the flow", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"a"}
]);
@ -529,8 +529,8 @@ describe('Flow', function() {
it("passes to parent if node not known locally", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"a"}
]);
@ -547,8 +547,8 @@ describe('Flow', function() {
it("does not pass to parent if cancelBubble set", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",z:"t1",type:"test",foo:"a"}
]);
@ -567,8 +567,8 @@ describe('Flow', function() {
it("passes a status event to the adjacent status node", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]},
{id:"sn2",x:10,y:10,z:"t1",type:"status",foo:"a",wires:[]}
@ -605,10 +605,10 @@ describe('Flow', function() {
it("passes a status event to the adjacent scoped status node ", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"status",scope:["2"],foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"status",scope:[["2"]],foo:"a",wires:[]},
{id:"sn2",x:10,y:10,z:"t1",type:"status",scope:["1"],foo:"a",wires:[]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
@ -639,7 +639,7 @@ describe('Flow', function() {
{id: "g1", type: "group", g: "g3", z:"t1" },
{id: "g2", type: "group", z:"t1" },
{id: "g3", type: "group", z:"t1" },
{id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]},
{id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]},
// sn - in the same group as source node
{id:"sn",x:10,y:10,z:"t1",g:"g1", type:"status",scope:"group",wires:[]},
// sn2 - in a different group hierarchy to the source node
@ -669,8 +669,8 @@ describe('Flow', function() {
it("passes an error event to the adjacent catch node", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]},
{id:"sn2",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]},
@ -712,10 +712,10 @@ describe('Flow', function() {
it("passes an error event to the adjacent scoped catch node ", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"catch",scope:["2"],foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"catch",scope:[["2"]],foo:"a",wires:[]},
{id:"sn2",x:10,y:10,z:"t1",type:"catch",scope:["1"],foo:"a",wires:[]},
{id:"sn3",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]},
{id:"sn4",x:10,y:10,z:"t1",type:"catch",uncaught:true,wires:[]}
@ -766,7 +766,7 @@ describe('Flow', function() {
{id: "g1", type: "group", g: "g3", z:"t1" },
{id: "g2", type: "group", z:"t1" },
{id: "g3", type: "group", z:"t1" },
{id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:["2"]},
{id:"1",x:10,y:10,z:"t1",g:"g1", type:"test",name:"a",wires:[["2"]]},
// sn - in the same group as source node
{id:"sn",x:10,y:10,z:"t1",g:"g1", type:"catch",scope:"group",wires:[]},
// sn2 - in a different group hierarchy to the source node
@ -795,8 +795,8 @@ describe('Flow', function() {
it("moves any existing error object sideways", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sn",x:10,y:10,z:"t1",type:"catch",foo:"a",wires:[]}
]);
@ -828,8 +828,8 @@ describe('Flow', function() {
it("passes a complete event to the adjacent Complete node",async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"testDone",foo:"a",wires:[]},
{id:"cn",x:10,y:10,z:"t1",type:"complete",scope:["1","3"],foo:"a",wires:[]}
]);
@ -856,8 +856,8 @@ describe('Flow', function() {
it("sends a message - no cloning", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
@ -875,7 +875,7 @@ describe('Flow', function() {
if (err) { reject(err) }
else { resolve() }
}
n2.receive = function(msg) {
n2._receive = function(msg) {
messageReceived = true;
try {
msg.should.be.exactly(message);
@ -898,8 +898,8 @@ describe('Flow', function() {
it("sends a message - cloning", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
@ -916,7 +916,7 @@ describe('Flow', function() {
if (err) { reject(err) }
else { resolve() }
}
n2.receive = function(msg) {
n2._receive = function(msg) {
try {
// Message should be cloned
msg.should.be.eql(message);
@ -939,8 +939,8 @@ describe('Flow', function() {
it("sends multiple messages", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
@ -957,7 +957,7 @@ describe('Flow', function() {
else { resolve() }
}
var messageCount = 0;
n2.receive = function(msg) {
n2._receive = function(msg) {
try {
msg.should.be.exactly(messages[messageCount++]);
if (messageCount === 2) {
@ -1032,8 +1032,8 @@ describe('Flow', function() {
})
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
var flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
@ -1049,7 +1049,7 @@ describe('Flow', function() {
if (err) { reject(err) }
else { resolve() }
}
n2.receive = function(msg) {
n2._receive = function(msg) {
messageReceived = true;
try {
msg.should.be.eql(message);
@ -1103,14 +1103,14 @@ describe('Flow', function() {
})
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
n1 = flow.getNode('1');
n2 = flow.getNode('2');
n2.receive = function(msg) {
n2._receive = function(msg) {
messageReceived = true;
}
n1.error = function(err) {
@ -1180,14 +1180,14 @@ describe('Flow', function() {
})
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["3"]}
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["3"]]}
]);
flow = Flow.create({},config,config.flows["t1"]);
await flow.start();
n1 = flow.getNode('1');
n2 = flow.getNode('2');
n2.receive = function(msg) {
n2._receive = function(msg) {
messageReceived = true;
}
n1.error = function(err) {

View File

@ -295,8 +295,8 @@ describe('Subflow', function() {
it("instantiates a subflow and stops it", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
@ -359,8 +359,8 @@ describe('Subflow', function() {
it("instantiates a subflow inside a subflow and stops it", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3","4"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"],["4"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 1","info":"",
@ -389,8 +389,8 @@ describe('Subflow', function() {
it("rewires a subflow node on update/start", async function(){
var rawConfig = [
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"4",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
@ -443,8 +443,8 @@ describe('Subflow', function() {
it("stops subflow instance nodes", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
"in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]},
@ -466,8 +466,8 @@ describe('Subflow', function() {
it("passes a status event to the subflow's parent tab status node - all scope", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
"in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]},
@ -496,8 +496,8 @@ describe('Subflow', function() {
it("passes a status event to the subflow's parent tab status node - targetted scope", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
"in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]},
@ -534,8 +534,8 @@ describe('Subflow', function() {
it("emits a status event when a message is passed to a subflow-status node - msg.payload as string", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{
id:"sf1",
@ -571,8 +571,8 @@ describe('Subflow', function() {
it("emits a status event when a message is passed to a subflow-status node - msg.payload as status obj", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{
id:"sf1",
@ -610,8 +610,8 @@ describe('Subflow', function() {
it("emits a status event when a message is passed to a subflow-status node - msg.status", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{
id:"sf1",
@ -649,8 +649,8 @@ describe('Subflow', function() {
it("does not emit a regular status event if it contains a subflow-status node", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{
id:"sf1",
@ -682,8 +682,8 @@ describe('Subflow', function() {
it("passes an error event to the subflow's parent tab catch node - all scope",async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
"in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]},
@ -714,8 +714,8 @@ describe('Subflow', function() {
it("passes an error event to the subflow's parent tab catch node - targetted scope", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",name:"a",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"a",wires:[]},
{id:"sf1",type:"subflow","name":"Subflow 2","info":"",
"in":[{"wires":[{"id":"sf1-1"}]}],"out":[{"wires":[{"id":"sf1-1","port":0}]}]},
@ -752,8 +752,8 @@ describe('Subflow', function() {
it("can access process env var", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]},
{id:"sf1",type:"subflow",name:"Subflow 2",info:"",
"in":[ {wires:[{id:"sf1-1"}]} ],
@ -779,8 +779,8 @@ describe('Subflow', function() {
it("can access subflow env var", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]},
{id:"sf1",type:"subflow",name:"Subflow 2",info:"",env: [{name: '__KEY__', value: '__VAL1__', type: 'str'}],
"in":[ {wires:[{id:"sf1-1"}]} ],
@ -815,8 +815,8 @@ describe('Subflow', function() {
it("can access nested subflow env var", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab", env: [{name: '__KEY1__', value: 't1', type: 'str'}]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]},
{id:"sf1",type:"subflow",name:"Subflow 1",info:"",
env: [{name: '__KEY2__', value: 'sf1', type: 'str'}],
@ -854,8 +854,8 @@ describe('Subflow', function() {
it("can access name of subflow as env var", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]},
{id:"sf1",type:"subflow",name:"Subflow 2",info:"",
"in":[ {wires:[{id:"sf1-1"}]} ],
@ -879,8 +879,8 @@ describe('Subflow', function() {
it("can access id of subflow as env var", async function() {
var config = flowUtils.parseConfig([
{id:"t1",type:"tab"},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:["2"]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:["3"]},
{id:"1",x:10,y:10,z:"t1",type:"test",foo:"t1.1",wires:[["2"]]},
{id:"2",x:10,y:10,z:"t1",type:"subflow:sf1",name:"SFN",wires:[["3"]]},
{id:"3",x:10,y:10,z:"t1",type:"test",foo:"t1.3",wires:[]},
{id:"sf1",type:"subflow",name:"Subflow 2",info:"",
"in":[ {wires:[{id:"sf1-1"}]} ],