Fix handling of too many pending messages in SORT node (#1514)

* initial support of SORT node

minor fix of sort node

fixed error message of sort node

fixed error handling of SORT node

add test case for SORT node

make limit of messages count computed once in SORT node

* update type in message & info description

* fix handling of pending messages in SORT node
pull/1515/head
Hiroyasu Nishiyama 2017-12-07 04:44:46 +09:00 committed by Dave Conway-Jones
parent b98d1216b1
commit d7c8adfd82
3 changed files with 60 additions and 6 deletions

View File

@ -857,6 +857,7 @@
"descending" : "descending",
"as-number" : "as number",
"invalid-exp" : "invalid JSONata expression in sort node",
"too-many" : "too many pending messages in sort node"
"too-many" : "too many pending messages in sort node",
"clear" : "clear pending message in sort node"
}
}

View File

@ -57,6 +57,7 @@ module.exports = function(RED) {
var node = this;
var pending = get_context_val(node, 'pending', {})
var pending_count = 0;
var pending_id = 0;
var order = n.order || "ascending";
var as_num = n.as_num || false;
var key_is_payload = (n.keyType === 'payload');
@ -134,6 +135,32 @@ module.exports = function(RED) {
return false;
}
function clear_pending() {
for(var key in pending) {
node.log(RED._("sort.clear"), pending[key].msgs[0]);
delete pending[key];
}
pending_count = 0;
}
function remove_oldest_pending() {
var oldest = undefined;
var oldest_key = undefined;
for(var key in pending) {
var item = pending[key];
if((oldest === undefined) ||
(oldest.seq_no > item.seq_no)) {
oldest = item;
oldest_key = key;
}
}
if(oldest !== undefined) {
delete pending[oldest_key];
return oldest.msgs.length;
}
return 0;
}
function process_msg(msg) {
if (!msg.hasOwnProperty("parts")) {
if (sort_payload(msg)) {
@ -149,7 +176,8 @@ module.exports = function(RED) {
if (!pending.hasOwnProperty(gid)) {
pending[gid] = {
count: undefined,
msgs: []
msgs: [],
seq_no: pending_id++
};
}
var group = pending[gid];
@ -166,15 +194,18 @@ module.exports = function(RED) {
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
pending = {};
pending_count = 0;
node.error(RED._("sort.too-many"),msg);
pending_count -= remove_oldest_pending();
node.error(RED._("sort.too-many"), msg);
}
}
this.on("input", function(msg) {
process_msg(msg);
});
this.on("close", function() {
clear_pending();
})
}
RED.nodes.registerType("sort", SortNode);

View File

@ -207,4 +207,26 @@ describe('SORT node', function() {
});
});
it('should clear pending messages on close', function(done) {
var flow = [{id:"n1", type:"sort", order:"ascending", as_num:false, keyType:"payload", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(sortNode, flow, function() {
var n1 = helper.getNode("n1");
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "sort";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "n1");
evt.should.have.property('type', "sort");
evt.should.have.property('msg', "sort.clear");
done();
}, 150);
var msg = { payload: 0,
parts: { id: "X", index: 0, count: 2} };
n1.receive(msg);
n1.close();
});
});
});