Allow graceful shutdown for node/flow deployment

pull/5346/head
GogoVega 2025-11-03 15:46:30 +01:00
parent cdd2abc4c1
commit 09e177bbbf
No known key found for this signature in database
GPG Key ID: E1E048B63AC5AC2B
2 changed files with 97 additions and 24 deletions

View File

@ -74,12 +74,16 @@ class Flow {
// Graceful Shutdown props
/** @type {Number} */
this.pendingMsgCount = 0;
/** @type {Record<string, number>} */
this.msgInProgressNodeMap = {};
/** @type {object[]|null} */
this.shutdownScope = null;
/** @type {Number} */
this.shutdownTimeout = 10000; // 10s
/** @type {object[]} */
this.messageInitiatorNodes = [];
/** @type {Record<string, string[]>} */
this.wiredNodeMap = {};
}
/**
@ -150,14 +154,37 @@ class Flow {
this.parent.log(msg);
}
decrementPendindMsgCount() {
/**
* This method is called when a node finishes processing a message.
* e.g. `node.done()` or `node.done(error)`.
* @param {object} node
*/
decrementPendingMsgCount(node) {
this.pendingMsgCount--;
this.debug("Decrement pending messages count to: " + this.pendingMsgCount);
if (this.msgInProgressNodeMap.hasOwnProperty(node.id)) {
this.msgInProgressNodeMap[node.id]--;
if (this.msgInProgressNodeMap[node.id] <= 0) {
delete this.msgInProgressNodeMap[node.id];
}
}
this.trace("decrement pending messages count to: " + this.pendingMsgCount);
this.trace(`decrement in-progress msg to: ${this.msgInProgressNodeMap[node.id] || 0} for: ${node.id}`);
}
incrementPendingMsgCount() {
/**
* This method is called when a node sends a message.
* e.g. `node.send(msg)` or `node.receive(msg)`.
* @param {object} node
*/
incrementPendingMsgCount(node) {
this.pendingMsgCount++;
this.debug("Increment pending messages count to: " + this.pendingMsgCount);
if (this.msgInProgressNodeMap[node.id]) {
this.msgInProgressNodeMap[node.id]++;
} else {
this.msgInProgressNodeMap[node.id] = 1;
}
this.trace("increment pending messages count to: " + this.pendingMsgCount);
this.trace(`increment in-progress msg to: ${this.msgInProgressNodeMap[node.id]} for: ${node.id}`);
}
/**
@ -280,6 +307,7 @@ class Flow {
}
}
this.wiredNodeMap = {};
for (id in this.flow.nodes) {
if (this.flow.nodes.hasOwnProperty(id)) {
node = this.flow.nodes[id];
@ -321,6 +349,15 @@ class Flow {
}
}
}
if (node.wires) {
// Create a map with incoming wires for each node
node.wires.forEach((output) => {
output.forEach((wire) => {
this.wiredNodeMap[wire] = this.wiredNodeMap[wire] || [];
this.wiredNodeMap[wire].push(node.id);
});
});
}
} else {
this.debug("not starting disabled node : "+id);
}
@ -383,7 +420,9 @@ class Flow {
*/
stop(stopList, removedList) {
this.trace("stop "+this.TYPE);
let fullStop = false;
if (!stopList) {
fullStop = true;
stopList = Object.keys(this.activeNodes);
}
// this.trace(" stopList: "+stopList.join(","))
@ -425,29 +464,63 @@ class Flow {
}
}
/** @type {Set<string>} */
const lookupNodes = new Set();
if (!fullStop) {
// Create a list of nodes wired upstream of nodes to be stopped.
const list = stopList.slice();
while (list.length) {
const id = list.pop();
if (lookupNodes.has(id)) {
continue;
}
lookupNodes.add(id);
if (this.wiredNodeMap[id]) {
list.push(...this.wiredNodeMap[id]);
}
}
}
shutdownPromise = new Promise((resolve, _reject) => {
let remainingPeriod = this.shutdownTimeout;
const loop = () => {
this.debug(`Pending message count: ${this.pendingMsgCount}`);
if (this.shutdownScope === null) {
if (this.pendingMsgCount === 0) {
this.pendingMsgCount = 0;
return resolve();
}
} else if (this.shutdownScope.length) {
let count = 0;
this.shutdownScope.forEach((flow) => {
if (flow && "pendingMsgCount" in flow) {
count += flow.pendingMsgCount;
if (fullStop) {
this.debug(`Pending message count: ${this.pendingMsgCount}`);
if (this.shutdownScope === null) {
if (this.pendingMsgCount === 0) {
return resolve();
}
});
if (count === 0) {
this.pendingMsgCount = 0;
} else if (this.shutdownScope.length) {
let count = 0;
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow) {
count += flow.pendingMsgCount;
}
});
if (count === 0) {
return resolve();
}
}
if (remainingPeriod < 0) {
this.error("Timeout for graceful shutdown has expired");
return resolve();
}
} else {
let inProgress = false;
for (const id of Object.keys(this.msgInProgressNodeMap)) {
if (lookupNodes.has(id)) {
inProgress = true;
break;
}
}
if (!inProgress) {
return resolve();
}
if (remainingPeriod < 0) {
this.error("Timeout for graceful shutdown has expired");
return resolve();
}
} else if (remainingPeriod < 0) {
this.pendingMsgCount = 0;
return resolve();
}
remainingPeriod -= 1000;
setTimeout(loop, 1000); // polling every 1000ms
@ -914,7 +987,7 @@ function handlePreDeliver(flow,sendEvent, reportError) {
} else if (err !== false) {
// node.send(msg) called
// Do it now to avoid next loop and an offbeat count
flow.incrementPendingMsgCount();
flow.incrementPendingMsgCount(sendEvent.destination.node);
if (asyncMessageDelivery) {
setImmediate(function() {
deliverMessageToDestination(sendEvent)

View File

@ -133,7 +133,7 @@ Node.prototype._complete = function(msg,error) {
this.metric("done",msg);
if (this._flow) {
// done() or done(error) called
this._flow.decrementPendindMsgCount();
this._flow.decrementPendingMsgCount(this);
}
hooks.trigger("onComplete",{ msg: msg, error: error, node: { id: this.id, node: this }}, err => {
if (err) {
@ -513,7 +513,7 @@ Node.prototype._receive = function (msg) {
Node.prototype.receive = function (msg) {
if (this._flow) {
// Equivalent to node.send(msg) without handleOnSend
this._flow.incrementPendingMsgCount();
this._flow.incrementPendingMsgCount(this);
}
this._receive(msg);
};