diff --git a/packages/node_modules/@node-red/nodes/core/core/25-complete.html b/packages/node_modules/@node-red/nodes/core/core/25-complete.html new file mode 100644 index 000000000..f82ed02f8 --- /dev/null +++ b/packages/node_modules/@node-red/nodes/core/core/25-complete.html @@ -0,0 +1,136 @@ + + diff --git a/packages/node_modules/@node-red/nodes/core/core/25-complete.js b/packages/node_modules/@node-red/nodes/core/core/25-complete.js new file mode 100644 index 000000000..78008f81d --- /dev/null +++ b/packages/node_modules/@node-red/nodes/core/core/25-complete.js @@ -0,0 +1,30 @@ +/** + * Copyright JS Foundation and other contributors, http://js.foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + + function CompleteNode(n) { + RED.nodes.createNode(this,n); + var node = this; + this.scope = n.scope; + this.on("input",function(msg) { + this.send(msg); + }); + } + + RED.nodes.registerType("complete",CompleteNode); +} diff --git a/packages/node_modules/@node-red/nodes/core/core/58-debug.js b/packages/node_modules/@node-red/nodes/core/core/58-debug.js index 1571605da..9beff7809 100644 --- a/packages/node_modules/@node-red/nodes/core/core/58-debug.js +++ b/packages/node_modules/@node-red/nodes/core/core/58-debug.js @@ -81,7 +81,7 @@ module.exports = function(RED) { } } - this.on("input", function(msg) { + this.on("input", function(msg, send, done) { if (this.complete === "true") { // debug complete msg object if (this.console === "true") { @@ -90,13 +90,14 @@ module.exports = function(RED) { if (this.active && this.tosidebar) { sendDebug({id:node.id, name:node.name, topic:msg.topic, msg:msg, _path:msg._path}); } + done(); } else { - prepareValue(msg,function(err,msg) { + prepareValue(msg,function(err,debugMsg) { if (err) { node.error(err); return; } - var output = msg.msg; + var output = debugMsg.msg; if (node.console === "true") { if (typeof output === "string") { node.log((output.indexOf("\n") !== -1 ? "\n" : "") + output); @@ -114,9 +115,10 @@ module.exports = function(RED) { } if (node.active) { if (node.tosidebar == true) { - sendDebug(msg); + sendDebug(debugMsg); } } + done(); }); } }) diff --git a/packages/node_modules/@node-red/nodes/core/core/75-exec.js b/packages/node_modules/@node-red/nodes/core/core/75-exec.js index 09e54decc..a5132230e 100644 --- a/packages/node_modules/@node-red/nodes/core/core/75-exec.js +++ b/packages/node_modules/@node-red/nodes/core/core/75-exec.js @@ -38,7 +38,7 @@ module.exports = function(RED) { //node.error("Exec node timeout"); } - this.on("input", function(msg) { + this.on("input", function(msg, nodeSend, nodeDone) { if (msg.hasOwnProperty("kill")) { if (typeof msg.kill !== "string" || msg.kill.length === 0 || !msg.kill.toUpperCase().startsWith("SIG") ) { msg.kill = "SIGTERM"; } if (msg.hasOwnProperty("pid")) { @@ -53,6 +53,7 @@ module.exports = function(RED) { node.status({fill:"red",shape:"dot",text:"killed"}); } } + nodeDone(); } else { var child; @@ -85,14 +86,14 @@ module.exports = function(RED) { // console.log('[exec] stdout: ' + data,child.pid); if (isUtf8(data)) { msg.payload = data.toString(); } else { msg.payload = data; } - node.send([RED.util.cloneMessage(msg),null,null]); + nodeSend([RED.util.cloneMessage(msg),null,null]); } }); child.stderr.on('data', function (data) { if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) { if (isUtf8(data)) { msg.payload = data.toString(); } else { msg.payload = Buffer.from(data); } - node.send([null,RED.util.cloneMessage(msg),null]); + nodeSend([null,RED.util.cloneMessage(msg),null]); } }); child.on('close', function (code,signal) { @@ -108,8 +109,9 @@ module.exports = function(RED) { if (code === null) { node.status({fill:"red",shape:"dot",text:"killed"}); } else if (code < 0) { node.status({fill:"red",shape:"dot",text:"rc:"+code}); } else { node.status({fill:"yellow",shape:"dot",text:"rc:"+code}); } - node.send([null,null,RED.util.cloneMessage(msg)]); + nodeSend([null,null,RED.util.cloneMessage(msg)]); } + nodeDone(); }); child.on('error', function (code) { if (child.tout) { clearTimeout(child.tout); } @@ -154,9 +156,10 @@ module.exports = function(RED) { msg.rc = msg3.payload; if (msg2) { msg2.rc = msg3.payload; } } - node.send([msg,msg2,msg3]); + nodeSend([msg,msg2,msg3]); if (child.tout) { clearTimeout(child.tout); } delete node.activeProcesses[child.pid]; + nodeDone(); }); node.status({fill:"blue",shape:"dot",text:"pid:"+child.pid}); child.on('error',function() {}); diff --git a/packages/node_modules/@node-red/nodes/core/core/80-function.js b/packages/node_modules/@node-red/nodes/core/core/80-function.js index a5d47ebc8..e6732e594 100644 --- a/packages/node_modules/@node-red/nodes/core/core/80-function.js +++ b/packages/node_modules/@node-red/nodes/core/core/80-function.js @@ -19,7 +19,7 @@ module.exports = function(RED) { var util = require("util"); var vm = require("vm"); - function sendResults(node,_msgid,msgs) { + function sendResults(node,send,_msgid,msgs) { if (msgs == null) { return; } else if (!util.isArray(msgs)) { @@ -49,7 +49,7 @@ module.exports = function(RED) { } } if (msgCount>0) { - node.send(msgs); + send(msgs); } } @@ -58,8 +58,17 @@ module.exports = function(RED) { var node = this; this.name = n.name; this.func = n.func; + + var handleNodeDoneCall = true; + // Check to see if the Function appears to call `node.done()`. If so, + // we will assume it is well written and does actually call node.done(). + // Otherwise, we will call node.done() after the function returns regardless. + if (/node\.done\s*\(\s*\)/.test(this.func)) { + handleNodeDoneCall = false; + } + var functionText = "var results = null;"+ - "results = (function(msg){ "+ + "results = (function(msg,__send__,__done__){ "+ "var __msgid__ = msg._msgid;"+ "var node = {"+ "id:__node__.id,"+ @@ -71,10 +80,11 @@ module.exports = function(RED) { "trace:__node__.trace,"+ "on:__node__.on,"+ "status:__node__.status,"+ - "send:function(msgs){ __node__.send(__msgid__,msgs);}"+ + "send:function(msgs){ __node__.send(__send__,__msgid__,msgs);},"+ + "done:__done__"+ "};\n"+ this.func+"\n"+ - "})(msg);"; + "})(msg,send,done);"; this.topic = n.topic; this.outstandingTimers = []; this.outstandingIntervals = []; @@ -104,8 +114,8 @@ module.exports = function(RED) { trace: function() { node.trace.apply(node, arguments); }, - send: function(id, msgs) { - sendResults(node, id, msgs); + send: function(send, id, msgs) { + sendResults(node, send, id, msgs); }, on: function() { if (arguments[0] === "input") { @@ -223,12 +233,18 @@ module.exports = function(RED) { // lineOffset: -11, // line number offset to be used for stack traces // columnOffset: 0, // column number offset to be used for stack traces }); - this.on("input", function(msg) { + this.on("input", function(msg,send,done) { try { var start = process.hrtime(); context.msg = msg; + context.send = send; + context.done = done; + this.script.runInContext(context); - sendResults(this,msg._msgid,context.results); + sendResults(this,send,msg._msgid,context.results); + if (handleNodeDoneCall) { + done(); + } var duration = process.hrtime(start); var converted = Math.floor((duration[0] * 1e9 + duration[1])/10000)/100; diff --git a/packages/node_modules/@node-red/nodes/core/io/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/io/10-mqtt.js index f6702e902..379cf8a96 100644 --- a/packages/node_modules/@node-red/nodes/core/io/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/io/10-mqtt.js @@ -334,7 +334,7 @@ module.exports = function(RED) { } }; - this.publish = function (msg) { + this.publish = function (msg,done) { if (node.connected) { if (msg.payload === null || msg.payload === undefined) { msg.payload = ""; @@ -350,7 +350,10 @@ module.exports = function(RED) { qos: msg.qos || 0, retain: msg.retain || false }; - node.client.publish(msg.topic, msg.payload, options, function(err) {return}); + node.client.publish(msg.topic, msg.payload, options, function(err) { + done && done(); + return + }); } }; @@ -453,7 +456,7 @@ module.exports = function(RED) { if (this.brokerConn) { this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"}); - this.on("input",function(msg) { + this.on("input",function(msg,send,done) { if (msg.qos) { msg.qos = parseInt(msg.qos); if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) { @@ -468,9 +471,13 @@ module.exports = function(RED) { } if ( msg.hasOwnProperty("payload")) { if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist - this.brokerConn.publish(msg); // send the message + this.brokerConn.publish(msg, done); // send the message + } else { + node.warn(RED._("mqtt.errors.invalid-topic")); + done(); } - else { node.warn(RED._("mqtt.errors.invalid-topic")); } + } else { + done(); } }); if (this.brokerConn.connected) { diff --git a/packages/node_modules/@node-red/nodes/core/io/21-httprequest.js b/packages/node_modules/@node-red/nodes/core/io/21-httprequest.js index ec5290835..d290ae244 100644 --- a/packages/node_modules/@node-red/nodes/core/io/21-httprequest.js +++ b/packages/node_modules/@node-red/nodes/core/io/21-httprequest.js @@ -50,7 +50,7 @@ module.exports = function(RED) { noprox = proxyConfig.noproxy; } - this.on("input",function(msg) { + this.on("input",function(msg,nodeSend,nodeDone) { var preRequestTimestamp = process.hrtime(); node.status({fill:"blue",shape:"dot",text:"httpin.status.requesting"}); var url = nodeUrl || msg.url; @@ -62,12 +62,14 @@ module.exports = function(RED) { } if (!url) { node.error(RED._("httpin.errors.no-url"),msg); + nodeDone(); return; } // url must start http:// or https:// so assume http:// if not set if (url.indexOf("://") !== -1 && url.indexOf("http") !== 0) { node.warn(RED._("httpin.errors.invalid-transport")); node.status({fill:"red",shape:"ring",text:"httpin.errors.invalid-transport"}); + nodeDone(); return; } if (!((url.indexOf("http://") === 0) || (url.indexOf("https://") === 0))) { @@ -261,10 +263,12 @@ module.exports = function(RED) { } } catch(err) { node.error(RED._("httpin.errors.invalid-payload"),msg); + nodeDone(); return; } } else { node.error(RED._("httpin.errors.invalid-payload"),msg); + nodeDone(); return; } } @@ -320,7 +324,8 @@ module.exports = function(RED) { } msg.payload = err.toString() + " : " + url; msg.statusCode = err.code; - node.send(msg); + nodeSend(msg); + nodeDone(); }else{ msg.statusCode = res.statusCode; msg.headers = res.headers; @@ -354,7 +359,8 @@ module.exports = function(RED) { } } node.status({}); - node.send(msg); + nodeSend(msg); + nodeDone(); } }); }); diff --git a/packages/node_modules/@node-red/nodes/core/io/22-websocket.js b/packages/node_modules/@node-red/nodes/core/io/22-websocket.js index 5240b544a..957b7d8ed 100644 --- a/packages/node_modules/@node-red/nodes/core/io/22-websocket.js +++ b/packages/node_modules/@node-red/nodes/core/io/22-websocket.js @@ -309,7 +309,7 @@ module.exports = function(RED) { node.status(status); }); } - this.on("input", function(msg) { + this.on("input", function(msg, nodeSend, nodeDone) { var payload; if (this.serverConfig.wholemsg) { var sess; @@ -337,6 +337,7 @@ module.exports = function(RED) { }); } } + nodeDone(); }); this.on('close', function() { node.status({}); diff --git a/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js b/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js index aa72f0fe9..eb365a54c 100644 --- a/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js +++ b/packages/node_modules/@node-red/nodes/core/io/31-tcpin.js @@ -132,7 +132,7 @@ module.exports = function(RED) { reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); } } else { - if (node.done) { node.done(); } + if (node.doneClose) { node.doneClose(); } } }); client.on('error', function(err) { @@ -142,7 +142,7 @@ module.exports = function(RED) { setupTcpClient(); this.on('close', function(done) { - node.done = done; + node.doneClose = done; this.closing = true; if (client) { client.destroy(); } clearTimeout(reconnectTimeout); @@ -305,13 +305,13 @@ module.exports = function(RED) { reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); } } else { - if (node.done) { node.done(); } + if (node.doneClose) { node.doneClose(); } } }); } setupTcpClient(); - node.on("input", function(msg) { + node.on("input", function(msg,nodeSend,nodeDone) { if (node.connected && msg.payload != null) { if (Buffer.isBuffer(msg.payload)) { client.write(msg.payload); @@ -325,10 +325,11 @@ module.exports = function(RED) { if (client) { node.status({}); client.destroy(); } } } + nodeDone(); }); node.on("close", function(done) { - node.done = done; + node.doneClose = done; this.closing = true; if (client) { client.destroy(); } clearTimeout(reconnectTimeout); @@ -337,7 +338,7 @@ module.exports = function(RED) { } else if (node.beserver == "reply") { - node.on("input",function(msg) { + node.on("input",function(msg, nodeSend, nodeDone) { if (msg._session && msg._session.type == "tcp") { var client = connectionPool[msg._session.id]; if (client) { @@ -361,6 +362,7 @@ module.exports = function(RED) { } } } + nodeDone(); }); } else { @@ -389,7 +391,7 @@ module.exports = function(RED) { }); }); - node.on("input", function(msg) { + node.on("input", function(msg, nodeSend, nodeDone) { if (msg.payload != null) { var buffer; if (Buffer.isBuffer(msg.payload)) { @@ -404,6 +406,7 @@ module.exports = function(RED) { else { connectedSockets[i].write(buffer); } } } + nodeDone(); }); server.on('error', function(err) { @@ -461,7 +464,7 @@ module.exports = function(RED) { var clients = {}; - this.on("input", function(msg) { + this.on("input", function(msg, nodeSend, nodeDone) { var i = 0; if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) { msg.payload = msg.payload.toString(); @@ -483,7 +486,7 @@ module.exports = function(RED) { connected: false, connecting: false }; - enqueue(clients[connection_id].msgQueue, msg); + enqueue(clients[connection_id].msgQueue, {msg:msg,nodeSend:nodeSend, nodeDone: nodeDone}); clients[connection_id].lastMsg = msg; if (!clients[connection_id].connecting && !clients[connection_id].connected) { @@ -505,9 +508,10 @@ module.exports = function(RED) { if (clients[connection_id] && clients[connection_id].client) { clients[connection_id].connected = true; clients[connection_id].connecting = false; - let msg; - while (msg = dequeue(clients[connection_id].msgQueue)) { - clients[connection_id].client.write(msg.payload); + let event; + while (event = dequeue(clients[connection_id].msgQueue)) { + clients[connection_id].client.write(event.msg.payload); + event.nodeDone(); } if (node.out === "time" && node.splitc < 0) { clients[connection_id].connected = clients[connection_id].connecting = false; @@ -527,7 +531,7 @@ module.exports = function(RED) { if (clients[connection_id]) { const msg = clients[connection_id].lastMsg || {}; msg.payload = data; - node.send(RED.util.cloneMessage(msg)); + nodeSend(RED.util.cloneMessage(msg)); } } // else if (node.splitc === 0) { @@ -550,7 +554,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i+1); buf.copy(msg.payload,0,0,i+1); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -572,7 +576,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -591,7 +595,7 @@ module.exports = function(RED) { const msg = clients[connection_id].lastMsg || {}; msg.payload = Buffer.alloc(i); buf.copy(msg.payload,0,0,i); - node.send(msg); + nodeSend(msg); if (clients[connection_id].client) { node.status({}); clients[connection_id].client.destroy(); @@ -628,9 +632,9 @@ module.exports = function(RED) { break; } } - if (node.done && !anyConnected) { + if (node.doneClose && !anyConnected) { clients = {}; - node.done(); + node.doneClose(); } }); @@ -663,13 +667,15 @@ module.exports = function(RED) { } else if (!clients[connection_id].connecting && clients[connection_id].connected) { if (clients[connection_id] && clients[connection_id].client) { - clients[connection_id].client.write(dequeue(clients[connection_id].msgQueue).payload); + let event = dequeue(clients[connection_id].msgQueue) + clients[connection_id].client.write(event.msg.payload); + event.nodeDone(); } } }); this.on("close", function(done) { - node.done = done; + node.doneClose = done; for (var cl in clients) { if (clients[cl].hasOwnProperty("client")) { clients[cl].client.destroy(); diff --git a/packages/node_modules/@node-red/nodes/core/io/32-udp.js b/packages/node_modules/@node-red/nodes/core/io/32-udp.js index cf4751760..60e9bec08 100644 --- a/packages/node_modules/@node-red/nodes/core/io/32-udp.js +++ b/packages/node_modules/@node-red/nodes/core/io/32-udp.js @@ -223,16 +223,19 @@ module.exports = function(RED) { udpInputPortsInUse[p] = sock; } - node.on("input", function(msg) { + node.on("input", function(msg, nodeSend, nodeDone) { if (msg.hasOwnProperty("payload")) { var add = node.addr || msg.ip || ""; var por = node.port || msg.port || 0; if (add === "") { node.warn(RED._("udp.errors.ip-notset")); + nodeDone(); } else if (por === 0) { node.warn(RED._("udp.errors.port-notset")); + nodeDone(); } else if (isNaN(por) || (por < 1) || (por > 65535)) { node.warn(RED._("udp.errors.port-invalid")); + nodeDone(); } else { var message; if (node.base64) { @@ -247,6 +250,7 @@ module.exports = function(RED) { node.error("udp : "+err,msg); } message = null; + nodeDone(); }); } } diff --git a/packages/node_modules/@node-red/nodes/core/logic/15-change.js b/packages/node_modules/@node-red/nodes/core/logic/15-change.js index 8d100233e..ae377ed1e 100644 --- a/packages/node_modules/@node-red/nodes/core/logic/15-change.js +++ b/packages/node_modules/@node-red/nodes/core/logic/15-change.js @@ -328,13 +328,14 @@ module.exports = function(RED) { } if (valid) { - this.on('input', function(msg) { + this.on('input', function(msg, send, done) { applyRules(msg, 0, (err,msg) => { if (err) { node.error(err,msg); } else if (msg) { - node.send(msg); + send(msg); } + done(); }) }); } diff --git a/packages/node_modules/@node-red/nodes/core/storage/50-file.js b/packages/node_modules/@node-red/nodes/core/storage/50-file.js index 547ef3132..c27c3ed5f 100644 --- a/packages/node_modules/@node-red/nodes/core/storage/50-file.js +++ b/packages/node_modules/@node-red/nodes/core/storage/50-file.js @@ -34,8 +34,9 @@ module.exports = function(RED) { } return data.toString(); } - + function FileNode(n) { + // Write/delete a file RED.nodes.createNode(this,n); this.filename = n.filename; this.appendNewline = n.appendNewline; @@ -48,7 +49,7 @@ module.exports = function(RED) { node.closing = false; node.closeCallback = null; - function processMsg(msg, done) { + function processMsg(msg,nodeSend, done) { var filename = node.filename || msg.filename || ""; if ((!node.filename) && (!node.tout)) { node.tout = setTimeout(function() { @@ -68,7 +69,7 @@ module.exports = function(RED) { if (RED.settings.verbose) { node.log(RED._("file.status.deletedfile",{file:filename})); } - node.send(msg); + nodeSend(msg); } done(); }); @@ -101,7 +102,7 @@ module.exports = function(RED) { }); wstream.on("open", function() { wstream.end(buf, function() { - node.send(msg); + nodeSend(msg); done(); }); }) @@ -150,13 +151,13 @@ module.exports = function(RED) { if (node.filename) { // Static filename - write and reuse the stream next time node.wstream.write(buf, function() { - node.send(msg); + nodeSend(msg); done(); }); } else { // Dynamic filename - write and close the stream node.wstream.end(buf, function() { - node.send(msg); + nodeSend(msg); delete node.wstream; delete node.wstreamIno; done(); @@ -169,12 +170,13 @@ module.exports = function(RED) { } } - function processQ(queue) { - var msg = queue[0]; - processMsg(msg, function() { + function processQueue(queue) { + var event = queue[0]; + processMsg(event.msg, event.send, function() { + event.done(); queue.shift(); if (queue.length > 0) { - processQ(queue); + processQueue(queue); } else if (node.closing) { closeNode(); @@ -182,14 +184,19 @@ module.exports = function(RED) { }); } - this.on("input", function(msg) { + this.on("input", function(msg,nodeSend,nodeDone) { var msgQueue = node.msgQueue; - if (msgQueue.push(msg) > 1) { + msgQueue.push({ + msg: msg, + send: nodeSend, + done: nodeDone + }) + if (msgQueue.length > 1) { // pending write exists return; } try { - processQ(msgQueue); + processQueue(msgQueue); } catch (e) { node.msgQueue = []; @@ -234,6 +241,7 @@ module.exports = function(RED) { function FileInNode(n) { + // Read a file RED.nodes.createNode(this,n); this.filename = n.filename; this.format = n.format; @@ -248,13 +256,14 @@ module.exports = function(RED) { if (this.format === "stream") { this.chunk = true; } var node = this; - this.on("input",function(msg) { + this.on("input",function(msg, nodeSend, nodeDone) { var filename = (node.filename || msg.filename || "").replace(/\t|\r|\n/g,''); if (!node.filename) { node.status({fill:"grey",shape:"dot",text:filename}); } if (filename === "") { node.warn(RED._("file.errors.nofilename")); + nodeDone(); } else { msg.filename = filename; @@ -288,7 +297,7 @@ module.exports = function(RED) { parts:{index:count, ch:ch, type:type, id:msg._msgid} } count += 1; - node.send(m); + nodeSend(m); } spare = bits[i]; } @@ -304,7 +313,7 @@ module.exports = function(RED) { getout = false; m.parts.count = count; } - node.send(m); + nodeSend(m); } } else { @@ -318,8 +327,9 @@ module.exports = function(RED) { var sendMessage = RED.util.cloneMessage(msg); delete sendMessage.payload; sendMessage.error = err; - node.send(sendMessage); + nodeSend(sendMessage); } + nodeDone(); }) .on('end', function() { if (node.chunk === false) { @@ -327,7 +337,7 @@ module.exports = function(RED) { msg.payload = decode(lines, node.encoding); } else { msg.payload = lines; } - node.send(msg); + nodeSend(msg); } else if (node.format === "lines") { var m = { payload: spare, @@ -339,12 +349,13 @@ module.exports = function(RED) { id: msg._msgid } }; - node.send(m); + nodeSend(m); } else if (getout) { // last chunk same size as high water mark - have to send empty extra packet. var m = { parts:{index:count, count:count, ch:ch, type:type, id:msg._msgid} }; - node.send(m); + nodeSend(m); } + nodeDone(); }); } }); diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/core/25-complete.html b/packages/node_modules/@node-red/nodes/locales/en-US/core/25-complete.html new file mode 100644 index 000000000..d73f8e13d --- /dev/null +++ b/packages/node_modules/@node-red/nodes/locales/en-US/core/25-complete.html @@ -0,0 +1,29 @@ + + + diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json index c279af387..40a26a4b3 100755 --- a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json +++ b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json @@ -112,6 +112,9 @@ "selected": "selected nodes" } }, + "complete": { + "completeNodes": "complete: __number__" + }, "debug": { "output": "Output", "none": "None", diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index d4e1ae00f..7d2424e18 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -18,15 +18,27 @@ var util = require("util"); var EventEmitter = require("events").EventEmitter; var redUtil = require("@node-red/util").util; -var Log = require("@node-red/util").log; // TODO: separate module +var Log = require("@node-red/util").log; var context = require("./context"); var flows = require("./flows"); +const NOOP_SEND = function() {} + +/** + * The Node object is the heart of a Node-RED flow. It is the object that all + * nodes extend. + * + * The Node object itself inherits from EventEmitter, although it provides + * custom implementations of some of the EE functions in order to handle + * `input` and `close` events properly. + */ function Node(n) { this.id = n.id; this.type = n.type; this.z = n.z; this._closeCallbacks = []; + this._inputCallback = null; + this._inputCallbacks = null; if (n.name) { this.name = n.name; @@ -43,12 +55,31 @@ function Node(n) { // as part of its constructure - config._flow will overwrite this._flow // which we can tolerate as they are the same object. Object.defineProperty(this,'_flow', {value: n._flow, enumerable: false, writable: true }) + this._asyncDelivery = n._flow.asyncMessageDelivery; + } + if (this._asyncDelivery === undefined) { + this._asyncDelivery = true; } this.updateWires(n.wires); } util.inherits(Node, EventEmitter); +/** + * Update the wiring configuration for this node. + * + * We try to optimise the message handling path. To do this there are three + * cases to consider: + * 1. this node is wired to nothing. In this case we replace node.send with a + * NO-OP function. + * 2. this node is wired to one other node. In this case we set `this._wire` + * as a reference to the node it is wired to. This means we avoid unnecessary + * iterations over what would otherwise be a 1-element array. + * 3. this node is wired to multiple things. The normal node.send processing of + * this.wires applies. + * + * @param {array} wires the new wiring configuration + */ Node.prototype.updateWires = function(wires) { //console.log("UPDATE",this.id); this.wires = wires || []; @@ -61,7 +92,7 @@ Node.prototype.updateWires = function(wires) { this._wireCount = wc; if (wc === 0) { // With nothing wired to the node, no-op send - this.send = function(msg) {} + this.send = NOOP_SEND } else { this.send = Node.prototype.send; if (this.wires.length === 1 && this.wires[0].length === 1) { @@ -72,6 +103,13 @@ Node.prototype.updateWires = function(wires) { } } +/** + * Get the context object for this node. + * + * As most nodes do not use context, this is a lazy function that will only + * create a context instance for the node if it is needed. + * @return {object} the context object + */ Node.prototype.context = function() { if (!this._context) { this._context = context.get(this._alias||this.id,this.z); @@ -79,29 +117,194 @@ Node.prototype.context = function() { return this._context; } +/** + * Handle the complete event for a message + * + * @param {object} msg The message that has completed + * @param {error} error (optional) an error hit whilst handling the message + */ +Node.prototype._complete = function(msg,error) { + if (error) { + // For now, delegate this to this.error + // But at some point, the timeout handling will need to know about + // this as well. + this.error(error,msg); + } else { + this._flow.handleComplete(this,msg); + } +} + +/** + * An internal reference to the original EventEmitter.on() function + */ Node.prototype._on = Node.prototype.on; +/** + * Register a callback function for a named event. + * 'close' and 'input' events are handled locally, other events defer to EventEmitter.on() + */ Node.prototype.on = function(event, callback) { var node = this; if (event == "close") { this._closeCallbacks.push(callback); + } else if (event === "input") { + if (this._inputCallback) { + this._inputCallbacks = [this._inputCallback, callback]; + this._inputCallback = null; + } else if (this._inputCallbacks) { + this._inputCallbacks.push(callback); + } else { + this._inputCallback = callback; + } } else { this._on(event, callback); } }; +/** + * An internal reference to the original EventEmitter.emit() function + */ +Node.prototype._emit = Node.prototype.emit; + +/** + * Emit an event to all registered listeners. + */ +Node.prototype.emit = function(event,arg) { + var node = this; + if (event === "input") { + // When Pluggable Message Routing arrives, this will be called from + // that and will already be sync/async depending on the router. + if (this._asyncDelivery) { + setImmediate(function() { + node._emitInput(arg); + }); + } else { + this._emitInput(arg); + } + } else { + this._emit(event,arg); + } +} + +/** + * Handle the 'input' event. + * + * This will call all registered handlers for the 'input' event. + */ +Node.prototype._emitInput = function(arg) { + var node = this; + if (node._inputCallback) { + // Just one callback registered. + try { + node._inputCallback( + arg, + function() { node.send.apply(node,arguments) }, + function(err) { node._complete(arg,err); } + ); + } catch(err) { + node.error(err,arg); + } + } else if (node._inputCallbacks) { + // Multiple callbacks registered. Call each one, tracking eventual completion + var c = node._inputCallbacks.length; + for (var i=0;i -1) { + this._inputCallbacks.splice(index,1); + } + // Check if we can optimise back to a single callback + if (this._inputCallbacks.length === 1) { + this._inputCallback = this._inputCallbacks[0]; + this._inputCallbacks = null; + } + } + } else if (name === "close") { + index = this._closeCallbacks.indexOf(listener); + if (index > -1) { + this._closeCallbacks.splice(index,1); + } + } else { + this._removeListener(name, listener); + } +} + +/** + * An internal reference to the original EventEmitter.removeAllListeners() function + */ +Node.prototype._removeAllListeners = Node.prototype.removeAllListeners; + +/** + * Remove all listeners for an event + */ +Node.prototype.removeAllListeners = function(name) { + if (name === "input") { + this._inputCallback = null; + this._inputCallbacks = null; + } else if (name === "close") { + this._closeCallbacks = []; + } else { + this._removeAllListeners(name); + } +} + +/** + * Called when the node is being stopped + * @param {boolean} removed Whether the node has been removed, or just being stopped + * @return {Promise} resolves when the node has closed + */ Node.prototype.close = function(removed) { //console.log(this.type,this.id,removed); var promises = []; var node = this; + // Call all registered close callbacks. for (var i=0;i 0) { + // The callback takes a 'done' callback and (maybe) the removed flag promises.push( new Promise((resolve) => { try { var args = []; if (callback.length === 2) { + // The listener expects the removed flag args.push(!!removed); } args.push(() => { @@ -116,6 +319,7 @@ Node.prototype.close = function(removed) { }) ); } else { + // No done callback so handle synchronously try { callback.call(node); } catch(err) { @@ -138,6 +342,12 @@ Node.prototype.close = function(removed) { } }; +/** + * Send a message to the nodes wired. + * + * + * @param {object} msg A message or array of messages to send + */ Node.prototype.send = function(msg) { var msgSent = false; var node; @@ -225,6 +435,12 @@ Node.prototype.send = function(msg) { } }; +/** + * Receive a message. + * + * This will emit the `input` event with the provided message. + * As of 1.0, this will return *before* any 'input' callback handler is invoked. + */ Node.prototype.receive = function(msg) { if (!msg) { msg = {}; @@ -233,11 +449,7 @@ Node.prototype.receive = function(msg) { msg._msgid = redUtil.generateId(); } this.metric("receive",msg); - try { - this.emit("input", msg); - } catch(err) { - this.error(err,msg); - } + this.emit("input",msg); }; function log_helper(self, level, msg) { @@ -258,15 +470,23 @@ function log_helper(self, level, msg) { } Log.log(o); } - +/** + * Log an INFO level message + */ Node.prototype.log = function(msg) { log_helper(this, Log.INFO, msg); }; +/** + * Log a WARN level message + */ Node.prototype.warn = function(msg) { log_helper(this, Log.WARN, msg); }; +/** + * Log an ERROR level message + */ Node.prototype.error = function(logMessage,msg) { if (typeof logMessage != 'boolean') { logMessage = logMessage || ""; @@ -280,15 +500,22 @@ Node.prototype.error = function(logMessage,msg) { } }; +/** + * Log an DEBUG level message + */ Node.prototype.debug = function(msg) { log_helper(this, Log.DEBUG, msg); } +/** + * Log an TRACE level message + */ Node.prototype.trace = function(msg) { log_helper(this, Log.TRACE, msg); } /** + * Log a metric event. * If called with no args, returns whether metric collection is enabled */ Node.prototype.metric = function(eventname, msg, metricValue) { @@ -305,6 +532,8 @@ Node.prototype.metric = function(eventname, msg, metricValue) { } /** + * Set the node's status object + * * status: { fill:"red|green", shape:"dot|ring", text:"blah" } * or * status: "simple text status" diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js b/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js index 4fb516f9d..40855793f 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/flows/Flow.js @@ -23,6 +23,7 @@ var Subflow; var Log; var nodeCloseTimeout = 15000; +var asyncMessageDelivery = true; /** * This class represents a flow within the runtime. It is responsible for @@ -125,6 +126,7 @@ class Flow { var id; this.catchNodes = []; this.statusNodes = []; + this.completeNodeMap = {}; var configNodes = Object.keys(this.flow.configs); var configNodeAttempts = {}; @@ -228,7 +230,7 @@ class Flow { this.trace(" id | type | alias"); this.trace("------------------|--------------|-----------------"); } - // Build the map of catch/status nodes. + // Build the map of catch/status/complete nodes. for (id in this.activeNodes) { if (this.activeNodes.hasOwnProperty(id)) { node = this.activeNodes[id]; @@ -237,6 +239,13 @@ class Flow { this.catchNodes.push(node); } else if (node.type === "status") { this.statusNodes.push(node); + } else if (node.type === "complete") { + if (node.scope) { + node.scope.forEach(id => { + this.completeNodeMap[id] = this.completeNodeMap[id] || []; + this.completeNodeMap[id].push(node); + }) + } } } } @@ -516,6 +525,20 @@ class Flow { return handled; } + handleComplete(node,msg) { + if (this.completeNodeMap[node.id]) { + let toSend = msg; + this.completeNodeMap[node.id].forEach((completeNode,index) => { + toSend = redUtil.cloneMessage(msg); + completeNode.receive(toSend); + }) + } + } + + get asyncMessageDelivery() { + return asyncMessageDelivery + } + dump() { console.log("==================") console.log(this.TYPE, this.id); @@ -562,6 +585,7 @@ function stopNode(node,removed) { module.exports = { init: function(runtime) { nodeCloseTimeout = runtime.settings.nodeCloseTimeout || 15000; + asyncMessageDelivery = !runtime.settings.runtimeSyncDelivery Log = runtime.log; Subflow = require("./Subflow"); Subflow.init(runtime); diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js b/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js index ef9fbdab2..981ed7173 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/flows/index.js @@ -729,6 +729,10 @@ module.exports = { updateFlow: updateFlow, removeFlow: removeFlow, disableFlow:null, - enableFlow:null + enableFlow:null, + isDeliveryModeAsync: function() { + // If settings is null, this is likely being run by unit tests + return !settings || !settings.runtimeSyncDelivery + } }; diff --git a/test/nodes/core/core/80-function_spec.js b/test/nodes/core/core/80-function_spec.js index 6f884f0ce..01ec551cf 100644 --- a/test/nodes/core/core/80-function_spec.js +++ b/test/nodes/core/core/80-function_spec.js @@ -268,45 +268,50 @@ describe('function node', function() { helper.load(functionNode, flow, function() { var n1 = helper.getNode("n1"); n1.receive({payload:"foo",topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().ERROR); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'ReferenceError: retunr is not defined (line 2, col 1)'); - done(); - } catch(err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().ERROR); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'ReferenceError: retunr is not defined (line 2, col 1)'); + done(); + } catch(err) { + done(err); + } + },50); }); }); it('should handle node.on()', function(done) { - var flow = [{id:"n1",type:"function",wires:[["n2"]],func:"node.on('close',function(){node.log('closed')});"}]; + var flow = [{id:"n1",type:"function",wires:[["n2"]],func:"node.on('close',function(){ node.log('closed')});"}]; helper.load(functionNode, flow, function() { var n1 = helper.getNode("n1"); n1.receive({payload:"foo",topic: "bar"}); - helper.getNode("n1").close(); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "function"; + setTimeout(function() { + n1.close().then(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().INFO); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'closed'); + done(); + } catch(err) { + done(err); + } }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().INFO); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'closed'); - done(); - } catch(err) { - done(err); - } + },1500); }); }); @@ -532,22 +537,24 @@ describe('function node', function() { }); function checkCallbackError(name, done) { - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().ERROR); - msg.should.have.property('id', name); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'Error: Callback must be a function'); - done(); - } - catch (e) { - done(e); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().ERROR); + msg.should.have.property('id', name); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'Error: Callback must be a function'); + done(); + } + catch (e) { + done(e); + } + },50); } it('should get persistable node context (w/o callback)', function(done) { @@ -1267,12 +1274,12 @@ describe('function node', function() { n1.receive({payload:"foo",topic: "bar"}); } else { msg.should.have.property('payload', "hello"); + delete process.env._TEST_FOO_; done(); } } catch(err) { - done(err); - } finally { delete process.env._TEST_FOO_; + done(err); } }); n1.receive({payload:"foo",topic: "bar"}); @@ -1285,21 +1292,23 @@ describe('function node', function() { helper.load(functionNode, flow, function () { var n1 = helper.getNode("n1"); n1.receive({payload: "foo", topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().INFO); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'test'); - done(); - } catch (err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().INFO); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'test'); + done(); + } catch (err) { + done(err); + } + },50); }); }); it('should log a Debug Message', function (done) { @@ -1307,21 +1316,23 @@ describe('function node', function() { helper.load(functionNode, flow, function () { var n1 = helper.getNode("n1"); n1.receive({payload: "foo", topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().DEBUG); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'test'); - done(); - } catch (err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().DEBUG); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'test'); + done(); + } catch (err) { + done(err); + } + },50); }); }); it('should log a Trace Message', function (done) { @@ -1329,21 +1340,23 @@ describe('function node', function() { helper.load(functionNode, flow, function () { var n1 = helper.getNode("n1"); n1.receive({payload: "foo", topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().TRACE); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'test'); - done(); - } catch (err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().TRACE); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'test'); + done(); + } catch (err) { + done(err); + } + },50); }); }); it('should log a Warning Message', function (done) { @@ -1351,21 +1364,23 @@ describe('function node', function() { helper.load(functionNode, flow, function () { var n1 = helper.getNode("n1"); n1.receive({payload: "foo", topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().WARN); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'test'); - done(); - } catch (err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().WARN); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'test'); + done(); + } catch (err) { + done(err); + } + },50); }); }); it('should log an Error Message', function (done) { @@ -1373,21 +1388,23 @@ describe('function node', function() { helper.load(functionNode, flow, function () { var n1 = helper.getNode("n1"); n1.receive({payload: "foo", topic: "bar"}); - try { - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "function"; - }); - logEvents.should.have.length(1); - var msg = logEvents[0][0]; - msg.should.have.property('level', helper.log().ERROR); - msg.should.have.property('id', 'n1'); - msg.should.have.property('type', 'function'); - msg.should.have.property('msg', 'test'); - done(); - } catch (err) { - done(err); - } + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "function"; + }); + logEvents.should.have.length(1); + var msg = logEvents[0][0]; + msg.should.have.property('level', helper.log().ERROR); + msg.should.have.property('id', 'n1'); + msg.should.have.property('type', 'function'); + msg.should.have.property('msg', 'test'); + done(); + } catch (err) { + done(err); + } + },50); }); }); it('should catch thrown string', function (done) { diff --git a/test/nodes/core/logic/18-sort_spec.js b/test/nodes/core/logic/18-sort_spec.js index ac8d7d11c..960a6d666 100644 --- a/test/nodes/core/logic/18-sort_spec.js +++ b/test/nodes/core/logic/18-sort_spec.js @@ -475,20 +475,21 @@ describe('SORT node', function() { {id:"n2", type:"helper"}]; helper.load(sortNode, flow, function() { var n1 = helper.getNode("n1"); - setTimeout(function() { - var logEvents = helper.log().args.filter(function (evt) { - return evt[0].type == "sort"; - }); - var evt = logEvents[0][0]; - evt.should.have.property('id', "n1"); - evt.should.have.property('type', "sort"); - evt.should.have.property('msg', "sort.clear"); - done(); - }, 150); var msg = { payload: 0, parts: { id: "X", index: 0, count: 2} }; n1.receive(msg); - n1.close(); + setTimeout(function() { + n1.close().then(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "sort"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "sort"); + evt.should.have.property('msg', "sort.clear"); + done(); + }); + }, 150); }); }); diff --git a/test/nodes/core/parsers/70-HTML_spec.js b/test/nodes/core/parsers/70-HTML_spec.js index e7e7ffa8b..3183a0539 100644 --- a/test/nodes/core/parsers/70-HTML_spec.js +++ b/test/nodes/core/parsers/70-HTML_spec.js @@ -21,7 +21,7 @@ var fs = require('fs-extra'); var htmlNode = require("nr-test-utils").require("@node-red/nodes/core/parsers/70-HTML.js"); var helper = require("node-red-node-test-helper"); -describe('html node', function() { +describe('HTML node', function() { var resourcesDir = __dirname+ path.sep + ".." + path.sep + ".." + path.sep + ".." + path.sep + "resources" + path.sep; var file = path.join(resourcesDir, "70-HTML-test-file.html"); @@ -228,16 +228,20 @@ describe('html node', function() { var n1 = helper.getNode("n1"); var n2 = helper.getNode("n2"); n1.receive({payload:null,topic: "bar"}); - helper.log().called.should.be.true(); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "html"; - }); - logEvents.should.have.length(1); - // Each logEvent is the array of args passed to the function. - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + setTimeout(function() { + try { + helper.log().called.should.be.true(); + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "html"; + }); + logEvents.should.have.length(1); + // Each logEvent is the array of args passed to the function. + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } diff --git a/test/nodes/core/parsers/70-JSON_spec.js b/test/nodes/core/parsers/70-JSON_spec.js index 7effdfab7..2ec3304bb 100644 --- a/test/nodes/core/parsers/70-JSON_spec.js +++ b/test/nodes/core/parsers/70-JSON_spec.js @@ -148,14 +148,18 @@ describe('JSON node', function() { var jn1 = helper.getNode("jn1"); var jn2 = helper.getNode("jn2"); jn1.receive({payload:'foo',topic: "bar"}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.startWith("Unexpected token o"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.startWith("Unexpected token o"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },20); } catch(err) { done(err); } @@ -378,14 +382,18 @@ describe('JSON node', function() { var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}}; var obj = {"number": "foo", "string": 3}; jn1.receive({payload:obj, schema:schema}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } @@ -402,14 +410,18 @@ describe('JSON node', function() { var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}}; var obj = {"number": "foo", "string": 3}; jn1.receive({payload:obj, schema:schema}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } @@ -426,14 +438,18 @@ describe('JSON node', function() { var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}}; var jsonString = '{"number":"Hello","string":3}'; jn1.receive({payload:jsonString, schema:schema}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } @@ -450,14 +466,18 @@ describe('JSON node', function() { var schema = {title: "testSchema", type: "object", properties: {number: {type: "number"}, string: {type: "string" }}}; var jsonString = '{"number":"Hello","string":3}'; jn1.receive({payload:jsonString, schema:schema}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.equal("json.errors.schema-error: data.number should be number, data.string should be string"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } @@ -474,14 +494,18 @@ describe('JSON node', function() { var schema = "garbage"; var obj = {"number": "foo", "string": 3}; jn1.receive({payload:obj, schema:schema}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "json"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.equal("json.errors.schema-error-compile"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "json"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.equal("json.errors.schema-error-compile"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } diff --git a/test/nodes/core/parsers/70-YAML_spec.js b/test/nodes/core/parsers/70-YAML_spec.js index 925a52922..3441e0946 100644 --- a/test/nodes/core/parsers/70-YAML_spec.js +++ b/test/nodes/core/parsers/70-YAML_spec.js @@ -130,14 +130,18 @@ describe('YAML node', function() { var yn1 = helper.getNode("yn1"); var yn2 = helper.getNode("yn2"); yn1.receive({payload:'employees:\n-firstName: John\n- lastName: Smith\n',topic: "bar"}); - var logEvents = helper.log().args.filter(function(evt) { - return evt[0].type == "yaml"; - }); - logEvents.should.have.length(1); - logEvents[0][0].should.have.a.property('msg'); - logEvents[0][0].msg.should.startWith("end of the stream"); - logEvents[0][0].should.have.a.property('level',helper.log().ERROR); - done(); + setTimeout(function() { + try { + var logEvents = helper.log().args.filter(function(evt) { + return evt[0].type == "yaml"; + }); + logEvents.should.have.length(1); + logEvents[0][0].should.have.a.property('msg'); + logEvents[0][0].msg.should.startWith("end of the stream"); + logEvents[0][0].should.have.a.property('level',helper.log().ERROR); + done(); + } catch(err) { done(err) } + },50); } catch(err) { done(err); } diff --git a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js index 7e5aed1b0..5ad52211e 100644 --- a/test/unit/@node-red/runtime/lib/nodes/Node_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/Node_spec.js @@ -156,10 +156,50 @@ describe('Node', function() { throw new Error("test error"); }); n.receive(message); - n.error.called.should.be.true(); - n.error.firstCall.args[1].should.equal(message); - done(); + setTimeout(function() { + n.error.called.should.be.true(); + n.error.firstCall.args[1].should.equal(message); + done(); + },50); + }); + it('calls parent flow handleComplete when callback provided', function(done) { + var n = new RedNode({id:'123',type:'abc', _flow: { + handleComplete: function(node,msg) { + try { + msg.should.deepEqual(message); + done(); + } catch(err) { + done(err); + } + } + }}); + + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(); + }); + n.receive(message); + }); + it('logs error if callback provides error', function(done) { + var n = new RedNode({id:'123',type:'abc'}); + sinon.stub(n,"error",function(err,msg) {}); + + var message = {payload:"hello world"}; + n.on('input',function(msg, nodeSend, nodeDone) { + nodeDone(new Error("test error")); + setTimeout(function() { + try { + n.error.called.should.be.true(); + n.error.firstCall.args[0].toString().should.equal("Error: test error"); + n.error.firstCall.args[1].should.equal(message); + done(); + } catch(err) { + done(err); + } + },50); + }); + n.receive(message); }); }); @@ -172,15 +212,69 @@ describe('Node', function() { var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var message = {payload:"hello world"}; - + var messageReceived = false; n2.on('input',function(msg) { // msg equals message, and is not a new copy + messageReceived = true; should.deepEqual(msg,message); should.strictEqual(msg,message); done(); }); - n1.send(message); + messageReceived.should.be.false(); + }); + + it('emits a single message - synchronous mode', function(done) { + var flow = { + getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + asyncMessageDelivery: false + }; + var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); + var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + var message = {payload:"hello world"}; + var messageReceived = false; + var notSyncErr; + n2.on('input',function(msg) { + try { + // msg equals message, and is not a new copy + messageReceived = true; + should.deepEqual(msg,message); + should.strictEqual(msg,message); + done(notSyncErr); + } catch(err) { + done(err); + } + }); + n1.send(message); + try { + messageReceived.should.be.true(); + } catch(err) { + notSyncErr = err; + } + }); + + it('emits a message with callback provided send', function(done) { + var flow = { + getNode: (id) => { return {'n1':n1,'n2':n2}[id]}, + handleComplete: (node,msg) => {} + }; + var n1 = new RedNode({_flow:flow,id:'n1',type:'abc',wires:[['n2']]}); + var n2 = new RedNode({_flow:flow,id:'n2',type:'abc'}); + var message = {payload:"hello world"}; + var messageReceived = false; + n1.on('input',function(msg,nodeSend,nodeDone) { + nodeSend(msg); + nodeDone(); + }); + n2.on('input',function(msg) { + // msg equals message, and is not a new copy + messageReceived = true; + should.deepEqual(msg,message); + should.strictEqual(msg,message); + done(); + }); + n1.receive(message); + messageReceived.should.be.false(); }); it('emits multiple messages on a single output', function(done) { @@ -356,12 +450,13 @@ describe('Node', function() { it("logs the uuid for all messages sent", function(done) { var logHandler = { + msgIds:[], messagesSent: 0, emit: function(event, msg) { if (msg.event == "node.abc.send" && msg.level == Log.METRIC) { this.messagesSent++; + this.msgIds.push(msg.msgid); (typeof msg.msgid).should.not.be.equal("undefined"); - done(); } } }; @@ -375,6 +470,17 @@ describe('Node', function() { var receiver1 = new RedNode({_flow:flow,id:'n2',type:'abc'}); var receiver2 = new RedNode({_flow:flow,id:'n3',type:'abc'}); sender.send({"some": "message"}); + setTimeout(function() { + try { + logHandler.messagesSent.should.equal(1); + should.exist(logHandler.msgIds[0]) + Log.removeHandler(logHandler); + done(); + } catch(err) { + Log.removeHandler(logHandler); + done(err); + } + },50) }) }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js b/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js index e7430a6c2..e1315dc50 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/flows/Flow_spec.js @@ -122,6 +122,7 @@ describe('Flow', function() { currentNodes[node.id] = node; this.on('input',function(msg) { node.handled++; + msg.handled = node.handled; node.messages.push(msg); node.send(msg); }); @@ -136,12 +137,42 @@ describe('Flow', function() { } util.inherits(TestAsyncNode,Node); + var TestDoneNode = function(n) { + Node.call(this,n); + var node = this; + this.scope = n.scope; + this.uncaught = n.uncaught; + this.foo = n.foo; + this.handled = 0; + this.messages = []; + this.stopped = false; + this.closeDelay = n.closeDelay || 50; + currentNodes[node.id] = node; + this.on('input',function(msg, send, done) { + node.handled++; + node.messages.push(msg); + send(msg); + done(); + }); + this.on('close',function(done) { + setTimeout(function() { + node.stopped = true; + stoppedNodes[node.id] = node; + delete currentNodes[node.id]; + done(); + },node.closeDelay); + }); + } + util.inherits(TestDoneNode,Node); + before(function() { getType = sinon.stub(typeRegistry,"get",function(type) { if (type=="test") { return TestNode; } else if (type=="testError"){ return TestErrorNode; + } else if (type=="testDone"){ + return TestDoneNode; } else { return TestAsyncNode; } @@ -189,28 +220,28 @@ describe('Flow', function() { currentNodes["2"].should.have.a.property("handled",0); currentNodes["3"].should.have.a.property("handled",0); + currentNodes["3"].on("input", function() { + currentNodes["1"].should.have.a.property("handled",1); + currentNodes["2"].should.have.a.property("handled",1); + currentNodes["3"].should.have.a.property("handled",1); - currentNodes["1"].receive({payload:"test"}); - - currentNodes["1"].should.have.a.property("handled",1); - currentNodes["2"].should.have.a.property("handled",1); - currentNodes["3"].should.have.a.property("handled",1); - - flow.stop().then(function() { - try { - currentNodes.should.not.have.a.property("1"); - currentNodes.should.not.have.a.property("2"); - currentNodes.should.not.have.a.property("3"); - currentNodes.should.not.have.a.property("4"); - stoppedNodes.should.have.a.property("1"); - stoppedNodes.should.have.a.property("2"); - stoppedNodes.should.have.a.property("3"); - stoppedNodes.should.have.a.property("4"); - done(); - } catch(err) { - done(err); - } + flow.stop().then(function() { + try { + currentNodes.should.not.have.a.property("1"); + currentNodes.should.not.have.a.property("2"); + currentNodes.should.not.have.a.property("3"); + currentNodes.should.not.have.a.property("4"); + stoppedNodes.should.have.a.property("1"); + stoppedNodes.should.have.a.property("2"); + stoppedNodes.should.have.a.property("3"); + stoppedNodes.should.have.a.property("4"); + done(); + } catch(err) { + done(err); + } + }); }); + currentNodes["1"].receive({payload:"test"}); }); it("instantiates config nodes in the right order",function(done) { @@ -350,25 +381,27 @@ describe('Flow', function() { currentNodes["1"].receive({payload:"test"}); - currentNodes["1"].should.have.a.property("handled",1); - // Message doesn't reach 3 as 2 is disabled - currentNodes["3"].should.have.a.property("handled",0); + setTimeout(function() { + currentNodes["1"].should.have.a.property("handled",1); + // Message doesn't reach 3 as 2 is disabled + currentNodes["3"].should.have.a.property("handled",0); - flow.stop().then(function() { - try { - currentNodes.should.not.have.a.property("1"); - currentNodes.should.not.have.a.property("2"); - currentNodes.should.not.have.a.property("3"); - currentNodes.should.not.have.a.property("4"); - stoppedNodes.should.have.a.property("1"); - stoppedNodes.should.not.have.a.property("2"); - stoppedNodes.should.have.a.property("3"); - stoppedNodes.should.have.a.property("4"); - done(); - } catch(err) { - done(err); - } - }); + flow.stop().then(function() { + try { + currentNodes.should.not.have.a.property("1"); + currentNodes.should.not.have.a.property("2"); + currentNodes.should.not.have.a.property("3"); + currentNodes.should.not.have.a.property("4"); + stoppedNodes.should.have.a.property("1"); + stoppedNodes.should.not.have.a.property("2"); + stoppedNodes.should.have.a.property("3"); + stoppedNodes.should.have.a.property("4"); + done(); + } catch(err) { + done(err); + } + }); + },50); }); }); @@ -551,31 +584,34 @@ describe('Flow', function() { flow.handleStatus(config.flows["t1"].nodes["1"],{text:"my-status",random:"otherProperty"}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","my-status"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","1"); - statusMessage.status.source.should.have.a.property("type","test"); - statusMessage.status.source.should.have.a.property("name","a"); + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - currentNodes["sn2"].should.have.a.property("handled",1); - statusMessage = currentNodes["sn2"].messages[0]; + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","my-status"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","1"); + statusMessage.status.source.should.have.a.property("type","test"); + statusMessage.status.source.should.have.a.property("name","a"); - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","my-status"); - statusMessage.status.should.have.a.property("random","otherProperty"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","1"); - statusMessage.status.source.should.have.a.property("type","test"); - statusMessage.status.source.should.have.a.property("name","a"); + currentNodes["sn2"].should.have.a.property("handled",1); + statusMessage = currentNodes["sn2"].messages[0]; + + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","my-status"); + statusMessage.status.should.have.a.property("random","otherProperty"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","1"); + statusMessage.status.source.should.have.a.property("type","test"); + statusMessage.status.source.should.have.a.property("name","a"); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50) }); it("passes a status event to the adjacent scoped status node ",function(done) { var config = flowUtils.parseConfig([ @@ -596,21 +632,23 @@ describe('Flow', function() { flow.handleStatus(config.flows["t1"].nodes["1"],{text:"my-status"}); - currentNodes["sn"].should.have.a.property("handled",0); - currentNodes["sn2"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn2"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",0); + currentNodes["sn2"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn2"].messages[0]; - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","my-status"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","1"); - statusMessage.status.source.should.have.a.property("type","test"); - statusMessage.status.source.should.have.a.property("name","a"); + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","my-status"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","1"); + statusMessage.status.source.should.have.a.property("type","test"); + statusMessage.status.source.should.have.a.property("name","a"); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); }); @@ -636,33 +674,35 @@ describe('Flow', function() { flow.handleError(config.flows["t1"].nodes["1"],"my-error",{a:"foo"}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","my-error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("id","1"); - statusMessage.error.source.should.have.a.property("type","test"); - statusMessage.error.source.should.have.a.property("name","a"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","my-error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("id","1"); + statusMessage.error.source.should.have.a.property("type","test"); + statusMessage.error.source.should.have.a.property("name","a"); - currentNodes["sn2"].should.have.a.property("handled",1); - statusMessage = currentNodes["sn2"].messages[0]; + currentNodes["sn2"].should.have.a.property("handled",1); + statusMessage = currentNodes["sn2"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","my-error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("id","1"); - statusMessage.error.source.should.have.a.property("type","test"); - statusMessage.error.source.should.have.a.property("name","a"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","my-error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("id","1"); + statusMessage.error.source.should.have.a.property("type","test"); + statusMessage.error.source.should.have.a.property("name","a"); - // Node sn3 has uncaught:true - so should not get called - currentNodes["sn3"].should.have.a.property("handled",0); + // Node sn3 has uncaught:true - so should not get called + currentNodes["sn3"].should.have.a.property("handled",0); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); it("passes an error event to the adjacent scoped catch node ",function(done) { var config = flowUtils.parseConfig([ @@ -684,39 +724,42 @@ describe('Flow', function() { flow.handleError(config.flows["t1"].nodes["1"],"my-error",{a:"foo"}); - currentNodes["sn"].should.have.a.property("handled",0); - currentNodes["sn2"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn2"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",0); + currentNodes["sn2"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn2"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","my-error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("id","1"); - statusMessage.error.source.should.have.a.property("type","test"); - statusMessage.error.source.should.have.a.property("name","a"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","my-error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("id","1"); + statusMessage.error.source.should.have.a.property("type","test"); + statusMessage.error.source.should.have.a.property("name","a"); - // Node sn3/4 have uncaught:true - so should not get called - currentNodes["sn3"].should.have.a.property("handled",0); - currentNodes["sn4"].should.have.a.property("handled",0); + // Node sn3/4 have uncaught:true - so should not get called + currentNodes["sn3"].should.have.a.property("handled",0); + currentNodes["sn4"].should.have.a.property("handled",0); - // Inject error that sn1/2 will ignore - so should get picked up by sn3 - flow.handleError(config.flows["t1"].nodes["3"],"my-error-2",{a:"foo-2"}); + // Inject error that sn1/2 will ignore - so should get picked up by sn3 + flow.handleError(config.flows["t1"].nodes["3"],"my-error-2",{a:"foo-2"}); + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",0); + currentNodes["sn2"].should.have.a.property("handled",1); + currentNodes["sn3"].should.have.a.property("handled",1); + currentNodes["sn4"].should.have.a.property("handled",1); - currentNodes["sn"].should.have.a.property("handled",0); - currentNodes["sn2"].should.have.a.property("handled",1); - currentNodes["sn3"].should.have.a.property("handled",1); - currentNodes["sn4"].should.have.a.property("handled",1); + statusMessage = currentNodes["sn3"].messages[0]; + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","my-error-2"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("id","3"); + statusMessage.error.source.should.have.a.property("type","test"); - statusMessage = currentNodes["sn3"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","my-error-2"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("id","3"); - statusMessage.error.source.should.have.a.property("type","test"); - - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); + },50); }); it("moves any existing error object sideways",function(done){ var config = flowUtils.parseConfig([ @@ -733,22 +776,54 @@ describe('Flow', function() { var activeNodes = flow.getActiveNodes(); flow.handleError(config.flows["t1"].nodes["1"],"my-error",{a:"foo",error:"existing"}); + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + statusMessage.should.have.a.property("_error","existing"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","my-error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("id","1"); + statusMessage.error.source.should.have.a.property("type","test"); + statusMessage.error.source.should.have.a.property("name","a"); - statusMessage.should.have.a.property("_error","existing"); - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","my-error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("id","1"); - statusMessage.error.source.should.have.a.property("type","test"); - statusMessage.error.source.should.have.a.property("name","a"); - - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); it("prevents an error looping more than 10 times",function(){}); }); + + describe("#handleComplete",function() { + it("passes a complete event to the adjacent Complete node",function(done) { + var config = flowUtils.parseConfig([ + {id:"t1",type:"tab"}, + {id:"1",x:10,y:10,z:"t1",type:"testDone",name:"a",wires:["2"]}, + {id:"2",x:10,y:10,z:"t1",type:"test",wires:["3"]}, + {id:"3",x:10,y:10,z:"t1",type:"testDone",foo:"a",wires:[]}, + {id:"cn",x:10,y:10,z:"t1",type:"complete",scope:["1","3"],foo:"a",wires:[]} + ]); + var flow = Flow.create({},config,config.flows["t1"]); + + flow.start(); + + var activeNodes = flow.getActiveNodes(); + Object.keys(activeNodes).should.have.length(4); + + var msg = {payload: "hello world"} + var n1 = currentNodes["1"].receive(msg); + setTimeout(function() { + currentNodes["cn"].should.have.a.property("handled",2); + currentNodes["cn"].messages[0].should.have.a.property("handled",1); + currentNodes["cn"].messages[1].should.have.a.property("handled",2); + flow.stop().then(function() { + done(); + }); + },50); + }); + }); + + }); diff --git a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js b/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js index d6cab9732..f92a01328 100644 --- a/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js +++ b/test/unit/@node-red/runtime/lib/nodes/flows/Subflow_spec.js @@ -271,32 +271,34 @@ describe('Subflow', function() { currentNodes["1"].receive({payload:"test"}); - currentNodes["1"].should.have.a.property("handled",1); - // currentNodes[sfInstanceId].should.have.a.property("handled",1); - // currentNodes[sfInstanceId2].should.have.a.property("handled",1); - currentNodes["3"].should.have.a.property("handled",1); - currentNodes["4"].should.have.a.property("handled",1); + setTimeout(function() { + currentNodes["1"].should.have.a.property("handled",1); + // currentNodes[sfInstanceId].should.have.a.property("handled",1); + // currentNodes[sfInstanceId2].should.have.a.property("handled",1); + currentNodes["3"].should.have.a.property("handled",1); + currentNodes["4"].should.have.a.property("handled",1); - flow.stop().then(function() { - Object.keys(currentNodes).should.have.length(0); - Object.keys(stoppedNodes).should.have.length(6); + flow.stop().then(function() { + Object.keys(currentNodes).should.have.length(0); + Object.keys(stoppedNodes).should.have.length(6); - // currentNodes.should.not.have.a.property("1"); - // currentNodes.should.not.have.a.property("3"); - // currentNodes.should.not.have.a.property("4"); - // // currentNodes.should.not.have.a.property(sfInstanceId); - // // currentNodes.should.not.have.a.property(sfInstanceId2); - // // currentNodes.should.not.have.a.property(sfConfigId); - // stoppedNodes.should.have.a.property("1"); - // stoppedNodes.should.have.a.property("3"); - // stoppedNodes.should.have.a.property("4"); - // // stoppedNodes.should.have.a.property(sfInstanceId); - // // stoppedNodes.should.have.a.property(sfInstanceId2); - // // stoppedNodes.should.have.a.property(sfConfigId); - done(); - }); + // currentNodes.should.not.have.a.property("1"); + // currentNodes.should.not.have.a.property("3"); + // currentNodes.should.not.have.a.property("4"); + // // currentNodes.should.not.have.a.property(sfInstanceId); + // // currentNodes.should.not.have.a.property(sfInstanceId2); + // // currentNodes.should.not.have.a.property(sfConfigId); + // stoppedNodes.should.have.a.property("1"); + // stoppedNodes.should.have.a.property("3"); + // stoppedNodes.should.have.a.property("4"); + // // stoppedNodes.should.have.a.property(sfInstanceId); + // // stoppedNodes.should.have.a.property(sfInstanceId2); + // // stoppedNodes.should.have.a.property(sfConfigId); + done(); + }); + },50); }); it("instantiates a subflow inside a subflow and stops it",function(done) { var config = flowUtils.parseConfig([ @@ -322,17 +324,14 @@ describe('Subflow', function() { currentNodes["1"].receive({payload:"test"}); - currentNodes["1"].should.have.a.property("handled",1); - - - currentNodes["3"].should.have.a.property("handled",1); - - - - flow.stop().then(function() { - Object.keys(currentNodes).should.have.length(0); - done(); - }); + setTimeout(function() { + currentNodes["1"].should.have.a.property("handled",1); + currentNodes["3"].should.have.a.property("handled",1); + flow.stop().then(function() { + Object.keys(currentNodes).should.have.length(0); + done(); + }); + },50); }); it("rewires a subflow node on update/start",function(done){ @@ -369,27 +368,31 @@ describe('Subflow', function() { currentNodes["1"].receive({payload:"test"}); - currentNodes["1"].should.have.a.property("handled",1); - // currentNodes[sfInstanceId].should.have.a.property("handled",1); - // currentNodes[sfInstanceId2].should.have.a.property("handled",1); - currentNodes["3"].should.have.a.property("handled",1); - currentNodes["4"].should.have.a.property("handled",0); + setTimeout(function() { + currentNodes["1"].should.have.a.property("handled",1); + // currentNodes[sfInstanceId].should.have.a.property("handled",1); + // currentNodes[sfInstanceId2].should.have.a.property("handled",1); + currentNodes["3"].should.have.a.property("handled",1); + currentNodes["4"].should.have.a.property("handled",0); - flow.update(newConfig,newConfig.flows["t1"]); - flow.start(diff) + flow.update(newConfig,newConfig.flows["t1"]); + flow.start(diff) - currentNodes["1"].receive({payload:"test2"}); + currentNodes["1"].receive({payload:"test2"}); + setTimeout(function() { - currentNodes["1"].should.have.a.property("handled",2); - // currentNodes[sfInstanceId].should.have.a.property("handled",2); - // currentNodes[sfInstanceId2].should.have.a.property("handled",2); - currentNodes["3"].should.have.a.property("handled",1); - currentNodes["4"].should.have.a.property("handled",1); + currentNodes["1"].should.have.a.property("handled",2); + // currentNodes[sfInstanceId].should.have.a.property("handled",2); + // currentNodes[sfInstanceId2].should.have.a.property("handled",2); + currentNodes["3"].should.have.a.property("handled",1); + currentNodes["4"].should.have.a.property("handled",1); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); + },50); }); }); describe('#stop', function() { @@ -436,20 +439,20 @@ describe('Subflow', function() { var activeNodes = flow.getActiveNodes(); activeNodes["1"].receive({payload:"test"}); + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","test status"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("type","testStatus"); + statusMessage.status.source.should.have.a.property("name","test-status-node"); - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","test status"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("type","testStatus"); - statusMessage.status.source.should.have.a.property("name","test-status-node"); - - flow.stop().then(function() { - - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); it("passes a status event to the subflow's parent tab status node - targetted scope",function(done) { var config = flowUtils.parseConfig([ @@ -472,21 +475,23 @@ describe('Subflow', function() { activeNodes["1"].receive({payload:"test"}); - parentFlowStatusCalled.should.be.false(); + setTimeout(function() { + parentFlowStatusCalled.should.be.false(); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","test status"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("type","testStatus"); - statusMessage.status.source.should.have.a.property("name","test-status-node"); + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","test status"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("type","testStatus"); + statusMessage.status.source.should.have.a.property("name","test-status-node"); - flow.stop().then(function() { + flow.stop().then(function() { - done(); - }); + done(); + }); + },50); }); }); @@ -517,19 +522,21 @@ describe('Subflow', function() { activeNodes["1"].receive({payload:"test-payload"}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","test-payload"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","2"); - statusMessage.status.source.should.have.a.property("type","subflow:sf1"); + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","test-payload"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","2"); + statusMessage.status.source.should.have.a.property("type","subflow:sf1"); - flow.stop().then(function() { + flow.stop().then(function() { - done(); - }); + done(); + }); + },50); }); it("emits a status event when a message is passed to a subflow-status node - msg.payload as status obj", function(done) { var config = flowUtils.parseConfig([ @@ -557,19 +564,21 @@ describe('Subflow', function() { activeNodes["1"].receive({payload:{text:"payload-obj"}}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","payload-obj"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","2"); - statusMessage.status.source.should.have.a.property("type","subflow:sf1"); + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","payload-obj"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","2"); + statusMessage.status.source.should.have.a.property("type","subflow:sf1"); - flow.stop().then(function() { + flow.stop().then(function() { - done(); - }); + done(); + }); + },50); }); it("emits a status event when a message is passed to a subflow-status node - msg.status", function(done) { var config = flowUtils.parseConfig([ @@ -597,19 +606,21 @@ describe('Subflow', function() { activeNodes["1"].receive({status:{text:"status-obj"}}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("status"); - statusMessage.status.should.have.a.property("text","status-obj"); - statusMessage.status.should.have.a.property("source"); - statusMessage.status.source.should.have.a.property("id","2"); - statusMessage.status.source.should.have.a.property("type","subflow:sf1"); + statusMessage.should.have.a.property("status"); + statusMessage.status.should.have.a.property("text","status-obj"); + statusMessage.status.should.have.a.property("source"); + statusMessage.status.source.should.have.a.property("id","2"); + statusMessage.status.source.should.have.a.property("type","subflow:sf1"); - flow.stop().then(function() { + flow.stop().then(function() { - done(); - }); + done(); + }); + },50); }); it("does not emit a regular status event if it contains a subflow-status node", function(done) { var config = flowUtils.parseConfig([ @@ -666,18 +677,20 @@ describe('Subflow', function() { activeNodes["1"].receive({payload:"test"}); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + setTimeout(function() { + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","test error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("type","testError"); - statusMessage.error.source.should.have.a.property("name","test-error-node"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","test error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("type","testError"); + statusMessage.error.source.should.have.a.property("name","test-error-node"); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); it("passes an error event to the subflow's parent tab catch node - targetted scope",function(done) { var config = flowUtils.parseConfig([ @@ -699,20 +712,22 @@ describe('Subflow', function() { activeNodes["1"].receive({payload:"test"}); - parentFlowErrorCalled.should.be.false(); + setTimeout(function() { + parentFlowErrorCalled.should.be.false(); - currentNodes["sn"].should.have.a.property("handled",1); - var statusMessage = currentNodes["sn"].messages[0]; + currentNodes["sn"].should.have.a.property("handled",1); + var statusMessage = currentNodes["sn"].messages[0]; - statusMessage.should.have.a.property("error"); - statusMessage.error.should.have.a.property("message","test error"); - statusMessage.error.should.have.a.property("source"); - statusMessage.error.source.should.have.a.property("type","testError"); - statusMessage.error.source.should.have.a.property("name","test-error-node"); + statusMessage.should.have.a.property("error"); + statusMessage.error.should.have.a.property("message","test error"); + statusMessage.error.should.have.a.property("source"); + statusMessage.error.source.should.have.a.property("type","testError"); + statusMessage.error.source.should.have.a.property("name","test-error-node"); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); }); }); @@ -756,11 +771,13 @@ describe('Subflow', function() { process.env["__KEY__"] = "__VAL__"; currentNodes["1"].receive({payload: "test"}); - currentNodes["3"].should.have.a.property("received", "__VAL__"); - - flow.stop().then(function() { - done(); - }); + setTimeout(function() { + currentNodes["3"].should.have.a.property("received", "__VAL__"); + + flow.stop().then(function() { + done(); + }); + },50); }); it("can access subflow env var", function(done) { @@ -792,13 +809,15 @@ describe('Subflow', function() { } process.env["__KEY__"] = "__VAL0__"; setEnv(testenv_node, "__KEY__", "__VAL1__"); - - currentNodes["1"].receive({payload: "test"}); - currentNodes["3"].should.have.a.property("received", "__VAL1__"); - flow.stop().then(function() { - done(); - }); + currentNodes["1"].receive({payload: "test"}); + setTimeout(function() { + currentNodes["3"].should.have.a.property("received", "__VAL1__"); + + flow.stop().then(function() { + done(); + }); + },50); }); it("can access nested subflow env var", function(done) { @@ -840,21 +859,27 @@ describe('Subflow', function() { process.env["__KEY__"] = "__VAL0__"; currentNodes["1"].receive({payload: "test"}); - currentNodes["3"].should.have.a.property("received", "__VAL0__"); + setTimeout(function() { + currentNodes["3"].should.have.a.property("received", "__VAL0__"); - setEnv(node_sf1_1, "__KEY__", "__VAL1__"); - currentNodes["1"].receive({payload: "test"}); - currentNodes["3"].should.have.a.property("received", "__VAL1__"); + setEnv(node_sf1_1, "__KEY__", "__VAL1__"); + currentNodes["1"].receive({payload: "test"}); + setTimeout(function() { + currentNodes["3"].should.have.a.property("received", "__VAL1__"); - setEnv(node_sf2_1, "__KEY__", "__VAL2__"); - currentNodes["1"].receive({payload: "test"}); - currentNodes["3"].should.have.a.property("received", "__VAL2__"); + setEnv(node_sf2_1, "__KEY__", "__VAL2__"); + currentNodes["1"].receive({payload: "test"}); + setTimeout(function() { + currentNodes["3"].should.have.a.property("received", "__VAL2__"); - flow.stop().then(function() { - done(); - }); + flow.stop().then(function() { + done(); + }); + },50); + },50); + },50); }); - + }); - + });