diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html
index c1b02e836..d67c6b2c6 100644
--- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html
+++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.html
@@ -10,6 +10,55 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
+
@@ -84,20 +176,66 @@
-
-
-
@@ -279,6 +457,7 @@
usetls: {value: false},
verifyservercert: { value: false},
compatmode: { value: false},
+ protocolVersion: { value: 4},
keepalive: {value:60,validate:RED.validators.number()},
cleansession: {value: true},
birthTopic: {value:""},
@@ -292,7 +471,11 @@
willTopic: {value:""},
willQos: {value:"0"},
willRetain: {value:false},
- willPayload: {value:""}
+ willPayload: {value:""},
+ sessionExpiryInterval: {value:0},
+ topicAliasMaximum: {value:0},
+ maximumPacketSize: {value:0},
+ receiveMaximum: {value:0}
},
credentials: {
user: {type:"text"},
@@ -370,10 +553,15 @@
this.usetls = false;
$("#node-config-input-usetls").prop("checked",false);
}
- if (typeof this.compatmode === 'undefined') {
- this.compatmode = false;
- $("#node-config-input-compatmode").prop('checked', false);
+ if (this.compatmode === 'true' || this.compatmode === true) {
+ delete this.compatmode;
+ this.protocolVersion = 4;
}
+ if (typeof this.protocolVersion === 'undefined') {
+ this.protocolVersion = 4;
+ }
+ console.log("setting protocolVersion", this.protocolVersion)
+ $("#node-config-input-protocolVersion").val(this.protocolVersion);
if (typeof this.keepalive === 'undefined') {
this.keepalive = 15;
$("#node-config-input-keepalive").val(this.keepalive);
@@ -444,4 +632,4 @@
}
}
});
-
+
\ No newline at end of file
diff --git a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js
index 7ab624222..4462f20cc 100644
--- a/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js
+++ b/packages/node_modules/@node-red/nodes/core/network/10-mqtt.js
@@ -14,6 +14,8 @@
* limitations under the License.
**/
+const { debug } = require("console");
+
module.exports = function(RED) {
"use strict";
var mqtt = require("mqtt");
@@ -43,6 +45,101 @@ module.exports = function(RED) {
return re.test(t);
}
+ /**
+ * Helper function for setting integer property values in the MQTT V5 properties object
+ * @param {object} src Source object containing properties
+ * @param {object} dst Destination object to set/add properties
+ * @param {string} propName The property name to set in the Destination object
+ * @param {integer} [minVal] The minimum value. If the src value is less than minVal, it will NOT be set in the destination
+ * @param {integer} [maxVal] The maximum value. If the src value is greater than maxVal, it will NOT be set in the destination
+ * @param {integer} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
+ */
+ function setIntProp(src, dst, propName, minVal, maxVal, def) {
+ if (src.hasOwnProperty(propName)) {
+ var v = parseInt(src[propName]);
+ if(isNaN(v)) return;
+ if(minVal != null) {
+ if(v < minVal) return;
+ }
+ if(maxVal != null) {
+ if(v > maxVal) return;
+ }
+ dst[propName] = v;
+ } else {
+ if(def != undefined) dst[propName] = def;
+ }
+ }
+
+ /**
+ * Helper function for setting string property values in the MQTT V5 properties object
+ * @param {object} src Source object containing properties
+ * @param {object} dst Destination object to set/add properties
+ * @param {string} propName The property name to set in the Destination object
+ * @param {string} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
+ */
+ function setStrProp(src, dst, propName, def) {
+ if (src[propName] && typeof src[propName] == "string") {
+ dst[propName] = src[propName];
+ } else {
+ if(def != undefined) dst[propName] = def;
+ }
+ }
+
+ /**
+ * Helper function for setting boolean property values in the MQTT V5 properties object
+ * @param {object} src Source object containing properties
+ * @param {object} dst Destination object to set/add properties
+ * @param {string} propName The property name to set in the Destination object
+ * @param {boolean} [def] An optional default to set in the destination object if prop is NOT present in the soruce object
+ */
+ function setBoolProp(src, dst, propName, def) {
+ if (src[propName] != null) {
+ if(src[propName] === "true" || src[propName] === true) {
+ dst[propName] = true;
+ } else if(src[propName] === "false" || src[propName] === false) {
+ dst[propName] = true;
+ }
+ } else {
+ if(def != undefined) dst[propName] = def;
+ }
+ }
+
+ /**
+ * Helper function for copying the MQTT v5 srcUserProperties object (parameter1) to the properties object (parameter2).
+ * Any property in srcUserProperties that is NOT a key/string pair will be silently discarded.
+ * NOTE: if no sutable properties are present, the userProperties object will NOT be added to the properties object
+ * @param {object} srcUserProperties An object with key/value string pairs
+ * @param {object} properties A properties object in which userProperties will be copied to
+ */
+ function setUserProperties(srcUserProperties, properties) {
+ if (srcUserProperties && typeof srcUserProperties == "object") {
+ let _clone = {};
+ let count = 0;
+ let keys = Object.keys(srcUserProperties);
+ if(!keys || !keys.length) return null;
+ keys.forEach(key => {
+ let val = srcUserProperties[key];
+ if(typeof val == "string") {
+ count++;
+ _clone[key] = val;
+ }
+ });
+ if(count) properties.userProperties = _clone;
+ }
+ }
+
+ /**
+ * Helper function for copying the MQTT v5 correlationData object (parameter1) to the properties object (parameter2).
+ * NOTE: if srcCorrelationData is not a buffer, correlationData will NOT be added to the properties object
+ * @param {object} srcCorrelationData An buffer containing correlationData
+ * @param {object} properties A properties object in which correlationData will be copied to
+ */
+ function setCorrelationData(srcCorrelationData, properties) {
+ if (srcCorrelationData && typeof Buffer.isBuffer(srcCorrelationData)) {
+ properties.correlationData = Buffer.from(srcCorrelationData);
+ }
+ }
+
function MQTTBrokerNode(n) {
RED.nodes.createNode(this,n);
@@ -54,9 +151,13 @@ module.exports = function(RED) {
this.usews = n.usews;
this.verifyservercert = n.verifyservercert;
this.compatmode = n.compatmode;
+ this.protocolVersion = n.protocolVersion;
this.keepalive = n.keepalive;
this.cleansession = n.cleansession;
-
+ this.sessionExpiryInterval = n.sessionExpiryInterval;
+ this.topicAliasMaximum = n.topicAliasMaximum;
+ this.maximumPacketSize = n.maximumPacketSize;
+ this.receiveMaximum = n.receiveMaximum;
// Config node state
this.brokerurl = "";
this.connected = false;
@@ -183,9 +284,19 @@ module.exports = function(RED) {
this.options.keepalive = this.keepalive;
this.options.clean = this.cleansession;
this.options.reconnectPeriod = RED.settings.mqttReconnectTime||5000;
- if (this.compatmode == "true" || this.compatmode === true) {
+ if (this.compatmode == "true" || this.compatmode === true || this.protocolVersion == 3) {
this.options.protocolId = 'MQIsdp';
this.options.protocolVersion = 3;
+ } else if ( this.protocolVersion == 5 ) {
+ this.options.protocolVersion = 5;
+ this.options.properties = {};
+ this.options.properties.requestResponseInformation = true;
+ this.options.properties.requestProblemInformation = true;
+ setIntProp(this,this.options.properties,"sessionExpiryInterval", 0);
+ setIntProp(this,this.options.properties,"topicAliasMaximum", 0);
+ setIntProp(this,this.options.properties,"maximumPacketSize", 0);
+ setIntProp(this,this.options.properties,"receiveMaximum", 0);
+ this.options.properties.userProperties = {"node-red":"v1.3"};//test
}
if (this.usetls && n.tls) {
var tlsNode = RED.nodes.getNode(n.tls);
@@ -242,6 +353,8 @@ module.exports = function(RED) {
if (!node.connected && !node.connecting) {
node.connecting = true;
try {
+ node.serverProperties = {};
+ debug("MQTT: ⬆️ mqtt.connect(node.brokerurl ,node.options)", node.brokerurl, node.options);
node.client = mqtt.connect(node.brokerurl ,node.options);
node.client.setMaxListeners(0);
// Register successful connect or reconnect handler
@@ -249,6 +362,19 @@ module.exports = function(RED) {
node.connecting = false;
node.connected = true;
node.log(RED._("mqtt.state.connected",{broker:(node.clientid?node.clientid+"@":"")+node.brokerurl}));
+ if(node.options.protocolVersion == 5 && arguments && arguments.length && arguments[0].hasOwnProperty("properties")) {
+ let mqttServerV5Properties = arguments[0].properties;
+ if(mqttServerV5Properties) {
+ setIntProp(mqttServerV5Properties, node.serverProperties, "topicAliasMaximum", 1);
+ setIntProp(mqttServerV5Properties, node.serverProperties, "receiveMaximum", 0);
+ setIntProp(mqttServerV5Properties, node.serverProperties, "sessionExpiryInterval", 0, 0xFFFFFFFF);
+ setIntProp(mqttServerV5Properties, node.serverProperties, "maximumPacketSize", 22);
+ setUserProperties(mqttServerV5Properties, node.serverProperties);
+ node.serverProperties.requestResponseInformation = mqttServerV5Properties.requestResponseInformation;
+ node.serverProperties.requestProblemInformation = mqttServerV5Properties.requestProblemInformation;
+ debug("MQTT: ⬆️ node.client.on('connect', cb) --> node.serverProperties", node.serverProperties );
+ }
+ }
for (var id in node.users) {
if (node.users.hasOwnProperty(id)) {
node.users[id].status({fill:"green",shape:"dot",text:"node-red:common.status.connected"});
@@ -260,16 +386,18 @@ module.exports = function(RED) {
// Re-subscribe to stored topics
for (var s in node.subscriptions) {
if (node.subscriptions.hasOwnProperty(s)) {
- var topic = s;
- var qos = 0;
+ let topic = s;
+ let qos = 0;
+ let _options = {};
for (var r in node.subscriptions[s]) {
if (node.subscriptions[s].hasOwnProperty(r)) {
qos = Math.max(qos,node.subscriptions[s][r].qos);
+ _options = node.subscriptions[s][r].options;
node.client.on('message',node.subscriptions[s][r].handler);
}
}
- var options = {qos: qos};
- node.client.subscribe(topic, options);
+ _options.qos = _options.qos || qos;
+ node.client.subscribe(topic, _options);
}
}
@@ -309,12 +437,21 @@ module.exports = function(RED) {
}
};
- this.subscribe = function (topic,qos,callback,ref) {
+ this.subscribe = function (topic,options,callback,ref) {
ref = ref||0;
+ var qos;
+ if(typeof options == "object") {
+ qos = options.qos;
+ } else {
+ qos = options;
+ options = {};
+ }
+ options.qos = qos;
node.subscriptions[topic] = node.subscriptions[topic]||{};
var sub = {
topic:topic,
qos:qos,
+ options:options,
handler:function(mtopic,mpayload, mpacket) {
if (matchTopic(topic,mtopic)) {
callback(mtopic,mpayload, mpacket);
@@ -325,8 +462,8 @@ module.exports = function(RED) {
node.subscriptions[topic][ref] = sub;
if (node.connected) {
node.client.on('message',sub.handler);
- var options = {};
- options.qos = qos;
+ // var options = {};
+ // options.qos = qos;
node.client.subscribe(topic, options);
}
};
@@ -361,11 +498,23 @@ module.exports = function(RED) {
msg.payload = "" + msg.payload;
}
}
-
var options = {
qos: msg.qos || 0,
retain: msg.retain || false
};
+ //https://github.com/mqttjs/MQTT.js/blob/master/README.md#mqttclientpublishtopic-message-options-callback
+ if(node.options.protocolVersion == 5) {
+ options.properties = options.properties || {};
+ setStrProp(msg, options.properties, "responseTopic");
+ setStrProp(msg, options.properties, "contentType");
+ setIntProp(msg, options.properties, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
+ setIntProp(msg, options.properties, "subscriptionIdentifier", 1, 268435455);
+ setIntProp(msg, options.properties, "messageExpiryInterval", 0);
+ setBoolProp(msg, options.properties, "payloadFormatIndicator");
+ setUserProperties(msg.userProperties, options.properties);
+ setCorrelationData(msg.correlationData, options.properties);
+ }
+ debug("MQTT: ➡️➡️ node.client.publish",msg,options);
node.client.publish(msg.topic, msg.payload, options, function(err) {
done && done();
return
@@ -405,6 +554,14 @@ module.exports = function(RED) {
RED.nodes.createNode(this,n);
this.topic = n.topic;
this.qos = parseInt(n.qos);
+ this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
+ this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
+ this.userPropertiesType = n.userPropertiesType;
+ this.nl = n.nl;
+ this.rap = n.rap;
+ this.rh = n.rh;
+
+
if (isNaN(this.qos) || this.qos < 0 || this.qos > 2) {
this.qos = 2;
}
@@ -416,10 +573,33 @@ module.exports = function(RED) {
this.datatype = n.datatype || "utf8";
var node = this;
if (this.brokerConn) {
+ let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5;
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
if (this.topic) {
node.brokerConn.register(this);
- this.brokerConn.subscribe(this.topic,this.qos,function(topic,payload,packet) {
+ let options = {
+ qos: this.qos
+ }
+ if(v5) {
+ options.properties = {};
+ if(node.userProperties && node.userPropertiesType !== "none") {
+ let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, {});
+ if(userProperties) options.properties.userProperties = userProperties;
+ }
+ if(node.subscriptionIdentifier) {
+ let sid = parseInt(node.subscriptionIdentifier);
+ if(sid >= 0 && !isNaN(sid) ) options.properties.subscriptionIdentifier = sid;//must not be sent if zero
+ }
+ if(node.nl === "true" || node.nl === true) options.nl = true;
+ if(node.nl === "false" || node.nl === false) options.nl = false;
+ if(node.rap === "true" || node.rap === true) options.rap = true;
+ if(node.rap === "false" || node.rap === false) options.rap = false;
+ if(node.rh === "true" || node.rh === true || node.rh == 1) options.rh = 1;
+ if(node.rh === "false" || node.rh === false || node.rh == 0) options.rh = 0;
+ }
+ debug("MQTT: ⬅️⬅️ this.brokerConn.subscribe",this.topic,options);
+ this.brokerConn.subscribe(this.topic,options,function(topic,payload,packet) {
+ debug("MQTT: ⬅️⬅️ ON this.brokerConn.subscribe",topic,payload,packet);
if (node.datatype === "buffer") {
// payload = payload;
} else if (node.datatype === "base64") {
@@ -437,9 +617,21 @@ module.exports = function(RED) {
if (isUtf8(payload)) { payload = payload.toString(); }
}
var msg = {topic:topic, payload:payload, qos:packet.qos, retain:packet.retain};
+ if(v5 && packet.properties) {
+ //msg.properties = packet.properties;
+ setStrProp(packet.properties, msg, "responseTopic");
+ setStrProp(packet.properties, msg, "contentType");
+ setIntProp(packet.properties, msg, "topicAlias", 1, node.serverProperties.topicAliasMaximum || 0);
+ setIntProp(packet.properties, msg, "subscriptionIdentifier", 1, 268435455);
+ setIntProp(packet.properties, msg, "messageExpiryInterval", 0);
+ setBoolProp(packet.properties, msg, "payloadFormatIndicator");
+ setUserProperties(packet.properties.userProperties, options.properties);
+ setCorrelationData(packet.properties.correlationData, options.properties);
+ }
if ((node.brokerConn.broker === "localhost")||(node.brokerConn.broker === "127.0.0.1")) {
msg._topic = topic;
}
+ debug("MQTT: ⬅️⬅️ node.send",msg);
node.send(msg);
}, this.id);
if (this.brokerConn.connected) {
@@ -467,11 +659,23 @@ module.exports = function(RED) {
this.qos = n.qos || null;
this.retain = n.retain;
this.broker = n.broker;
+ this.responseTopic = n.responseTopic;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901114
+ this.correlationData = n.correlationData;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901115
+ this.correlationDataType = n.correlationDataType;
+ this.contentType = n.contentType;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901118
+ this.topicAlias = n.topicAlias; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
+ this.messageExpiryInterval = n.messageExpiryInterval; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112
+ this.payloadFormatIndicator = n.payloadFormatIndicator; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111
+ this.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
+ this.userProperties = n.userProperties;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901116
+ this.userPropertiesType = n.userPropertiesType;
+
this.brokerConn = RED.nodes.getNode(this.broker);
var node = this;
var chk = /[\+#]/;
if (this.brokerConn) {
+ let v5 = this.brokerConn.options && this.brokerConn.options.protocolVersion == 5;
this.status({fill:"red",shape:"ring",text:"node-red:common.status.disconnected"});
this.on("input",function(msg,send,done) {
if (msg.qos) {
@@ -483,12 +687,32 @@ module.exports = function(RED) {
msg.qos = Number(node.qos || msg.qos || 0);
msg.retain = node.retain || msg.retain || false;
msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
- if (node.topic) {
- msg.topic = node.topic;
+ if(v5) {
+ //TODO: contain all v5 props in an object?
+ if(node.userProperties && node.userPropertiesType !== "none") {
+ let userProperties = RED.util.evaluateNodeProperty(node.userProperties, node.userPropertiesType, node, msg);
+ if(userProperties) msg.userProperties = userProperties;
+ }
+ if(node.correlationData && node.correlationDataType !== "none") {
+ let correlationData = RED.util.evaluateNodeProperty(node.correlationData, node.correlationDataType, node, msg);
+ if(correlationData) msg.correlationData = correlationData;
+ }
+ var msgPropOverride = function(propName) { if(node[propName]) { msg[propName] = node[propName]; } }
+ msgPropOverride("responseTopic");
+ msgPropOverride("contentType");
+ msgPropOverride("topicAlias");
+ msgPropOverride("messageExpiryInterval");
+ msgPropOverride("payloadFormatIndicator");
+ msgPropOverride("subscriptionIdentifier");
+ msgPropOverride("topic");
}
if ( msg.hasOwnProperty("payload")) {
- if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
- if (chk.test(msg.topic)) { node.warn(RED._("mqtt.errors.invalid-topic")); }
+ let topicOK = msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "");
+ if(!topicOK && v5 && msg.topicAlias && node.brokerConn.serverProperties.topicAliasMaximum) {
+ topicOK = typeof msg.topicAlias === "number" && msg.topicAlias >= 0 && node.brokerConn.serverProperties.topicAliasMaximum >= msg.topicAlias
+ msg.topic = "";//must be empty string?
+ }
+ if (topicOK) { // topic must exist
this.brokerConn.publish(msg, done); // send the message
} else {
node.warn(RED._("mqtt.errors.invalid-topic"));
@@ -510,4 +734,4 @@ module.exports = function(RED) {
}
}
RED.nodes.registerType("mqtt out",MQTTOutNode);
-};
+};
\ No newline at end of file
diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json
index eb623ce20..794d424b7 100755
--- a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json
+++ b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json
@@ -355,7 +355,11 @@
"use-tls": "Enable secure (SSL/TLS) connection",
"tls-config":"TLS Configuration",
"verify-server-cert":"Verify server certificate",
- "compatmode": "Use legacy MQTT 3.1 support"
+ "compatmode": "Use legacy MQTT 3.1 support",
+ "protocolVersion": "Version",
+ "protocolVersion3": "MQTT Protocol v3.1 (legacy)",
+ "protocolVersion4": "MQTT Protocol v3.1.1",
+ "protocolVersion5": "MQTT Protocol v5"
},
"sections-label":{
"birth-message": "Message sent on connection (birth message)",