WS client: Add convenience method to connect to the event WebSocket (#2993)

Also improve the WS client code in general.

Signed-off-by: Florian Hotze <dev@florianhotze.com>
pull/2997/head
Florian Hotze 2025-01-09 00:28:10 +01:00 committed by GitHub
parent ea5121d77e
commit a9954c3ee9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 99 additions and 26 deletions

View File

@ -1,11 +1,55 @@
import Framework7 from 'framework7'
import { getAccessToken } from './auth'
const HEARTBEAT_MESSAGE = `{
"type": "WebSocketEvent",
"topic": "openhab/websocket/heartbeat",
"payload": "PING",
"source": "WebSocketTestInstance"
}`
/**
* Build a heartbeat message for the given WebSocket client id.
* @param {string} id WS client id
* @return {string}
*/
function heartbeatMessage (id) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/heartbeat',
payload: 'PING',
source: id
})
}
function arrayToSerialisedString (arr) {
return '[' + arr.map(e => '"' + e + '"').join(',') + ']'
}
/**
* Build a event source filter message for the given WebSocket client id and the given sources.
* Source filters can be used to remove events from a specific source from the event WS.
* @param {string} id WS client id
* @param {string[]} sources event sources to filter out
* @return {string}
*/
function eventSourceFilterMessage (id, sources) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/source',
payload: arrayToSerialisedString(sources),
source: id
})
}
/**
* Build an event type filter message for the given WebSocket client id and the given event types.
* Event type filters can be used to select a sub-set of all available events for the event WS.
* @param {string} id WS client id
* @param types
* @return {string}
*/
function eventTypeFilterMessage (id, types) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/type',
payload: arrayToSerialisedString(types),
source: id
})
}
const openWSConnections = []
@ -14,12 +58,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h
// Create a new WebSocket connection
const socket = new WebSocket(path, [`org.openhab.ws.accessToken.base64.${encodedToken}`, 'org.openhab.ws.protocol.default'])
socket.id = 'ui-' + Framework7.utils.id()
// Handle WebSocket connection opened
socket.onopen = (event) => {
socket.setKeepalive(heartbeatInterval)
if (readyCallback) {
readyCallback(event)
}
if (readyCallback) readyCallback(event)
}
// Handle WebSocket message received
@ -42,15 +86,12 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h
}
// WebSocket keep alive
socket.setKeepalive = (seconds = 5) => {
socket.setKeepalive = (seconds) => {
if (!heartbeatCallback) return
console.debug('Setting keepalive interval seconds', seconds)
socket.clearKeepalive()
socket.keepaliveTimer = setInterval(() => {
if (heartbeatCallback) {
heartbeatCallback()
} else {
socket.send(HEARTBEAT_MESSAGE)
}
heartbeatCallback()
}, seconds * 1000)
}
@ -69,21 +110,53 @@ function newWSConnection (path, messageCallback, readyCallback, errorCallback, h
export default {
/**
* Connect to the websocket at the given path.
* Connect to the WebSocket at the given path.
* This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback.
*
* @param {string} path path to connect to, e.g. `/ws`
* @param {fn} messageCallback
* @param {fn} [readyCallback=null]
* @param {fn} [errorCallback=null]
* @param {fn} [heartbeatCallback=null] heartbeat callback to use instead of the default PING/PONG
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} heartbeatCallback heartbeat callback
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @param {number} [heartbeatInterval=5] heartbeat interval in seconds
* @return {WebSocket}
*/
connect (path, messageCallback, readyCallback = null, errorCallback = null, heartbeatCallback = null, heartbeatInterval = 5) {
connect (path, messageCallback, heartbeatCallback, readyCallback, errorCallback, heartbeatInterval = 5) {
return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval)
},
/**
* Close the given websocket connection.
* Connect to the event WebSocket, which provides direct access to the EventBus.
* This convenience method takes care of the keepalive mechanism as well as filter setup.
*
* @param {string[]} types array of event types to filter by, if empty all events are received
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @return {WebSocket}
*/
events (types, messageCallback, readyCallback, errorCallback) {
let socket
const extendedMessageCallback = (event) => {
if (event.type === 'WebSocketEvent') return
messageCallback(event)
}
const extendedReadyCallback = (event) => {
socket.send(eventSourceFilterMessage(socket.id, [socket.id]))
if (Array.isArray(types) && types.length > 0) socket.send(eventTypeFilterMessage(socket.id, types))
if (readyCallback) readyCallback(event)
}
const heartbeatCallback = () => {
socket.send(heartbeatMessage(socket.id))
}
socket = this.connect('/ws/events', extendedMessageCallback, heartbeatCallback, extendedReadyCallback, errorCallback)
return socket
},
/**
* Close the given WebSocket connection.
*
* @param {WebSocket} socket
* @param {fn} [callback=null] callback to execute on connection close
@ -100,7 +173,7 @@ export default {
callback(event)
}
}
socket.close()
socket.clearKeepalive()
socket.close()
}
}

View File

@ -157,7 +157,7 @@ export default {
this.sseEvents = []
},
startWS () {
this.wsClient = this.$oh.ws.connect('/ws/events', (event) => {
this.wsClient = this.$oh.ws.events([], (event) => {
event.time = new Date()
this.wsEvents.unshift(...[event])
this.wsEvents.splice(5)

View File

@ -425,11 +425,11 @@ export default {
this.addLogEntry(event)
}
const keepaliveCallback = () => {
const heartbeatCallback = () => {
this.socket.send('[]')
}
this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, readyCallback, null, keepaliveCallback, 9)
this.socket = this.$oh.ws.connect('/ws/logs', messageCallback, heartbeatCallback, readyCallback, null, 9)
// TEMP
// for (let i = 0; i < 1980; i++) {