mirror of https://github.com/node-red/node-red.git
Avoid communication queue overflow with warning and throttling mechanisms
parent
f63b825fdc
commit
9201e6bd9e
|
|
@ -36,6 +36,9 @@ var anonymousUser;
|
|||
var heartbeatTimer;
|
||||
var lastSentTime;
|
||||
|
||||
const MAX_STACK_SIZE = 10_000;
|
||||
const MAX_STACK_SIZE_WARN_COOLDOWN_MILLIS = 30_000;
|
||||
|
||||
function init(_server,_settings,_runtimeAPI) {
|
||||
server = _server;
|
||||
settings = _settings;
|
||||
|
|
@ -62,6 +65,8 @@ function CommsConnection(ws, user) {
|
|||
this.stack = [];
|
||||
this.user = user;
|
||||
this.lastSentTime = 0;
|
||||
this.lastOverflowWarning = 0;
|
||||
this.totalDroppedSinceLastWarning = 0;
|
||||
var self = this;
|
||||
|
||||
log.audit({event: "comms.open"});
|
||||
|
|
@ -172,7 +177,40 @@ function CommsConnection(ws, user) {
|
|||
|
||||
CommsConnection.prototype.send = function(topic,data) {
|
||||
if (topic && data) {
|
||||
this.stack.push({topic:topic,data:data});
|
||||
if (this.stack.length < MAX_STACK_SIZE) {
|
||||
this.stack.push({topic:topic,data:data});
|
||||
} else {
|
||||
// Overflow.
|
||||
this.totalDroppedSinceLastWarning++;
|
||||
|
||||
var now = Date.now();
|
||||
if (this.lastOverflowWarning === 0 || now - this.lastOverflowWarning > MAX_STACK_SIZE_WARN_COOLDOWN_MILLIS) {
|
||||
// Set the timestamp now so that the call to log.warn will not create a
|
||||
// stack overflow due to the if-condition matching.
|
||||
this.lastOverflowWarning = now;
|
||||
|
||||
// Assemble warning message.
|
||||
let lastDroppedMessage = {topic:topic,data:data}
|
||||
let warnMessage = `Communication queue overflow for session ${this.session} - dropped ${this.totalDroppedSinceLastWarning} message(s) in last period (limit: ${MAX_STACK_SIZE}).\nCheck extensive usage of debug nodes or status updates from third-party nodes.\nLast dropped message: ${JSON.stringify(lastDroppedMessage)}`
|
||||
log.warn(warnMessage);
|
||||
|
||||
// Send notification directly via WebSocket for immediate feedback (bypassing the queue).
|
||||
try {
|
||||
this.ws.send(JSON.stringify([{
|
||||
topic: "debug",
|
||||
data: {
|
||||
level: log.WARN,
|
||||
format: "warning",
|
||||
msg: warnMessage,
|
||||
}
|
||||
}]));
|
||||
} catch(err) {
|
||||
// Silently ignore WebSocket send errors during overflow.
|
||||
}
|
||||
// Reset the counter.
|
||||
this.totalDroppedSinceLastWarning = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
this._queueSend();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue