diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index c44fa06c5..c4adb1f07 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -159,7 +159,8 @@ module.exports = function(RED) { if (node.pauseType === "delay") { node.on("input", function(msg, send, done) { var id = ourTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); + var idx = node.idList.indexOf(id); + if (idx !== -1) { node.idList.splice(idx, 1); } if (node.timeout > 1000) { node.status({fill:"blue",shape:"dot",text:node.idList.length}); } @@ -184,7 +185,8 @@ module.exports = function(RED) { } if (delayvar < 0) { delayvar = 0; } var id = ourTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); + var idx = node.idList.indexOf(id); + if (idx !== -1) { node.idList.splice(idx, 1); } if (node.idList.length === 0) { node.status({}); } send(msg); if (delayvar >= 0) { @@ -207,7 +209,8 @@ module.exports = function(RED) { node.on("input", function(msg, send, done) { var wait = node.randomFirst + (node.diff * Math.random()); var id = ourTimeout(function() { - node.idList.splice(node.idList.indexOf(id),1); + var idx = node.idList.indexOf(id); + if (idx !== -1) { node.idList.splice(idx, 1); } send(msg); if (node.timeout >= 1000) { node.status({fill:"blue",shape:"dot",text:node.idList.length}); diff --git a/packages/node_modules/@node-red/nodes/core/function/90-exec.js b/packages/node_modules/@node-red/nodes/core/function/90-exec.js index 02a908d4b..beb4d11d4 100644 --- a/packages/node_modules/@node-red/nodes/core/function/90-exec.js +++ b/packages/node_modules/@node-red/nodes/core/function/90-exec.js @@ -105,18 +105,26 @@ module.exports = function(RED) { } node.activeProcesses[child.pid] = child; child.stdout.on('data', function (data) { - if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) { - // console.log('[exec] stdout: ' + data,child.pid); - if (isUtf8(data)) { msg.payload = data.toString(); } - else { msg.payload = data; } - nodeSend([RED.util.cloneMessage(msg),null,null]); + try { + if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) { + // console.log('[exec] stdout: ' + data,child.pid); + if (isUtf8(data)) { msg.payload = data.toString(); } + else { msg.payload = data; } + nodeSend([RED.util.cloneMessage(msg),null,null]); + } + } catch (err) { + node.error(err.toString()); } }); child.stderr.on('data', function (data) { - if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) { - if (isUtf8(data)) { msg.payload = data.toString(); } - else { msg.payload = data; } - nodeSend([null,RED.util.cloneMessage(msg),null]); + try { + if (node.activeProcesses.hasOwnProperty(child.pid) && node.activeProcesses[child.pid] !== null) { + if (isUtf8(data)) { msg.payload = data.toString(); } + else { msg.payload = data; } + nodeSend([null,RED.util.cloneMessage(msg),null]); + } + } catch (err) { + node.error(err.toString()); } }); child.on('close', function (code,signal) { diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js index 451035a74..3600b016b 100644 --- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js +++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js @@ -227,6 +227,7 @@ module.exports = function(RED) { * Handle the payload / packet recieved in MQTT In and MQTT Sub nodes */ function subscriptionHandler(node, datatype ,topic, payload, packet) { + if (!packet) { packet = {}; } const msg = {topic:topic, payload:null, qos:packet.qos, retain:packet.retain}; const v5 = (node && node.brokerConn) ? node.brokerConn.v5() @@ -1074,12 +1075,16 @@ module.exports = function(RED) { if (!subscription.handler) { subscription.handler = function (mtopic, mpayload, mpacket) { - const sops = subscription.options ? subscription.options.properties : {} - const pops = mpacket.properties || {} - if (subIdsAvailable && pops.subscriptionIdentifier && sops.subscriptionIdentifier && (pops.subscriptionIdentifier !== sops.subscriptionIdentifier)) { - //do nothing as subscriptionIdentifier does not match - } else if (matchTopic(topic, mtopic)) { - subscription.callback && subscription.callback(mtopic, mpayload, mpacket) + try { + const sops = subscription.options ? subscription.options.properties : {} + const pops = (mpacket && mpacket.properties) || {} + if (subIdsAvailable && pops.subscriptionIdentifier && sops.subscriptionIdentifier && (pops.subscriptionIdentifier !== sops.subscriptionIdentifier)) { + //do nothing as subscriptionIdentifier does not match + } else if (matchTopic(topic, mtopic)) { + subscription.callback && subscription.callback(mtopic, mpayload, mpacket) + } + } catch (err) { + node.error("MQTT subscription handler error: " + err.toString()); } } } diff --git a/packages/node_modules/@node-red/nodes/core/network/22-websocket.js b/packages/node_modules/@node-red/nodes/core/network/22-websocket.js index f30fd91ce..3c877dc79 100644 --- a/packages/node_modules/@node-red/nodes/core/network/22-websocket.js +++ b/packages/node_modules/@node-red/nodes/core/network/22-websocket.js @@ -297,7 +297,11 @@ module.exports = function(RED) { } msg._session = {type:"websocket",id:id}; for (var i = 0; i < this._inputNodes.length; i++) { - this._inputNodes[i].send(msg); + try { + this._inputNodes[i].send(msg); + } catch (err) { + this.error(RED._("websocket.errors.send-error") + " " + err.toString()); + } } } diff --git a/packages/node_modules/@node-red/nodes/core/network/31-tcpin.js b/packages/node_modules/@node-red/nodes/core/network/31-tcpin.js index 500bbe2c2..e91a9b7c3 100644 --- a/packages/node_modules/@node-red/nodes/core/network/31-tcpin.js +++ b/packages/node_modules/@node-red/nodes/core/network/31-tcpin.js @@ -127,32 +127,36 @@ module.exports = function(RED) { connectionPool[id] = client; client.on('data', function (data) { - if (node.datatype != 'buffer') { - data = data.toString(node.datatype); - } - if (node.stream) { - var msg; - if ((node.datatype) === "utf8" && node.newline !== "") { - buffer = buffer+data; - var parts = buffer.split(node.newline); - for (var i = 0; i= node.splitc) { - if (clients[connection_id]) { - const msg = clients[connection_id].lastMsg || {}; - msg.payload = Buffer.alloc(i); - buf.copy(msg.payload,0,0,i); - if (node.ret === "string") { - try { msg.payload = msg.payload.toString(); } - catch(e) { node.error("Failed to create string", msg); } + // count bytes into a buffer... + else if (node.out == "count") { + buf[i] = data[j]; + i += 1; + if ( i >= node.splitc) { + if (clients[connection_id]) { + const msg = clients[connection_id].lastMsg || {}; + msg.payload = Buffer.alloc(i); + buf.copy(msg.payload,0,0,i); + if (node.ret === "string") { + try { msg.payload = msg.payload.toString(); } + catch(e) { node.error("Failed to create string", msg); } + } + nodeSend(msg); + if (clients[connection_id].client) { + node.status({}); + clients[connection_id].client.destroy(); + delete clients[connection_id]; + } + i = 0; } - nodeSend(msg); - if (clients[connection_id].client) { - node.status({}); - clients[connection_id].client.destroy(); - delete clients[connection_id]; - } - i = 0; } } - } - // look for a char - else { - buf[i] = data[j]; - i += 1; - if (data[j] == node.splitc) { - if (clients[connection_id]) { - const msg = clients[connection_id].lastMsg || {}; - msg.payload = Buffer.alloc(i); - buf.copy(msg.payload,0,0,i); - if (node.ret === "string") { - try { msg.payload = msg.payload.toString(); } - catch(e) { node.error("Failed to create string", msg); } + // look for a char + else { + buf[i] = data[j]; + i += 1; + if (data[j] == node.splitc) { + if (clients[connection_id]) { + const msg = clients[connection_id].lastMsg || {}; + msg.payload = Buffer.alloc(i); + buf.copy(msg.payload,0,0,i); + if (node.ret === "string") { + try { msg.payload = msg.payload.toString(); } + catch(e) { node.error("Failed to create string", msg); } + } + nodeSend(msg); + if (clients[connection_id].client) { + node.status({}); + clients[connection_id].client.destroy(); + delete clients[connection_id]; + } + i = 0; } - nodeSend(msg); - if (clients[connection_id].client) { - node.status({}); - clients[connection_id].client.destroy(); - delete clients[connection_id]; - } - i = 0; } } } } + } catch (err) { + node.error(RED._("tcpin.errors.error",{error:err.toString()})); } }); diff --git a/packages/node_modules/@node-red/nodes/core/network/32-udp.js b/packages/node_modules/@node-red/nodes/core/network/32-udp.js index d42d3a7c3..27709c2bd 100644 --- a/packages/node_modules/@node-red/nodes/core/network/32-udp.js +++ b/packages/node_modules/@node-red/nodes/core/network/32-udp.js @@ -98,15 +98,19 @@ module.exports = function(RED) { }); server.on('message', function (message, remote) { - var msg; - if (node.datatype =="base64") { - msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; - } else if (node.datatype =="utf8") { - msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; - } else { - msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; + try { + var msg; + if (node.datatype =="base64") { + msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; + } else if (node.datatype =="utf8") { + msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; + } else { + msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; + } + node.send(msg); + } catch (err) { + node.error(RED._("udp.errors.error",{error:err.toString()})); } - node.send(msg); }); server.on('listening', function () { diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js index 0b1ed349b..f31504214 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/Node.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/Node.js @@ -322,6 +322,7 @@ Node.prototype.close = function(removed) { // The callback takes a 'done' callback and (maybe) the removed flag promises.push( new Promise((resolve) => { + var resolved = false; try { var args = []; if (callback.length === 2) { @@ -329,13 +330,19 @@ Node.prototype.close = function(removed) { args.push(!!removed); } args.push(() => { - resolve(); + if (!resolved) { + resolved = true; + resolve(); + } }); callback.apply(node, args); } catch(err) { // TODO: error thrown in node async close callback // We've never logged this properly. - resolve(); + if (!resolved) { + resolved = true; + resolve(); + } } }) ); diff --git a/packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js b/packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js index ef499f0f9..5697f329d 100644 --- a/packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js +++ b/packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js @@ -155,6 +155,7 @@ function LocalFileSystem(config){ } this.pendingWrites = {}; this.knownCircularRefs = {}; + this.closing = false; if (config.hasOwnProperty('flushInterval')) { this.flushInterval = Math.max(0,config.flushInterval) * 1000; @@ -233,16 +234,19 @@ LocalFileSystem.prototype.open = function(){ LocalFileSystem.prototype.close = function(){ var self = this; - if (this.cache && this._pendingWriteTimeout) { - clearTimeout(this._pendingWriteTimeout); - delete this._pendingWriteTimeout; + this.closing = true; + if (this.cache) { + if (this._pendingWriteTimeout) { + clearTimeout(this._pendingWriteTimeout); + delete this._pendingWriteTimeout; + } this.flushInterval = 0; + // Always flush pending writes on close, even if no timeout was pending self.writePromise = self.writePromise.then(function(){ return self._flushPendingWrites.call(self).catch(function(err) { log.error(log._("context.localfilesystem.error-write",{message:err.toString()})); }); }); - } return this.writePromise; } @@ -298,8 +302,9 @@ LocalFileSystem.prototype.set = function(scope, key, value, callback) { if (this.cache) { this.cache.set(scope,key,value,callback); this.pendingWrites[scope] = true; - if (this._pendingWriteTimeout) { - // there's a pending write which will handle this + if (this._pendingWriteTimeout || this.closing) { + // there's a pending write which will handle this, + // or we're closing and the close() flush will handle it return; } else { this._pendingWriteTimeout = setTimeout(function() {