diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.html b/packages/node_modules/@node-red/nodes/core/function/89-delay.html index 957bf7f12..6c939240d 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.html @@ -63,6 +63,7 @@ @@ -185,6 +186,8 @@ return this._("delay.label.limit")+" "+rate; } else if (this.pauseType == "timed") { return this._("delay.label.limitTopic")+" "+rate; + } else if (this.pauseType == "burst") { + return this._("delay.label.burst")+" "+rate; } else { return this._("delay.label.limitTopic")+" "+rate; } @@ -245,6 +248,9 @@ $("#node-input-delay-action").val('rate'); $("#node-input-rate-type").val('topic'); $("#node-input-rate-topic-type").val('timed'); + } else if (this.pauseType == "burst") { + $("#node-input-delay-action").val('rate'); + $("#node-input-rate-type").val('burst'); } if (!this.timeoutUnits) { @@ -294,12 +300,17 @@ if (this.value === "all") { $("#rate-details-per-topic").hide(); $("#node-input-drop-select-queue").attr('disabled', false); + $("#rate-override").show(); + } else if (this.value === "burst") { + $("#rate-details-per-topic").hide(); + $("#node-input-drop-select-queue").attr('disabled', true); + $("#rate-override").hide(); } else if (this.value === "topic") { if ($("#node-input-drop-select").val() === "queue") { - $("#node-input-drop-select").val("drop"); } $("#node-input-drop-select-queue").attr('disabled', true); $("#rate-details-per-topic").show(); + $("#rate-override").show(); } }).trigger("change"); }, @@ -312,6 +323,8 @@ action = $("#node-input-rate-type").val(); if (action === "all") { this.pauseType = "rate"; + } else if (action === "burst") { + this.pauseType = "burst"; } else { this.pauseType = $("#node-input-rate-topic-type").val(); } diff --git a/packages/node_modules/@node-red/nodes/core/function/89-delay.js b/packages/node_modules/@node-red/nodes/core/function/89-delay.js index c44fa06c5..b42061f4f 100644 --- a/packages/node_modules/@node-red/nodes/core/function/89-delay.js +++ b/packages/node_modules/@node-red/nodes/core/function/89-delay.js @@ -42,6 +42,7 @@ module.exports = function(RED) { this.timeoutUnits = n.timeoutUnits; this.randomUnits = n.randomUnits; this.rateUnits = n.rateUnits; + this.burst = n.rate; if (n.timeoutUnits === "milliseconds") { this.timeout = n.timeout; @@ -57,12 +58,16 @@ module.exports = function(RED) { if (n.rateUnits === "minute") { this.rate = (60 * 1000)/n.rate; + this.timer = n.nbRateUnits * (60 * 1000); } else if (n.rateUnits === "hour") { this.rate = (60 * 60 * 1000)/n.rate; + this.timer = n.nbRateUnits * (60 * 60 * 1000); } else if (n.rateUnits === "day") { this.rate = (24 * 60 * 60 * 1000)/n.rate; + this.timer = n.nbRateUnits * (24 * 60 * 60 * 1000); } else { // Default to seconds this.rate = 1000/n.rate; + this.timer = n.nbRateUnits * 1000; } this.rate *= (n.nbRateUnits > 0 ? n.nbRateUnits : 1); @@ -337,6 +342,42 @@ module.exports = function(RED) { }); } + // Handle the burst mode + else if (node.pauseType === "burst") { + node.timers = []; + node.inflight = 0; + node.status({ fill: "green", shape: "ring", text: "" }) + node.on("input", function(msg, send, done) { + if (msg.hasOwnProperty("reset")) { + node.timers.forEach((t) => clearTimeout(t)); + node.timers = []; + node.inflight = 0; + node.status({ fill: "green", shape: "ring", text: "" }); + done(); + return; + } + if (node.inflight < node.burst) { + node.inflight += 1; + send(msg); + node.timers[node.inflight-1] = setTimeout(() => { + if (node.inflight == node.burst) { + node.status({ fill: "green", shape: "ring", text: "" }); + } + node.inflight -= 1; + }, node.timer); + } + else { + if (node.outputs === 2) { send([null,msg]); } + node.status({ fill: "red", shape: "dot", text: "" }); + } + done(); + }); + node.on("close", function() { + node.timers.forEach((t) => clearTimeout(t)); + node.status({}); + }); + } + // The topic based fair queue and last arrived on all topics queue else if ((node.pauseType === "queue") || (node.pauseType === "timed")) { node.intervalID = setInterval(function() { diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html index ceb71ea0a..90ae9fb03 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/function/89-delay.html @@ -16,6 +16,7 @@ diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json index 0e5f4ae62..dd96a90e0 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/messages.json +++ b/packages/node_modules/@node-red/nodes/locales/en-US/messages.json @@ -309,7 +309,8 @@ "delayvarmsg": "Override delay with msg.delay", "randomdelay": "Random delay", "limitrate": "Rate Limit", - "limitall": "All messages", + "limitall": "All messages - even spacing", + "limitburst": "All messages - burst mode", "limittopic": "For each msg.topic", "fairqueue": "Send each topic in turn", "timedqueue": "Send all topics", @@ -333,6 +334,7 @@ "label": { "delay": "delay", "variable": "variable", + "burst": "burst", "limit": "limit", "limitTopic": "limit topic", "random": "random", diff --git a/test/nodes/core/function/89-delay_spec.js b/test/nodes/core/function/89-delay_spec.js index 4fbf3df54..15dbe340d 100644 --- a/test/nodes/core/function/89-delay_spec.js +++ b/test/nodes/core/function/89-delay_spec.js @@ -1009,6 +1009,67 @@ describe('delay Node', function() { }); }); + it('can handle a burst in burst mode', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"burst","timeout":1,"timeoutUnits":"seconds","rate":3,"nbRateUnits":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + c += 1; + if (c > 3) { done(err)} + done(); + }); + + setTimeout( function() { + if (c === 3) { done(); } + }, 700); + + // send test messages + delayNode1.receive({payload:1}); + setImmediate( function() { delayNode1.receive({payload:2}); } ); + setImmediate( function() { delayNode1.receive({payload:3}); } ); + setImmediate( function() { delayNode1.receive({payload:4}); } ); + setImmediate( function() { delayNode1.receive({payload:5}); } ); + }); + }); + + it('can reset a burst in burst mode', function(done) { + this.timeout(2000); + var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"burst","timeout":1,"timeoutUnits":"seconds","rate":3,"nbRateUnits":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]}, + {id:"helperNode1", type:"helper", wires:[]}]; + helper.load(delayNode, flow, function() { + var delayNode1 = helper.getNode("delayNode1"); + var helperNode1 = helper.getNode("helperNode1"); + var t = Date.now(); + var c = 0; + helperNode1.on("input", function(msg) { + msg.should.have.a.property('payload'); + c += 1; + if (c > 4) { done(err)} + done(); + }); + + setTimeout( function() { + if (c === 4) { done(); } + }, 700); + + // send test messages + delayNode1.receive({payload:1}); + setImmediate( function() { delayNode1.receive({payload:2}); } ); + setImmediate( function() { delayNode1.receive({payload:3}); } ); + setImmediate( function() { delayNode1.receive({payload:4}); } ); + setImmediate( function() { delayNode1.receive({payload:5}); } ); + setImmediate( function() { delayNode1.receive({reset:true}); } ); + setImmediate( function() { delayNode1.receive({payload:6}); } ); + }); + }); + + it('sending a msg with reset to empty queue doesnt send anything', function(done) { this.timeout(2000); var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},