Remove the polling loop + improvements

pull/5346/head
GogoVega 2025-11-04 12:15:55 +01:00
parent d52520250d
commit e17256934d
No known key found for this signature in database
GPG Key ID: E1E048B63AC5AC2B
2 changed files with 87 additions and 64 deletions

View File

@ -80,7 +80,9 @@ class Flow {
this.shutdownScope = null;
/** @type {Number} */
this.shutdownTimeout = 10000; // 10s
/** @type {object[]} */
/** @type {null|(() => void)} */
this.shutdown = null;
/** @type {string[]} */
this.messageInitiatorNodes = [];
/** @type {Record<string, string[]>} */
this.wiredNodeMap = {};
@ -169,6 +171,10 @@ class Flow {
}
this.trace("decrement pending messages count to: " + this.pendingMsgCount);
this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`);
if (this.shutdown) {
this.shutdown();
}
}
/**
@ -347,6 +353,7 @@ class Flow {
if (node.wires) {
// Create a map with incoming wires for each node
node.wires.forEach((output) => {
// TODO: Still an array?
output.forEach((wire) => {
this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || [];
this.wiredNodeMap[wire].push(node.id);
@ -359,7 +366,7 @@ class Flow {
}
}
this.messageInitiatorNodes = this.flow.messageInitiatorNodes?.map((id) => this.getNode(id)) || [];
this.messageInitiatorNodes = this.flow.messageInitiatorNodes || [];
this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null;
if (typeof this.flow.shutdownTimeout === "number") {
// TODO: Math.max
@ -416,9 +423,9 @@ class Flow {
* Stop this flow.
* The `stopList` argument helps define what needs to be stopped in the case
* of a modified-nodes/flows type deploy.
* @param {[type]} stopList [description]
* @param {[type]} removedList [description]
* @return {[type]} [description]
* @param {string[]} [stopList] [description]
* @param {string[]} [removedList] [description]
* @return {Promise<void>} [description]
*/
stop(stopList, removedList) {
this.trace("stop "+this.TYPE);
@ -451,20 +458,8 @@ class Flow {
/** @type {Promise<void>} */
let shutdownPromise = Promise.resolve();
if (gracefulShutdown && this.messageInitiatorNodes.length) {
for (const node of this.messageInitiatorNodes) {
if (node) {
delete this.activeNodes[node.id];
try {
const removed = removedMap[node.id];
if (removed) {
events.emit("node-status", { id: node.id });
}
initialPromises.push(stopNode(node, removed).catch(() => { }));
} catch (error) {
node.error(error);
}
}
}
// Start by closing initiator nodes
initialPromises.push(...this.stopNodes(this.messageInitiatorNodes, removedMap));
/** @type {Set<string>} */
const lookupNodes = new Set();
@ -484,11 +479,11 @@ class Flow {
}
}
shutdownPromise = new Promise((resolve, _reject) => {
let remainingPeriod = this.shutdownTimeout;
const loop = () => {
/** @type {Promise<void>} */
const closePromise = new Promise((resolve) => {
this.shutdown = () => {
if (fullStop) {
this.debug(`Pending message count: ${this.pendingMsgCount}`);
this.trace(`Pending message count: ${this.pendingMsgCount}`);
if (this.shutdownScope === null) {
if (this.pendingMsgCount === 0) {
return resolve();
@ -501,13 +496,15 @@ class Flow {
}
});
if (count === 0) {
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.shutdown && flow.id !== this.id) {
// Retrigger validation in each flow
flow.shutdown();
}
});
return resolve();
}
}
if (remainingPeriod < 0) {
this.error("Timeout for graceful shutdown has expired");
return resolve();
}
} else {
let inProgress = false;
for (const id of Object.keys(this.msgInProgressNodeMap)) {
@ -519,45 +516,64 @@ class Flow {
if (!inProgress) {
return resolve();
}
if (remainingPeriod < 0) {
this.error("Timeout for graceful shutdown has expired");
return resolve();
}
}
remainingPeriod -= 1000;
setTimeout(loop, 1000); // polling every 1000ms
};
setImmediate(loop);
});
}
/** @type {() => Promise<void>[]} */
const finalClose = () => {
const finalPromises = [];
for (var i=0;i<stopList.length;i++) {
const node = this.activeNodes[stopList[i]];
if (node) {
delete this.activeNodes[stopList[i]];
if (this.subflowInstanceNodes[stopList[i]]) {
delete this.subflowInstanceNodes[stopList[i]];
}
try {
const removed = removedMap[stopList[i]];
if (removed) {
events.emit("node-status", { id: node.id });
}
finalPromises.push(stopNode(node,removed).catch(() => { }));
} catch(err) {
node.error(err);
}
}
});
/** @type {NodeJS.Timeout|null} */
let timer = null;
const timeoutPromise = new Promise((_resolve, reject) => {
timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout);
});
const start = Date.now();
shutdownPromise = Promise.race([closePromise, timeoutPromise])
.then(() => {
clearTimeout(timer);
const delta = Date.now() - start;
this.trace(`Stopped gracefuly in ${delta}ms`);
}).catch((error) => {
clearTimeout(timer);
this.error(Log._("nodes.flows.stopping-error", { message: error }));
});
if (this.shutdown) {
setImmediate(this.shutdown);
}
return finalPromises;
};
}
return Promise.all(initialPromises)
.then(() => shutdownPromise)
.then(() => Promise.all(finalClose()));
.then(() => Promise.all(this.stopNodes(stopList, removedMap)));
}
/**
* Stop every node of stopList
* @param {string[]} stopList
* @param {Record<string, true>} removedMap
* @returns {Promise<void>[]}
*/
stopNodes(stopList, removedMap) {
const promises = [];
for (const nodeId of stopList) {
const node = this.activeNodes[nodeId];
if (node) {
delete this.activeNodes[nodeId];
if (this.subflowInstanceNodes[nodeId]) {
delete this.subflowInstanceNodes[nodeId];
}
try {
const removed = removedMap[nodeId];
if (removed) {
// Clears the node status
events.emit("node-status", { id: node.id });
}
promises.push(stopNode(node, removed).catch(() => { }));
} catch(error) {
node.error(error);
}
}
}
return promises;
}
/**
@ -912,9 +928,9 @@ class Flow {
/**
* Stop an individual node within this flow.
*
* @param {[type]} node [description]
* @param {[type]} removed [description]
* @return {[type]} [description]
* @param {object} node [description]
* @param {boolean} removed [description]
* @return {Promise<void>} [description]
*/
function stopNode(node,removed) {
Log.trace("Stopping node "+node.type+":"+node.id+(removed?" removed":""));
@ -989,7 +1005,7 @@ function handlePreDeliver(flow,sendEvent, reportError) {
} else if (err !== false) {
// node.send(msg) called
// Do it now to avoid next loop and an offbeat count
flow.incrementPendingMsgCount(sendEvent.destination.node || { id: sendEvent.destination.id });
flow.incrementPendingMsgCount(sendEvent.destination.node);
if (asyncMessageDelivery) {
setImmediate(function() {
deliverMessageToDestination(sendEvent)

View File

@ -203,6 +203,7 @@ Node.prototype.emit = function(event, ...args) {
* This will call all registered handlers for the 'input' event.
*/
Node.prototype._emitInput = function(arg) {
// TODO: inject node directly used the event to trigger a message, is this allowed?
var node = this;
this.metric("receive", arg);
let receiveEvent = { msg:arg, destination: { id: this.id, node: this } }
@ -221,6 +222,7 @@ Node.prototype._emitInput = function(arg) {
function(err) { node._complete(arg,err); }
);
if (node._expectedDoneCount === 0 && node._flow) {
// TODO: Call Complete node is expected?
// Ensure done() is called
node._complete(arg);
}
@ -245,6 +247,11 @@ Node.prototype._emitInput = function(arg) {
}
}
);
if (node._expectedDoneCount === 0 && node._flow) {
// TODO: Call Complete node is expected?
// Ensure done() is called
node._complete(arg);
}
} catch(err) {
node.error(err,arg);
}