From a9954c3ee92509b08ccf5e96a370b48309b9dad9 Mon Sep 17 00:00:00 2001 From: Florian Hotze Date: Thu, 9 Jan 2025 00:28:10 +0100 Subject: [PATCH] WS client: Add convenience method to connect to the event WebSocket (#2993) Also improve the WS client code in general. Signed-off-by: Florian Hotze --- .../org.openhab.ui/web/src/js/openhab/ws.js | 119 ++++++++++++++---- .../src/pages/developer/developer-tools.vue | 2 +- .../web/src/pages/developer/log-viewer.vue | 4 +- 3 files changed, 99 insertions(+), 26 deletions(-) diff --git a/bundles/org.openhab.ui/web/src/js/openhab/ws.js b/bundles/org.openhab.ui/web/src/js/openhab/ws.js index a39d5e3c0..ec4270f7b 100644 --- a/bundles/org.openhab.ui/web/src/js/openhab/ws.js +++ b/bundles/org.openhab.ui/web/src/js/openhab/ws.js @@ -1,11 +1,55 @@ +import Framework7 from 'framework7' import { getAccessToken } from './auth' -const HEARTBEAT_MESSAGE = `{ - "type": "WebSocketEvent", - "topic": "openhab/websocket/heartbeat", - "payload": "PING", - "source": "WebSocketTestInstance" -}` +/** + * Build a heartbeat message for the given WebSocket client id. + * @param {string} id WS client id + * @return {string} + */ +function heartbeatMessage (id) { + return JSON.stringify({ + type: 'WebSocketEvent', + topic: 'openhab/websocket/heartbeat', + payload: 'PING', + source: id + }) +} + +function arrayToSerialisedString (arr) { + return '[' + arr.map(e => '"' + e + '"').join(',') + ']' +} + +/** + * Build a event source filter message for the given WebSocket client id and the given sources. + * Source filters can be used to remove events from a specific source from the event WS. + * @param {string} id WS client id + * @param {string[]} sources event sources to filter out + * @return {string} + */ +function eventSourceFilterMessage (id, sources) { + return JSON.stringify({ + type: 'WebSocketEvent', + topic: 'openhab/websocket/filter/source', + payload: arrayToSerialisedString(sources), + source: id + }) +} + +/** + * Build an event type filter message for the given WebSocket client id and the given event types. + * Event type filters can be used to select a sub-set of all available events for the event WS. + * @param {string} id WS client id + * @param types + * @return {string} + */ +function eventTypeFilterMessage (id, types) { + return JSON.stringify({ + type: 'WebSocketEvent', + topic: 'openhab/websocket/filter/type', + payload: arrayToSerialisedString(types), + source: id + }) +} const openWSConnections = [] @@ -14,12 +58,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h // Create a new WebSocket connection const socket = new WebSocket(path, [`org.openhab.ws.accessToken.base64.${encodedToken}`, 'org.openhab.ws.protocol.default']) + socket.id = 'ui-' + Framework7.utils.id() + // Handle WebSocket connection opened socket.onopen = (event) => { socket.setKeepalive(heartbeatInterval) - if (readyCallback) { - readyCallback(event) - } + if (readyCallback) readyCallback(event) } // Handle WebSocket message received @@ -42,15 +86,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h } // WebSocket keep alive - socket.setKeepalive = (seconds = 5) => { + socket.setKeepalive = (seconds) => { + if (!heartbeatCallback) return console.debug('Setting keepalive interval seconds', seconds) socket.clearKeepalive() socket.keepaliveTimer = setInterval(() => { - if (heartbeatCallback) { - heartbeatCallback() - } else { - socket.send(HEARTBEAT_MESSAGE) - } + heartbeatCallback() }, seconds * 1000) } @@ -69,21 +110,53 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h export default { /** - * Connect to the websocket at the given path. + * Connect to the WebSocket at the given path. + * This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback. * * @param {string} path path to connect to, e.g. `/ws` - * @param {fn} messageCallback - * @param {fn} [readyCallback=null] - * @param {fn} [errorCallback=null] - * @param {fn} [heartbeatCallback=null] heartbeat callback to use instead of the default PING/PONG + * @param {fn} messageCallback message callback to handle incoming messages + * @param {fn} heartbeatCallback heartbeat callback + * @param {fn} [readyCallback] ready callback + * @param {fn} [errorCallback] error callback * @param {number} [heartbeatInterval=5] heartbeat interval in seconds * @return {WebSocket} */ - connect (path, messageCallback, readyCallback = null, errorCallback = null, heartbeatCallback = null, heartbeatInterval = 5) { + connect (path, messageCallback, heartbeatCallback, readyCallback, errorCallback, heartbeatInterval = 5) { return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval) }, /** - * Close the given websocket connection. + * Connect to the event WebSocket, which provides direct access to the EventBus. + * This convenience method takes care of the keepalive mechanism as well as filter setup. + * + * @param {string[]} types array of event types to filter by, if empty all events are received + * @param {fn} messageCallback message callback to handle incoming messages + * @param {fn} [readyCallback] ready callback + * @param {fn} [errorCallback] error callback + * @return {WebSocket} + */ + events (types, messageCallback, readyCallback, errorCallback) { + let socket + + const extendedMessageCallback = (event) => { + if (event.type === 'WebSocketEvent') return + messageCallback(event) + } + + const extendedReadyCallback = (event) => { + socket.send(eventSourceFilterMessage(socket.id, [socket.id])) + if (Array.isArray(types) && types.length > 0) socket.send(eventTypeFilterMessage(socket.id, types)) + if (readyCallback) readyCallback(event) + } + + const heartbeatCallback = () => { + socket.send(heartbeatMessage(socket.id)) + } + + socket = this.connect('/ws/events', extendedMessageCallback, heartbeatCallback, extendedReadyCallback, errorCallback) + return socket + }, + /** + * Close the given WebSocket connection. * * @param {WebSocket} socket * @param {fn} [callback=null] callback to execute on connection close @@ -100,7 +173,7 @@ export default { callback(event) } } - socket.close() socket.clearKeepalive() + socket.close() } } diff --git a/bundles/org.openhab.ui/web/src/pages/developer/developer-tools.vue b/bundles/org.openhab.ui/web/src/pages/developer/developer-tools.vue index 97604fa19..78c81f42f 100644 --- a/bundles/org.openhab.ui/web/src/pages/developer/developer-tools.vue +++ b/bundles/org.openhab.ui/web/src/pages/developer/developer-tools.vue @@ -157,7 +157,7 @@ export default { this.sseEvents = [] }, startWS () { - this.wsClient = this.$oh.ws.connect('/ws/events', (event) => { + this.wsClient = this.$oh.ws.events([], (event) => { event.time = new Date() this.wsEvents.unshift(...[event]) this.wsEvents.splice(5) diff --git a/bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue b/bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue index b3c98292f..e630449ff 100644 --- a/bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue +++ b/bundles/org.openhab.ui/web/src/pages/developer/log-viewer.vue @@ -425,11 +425,11 @@ export default { this.addLogEntry(event) } - const keepaliveCallback = () => { + const heartbeatCallback = () => { this.socket.send('[]') } - this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, readyCallback, null, keepaliveCallback, 9) + this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, heartbeatCallback, readyCallback, null, 9) // TEMP // for (let i = 0; i < 1980; i++) {