From d7c8adfd82e0b90479660ebf6f95ea903404b508 Mon Sep 17 00:00:00 2001 From: Hiroyasu Nishiyama Date: Thu, 7 Dec 2017 04:44:46 +0900 Subject: [PATCH] Fix handling of too many pending messages in SORT node (#1514) * initial support of SORT node minor fix of sort node fixed error message of sort node fixed error handling of SORT node add test case for SORT node make limit of messages count computed once in SORT node * update type in message & info description * fix handling of pending messages in SORT node --- nodes/core/locales/en-US/messages.json | 3 +- nodes/core/logic/18-sort.js | 41 ++++++++++++++++++++++---- test/nodes/core/logic/18-sort_spec.js | 22 ++++++++++++++ 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/nodes/core/locales/en-US/messages.json b/nodes/core/locales/en-US/messages.json index 80c9b88a9..8562b961f 100644 --- a/nodes/core/locales/en-US/messages.json +++ b/nodes/core/locales/en-US/messages.json @@ -857,6 +857,7 @@ "descending" : "descending", "as-number" : "as number", "invalid-exp" : "invalid JSONata expression in sort node", - "too-many" : "too many pending messages in sort node" + "too-many" : "too many pending messages in sort node", + "clear" : "clear pending message in sort node" } } diff --git a/nodes/core/logic/18-sort.js b/nodes/core/logic/18-sort.js index 664cfc5d8..529e83f37 100644 --- a/nodes/core/logic/18-sort.js +++ b/nodes/core/logic/18-sort.js @@ -57,6 +57,7 @@ module.exports = function(RED) { var node = this; var pending = get_context_val(node, 'pending', {}) var pending_count = 0; + var pending_id = 0; var order = n.order || "ascending"; var as_num = n.as_num || false; var key_is_payload = (n.keyType === 'payload'); @@ -134,6 +135,32 @@ module.exports = function(RED) { return false; } + function clear_pending() { + for(var key in pending) { + node.log(RED._("sort.clear"), pending[key].msgs[0]); + delete pending[key]; + } + pending_count = 0; + } + + function remove_oldest_pending() { + var oldest = undefined; + var oldest_key = undefined; + for(var key in pending) { + var item = pending[key]; + if((oldest === undefined) || + (oldest.seq_no > item.seq_no)) { + oldest = item; + oldest_key = key; + } + } + if(oldest !== undefined) { + delete pending[oldest_key]; + return oldest.msgs.length; + } + return 0; + } + function process_msg(msg) { if (!msg.hasOwnProperty("parts")) { if (sort_payload(msg)) { @@ -149,7 +176,8 @@ module.exports = function(RED) { if (!pending.hasOwnProperty(gid)) { pending[gid] = { count: undefined, - msgs: [] + msgs: [], + seq_no: pending_id++ }; } var group = pending[gid]; @@ -166,15 +194,18 @@ module.exports = function(RED) { } var max_msgs = max_kept_msgs_count(node); if ((max_msgs > 0) && (pending_count > max_msgs)) { - pending = {}; - pending_count = 0; - node.error(RED._("sort.too-many"),msg); + pending_count -= remove_oldest_pending(); + node.error(RED._("sort.too-many"), msg); } } - + this.on("input", function(msg) { process_msg(msg); }); + + this.on("close", function() { + clear_pending(); + }) } RED.nodes.registerType("sort", SortNode); diff --git a/test/nodes/core/logic/18-sort_spec.js b/test/nodes/core/logic/18-sort_spec.js index 93581cc34..1695a7439 100644 --- a/test/nodes/core/logic/18-sort_spec.js +++ b/test/nodes/core/logic/18-sort_spec.js @@ -207,4 +207,26 @@ describe('SORT node', function() { }); }); + it('should clear pending messages on close', function(done) { + var flow = [{id:"n1", type:"sort", order:"ascending", as_num:false, keyType:"payload", wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(sortNode, flow, function() { + var n1 = helper.getNode("n1"); + setTimeout(function() { + var logEvents = helper.log().args.filter(function (evt) { + return evt[0].type == "sort"; + }); + var evt = logEvents[0][0]; + evt.should.have.property('id', "n1"); + evt.should.have.property('type', "sort"); + evt.should.have.property('msg', "sort.clear"); + done(); + }, 150); + var msg = { payload: 0, + parts: { id: "X", index: 0, count: 2} }; + n1.receive(msg); + n1.close(); + }); + }); + });