mirror of https://github.com/node-red/node-red.git
CComms API updates
parent
f041a21f22
commit
068b93befa
|
@ -77,6 +77,53 @@ function CommsConnection(ws, user) {
|
|||
log.trace("comms.close "+self.session);
|
||||
removeActiveConnection(self);
|
||||
});
|
||||
|
||||
const handleAuthPacket = function(msg) {
|
||||
Tokens.get(msg.auth).then(function(client) {
|
||||
if (client) {
|
||||
Users.get(client.user).then(function(user) {
|
||||
if (user) {
|
||||
self.user = user;
|
||||
log.audit({event: "comms.auth",user:self.user});
|
||||
completeConnection(msg, client.scope,msg.auth,true);
|
||||
} else {
|
||||
log.audit({event: "comms.auth.fail"});
|
||||
completeConnection(msg, null,null,false);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
Users.tokens(msg.auth).then(function(user) {
|
||||
if (user) {
|
||||
self.user = user;
|
||||
log.audit({event: "comms.auth",user:self.user});
|
||||
completeConnection(msg, user.permissions,msg.auth,true);
|
||||
} else {
|
||||
log.audit({event: "comms.auth.fail"});
|
||||
completeConnection(msg, null,null,false);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
const completeConnection = function(msg, userScope, session, sendAck) {
|
||||
try {
|
||||
if (!userScope || !Permissions.hasPermission(userScope,"status.read")) {
|
||||
ws.send(JSON.stringify({auth:"fail"}));
|
||||
ws.close();
|
||||
} else {
|
||||
pendingAuth = false;
|
||||
addActiveConnection(self);
|
||||
self.token = msg.auth;
|
||||
if (sendAck) {
|
||||
ws.send(JSON.stringify({auth:"ok"}));
|
||||
}
|
||||
}
|
||||
} catch(err) {
|
||||
console.log(err.stack);
|
||||
// Just in case the socket closes before we attempt
|
||||
// to send anything.
|
||||
}
|
||||
}
|
||||
ws.on('message', function(data,flags) {
|
||||
var msg = null;
|
||||
try {
|
||||
|
@ -86,68 +133,34 @@ function CommsConnection(ws, user) {
|
|||
return;
|
||||
}
|
||||
if (!pendingAuth) {
|
||||
if (msg.subscribe) {
|
||||
if (msg.auth) {
|
||||
handleAuthPacket(msg)
|
||||
} else if (msg.subscribe) {
|
||||
self.subscribe(msg.subscribe);
|
||||
// handleRemoteSubscription(ws,msg.subscribe);
|
||||
} else if (msg.topic) {
|
||||
runtimeAPI.comms.receive({
|
||||
user: self.user,
|
||||
client: self,
|
||||
topic: msg.topic,
|
||||
data: msg.data
|
||||
})
|
||||
}
|
||||
} else {
|
||||
var completeConnection = function(userScope,session,sendAck) {
|
||||
try {
|
||||
if (!userScope || !Permissions.hasPermission(userScope,"status.read")) {
|
||||
ws.send(JSON.stringify({auth:"fail"}));
|
||||
ws.close();
|
||||
} else {
|
||||
pendingAuth = false;
|
||||
addActiveConnection(self);
|
||||
self.token = msg.auth;
|
||||
if (sendAck) {
|
||||
ws.send(JSON.stringify({auth:"ok"}));
|
||||
}
|
||||
}
|
||||
} catch(err) {
|
||||
console.log(err.stack);
|
||||
// Just in case the socket closes before we attempt
|
||||
// to send anything.
|
||||
}
|
||||
}
|
||||
if (msg.auth) {
|
||||
Tokens.get(msg.auth).then(function(client) {
|
||||
if (client) {
|
||||
Users.get(client.user).then(function(user) {
|
||||
if (user) {
|
||||
self.user = user;
|
||||
log.audit({event: "comms.auth",user:self.user});
|
||||
completeConnection(client.scope,msg.auth,true);
|
||||
} else {
|
||||
log.audit({event: "comms.auth.fail"});
|
||||
completeConnection(null,null,false);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
Users.tokens(msg.auth).then(function(user) {
|
||||
if (user) {
|
||||
self.user = user;
|
||||
log.audit({event: "comms.auth",user:self.user});
|
||||
completeConnection(user.permissions,msg.auth,true);
|
||||
} else {
|
||||
log.audit({event: "comms.auth.fail"});
|
||||
completeConnection(null,null,false);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
handleAuthPacket(msg)
|
||||
} else {
|
||||
if (anonymousUser) {
|
||||
log.audit({event: "comms.auth",user:anonymousUser});
|
||||
self.user = anonymousUser;
|
||||
completeConnection(anonymousUser.permissions,null,false);
|
||||
completeConnection(msg, anonymousUser.permissions, null, false);
|
||||
//TODO: duplicated code - pull non-auth message handling out
|
||||
if (msg.subscribe) {
|
||||
self.subscribe(msg.subscribe);
|
||||
}
|
||||
} else {
|
||||
log.audit({event: "comms.auth.fail"});
|
||||
completeConnection(null,null,false);
|
||||
completeConnection(msg, null,null,false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,15 @@ RED.comms = (function() {
|
|||
var reconnectAttempts = 0;
|
||||
var active = false;
|
||||
|
||||
RED.events.on('login', function(username) {
|
||||
// User has logged in
|
||||
// Need to upgrade the connection to be authenticated
|
||||
if (ws && ws.readyState == 1) {
|
||||
const auth_tokens = RED.settings.get("auth-tokens");
|
||||
ws.send(JSON.stringify({auth:auth_tokens.access_token}))
|
||||
}
|
||||
})
|
||||
|
||||
function connectWS() {
|
||||
active = true;
|
||||
var wspath;
|
||||
|
@ -56,6 +65,7 @@ RED.comms = (function() {
|
|||
ws.send(JSON.stringify({subscribe:t}));
|
||||
}
|
||||
}
|
||||
emit('connect')
|
||||
}
|
||||
|
||||
ws = new WebSocket(wspath);
|
||||
|
@ -180,9 +190,53 @@ RED.comms = (function() {
|
|||
}
|
||||
}
|
||||
|
||||
function send(topic, msg) {
|
||||
if (ws && ws.readyState == 1) {
|
||||
ws.send(JSON.stringify({
|
||||
topic,
|
||||
data: msg
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
const eventHandlers = {};
|
||||
function on(evt,func) {
|
||||
eventHandlers[evt] = eventHandlers[evt]||[];
|
||||
eventHandlers[evt].push(func);
|
||||
}
|
||||
function off(evt,func) {
|
||||
const handler = eventHandlers[evt];
|
||||
if (handler) {
|
||||
for (let i=0;i<handler.length;i++) {
|
||||
if (handler[i] === func) {
|
||||
handler.splice(i,1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
function emit() {
|
||||
const evt = arguments[0]
|
||||
const args = Array.prototype.slice.call(arguments,1);
|
||||
if (eventHandlers[evt]) {
|
||||
let cpyHandlers = [...eventHandlers[evt]];
|
||||
for (let i=0;i<cpyHandlers.length;i++) {
|
||||
try {
|
||||
cpyHandlers[i].apply(null, args);
|
||||
} catch(err) {
|
||||
console.warn("RED.comms.emit error: ["+evt+"] "+(err.toString()));
|
||||
console.warn(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
connect: connectWS,
|
||||
subscribe: subscribe,
|
||||
unsubscribe:unsubscribe
|
||||
unsubscribe:unsubscribe,
|
||||
on,
|
||||
off,
|
||||
send
|
||||
}
|
||||
})();
|
||||
|
|
|
@ -153,10 +153,6 @@ RED.envVar = (function() {
|
|||
}
|
||||
|
||||
function init(done) {
|
||||
if (!RED.user.hasPermission("settings.write")) {
|
||||
RED.notify(RED._("user.errors.settings"),"error");
|
||||
return;
|
||||
}
|
||||
RED.userSettings.add({
|
||||
id:'envvar',
|
||||
title: RED._("env-var.environment"),
|
||||
|
|
|
@ -187,6 +187,7 @@ RED.user = (function() {
|
|||
}
|
||||
|
||||
function logout() {
|
||||
RED.events.emit('logout')
|
||||
var tokens = RED.settings.get("auth-tokens");
|
||||
var token = tokens?tokens.access_token:"";
|
||||
$.ajax({
|
||||
|
@ -225,6 +226,7 @@ RED.user = (function() {
|
|||
});
|
||||
}
|
||||
});
|
||||
$('<i class="fa fa-user"></i>').appendTo("#red-ui-header-button-user");
|
||||
} else {
|
||||
RED.menu.addItem("red-ui-header-button-user",{
|
||||
id:"usermenu-item-username",
|
||||
|
@ -237,6 +239,15 @@ RED.user = (function() {
|
|||
RED.user.logout();
|
||||
}
|
||||
});
|
||||
const userMenu = $("#red-ui-header-button-user")
|
||||
userMenu.empty()
|
||||
if (RED.settings.user.image) {
|
||||
$('<span class="user-profile"></span>').css({
|
||||
backgroundImage: "url("+RED.settings.user.image+")",
|
||||
}).appendTo(userMenu);
|
||||
} else {
|
||||
$('<i class="fa fa-user"></i>').appendTo(userMenu);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -247,14 +258,6 @@ RED.user = (function() {
|
|||
|
||||
var userMenu = $('<li><a id="red-ui-header-button-user" class="button hide" href="#"></a></li>')
|
||||
.prependTo(".red-ui-header-toolbar");
|
||||
if (RED.settings.user.image) {
|
||||
$('<span class="user-profile"></span>').css({
|
||||
backgroundImage: "url("+RED.settings.user.image+")",
|
||||
}).appendTo(userMenu.find("a"));
|
||||
} else {
|
||||
$('<i class="fa fa-user"></i>').appendTo(userMenu.find("a"));
|
||||
}
|
||||
|
||||
RED.menu.init({id:"red-ui-header-button-user",
|
||||
options: []
|
||||
});
|
||||
|
|
|
@ -63,25 +63,29 @@
|
|||
}
|
||||
|
||||
.red-ui-header-toolbar {
|
||||
display: flex;
|
||||
align-items: stretch;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
list-style: none;
|
||||
float: right;
|
||||
|
||||
> li {
|
||||
display: inline-block;
|
||||
display: inline-flex;
|
||||
align-items: stretch;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
position: relative;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
.button {
|
||||
height: 100%;
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
min-width: 20px;
|
||||
text-align: center;
|
||||
line-height: 40px;
|
||||
display: inline-block;
|
||||
font-size: 20px;
|
||||
padding: 0px 12px;
|
||||
text-decoration: none;
|
||||
|
@ -271,13 +275,13 @@
|
|||
color: var(--red-ui-header-menu-heading-color);
|
||||
}
|
||||
|
||||
#red-ui-header-button-user .user-profile {
|
||||
.user-profile {
|
||||
background-position: center center;
|
||||
background-repeat: no-repeat;
|
||||
background-size: contain;
|
||||
display: inline-block;
|
||||
width: 40px;
|
||||
height: 35px;
|
||||
width: 30px;
|
||||
height: 30px;
|
||||
vertical-align: middle;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ var connections = [];
|
|||
const events = require("@node-red/util").events;
|
||||
|
||||
function handleCommsEvent(event) {
|
||||
publish(event.topic,event.data,event.retain);
|
||||
publish(event.topic,event.data,event.retain,event.session,event.excludeSession);
|
||||
}
|
||||
function handleStatusEvent(event) {
|
||||
if (!event.status) {
|
||||
|
@ -74,13 +74,17 @@ function handleEventLog(event) {
|
|||
publish("event-log/"+event.id,event.payload||{});
|
||||
}
|
||||
|
||||
function publish(topic,data,retain) {
|
||||
function publish(topic, data, retain, session, excludeSession) {
|
||||
if (retain) {
|
||||
retained[topic] = data;
|
||||
} else {
|
||||
delete retained[topic];
|
||||
}
|
||||
connections.forEach(connection => connection.send(topic,data))
|
||||
connections.forEach(connection => {
|
||||
if ((!session || connection.session === session) && (!excludeSession || connection.session !== excludeSession)) {
|
||||
connection.send(topic,data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
@ -109,6 +113,10 @@ var api = module.exports = {
|
|||
*/
|
||||
addConnection: async function(opts) {
|
||||
connections.push(opts.client);
|
||||
events.emit('comms:connection-added', {
|
||||
session: opts.client.session,
|
||||
user: opts.client.user
|
||||
})
|
||||
},
|
||||
|
||||
/**
|
||||
|
@ -126,6 +134,9 @@ var api = module.exports = {
|
|||
break;
|
||||
}
|
||||
}
|
||||
events.emit('comms:connection-removed', {
|
||||
session: opts.client.session
|
||||
})
|
||||
},
|
||||
|
||||
/**
|
||||
|
@ -157,5 +168,23 @@ var api = module.exports = {
|
|||
* @return {Promise<Object>} - resolves when complete
|
||||
* @memberof @node-red/runtime_comms
|
||||
*/
|
||||
unsubscribe: async function(opts) {}
|
||||
unsubscribe: async function(opts) {},
|
||||
|
||||
/**
|
||||
* @param {Object} opts
|
||||
* @param {User} opts.user - the user calling the api
|
||||
* @param {CommsConnection} opts.client - the client connection
|
||||
* @param {String} opts.topic - the message topic
|
||||
* @param {String} opts.data - the message data
|
||||
* @return {Promise<Object>} - resolves when complete
|
||||
*/
|
||||
receive: async function (opts) {
|
||||
if (opts.topic) {
|
||||
events.emit('comms:message:' + opts.topic, {
|
||||
session: opts.client.session,
|
||||
user: opts.user,
|
||||
data: opts.data
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue