Merge pull request #5391 from node-red/add-burst-mode-to-delay-node

add burst mode to delay node
pull/5446/head
Nick O'Leary 2026-01-21 09:49:02 +00:00 committed by GitHub
commit 8faebae4bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 131 additions and 5 deletions

View File

@ -63,6 +63,7 @@
<label></label>
<select id="node-input-rate-type" style="width:270px !important">
<option value="all" data-i18n="delay.limitall"></option>
<option value="burst" data-i18n="delay.limitburst"></option>
<option value="topic" data-i18n="delay.limittopic"></option>
</select>
</div>
@ -185,6 +186,8 @@
return this._("delay.label.limit")+" "+rate;
} else if (this.pauseType == "timed") {
return this._("delay.label.limitTopic")+" "+rate;
} else if (this.pauseType == "burst") {
return this._("delay.label.burst")+" "+rate;
} else {
return this._("delay.label.limitTopic")+" "+rate;
}
@ -245,6 +248,9 @@
$("#node-input-delay-action").val('rate');
$("#node-input-rate-type").val('topic');
$("#node-input-rate-topic-type").val('timed');
} else if (this.pauseType == "burst") {
$("#node-input-delay-action").val('rate');
$("#node-input-rate-type").val('burst');
}
if (!this.timeoutUnits) {
@ -294,12 +300,17 @@
if (this.value === "all") {
$("#rate-details-per-topic").hide();
$("#node-input-drop-select-queue").attr('disabled', false);
$("#rate-override").show();
} else if (this.value === "burst") {
$("#rate-details-per-topic").hide();
$("#node-input-drop-select-queue").attr('disabled', true);
$("#rate-override").hide();
} else if (this.value === "topic") {
if ($("#node-input-drop-select").val() === "queue") {
$("#node-input-drop-select").val("drop");
}
$("#node-input-drop-select-queue").attr('disabled', true);
$("#rate-details-per-topic").show();
$("#rate-override").show();
}
}).trigger("change");
},
@ -312,6 +323,8 @@
action = $("#node-input-rate-type").val();
if (action === "all") {
this.pauseType = "rate";
} else if (action === "burst") {
this.pauseType = "burst";
} else {
this.pauseType = $("#node-input-rate-topic-type").val();
}

View File

