mirror of https://github.com/node-red/node-red.git
Add session awareness to WebSocket node
This allows a websocket-in node to receive data, process it in a flow and then send it back to the originating websocket client via a websocket-out node.pull/101/head^2
parent
ab04fcf7c0
commit
9690ebe9c1
|
@ -43,6 +43,8 @@ function WebSocketListenerNode(n) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node._clients = {};
|
||||||
|
|
||||||
RED.server.addListener('newListener',storeListener);
|
RED.server.addListener('newListener',storeListener);
|
||||||
|
|
||||||
// Create a WebSocket Server
|
// Create a WebSocket Server
|
||||||
|
@ -53,8 +55,14 @@ function WebSocketListenerNode(n) {
|
||||||
RED.server.removeListener('newListener',storeListener);
|
RED.server.removeListener('newListener',storeListener);
|
||||||
|
|
||||||
node.server.on('connection', function(socket){
|
node.server.on('connection', function(socket){
|
||||||
|
var id = (1+Math.random()*4294967295).toString(16);
|
||||||
|
node._clients[id] = socket;
|
||||||
|
|
||||||
|
socket.on('close',function() {
|
||||||
|
delete node._clients[id];
|
||||||
|
});
|
||||||
socket.on('message',function(data,flags){
|
socket.on('message',function(data,flags){
|
||||||
node.handleEvent(socket,'message',data,flags);
|
node.handleEvent(id,socket,'message',data,flags);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -80,9 +88,9 @@ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){
|
||||||
this._inputNodes.push(handler);
|
this._inputNodes.push(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
WebSocketListenerNode.prototype.handleEvent = function(/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
|
WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
|
||||||
for (var i = 0; i < this._inputNodes.length; i++) {
|
for (var i = 0; i < this._inputNodes.length; i++) {
|
||||||
this._inputNodes[i].send({payload:data});
|
this._inputNodes[i].send({session:{type:"websocket",id:id},payload:data});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +100,13 @@ WebSocketListenerNode.prototype.broadcast = function(data){
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WebSocketListenerNode.prototype.send = function(id,data){
|
||||||
|
var session = this._clients[id];
|
||||||
|
if (session) {
|
||||||
|
session.send(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function WebSocketInNode(n) {
|
function WebSocketInNode(n) {
|
||||||
RED.nodes.createNode(this,n);
|
RED.nodes.createNode(this,n);
|
||||||
this.server = n.server;
|
this.server = n.server;
|
||||||
|
@ -114,11 +129,23 @@ function WebSocketOutNode(n) {
|
||||||
this.error("Missing server configuration");
|
this.error("Missing server configuration");
|
||||||
}
|
}
|
||||||
this.on("input", function(msg) {
|
this.on("input", function(msg) {
|
||||||
node.serverConfig.broadcast(msg.payload,function(error){
|
var payload = msg.payload;
|
||||||
|
if (Buffer.isBuffer(payload)) {
|
||||||
|
payload = payload.toString();
|
||||||
|
} else if (typeof payload === "object") {
|
||||||
|
payload = JSON.stringify(payload);
|
||||||
|
} else if (typeof payload !== "string") {
|
||||||
|
payload = ""+payload;
|
||||||
|
}
|
||||||
|
if (msg.session && msg.session.type == "websocket") {
|
||||||
|
node.serverConfig.send(msg.session.id,payload);
|
||||||
|
} else {
|
||||||
|
node.serverConfig.broadcast(payload,function(error){
|
||||||
if(!!error){
|
if(!!error){
|
||||||
node.warn("An error occurred while sending:" + inspect(error));
|
node.warn("An error occurred while sending:" + inspect(error));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
RED.nodes.registerType("websocket out",WebSocketOutNode);
|
RED.nodes.registerType("websocket out",WebSocketOutNode);
|
||||||
|
|
Loading…
Reference in New Issue