From d8d82e2ba3f617545fcc8d1fa38945d5e0575538 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Mon, 9 Jul 2018 23:06:51 +0100 Subject: [PATCH] Update sort node for async use of jsonata --- nodes/core/logic/18-sort.js | 216 ++++++++++++++++++++---------------- 1 file changed, 118 insertions(+), 98 deletions(-) diff --git a/nodes/core/logic/18-sort.js b/nodes/core/logic/18-sort.js index e30535dc1..a3023e5f1 100644 --- a/nodes/core/logic/18-sort.js +++ b/nodes/core/logic/18-sort.js @@ -17,7 +17,7 @@ module.exports = function(RED) { "use strict"; - var _max_kept_msgs_count = undefined; + var _max_kept_msgs_count; function max_kept_msgs_count(node) { if (_max_kept_msgs_count === undefined) { @@ -32,30 +32,20 @@ module.exports = function(RED) { return _max_kept_msgs_count; } - function eval_jsonata(node, code, val) { - try { - return RED.util.evaluateJSONataExpression(code, val); - } - catch (e) { - node.error(RED._("sort.invalid-exp")); - throw e; - } - } - - function get_context_val(node, name, dval) { - var context = node.context(); - var val = context.get(name); - if (val === undefined) { - context.set(name, dval); - return dval; - } - return val; - } + // function get_context_val(node, name, dval) { + // var context = node.context(); + // var val = context.get(name); + // if (val === undefined) { + // context.set(name, dval); + // return dval; + // } + // return val; + // } function SortNode(n) { RED.nodes.createNode(this, n); var node = this; - var pending = get_context_val(node, 'pending', {}) + var pending = {};//get_context_val(node, 'pending', {}) var pending_count = 0; var pending_id = 0; var order = n.order || "ascending"; @@ -76,11 +66,10 @@ module.exports = function(RED) { } } var dir = (order === "ascending") ? 1 : -1; - var conv = as_num - ? function(x) { return Number(x); } - : function(x) { return x; }; + var conv = as_num ? function(x) { return Number(x); } + : function(x) { return x; }; - function gen_comp(key) { + function generateComparisonFunction(key) { return function(x, y) { var xp = conv(key(x)); var yp = conv(key(y)); @@ -90,74 +79,97 @@ module.exports = function(RED) { }; } - function send_group(group) { - var key = key_is_exp - ? function(msg) { - return eval_jsonata(node, key_exp, msg); - } - : function(msg) { - return RED.util.getMessageProperty(msg, key_prop); - }; - var comp = gen_comp(key); + function sortMessageGroup(group) { + var promise; var msgs = group.msgs; - try { - msgs.sort(comp); - } - catch (e) { - return; // not send when error - } - for (var i = 0; i < msgs.length; i++) { - var msg = msgs[i]; - msg.parts.index = i; - node.send(msg); - } - } - - function sort_payload(msg) { - var data = RED.util.getMessageProperty(msg, target_prop); - if (Array.isArray(data)) { - var key = key_is_exp - ? function(elem) { - return eval_jsonata(node, key_exp, elem); - } - : function(elem) { return elem; }; - var comp = gen_comp(key); + if (key_is_exp) { + var evaluatedDataPromises = msgs.map(msg => { + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(key_exp, msg, (err, result) => { + resolve({ + item: msg, + sortValue: result + }) + }); + }) + }); + promise = Promise.all(evaluatedDataPromises).then(evaluatedElements => { + // Once all of the sort keys are evaluated, sort by them + var comp = generateComparisonFunction(elem=>elem.sortValue); + return evaluatedElements.sort(comp).map(elem=>elem.item); + }); + } else { + var key = function(msg) { + return ; + } + var comp = generateComparisonFunction(msg => RED.util.getMessageProperty(msg, key_prop)); try { - data.sort(comp); + msgs.sort(comp); } catch (e) { - return false; + return; // not send when error } - return true; + promise = Promise.resolve(msgs); } - return false; + return promise.then(msgs => { + for (var i = 0; i < msgs.length; i++) { + var msg = msgs[i]; + msg.parts.index = i; + node.send(msg); + } + }); } - function check_parts(parts) { - if (parts.hasOwnProperty("id") && - parts.hasOwnProperty("index")) { - return true; + function sortMessageProperty(msg) { + var data = RED.util.getMessageProperty(msg, target_prop); + if (Array.isArray(data)) { + if (key_is_exp) { + // key is an expression. Evaluated the expression for each item + // to get its sort value. As this could be async, need to do + // it first. + var evaluatedDataPromises = data.map(elem => { + return new Promise((resolve,reject) => { + RED.util.evaluateJSONataExpression(key_exp, elem, (err, result) => { + resolve({ + item: elem, + sortValue: result + }) + }); + }) + }) + return Promise.all(evaluatedDataPromises).then(evaluatedElements => { + // Once all of the sort keys are evaluated, sort by them + // and reconstruct the original message item with the newly + // sorted values. + var comp = generateComparisonFunction(elem=>elem.sortValue); + data = evaluatedElements.sort(comp).map(elem=>elem.item); + RED.util.setMessageProperty(msg, target_prop,data); + return true; + }) + } else { + var comp = generateComparisonFunction(elem=>elem); + try { + data.sort(comp); + } catch (e) { + return Promise.resolve(false); + } + return Promise.resolve(true); + } } - return false; + return Promise.resolve(false); } - function clear_pending() { + function removeOldestPending() { + var oldest; + var oldest_key; for(var key in pending) { - node.log(RED._("sort.clear"), pending[key].msgs[0]); - delete pending[key]; - } - pending_count = 0; - } - - function remove_oldest_pending() { - var oldest = undefined; - var oldest_key = undefined; - for(var key in pending) { - var item = pending[key]; - if((oldest === undefined) || - (oldest.seq_no > item.seq_no)) { - oldest = item; - oldest_key = key; + if (pending.hasOwnProperty(key)) { + var item = pending[key]; + if((oldest === undefined) || + (oldest.seq_no > item.seq_no)) { + oldest = item; + oldest_key = key; + } } } if(oldest !== undefined) { @@ -166,16 +178,18 @@ module.exports = function(RED) { } return 0; } - - function process_msg(msg) { + + function processMessage(msg) { if (target_is_prop) { - if (sort_payload(msg)) { - node.send(msg); - } - return; + sortMessageProperty(msg).then(send => { + if (send) { + node.send(msg); + } + }).catch(err => { + }); } var parts = msg.parts; - if (!check_parts(parts)) { + if (!parts.hasOwnProperty("id") || !parts.hasOwnProperty("index")) { return; } var gid = parts.id; @@ -195,23 +209,29 @@ module.exports = function(RED) { pending_count++; if (group.count === msgs.length) { delete pending[gid] - send_group(group); + sortMessageGroup(group); pending_count -= msgs.length; - } - var max_msgs = max_kept_msgs_count(node); - if ((max_msgs > 0) && (pending_count > max_msgs)) { - pending_count -= remove_oldest_pending(); - node.error(RED._("sort.too-many"), msg); + } else { + var max_msgs = max_kept_msgs_count(node); + if ((max_msgs > 0) && (pending_count > max_msgs)) { + pending_count -= removeOldestPending(); + node.error(RED._("sort.too-many"), msg); + } } } - + this.on("input", function(msg) { - process_msg(msg); + processMessage(msg); }); this.on("close", function() { - clear_pending(); - }) + for(var key in pending) { + if (pending.hasOwnProperty(key)) { + node.log(RED._("sort.clear"), pending[key].msgs[0]); + delete pending[key]; + } + } + pending_count = 0; }) } RED.nodes.registerType("sort", SortNode);