Allow scoped flow to cancel shutdown of other flows

pull/5346/head
GogoVega 2025-11-05 11:44:48 +01:00
parent ff5c2a264d
commit 0cd00d8410
No known key found for this signature in database
GPG Key ID: E1E048B63AC5AC2B
1 changed files with 18 additions and 0 deletions

View File

@ -81,7 +81,11 @@ class Flow {
/** @type {Number} */
this.shutdownTimeout = 10000; // 10s
/** @type {null|(() => void)} */
this.cancelShutdown = null;
/** @type {null|(() => void)} */
this.shutdown = null;
/** @type {boolean} */
this.failFast = false;
/** @type {string[]} */
this.messageInitiatorNodes = [];
/** @type {Record<string, string[]>} */
@ -366,6 +370,9 @@ class Flow {
}
}
this.shutdown = null;
this.cancelShutdown = null;
this.failFast = this.flow.failFast || false;
this.messageInitiatorNodes = this.flow.messageInitiatorNodes || [];
this.shutdownScope = this.flow.shutdownScope?.map((id) => activeFlows[id]) || null;
if (typeof this.flow.shutdownTimeout === "number") {
@ -526,6 +533,9 @@ class Flow {
/** @type {NodeJS.Timeout|null} */
let timer = null;
const timeoutPromise = new Promise((_resolve, reject) => {
if (this.failFast && this.shutdownScope) {
this.cancelShutdown = () => reject("Graceful shutdown cancelled");
}
timer = setTimeout(() => reject("Graceful shutdown timed out"), this.shutdownTimeout);
});
const start = Date.now();
@ -537,6 +547,14 @@ class Flow {
}).catch((error) => {
clearTimeout(timer);
this.error(Log._("nodes.flows.stopping-error", { message: error }));
if (this.failFast && this.shutdownScope) {
this.shutdownScope.forEach((flow) => {
if (flow instanceof Flow && flow.cancelShutdown && flow.id !== this.id) {
// Cancel other flow shutdown if this one fails
flow.cancelShutdown();
}
});
}
});
if (this.shutdown) {