@ -42,6 +42,7 @@ module.exports = function(RED) {
this.timeoutUnits = n.timeoutUnits;
this.randomUnits = n.randomUnits;
this.rateUnits = n.rateUnits;
this.burst = n.rate;
if (n.timeoutUnits === "milliseconds") {
this.timeout = n.timeout;
@ -57,12 +58,16 @@ module.exports = function(RED) {
if (n.rateUnits === "minute") {
this.rate = (60 * 1000)/n.rate;
this.timer = n.nbRateUnits * (60 * 1000);
} else if (n.rateUnits === "hour") {
this.rate = (60 * 60 * 1000)/n.rate;
this.timer = n.nbRateUnits * (60 * 60 * 1000);
} else if (n.rateUnits === "day") {
this.rate = (24 * 60 * 60 * 1000)/n.rate;
this.timer = n.nbRateUnits * (24 * 60 * 60 * 1000);
} else { // Default to seconds
this.rate = 1000/n.rate;
this.timer = n.nbRateUnits * 1000;
}
this.rate *= (n.nbRateUnits > 0 ? n.nbRateUnits : 1);
@ -337,6 +342,42 @@ module.exports = function(RED) {
});
}
// Handle the burst mode
else if (node.pauseType === "burst") {
node.timers = [];
node.inflight = 0;
node.status({ fill: "green", shape: "ring", text: "" })
node.on("input", function(msg, send, done) {
if (msg.hasOwnProperty("reset")) {
node.timers.forEach((t) => clearTimeout(t));
node.timers = [];
node.inflight = 0;
node.status({ fill: "green", shape: "ring", text: "" });
done();
return;
}
if (node.inflight < node.burst) {
node.inflight += 1;
send(msg);
node.timers[node.inflight-1] = setTimeout(() => {
if (node.inflight == node.burst) {
node.status({ fill: "green", shape: "ring", text: "" });
}
node.inflight -= 1;
}, node.timer);
}
else {
if (node.outputs === 2) { send([null,msg]); }
node.status({ fill: "red", shape: "dot", text: "" });
}
done();
});
node.on("close", function() {
node.timers.forEach((t) => clearTimeout(t));
node.status({});
});
}
// The topic based fair queue and last arrived on all topics queue
else if ((node.pauseType === "queue") || (node.pauseType === "timed")) {
node.intervalID = setInterval(function() {

View File

@ -16,6 +16,7 @@
<script type="text/html" data-help-name="delay">
<p>Delays each message passing through the node or limits the rate at which they can pass.</p>
<p>Not all input parameters apply to all modes.</p>
<h3>Inputs</h3>
<dl class="message-properties">
<dt class="optional">delay <span class="property-type">number</span></dt>
@ -47,9 +48,11 @@
Each message is delayed independently of any other message, based on
the time of its arrival.
</p>
<p>When configured to rate limit messages, their delivery is spread across
the configured time period. The status shows the number of messages currently in the queue.
It can optionally discard intermediate messages as they arrive.</p>
<p>When configured to rate limit messages, the node can operate in several modes.
By default the messages are delivered evenly spread across
the configured time period. e.g. 3 msgs in 12 seconds means 1 message every 4 seconds.
The status shows the number of messages currently in the queue.
It can optionally discard intermediate messages as they arrive.
</p>
<p>If set to allow override of the rate, the new rate will be applied immediately,
and will remain in effect until changed again, the node is reset, or the flow is restarted.</p>
@ -59,6 +62,12 @@
the most recent message for all topics, or release the most recent message
for the next topic.
</p>
<p>In burst mode, messages are passed through up to the configured limit immediately,
any further messages will be dropped or sent to the second output until the time
period has elapsed. At that point the next burst of messages can be sent.
The status shows green when messages can be sent immediately, and red when
messages are blocked.
</p>
<p><b>Note</b>: In rate limit mode the maximum queue depth can be set by a property in your
<i>settings.js</i> file. For example <code>nodeMessageBufferMaxLength: 1000,</code></p>
</script>

View File

@ -309,7 +309,8 @@
"delayvarmsg": "Override delay with msg.delay",
"randomdelay": "Random delay",
"limitrate": "Rate Limit",
"limitall": "All messages",
"limitall": "All messages - even spacing",
"limitburst": "All messages - burst mode",
"limittopic": "For each msg.topic",
"fairqueue": "Send each topic in turn",
"timedqueue": "Send all topics",
@ -333,6 +334,7 @@
"label": {
"delay": "delay",
"variable": "variable",
"burst": "burst",
"limit": "limit",
"limitTopic": "limit topic",
"random": "random",

View File

@ -1009,6 +1009,67 @@ describe('delay Node', function() {
});
});
it('can handle a burst in burst mode', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"burst","timeout":1,"timeoutUnits":"seconds","rate":3,"nbRateUnits":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
c += 1;
if (c > 3) { done(err)}
done();
});
setTimeout( function() {
if (c === 3) { done(); }
}, 700);
// send test messages
delayNode1.receive({payload:1});
setImmediate( function() { delayNode1.receive({payload:2}); } );
setImmediate( function() { delayNode1.receive({payload:3}); } );
setImmediate( function() { delayNode1.receive({payload:4}); } );
setImmediate( function() { delayNode1.receive({payload:5}); } );
});
});
it('can reset a burst in burst mode', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"burst","timeout":1,"timeoutUnits":"seconds","rate":3,"nbRateUnits":1,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
{id:"helperNode1", type:"helper", wires:[]}];
helper.load(delayNode, flow, function() {
var delayNode1 = helper.getNode("delayNode1");
var helperNode1 = helper.getNode("helperNode1");
var t = Date.now();
var c = 0;
helperNode1.on("input", function(msg) {
msg.should.have.a.property('payload');
c += 1;
if (c > 4) { done(err)}
done();
});
setTimeout( function() {
if (c === 4) { done(); }
}, 700);
// send test messages
delayNode1.receive({payload:1});
setImmediate( function() { delayNode1.receive({payload:2}); } );
setImmediate( function() { delayNode1.receive({payload:3}); } );
setImmediate( function() { delayNode1.receive({payload:4}); } );
setImmediate( function() { delayNode1.receive({payload:5}); } );
setImmediate( function() { delayNode1.receive({reset:true}); } );
setImmediate( function() { delayNode1.receive({payload:6}); } );
});
});
it('sending a msg with reset to empty queue doesnt send anything', function(done) {
this.timeout(2000);
var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":1,"timeoutUnits":"seconds","rate":2,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},