node-red/packages/node_modules/@node-red/runtime/lib/flows/Flow.js

1131 lines
44 KiB
JavaScript

/**
* Copyright JS Foundation and other contributors, http://js.foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
const clone = require("clone");
const redUtil = require("@node-red/util").util;
const events = require("@node-red/util").events;
const flowUtil = require("./util");
const context = require('../nodes/context');
const hooks = require("@node-red/util").hooks;
const credentials = require("../nodes/credentials");
let Subflow;
let Log;
let Group;
let activeFlows;
let nodeCloseTimeout = 15000;
let asyncMessageDelivery = true;
let gracefulShutdown = false;
/**
* This class represents a flow within the runtime. It is responsible for
* creating, starting and stopping all nodes within the flow.
*/
class Flow {
/**
* Create a Flow object.
* @param {[type]} parent The parent flow
* @param {[type]} globalFlow The global flow definition
* @param {[type]} flow This flow's definition
*/
constructor(parent,globalFlow,flow) {
this.TYPE = 'flow';
this.parent = parent;
this.global = globalFlow;
if (typeof flow === 'undefined') {
this.flow = globalFlow;
this.isGlobalFlow = true;
} else {
this.flow = flow;
this.isGlobalFlow = false;
}
this.id = this.flow.id || "global";
this.groups = {}
this.groupOrder = []
this.activeNodes = {};
this.subflowInstanceNodes = {};
this.completeNodeMap = {};
this.linkNodes = [];
this.catchNodes = [];
this.statusNodes = [];
this.path = this.id;
// Ensure a context exists for this flow
this.context = context.getFlowContext(this.id,this.parent.id);
// env is an array of env definitions
// _env is an object for direct lookup of env name -> value
this.env = this.flow.env
this._env = {}
// Graceful Shutdown props
/** @type {Number} */
this.pendingMsgCount = 0;
/** @type {Record<string, number>} */
this.msgInProgressNodeMap = {};
/** @type {object[]|null} */
this.shutdownScope = null;
/** @type {Number} */
this.shutdownTimeout = 10000; // 10s
/** @type {null|(() => void)} */
this.cancelShutdown = null;
/** @type {null|(() => void)} */
this.shutdown = null;
/** @type {boolean} */
this.failFast = false;
/** @type {string[]} */
this.messageInitiatorNodes = [];
/** @type {Record<string, string[]>} */
this.wiredNodeMap = {};
}
/**
* Log a debug-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
debug(msg) {
Log.log({
id: this.id||"global",
level: Log.DEBUG,
type:this.TYPE,
msg:msg
})
}
/**
* Log an error-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
error(msg) {
Log.log({
id: this.id||"global",
level: Log.ERROR,
type:this.TYPE,
msg:msg
})
}
/**
* Log a info-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
info(msg) {
Log.log({
id: this.id||"global",
level: Log.INFO,
type:this.TYPE,
msg:msg
})
}
/**
* Log a trace-level message from this flow
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
trace(msg) {
Log.log({
id: this.id||"global",
level: Log.TRACE,
type:this.TYPE,
msg:msg
})
}
/**
* [log description]
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
log(msg) {
if (!msg.path) {
msg.path = this.path;
}
this.parent.log(msg);
}
/**
* This method is called when a node finishes processing a message.
* e.g. `node.done()` or `node.done(error)`.
* @param {object} node
*/
decrementPendingMsgCount(node) {
this.pendingMsgCount--;
if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) {
this.msgInProgressNodeMap[node.id]--;
if (this.msgInProgressNodeMap[node.id] <= 0) {
delete this.msgInProgressNodeMap[node.id];
}
}
this.trace("decrement pending messages count to: " + this.pendingMsgCount);
this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`);
if (this.shutdown) {
this.shutdown();
}
}
/**
* This method is called when a node sends a message.
* e.g. `node.send(msg)` or `node.receive(msg)`.
* @param {object} node
*/
incrementPendingMsgCount(node) {
this.pendingMsgCount++;
if (this.msgInProgressNodeMap[node.id]) {
this.msgInProgressNodeMap[node.id]++;
} else {
this.msgInProgressNodeMap[node.id] = 1;
}
this.trace("increment pending messages count to: " + this.pendingMsgCount);
this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`);
}
/**
* Start this flow.
* The `diff` argument helps define what needs to be started in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
async start(diff) {
this.trace("start "+this.TYPE+" ["+this.path+"]");
var node;
var newNode;
var id;
this.linkNodes = [];
this.catchNodes = [];
this.statusNodes = [];
this.completeNodeMap = {};
if (this.isGlobalFlow) {
// This is the global flow. It needs to go find the `global-config`
// node and extract any env properties from it
const configNodes = Object.keys(this.flow.configs);
for (let i = 0; i < configNodes.length; i++) {
const node = this.flow.configs[configNodes[i]]
if (node.type === 'global-config' && node.env) {
const globalCreds = credentials.get(node.id)?.map || {}
const nodeEnv = await flowUtil.evaluateEnvProperties(this, node.env, globalCreds)
this._env = { ...this._env, ...nodeEnv }
}
}
}
if (this.env) {
this._env = { ...this._env, ...await flowUtil.evaluateEnvProperties(this, this.env, credentials.get(this.id)) }
}
// Initialise the group objects. These must be done in the right order
// starting from outer-most to inner-most so that the parent hierarchy
// is maintained.
this.groups = {}
this.groupOrder = []
const groupIds = Object.keys(this.flow.groups || {})
while (groupIds.length > 0) {
const id = groupIds.shift()
const groupDef = this.flow.groups[id]
if (!groupDef.g || this.groups[groupDef.g]) {
// The parent of this group is available - either another group
// or the top-level flow (this)
const parent = this.groups[groupDef.g] || this
this.groups[groupDef.id] = new Group(parent, groupDef)
this.groupOrder.push(groupDef.id)
} else {
// Try again once we've processed the other groups
groupIds.push(id)
}
}
for (let i = 0; i < this.groupOrder.length; i++) {
// Start the groups in the right order so they
// can setup their env vars knowning their parent
// will have been started
await this.groups[this.groupOrder[i]].start()
}
var configNodes = Object.keys(this.flow.configs);
var configNodeAttempts = {};
while (configNodes.length > 0) {
id = configNodes.shift();
node = this.flow.configs[id];
if (!this.activeNodes[id]) {
if (node.d !== true) {
var readyToCreate = true;
// This node doesn't exist.
// Check it doesn't reference another non-existent config node
for (var prop in node) {
if (node.hasOwnProperty(prop) &&
prop !== 'id' &&
prop !== 'wires' &&
prop !== '_users' &&
this.flow.configs[node[prop]] &&
this.flow.configs[node[prop]].d !== true
) {
if (!this.activeNodes[node[prop]]) {
// References a non-existent config node
// Add it to the back of the list to try again later
configNodes.push(id);
configNodeAttempts[id] = (configNodeAttempts[id]||0)+1;
if (configNodeAttempts[id] === 100) {
throw new Error("Circular config node dependency detected: "+id);
}
readyToCreate = false;
break;
}
}
}
if (readyToCreate) {
newNode = await flowUtil.createNode(this,node);
if (newNode) {
this.activeNodes[id] = newNode;
}
}
} else {
this.debug("not starting disabled config node : "+id);
}
}
}
if (diff && diff.rewired) {
for (var j=0;j<diff.rewired.length;j++) {
var rewireNode = this.activeNodes[diff.rewired[j]];
if (rewireNode) {
rewireNode.updateWires(this.flow.nodes[rewireNode.id].wires);
}
}
}
this.wiredNodeMap = {};
for (id in this.flow.nodes) {
if (this.flow.nodes.hasOwnProperty(id)) {
node = this.flow.nodes[id];
if (node.d !== true) {
if (!node.subflow) {
if (!this.activeNodes[id]) {
newNode = await flowUtil.createNode(this,node);
if (newNode) {
this.activeNodes[id] = newNode;
}
}
} else {
if (!this.subflowInstanceNodes[id]) {
try {
var subflowDefinition = this.flow.subflows[node.subflow]||this.global.subflows[node.subflow]
// console.log("NEED TO CREATE A SUBFLOW",id,node.subflow);
// Ensure the path property is set on the instance node so NR_SUBFLOW_PATH env is evaluated properly
Object.defineProperty(node,'_path', {value: `${this.path}/${node._alias||node.id}`, enumerable: false, writable: true })
this.subflowInstanceNodes[id] = true;
var subflow = Subflow.create(
this,
this.global,
subflowDefinition,
node
);
this.subflowInstanceNodes[id] = subflow;
await subflow.start();
this.activeNodes[id] = subflow.node;
// this.subflowInstanceNodes[id] = nodes.map(function(n) { return n.id});
// for (var i=0;i<nodes.length;i++) {
// if (nodes[i]) {
// this.activeNodes[nodes[i].id] = nodes[i];
// }
// }
} catch(err) {
console.log(err.stack)
}
}
}
if (node.wires) {
// Create a map with incoming wires for each node
node.wires.forEach((output) => {
// TODO: Still an array?
output.forEach((wire) => {
this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || [];
this.wiredNodeMap[wire].push(node.id);
});
});
}
} else {
this.debug("not starting disabled node : "+id);
}
}
}
this.shutdown = null;
this.cancelShutdown = null;
this.failFast = this.flow.failFast || false;
this.messageInitiatorNodes = this.flow.initiatorNodes || this.subflowDef?.initiatorNodes || [];
if (typeof this.flow.timeout === "number") {
// TODO: Math.max
this.shutdownTimeout = this.flow.timeout;
}
// Determine the shutdown scope; flows needed to calculate pending msgs
/** @type {(node: import("@node-red/registry").Node) => boolean} */
const nodeFilter = (n) => n.type.startsWith("link ") || n.type.startsWith("subflow:");
const nodes = Object.values(this.flow.nodes || {}).filter(nodeFilter);
const seen = new Set();
const scope = { [this.id]: true };
seen.add(this.id);
if (this.subflowInstance && this.subflowInstance.z) {
scope[this.subflowInstance.z] = true;
nodes.push(...Object.values(this.global.flows[this.subflowInstance.z]?.nodes || {}).filter(nodeFilter));
}
while (nodes.length) {
const node = nodes.pop();
if (seen.has(node.id)) {
continue;
}
seen.add(node.id);
if (node.links) {
node.links.forEach((id) => {
const link = this.global.allNodes[id];
if (link && !scope[link.z]) {
scope[link.z] = true;
nodes.push(...Object.values(this.global.flows[link.z]?.nodes || {}).filter(nodeFilter));
}
})
} else if (node.type.startsWith("subflow:") && !scope[node.id]) {
scope[node.id] = true;
if (!scope[node.type]) {
scope[node.type] = true;
nodes.push(...Object.values(this.global.flows[node.type.substring(8)]?.nodes || {}).filter(nodeFilter));
}
}
}
const scopeList = Object.keys(scope);
this.trace("---------------------------------------------------");
this.trace(" shutdown scope");
this.trace("---------------------------------------------------");
this.trace(scopeList.join(", "));
if (scopeList.length > 1) {
setImmediate(() => {
// Lookup for the flow instance
this.shutdownScope = scopeList.map((id) => this.getNode(id, false)?._flow || activeFlows[id]);
});
}
var activeCount = Object.keys(this.activeNodes).length;
if (activeCount > 0) {
this.trace("------------------|--------------|-----------------");
this.trace(" id | type | alias");
this.trace("------------------|--------------|-----------------");
}
// Build the map of catch/status/complete nodes.
for (id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
node = this.activeNodes[id];
this.trace(" "+id.padEnd(16)+" | "+node.type.padEnd(12)+" | "+(node._alias||"")+(node._zAlias?" [zAlias:"+node._zAlias+"]":""));
if (node.type.startsWith("link ")) {
this.linkNodes.push(node);
} else if (node.type === "catch") {
this.catchNodes.push(node);
} else if (node.type === "status") {
this.statusNodes.push(node);
} else if (node.type === "complete") {
if (node.scope) {
node.scope.forEach(id => {
this.completeNodeMap[id] = this.completeNodeMap[id] || [];
this.completeNodeMap[id].push(node);
})
}
}
}
}
this.catchNodes.sort(function(A,B) {
if (A.scope && !B.scope) {
return -1;
} else if (!A.scope && B.scope) {
return 1;
} else if (A.scope && B.scope) {
return 0;
} else if (A.uncaught && !B.uncaught) {
return 1;
} else if (!A.uncaught && B.uncaught) {
return -1;
}
return 0;
});
if (activeCount > 0) {
this.trace("------------------|--------------|-----------------");
}
// this.dump();
}
/**
* Stop this flow.
* The `stopList` argument helps define what needs to be stopped in the case
* of a modified-nodes/flows type deploy.
* @param {string[]} [stopList] [description]
* @param {string[]} [removedList] [description]
* @return {Promise<void>} [description]
*/
stop(stopList, removedList) {
this.trace("stop "+this.TYPE);
let fullStop = false;
if (!stopList) {
fullStop = true;
stopList = Object.keys(this.activeNodes);
}
// this.trace(" stopList: "+stopList.join(","))
// Convert the list to a map to avoid multiple scans of the list
var removedMap = {};
removedList = removedList || [];
removedList.forEach(function(id) {
removedMap[id] = true;
});
let nodesToStop = [];
let configsToStop = [];
const stopMap = {};
stopList.forEach(id => {
stopMap[id] = true;
if (this.flow.configs[id]) {
configsToStop.push(id);
} else {
nodesToStop.push(id);
}
});
stopList = nodesToStop.concat(configsToStop);
/** @type {Promise<void>[]} */
const initialPromises = [];
/** @type {Promise<void>} */
let shutdownPromise = Promise.resolve();
if (gracefulShutdown && this.messageInitiatorNodes) {
// Start by closing initiator nodes
const filteredNodes = this.messageInitiatorNodes.filter((id) => stopMap[id]);
initialPromises.push(...this.stopNodes(filteredNodes, removedMap));
/** @type {Set<string>} */
const lookupNodes = new Set();
if (!fullStop) {
// Create a list of nodes wired upstream of nodes to be stopped.
const list = stopList.slice();
while (list.length) {
const id = list.pop();
if (lookupNodes.has(id)) {
continue;
}
lookupNodes.add(id);
if (this.wiredNodeMap[id]) {
list.push(...this.wiredNodeMap[id]);
}
}
}
/** @type {Promise<void>} */
const closePromise = new Promise((resolve) => {
this.shutdown = () => {
if (fullStop) {
this.trace(`Pending message count: ${this.pendingMsgCount}`);
if (this.shutdownScope === null) {
if (this.pendingMsgCount === 0) {
return resolve();
}
} else if (this.shutdownScope.length) {
if (this.pendingMsgCount === 0 && this.failFast) {
// Need to double check due to flow crossing (z)
if (Object.keys(this.msgInProgressNodeMap).length === 0) {
// Allow a scoped flow to stop itself
// Not capable to detect a loop or a callback; it is up
// to the user to choose whether or not to enable flow closure.
this.shutdown = null;
return resolve();
}
}
let count = 0;
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow) {
count += flow.pendingMsgCount;
}
});
if (count === 0) {
this.shutdown = null;
if (this.messageInitiatorNodes.includes(this.id)) {
// This is a subflow initiator node
// Other flows from the scope are still running at that time
return resolve();
}
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) {
// Retrigger validation in each flow
flow.shutdown();
}
});
return resolve();
}
}
} else {
let inProgress = false;
for (const id of Object.keys(this.msgInProgressNodeMap)) {
if (lookupNodes.has(id)) {
inProgress = true;
break;
}
}
if (!inProgress) {
return resolve();
}
}
}
});
/** @type {NodeJS.Timeout|null} */
let timer = null;
const timeoutPromise = new Promise((_resolve, reject) => {
if (this.shutdownScope) {
this.cancelShutdown = () => reject("Graceful shutdown cancelled");
}
timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout);
});
const start = Date.now();
shutdownPromise = Promise.race([closePromise, timeoutPromise])
.then(() => {
clearTimeout(timer);
const delta = Date.now() - start;
this.trace(`Stopped gracefuly in ${delta}ms`);
}).catch((error) => {
clearTimeout(timer);
this.error(Log._("nodes.flows.stopping-error", { message: error }));
if (this.failFast && this.shutdownScope) {
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.cancelShutdown && flow.id !== this.id) {
// Cancel other flow shutdown if this one fails
flow.cancelShutdown();
}
});
}
});
if (this.shutdown) {
setImmediate(this.shutdown);
}
}
return Promise.all(initialPromises)
.then(() => shutdownPromise)
.then(() => Promise.all(this.stopNodes(stopList, removedMap)));
}
/**
* Stop every node of stopList
* @param {string[]} stopList
* @param {Record<string, true>} removedMap
* @returns {Promise<void>[]}
*/
stopNodes(stopList, removedMap) {
const promises = [];
for (const nodeId of stopList) {
const node = this.activeNodes[nodeId];
if (node) {
delete this.activeNodes[nodeId];
if (this.subflowInstanceNodes[nodeId]) {
delete this.subflowInstanceNodes[nodeId];
}
try {
const removed = removedMap[nodeId];
if (removed) {
// Clears the node status
events.emit("node-status", { id: node.id });
}
promises.push(stopNode(node, removed).catch(() => { }));
} catch(error) {
node.error(error);
}
}
}
return promises;
}
/**
* Update the flow definition. This doesn't change anything that is running.
* This should be called after `stop` and before `start`.
* @param {[type]} _global [description]
* @param {[type]} _flow [description]
* @return {[type]} [description]
*/
update(_global,_flow) {
this.global = _global;
this.flow = _flow;
}
/**
* Get a node instance from this flow. If the node is not known to this
* flow, pass the request up to the parent.
* @param {String} id [description]
* @param {Boolean} cancelBubble if true, prevents the flow from passing the request to the parent
* This stops infinite loops when the parent asked this Flow for the
* node to begin with.
* @return {[type]} [description]
*/
getNode(id, cancelBubble) {
if (!id) {
return undefined;
}
// console.log((new Error().stack).toString().split("\n").slice(1,3).join("\n"))
if ((this.flow.configs && this.flow.configs[id]) || (this.flow.nodes && this.flow.nodes[id] && this.flow.nodes[id].type.substring(0,8) != "subflow:")) {
// This is a node owned by this flow, so return whatever we have got
// During a stop/restart, activeNodes could be null for this id
return this.activeNodes[id];
} else if (this.activeNodes[id]) {
// TEMP: this is a subflow internal node within this flow or subflow instance node
return this.activeNodes[id];
} else if (this.subflowInstanceNodes[id]) {
return this.subflowInstanceNodes[id];
} else if (cancelBubble) {
// The node could be inside one of this flow's subflows
var node;
for (var sfId in this.subflowInstanceNodes) {
if (this.subflowInstanceNodes.hasOwnProperty(sfId)) {
node = this.subflowInstanceNodes[sfId].getNode(id,cancelBubble);
if (node) {
return node;
}
}
}
} else if (typeof this.parent.getNode === "function") {
// Node not found inside this flow - ask the parent
return this.parent.getNode(id);
}
return undefined;
}
/**
* Get a group node instance
* @param {String} id
* @return {Node} group node
*/
getGroupNode(id) {
return this.groups[id];
}
/**
* Get all of the nodes instantiated within this flow
* @return {[type]} [description]
*/
getActiveNodes() {
return this.activeNodes;
}
/**
* Get a flow setting value.
* @param {[type]} key [description]
* @return {[type]} [description]
*/
getSetting(key) {
const flow = this.flow;
if (key === "NR_FLOW_NAME") {
return flow.label;
}
if (key === "NR_FLOW_ID") {
return flow.id;
}
if (!key.startsWith("$parent.")) {
if (this._env.hasOwnProperty(key)) {
return (this._env[key] && Object.hasOwn(this._env[key], 'value') && this._env[key].__clone__) ? clone(this._env[key].value) : this._env[key]
}
} else {
key = key.substring(8);
}
// Delegate to the parent flow.
return this.parent.getSetting(key);
}
/**
* Handle a status event from a node within this flow.
* @param {Node} node The original node that triggered the event
* @param {Object} statusMessage The status object
* @param {Node} reportingNode The node emitting the status event.
* This could be a subflow instance node when the status
* is being delegated up.
* @param {boolean} muteStatusEvent Whether to emit the status event
* @return {[type]} [description]
*/
handleStatus(node,statusMessage,reportingNode,muteStatusEvent) {
if (!reportingNode) {
reportingNode = node;
}
if (!muteStatusEvent) {
if (statusMessage.hasOwnProperty("text") && typeof statusMessage.text !== "string") {
try {
statusMessage.text = statusMessage.text.toString();
}
catch(e) {}
}
events.emit("node-status",{
id: node.id,
status:statusMessage
});
}
let handled = false;
if (this.id === 'global' && node.users) {
// This is a global config node
// Delegate status to any nodes using this config node
for (let userNode in node.users) {
if (node.users.hasOwnProperty(userNode)) {
handled = node.users[userNode]._flow.handleStatus(node,statusMessage,node.users[userNode],true) || handled;
}
}
} else {
const candidateNodes = [];
this.statusNodes.forEach(targetStatusNode => {
if (targetStatusNode.g && targetStatusNode.scope === 'group' && !reportingNode.g) {
// Status node inside a group, reporting node not in a group - skip it
return
}
if (Array.isArray(targetStatusNode.scope) && targetStatusNode.scope.indexOf(reportingNode.id) === -1) {
return;
}
let distance = 0
if (reportingNode.g) {
// Reporting node inside a group. Calculate the distance between it and the status node
let containingGroup = this.groups[reportingNode.g]
while (containingGroup && containingGroup.id !== targetStatusNode.g) {
distance++
containingGroup = this.groups[containingGroup.g]
}
if (!containingGroup && targetStatusNode.g && targetStatusNode.scope === 'group') {
// This status node is in a group, but not in the same hierachy
// the reporting node is in
return
}
}
candidateNodes.push({ d: distance, n: targetStatusNode })
})
candidateNodes.sort((A,B) => {
return A.d - B.d
})
candidateNodes.forEach(candidate => {
const targetStatusNode = candidate.n
var message = {
status: clone(statusMessage)
}
if (statusMessage.hasOwnProperty("text")) {
message.status.text = statusMessage.text.toString();
}
message.status.source = {
id: node.id,
type: node.type,
name: node.name
}
targetStatusNode.receive(message);
handled = true;
});
}
return handled;
}
/**
* Handle an error event from a node within this flow. If there are no Catch
* nodes within this flow, pass the event to the parent flow.
* @param {[type]} node [description]
* @param {[type]} logMessage [description]
* @param {[type]} msg [description]
* @param {[type]} reportingNode [description]
* @return {[type]} [description]
*/
handleError(node,logMessage,msg,reportingNode) {
if (!reportingNode) {
reportingNode = node;
}
// console.log("HE",logMessage);
var count = 1;
if (msg && msg.hasOwnProperty("error") && msg.error) {
if (msg.error.hasOwnProperty("source") && msg.error.source) {
if (msg.error.source.id === node.id) {
count = msg.error.source.count+1;
if (count === 10) {
node.warn(Log._("nodes.flow.error-loop"));
return false;
}
}
}
}
let handled = false;
if (this.id === 'global' && node.users) {
// This is a global config node
// Delegate status to any nodes using this config node
for (let userNode in node.users) {
if (node.users.hasOwnProperty(userNode)) {
handled = node.users[userNode]._flow.handleError(node,logMessage,msg,node.users[userNode]) || handled;
}
}
} else {
const candidateNodes = [];
this.catchNodes.forEach(targetCatchNode => {
if (targetCatchNode.g && targetCatchNode.scope === 'group' && !reportingNode.g) {
// Catch node inside a group, reporting node not in a group - skip it
return
}
if (Array.isArray(targetCatchNode.scope) && targetCatchNode.scope.indexOf(reportingNode.id) === -1) {
// Catch node has a scope set and it doesn't include the reporting node
return;
}
let distance = 0
if (reportingNode.g) {
// Reporting node inside a group. Calculate the distance between it and the catch node
let containingGroup = this.groups[reportingNode.g]
while (containingGroup && containingGroup.id !== targetCatchNode.g) {
distance++
containingGroup = this.groups[containingGroup.g]
}
if (!containingGroup && targetCatchNode.g && targetCatchNode.scope === 'group') {
// This catch node is in a group, but not in the same hierachy
// the reporting node is in
return
}
}
candidateNodes.push({ d: distance, n: targetCatchNode })
})
candidateNodes.sort((A,B) => {
return A.d - B.d
})
let handledByUncaught = false
candidateNodes.forEach(candidate => {
const targetCatchNode = candidate.n
if (targetCatchNode.uncaught && !handledByUncaught) {
// This node only wants errors that haven't already been handled
if (handled) {
return
}
handledByUncaught = true
}
let errorMessage;
if (msg) {
errorMessage = redUtil.cloneMessage(msg);
} else {
errorMessage = {};
}
if (errorMessage.hasOwnProperty("error")) {
errorMessage._error = errorMessage.error;
}
errorMessage.error = {
message: logMessage.toString(),
source: {
id: node.id,
type: node.type,
name: node.name,
count: count
}
};
if (logMessage.hasOwnProperty('code')) {
errorMessage.error.code = logMessage.code;
}
if (logMessage.hasOwnProperty('stack')) {
errorMessage.error.stack = logMessage.stack;
}
if (logMessage.hasOwnProperty('cause')) {
errorMessage.error.cause = logMessage.cause;
}
targetCatchNode.receive(errorMessage);
handled = true;
});
}
return handled;
}
handleComplete(node,msg) {
// Trigger Complete nodes
if (this.completeNodeMap[node.id]) {
let toSend = msg;
this.completeNodeMap[node.id].forEach((completeNode,index) => {
toSend = redUtil.cloneMessage(msg);
completeNode.receive(toSend);
})
}
}
send(sendEvents) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
// preRoute - called once for each SendEvent object in turn
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
// onReceive - a node is about to receive a message
// postReceive - the message has been passed to the node's input handler
// onDone, onError - the node has completed with a message or logged an error
handleOnSend(this,sendEvents, (err, eventData) => {
if (err) {
let srcNode;
if (Array.isArray(eventData)) {
srcNode = eventData[0].source.node;
} else {
srcNode = eventData.source.node;
}
srcNode.error(err);
}
});
}
getContext(scope) {
if (scope === 'flow') {
return this.context
} else if (scope === 'global') {
return context.get('global')
}
}
dump() {
console.log("==================")
console.log(this.TYPE, this.id);
for (var id in this.activeNodes) {
if (this.activeNodes.hasOwnProperty(id)) {
var node = this.activeNodes[id];
console.log(" ",id.padEnd(16),node.type)
if (node.wires) {
console.log(" -> ",node.wires)
}
}
}
console.log("==================")
}
}
/**
* Stop an individual node within this flow.
*
* @param {object} node [description]
* @param {boolean} removed [description]
* @return {Promise<void>} [description]
*/
function stopNode(node,removed) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
const start = Date.now();
const closePromise = node.close(removed);
let closeTimer = null;
const closeTimeout = new Promise((resolve,reject) => {
closeTimer = setTimeout(() => {
reject("Close timed out");
}, nodeCloseTimeout);
});
return Promise.race([closePromise,closeTimeout]).then(() => {
clearTimeout(closeTimer);
var delta = Date.now() - start;
Log.trace("Stopped node "+node.type+":"+node.id+" ("+delta+"ms)" );
}).catch(err => {
clearTimeout(closeTimer);
node.error(Log._("nodes.flows.stopping-error",{message:err}));
Log.debug(err.stack);
})
}
function handleOnSend(flow,sendEvents, reportError) {
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
hooks.trigger("onSend",sendEvents,(err) => {
if (err) {
reportError(err,sendEvents);
return
} else if (err !== false) {
for (var i=0;i<sendEvents.length;i++) {
handlePreRoute(flow,sendEvents[i],reportError)
}
}
});
}
function handlePreRoute(flow, sendEvent, reportError) {
// preRoute - called once for each SendEvent object in turn
hooks.trigger("preRoute",sendEvent,(err) => {
if (err) {
reportError(err,sendEvent);
return;
} else if (err !== false) {
sendEvent.destination.node = flow.getNode(sendEvent.destination.id);
if (sendEvent.destination.node && typeof sendEvent.destination.node === 'object') {
if (sendEvent.cloneMessage) {
sendEvent.msg = redUtil.cloneMessage(sendEvent.msg);
}
handlePreDeliver(flow,sendEvent,reportError);
}
}
})
}
function deliverMessageToDestination(sendEvent) {
if (sendEvent?.destination?.node) {
try {
sendEvent.destination.node._receive(sendEvent.msg);
} catch(err) {
Log.error(`Error delivering message to node:${sendEvent.destination.node._path} [${sendEvent.destination.node.type}]`)
Log.error(err.stack)
}
}
}
function handlePreDeliver(flow,sendEvent, reportError) {
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
hooks.trigger("preDeliver",sendEvent,(err) => {
if (err) {
reportError(err,sendEvent);
return;
} else if (err !== false) {
// node.send(msg) called
// Do it now to avoid next loop and an offbeat count
flow.incrementPendingMsgCount(sendEvent.destination.node);
if (asyncMessageDelivery) {
setImmediate(function() {
deliverMessageToDestination(sendEvent)
})
} else {
deliverMessageToDestination(sendEvent)
}
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
hooks.trigger("postDeliver", sendEvent, function(err) {
if (err) {
reportError(err,sendEvent);
}
})
}
})
}
module.exports = {
init: function(runtime) {
nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000;
asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery;
gracefulShutdown = runtime.settings.gracefulShutdown?.enabled || false;
Log = runtime.log;
Subflow = require("./Subflow");
Group = require("./Group").Group
},
create: function(parent,global,conf) {
return new Flow(parent,global,conf)
},
setActiveFlows: function(flows) {
activeFlows = flows;
},
Flow: Flow
}