Refactor `$oh` WS and SSE utils to TypeScript (#3414)

Signed-off-by: Florian Hotze <dev@florianhotze.com>
pull/3368/head
Florian Hotze 2025-10-31 17:43:28 +01:00 committed by GitHub
parent 5b2ad7ee5c
commit 295963e99b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 520 additions and 368 deletions

View File

@ -50,7 +50,7 @@
"esbuild": "^0.25.6",
"escaper": "^3.0.6",
"eslint-linter-browserify": "^9.33.0",
"event-source-polyfill": "^1.0.22",
"event-source-polyfill": "^1.0.31",
"fast-deep-equal": "^3.1.3",
"framework7": "^7.1.5",
"framework7-icons": "^5.0.5",
@ -100,6 +100,7 @@
"@intlify/unplugin-vue-i18n": "^6.0.8",
"@tsconfig/node22": "^22.0.2",
"@types/css-modules": "^1.0.5",
"@types/event-source-polyfill": "^1.0.5",
"@types/node": "^24.0.14",
"@typescript-eslint/eslint-plugin": "^8.39.0",
"@typescript-eslint/parser": "^8.39.0",
@ -4366,6 +4367,13 @@
"dev": true,
"license": "MIT"
},
"node_modules/@types/event-source-polyfill": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/@types/event-source-polyfill/-/event-source-polyfill-1.0.5.tgz",
"integrity": "sha512-iaiDuDI2aIFft7XkcwMzDWLqo7LVDixd2sR6B4wxJut9xcp/Ev9bO4EFg4rm6S9QxATLBj5OPxdeocgmhjwKaw==",
"dev": true,
"license": "MIT"
},
"node_modules/@types/events": {
"version": "3.0.3",
"license": "MIT"
@ -8053,6 +8061,8 @@
},
"node_modules/event-source-polyfill": {
"version": "1.0.31",
"resolved": "https://registry.npmjs.org/event-source-polyfill/-/event-source-polyfill-1.0.31.tgz",
"integrity": "sha512-4IJSItgS/41IxN5UVAVuAyczwZF7ZIEsM1XAoUzIHA6A+xzusEZUutdXz2Nr+MQPLxfTiCvqE79/C8HT8fKFvA==",
"license": "MIT"
},
"node_modules/events": {

View File

@ -94,7 +94,7 @@
"esbuild": "^0.25.6",
"escaper": "^3.0.6",
"eslint-linter-browserify": "^9.33.0",
"event-source-polyfill": "^1.0.22",
"event-source-polyfill": "^1.0.31",
"fast-deep-equal": "^3.1.3",
"framework7": "^7.1.5",
"framework7-icons": "^5.0.5",
@ -144,6 +144,7 @@
"@intlify/unplugin-vue-i18n": "^6.0.8",
"@tsconfig/node22": "^22.0.2",
"@types/css-modules": "^1.0.5",
"@types/event-source-polyfill": "^1.0.5",
"@types/node": "^24.0.14",
"@typescript-eslint/eslint-plugin": "^8.39.0",
"@typescript-eslint/parser": "^8.39.0",

View File

@ -1,21 +0,0 @@
declare namespace _default {
function connect(
path: string,
topics: null,
messageCallback: (evt: MessageEvent) => void,
errorCallback: () => void,
heartbeatCallback: (tbd: boolean) => void
): any
function connectStateTracker(
path: string,
readyCallback: (data: any) => void,
updateCallback: (data: any) => void,
errorCallback: () => void,
heartbeatCallback: (healthy: boolean) => void
): any
function close(
client: EventSource & { clearKeepalive?: () => void },
callback?: (() => void) | null
): void
}
export default _default

View File

@ -1,115 +0,0 @@
import { EventSourcePolyfill, NativeEventSource } from 'event-source-polyfill'
import { getAccessToken, getTokenInCustomHeader, getBasicCredentials, getRequireToken } from './auth'
let openSSEClients = []
function newSSEConnection (path, readyCallback, messageCallback, errorCallback, heartbeatCallback) {
let eventSource
let reconnectSeconds = 1
const headers = {}
if (getAccessToken() && getRequireToken()) {
if (getTokenInCustomHeader()) {
headers['X-OPENHAB-TOKEN'] = getAccessToken()
} else {
headers['Authorization'] = 'Bearer ' + getAccessToken()
}
}
if (getBasicCredentials()) {
const creds = getBasicCredentials()
headers['Authorization'] = 'Basic ' + btoa(creds.id + ':' + creds.password)
}
function initEventSource () {
if (Object.keys(headers).length > 0) {
eventSource = new EventSourcePolyfill(path, { headers })
} else {
eventSource = new NativeEventSource(path)
}
eventSource.addEventListener('ready', (e) => {
readyCallback(e.data)
})
eventSource.addEventListener('alive', (e) => {
let evt = JSON.parse(e.data)
eventSource.setKeepalive(evt.interval)
if (heartbeatCallback) {
heartbeatCallback(true)
}
})
eventSource.onmessage = (event) => {
let evt = JSON.parse(event.data)
messageCallback(evt)
}
eventSource.onopen = (event) => {
reconnectSeconds = 1
}
eventSource.onerror = () => {
console.warn('SSE error')
eventSource.clearKeepalive()
if (errorCallback) {
errorCallback()
}
if (eventSource.readyState === 2) {
console.log('%c=!= Event source connection broken...', 'background-color: red; color: white')
console.debug(`Attempting SSE reconnection in ${reconnectSeconds} seconds...`)
setTimeout(() => {
if (eventSource.readyState === 2) {
reconnectSeconds = reconnectSeconds * 2
if (reconnectSeconds > 10) reconnectSeconds = 10
eventSource.close()
eventSource.clearKeepalive()
eventSource = initEventSource()
}
}, reconnectSeconds * 1000)
}
}
eventSource.setKeepalive = (seconds = 10) => {
console.debug('Setting keepalive interval seconds', seconds)
eventSource.clearKeepalive()
eventSource.keepaliveTimer = setTimeout(() => {
console.warn('SSE timeout error')
if (heartbeatCallback) {
heartbeatCallback(false)
}
}, (seconds + 2) * 1000)
}
eventSource.clearKeepalive = () => {
if (eventSource.keepaliveTimer) clearTimeout(eventSource.keepaliveTimer)
delete eventSource.keepaliveTimer
}
return eventSource
}
eventSource = initEventSource()
openSSEClients.push(eventSource)
console.debug(`new SSE connection: ${eventSource.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
return eventSource
}
export default {
connect (path, topics, messageCallback, errorCallback, heartbeatCallback) {
return newSSEConnection(path, null, messageCallback, errorCallback, heartbeatCallback)
},
connectStateTracker (path, readyCallback, updateCallback, errorCallback, heartbeatCallback) {
return newSSEConnection(path, readyCallback, updateCallback, errorCallback, heartbeatCallback)
},
close (client, callback) {
if (!client) return
if (openSSEClients.indexOf(client) >= 0) {
openSSEClients.splice(openSSEClients.indexOf(client), 1)
}
console.debug(`SSE connection closed: ${client.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
client.close()
client.clearKeepalive()
}
}

View File

@ -0,0 +1,216 @@
import { EventSourcePolyfill, NativeEventSource } from 'event-source-polyfill'
import { getAccessToken, getTokenInCustomHeader, getBasicCredentials, getRequireToken } from './auth'
/**
* An EventSource that is extended with a keepalive/heartbeat mechanism.
*/
interface KeepaliveEventSource extends EventSource {
keepaliveTimer?: number;
setKeepalive: (seconds?: number) => void;
clearKeepalive: () => void;
}
let openSSEClients: KeepaliveEventSource[] = []
type ReadyCallback = (data: string) => void;
type MessageCallback = (data: any) => void;
type ErrorCallback = () => void;
type HeartbeatCallback = (isAlive: boolean) => void;
/**
* Creates and initializes a new Server-Sent Events (SSE) connection.
*/
function newSSEConnection (
path: string,
readyCallback: ReadyCallback | undefined,
messageCallback: MessageCallback,
errorCallback: ErrorCallback,
heartbeatCallback: HeartbeatCallback | undefined
): KeepaliveEventSource {
let eventSource: KeepaliveEventSource
let reconnectSeconds = 1
const headers: Record<string, string> = {}
// Setup headers for authentication
const accessToken = getAccessToken()
if (accessToken && getRequireToken()) {
if (getTokenInCustomHeader()) {
headers['X-OPENHAB-TOKEN'] = accessToken
} else {
headers['Authorization'] = 'Bearer ' + accessToken
}
}
const basicCreds = getBasicCredentials()
if (basicCreds) {
headers['Authorization'] = 'Basic ' + btoa(basicCreds.id + ':' + basicCreds.password)
}
// Core initialization logic
function initEventSource (): KeepaliveEventSource {
let newEventSource: EventSource
if (Object.keys(headers).length > 0) {
// Use EventSourcePolyfill when headers are needed
newEventSource = new EventSourcePolyfill(path, { headers })
} else {
// Use NativeEventSource when no custom headers are needed
newEventSource = new NativeEventSource(path)
}
// Type assertion to treat the EventSource as our extended interface,
// allowing us to add custom methods/properties.
const es = newEventSource as KeepaliveEventSource
// Add keepalive/heartbeat mechanism
es.setKeepalive = (seconds: number = 10) => {
console.debug('Setting keepalive interval seconds', seconds)
es.clearKeepalive()
es.keepaliveTimer = setTimeout(() => {
console.warn('SSE timeout error')
if (heartbeatCallback) {
heartbeatCallback(false)
}
}, (seconds + 2) * 1000)
}
es.clearKeepalive = () => {
if (es.keepaliveTimer) clearTimeout(es.keepaliveTimer)
delete es.keepaliveTimer
}
// Event handlers
if (readyCallback) {
es.addEventListener('ready', (e: MessageEvent) => {
readyCallback(e.data)
})
}
es.addEventListener('alive', (e: MessageEvent) => {
// Type 'e.data' is string, parse to get the object with 'interval'
let evt: { interval: number }
try {
evt = JSON.parse(e.data)
es.setKeepalive(evt.interval)
} catch (error) {
console.error('Failed to parse "alive" message data:', error)
if (heartbeatCallback) heartbeatCallback(false)
return
}
if (heartbeatCallback) heartbeatCallback(true)
})
es.onmessage = (event: MessageEvent) => {
let evt: any
try {
evt = JSON.parse(event.data)
} catch (error) {
console.error('Failed to parse SSE message data:', error)
return
}
messageCallback(evt)
}
es.onopen = (event: Event) => {
reconnectSeconds = 1 // Reset reconnection delay on successful open
}
es.onerror = (event: Event) => {
console.warn('SSE error')
es.clearKeepalive()
if (errorCallback) {
errorCallback()
}
// Handle reconnection logic
// Note: readyState === 2 is defined as CLOSED in EventSource spec
if (es.readyState === 2) {
console.log('%c=!= Event source connection broken...', 'background-color: red; color: white')
console.debug(`Attempting SSE reconnection in ${reconnectSeconds} seconds...`)
setTimeout(() => {
// Check state again before reconnecting
if (es.readyState === 2) {
reconnectSeconds = reconnectSeconds * 2
if (reconnectSeconds > 10) reconnectSeconds = 10
// Close the current broken connection
es.close()
es.clearKeepalive()
// Reinitialize the connection
eventSource = initEventSource() // Reassign the outer scope's eventSource
}
}, reconnectSeconds * 1000)
}
}
return es
}
eventSource = initEventSource()
openSSEClients.push(eventSource)
console.debug(`new SSE connection: ${eventSource.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
return eventSource
}
const SSEService = {
/**
* Connect to a generic SSE endpoint.
* @param path path to connect to, e.g. `/rest/events`
* @param topics array of event topics)
* @param messageCallback callback to handle incoming messages
* @param errorCallback error callback
* @param heartbeatCallback heartbeat callback
*/
connect (
path: string,
topics: string[],
messageCallback: MessageCallback,
errorCallback: ErrorCallback,
heartbeatCallback?: HeartbeatCallback
): KeepaliveEventSource {
return newSSEConnection(path, undefined, messageCallback, errorCallback, heartbeatCallback)
},
/**
* Connect to the state tracking SSE endpoint (e.g., for item states).
* @param path path to connect to
* @param readyCallback ready callback
* @param updateCallback callback to handle state updates (messages)
* @param errorCallback error callback
* @param [heartbeatCallback] heartbeat callback
*/
connectStateTracker (
path: string,
readyCallback: ReadyCallback,
updateCallback: MessageCallback,
errorCallback: ErrorCallback,
heartbeatCallback?: HeartbeatCallback
): KeepaliveEventSource {
return newSSEConnection(path, readyCallback, updateCallback, errorCallback, heartbeatCallback)
},
/**
* Close the given SSE connection.
* @param es the SSE connection to close
*/
close (es: EventSource): void {
if (!es) return
const keepaliveEventSource = es as KeepaliveEventSource
const index = openSSEClients.indexOf(keepaliveEventSource)
if (index >= 0) {
openSSEClients.splice(index, 1)
}
console.debug(`SSE connection closed: ${keepaliveEventSource.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
keepaliveEventSource.clearKeepalive()
keepaliveEventSource.close()
}
}
export default SSEService

View File

@ -1,34 +0,0 @@
declare namespace _default {
/**
* Connect to the WebSocket at the given path.
* This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback.
*
* @param {string} path path to connect to, e.g. `/ws`
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} heartbeatCallback heartbeat callback
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @param {number} [heartbeatInterval=5] heartbeat interval in seconds
* @return {WebSocket}
*/
function connect(path: string, messageCallback: (event: object) => void, heartbeatCallback: () => void, readyCallback?: (event: object) => void, errorCallback?: (event: object) => void, heartbeatInterval?: number): WebSocket;
/**
* Connect to the event WebSocket, which provides direct access to the EventBus.
* This convenience method takes care of the keepalive mechanism as well as filter setup.
*
* @param {string[]} topics array of event topics to filter by, if empty all events are received
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @return {WebSocket}
*/
function events(topics: string[], messageCallback: (event: object) => void, readyCallback?: (event: object) => void, errorCallback?: (event: object) => void): WebSocket;
/**
* Close the given WebSocket connection.
*
* @param {WebSocket} socket
* @param {fn} [callback=null] callback to execute on connection close
*/
function close(socket: WebSocket, callback?: (event: object) => void): void;
}
export default _default

View File

@ -1,195 +0,0 @@
import { f7 } from 'framework7-vue'
import { getAccessToken } from './auth'
/**
* Build a heartbeat message for the given WebSocket client id.
* @param {string} id WS client id
* @return {string}
*/
function heartbeatMessage (id) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/heartbeat',
payload: 'PING',
source: id
})
}
function arrayToSerialisedString (arr) {
return '[' + arr.map((e) => '"' + e + '"').join(',') + ']'
}
/**
* Build a event source filter message for the given WebSocket client id and the given sources.
* Source filters can be used to remove events from a specific source from the event WS.
* @param {string} id WS client id
* @param {string[]} sources event sources to exclude
* @return {string}
*/
function eventSourceFilterMessage (id, sources) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/source',
payload: arrayToSerialisedString(sources),
source: id
})
}
/**
* Build an event type filter message for the given WebSocket client id and the given event types.
* Event type filters can be used to select a sub-set of all available events for the event WS.
* @param {string} id WS client id
* @param {string[]} types event types to include
* @return {string}
*/
function eventTypeFilterMessage (id, types) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/type',
payload: arrayToSerialisedString(types),
source: id
})
}
/**
* Build an event topic filter message for the given WebSocket client id and the given event topics.
* Event topic filters can be used to select a sub-set of all available events for the event WS.
* @param {string} id WS client id
* @param {string[]} topics event topics to include
* @returns {string}
*/
function eventTopicFilterMesssage (id, topics) {
return JSON.stringify({
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/topic',
payload: arrayToSerialisedString(topics),
source: id
})
}
const openWSConnections = []
function newWSConnection (path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval) {
const encodedToken = btoa(getAccessToken()).replace(/=*$/, '')
// Create a new WebSocket connection
const socket = new WebSocket(path, [`org.openhab.ws.accessToken.base64.${encodedToken}`, 'org.openhab.ws.protocol.default'])
socket.id = 'ui-' + f7.utils.id()
// Handle WebSocket connection opened
socket.onopen = (event) => {
socket.setKeepalive(heartbeatInterval)
if (readyCallback) readyCallback(event)
}
// Handle WebSocket message received
socket.onmessage = (event) => {
let evt = event.data
try {
evt = JSON.parse(event.data)
} catch (e) {
console.error('Error while parsing message', e)
}
messageCallback(evt)
}
// Handle WebSocket error
socket.onerror = (event) => {
console.error('WebSocket error', event)
if (errorCallback) {
errorCallback(event)
}
}
// WebSocket keep alive
socket.setKeepalive = (seconds) => {
if (!heartbeatCallback) return
console.debug('Setting keepalive interval seconds', seconds)
socket.clearKeepalive()
socket.keepaliveTimer = setInterval(() => {
heartbeatCallback()
}, seconds * 1000)
}
socket.clearKeepalive = () => {
if (socket.keepaliveTimer) clearInterval(socket.keepaliveTimer)
delete socket.keepaliveTimer
}
// Add the new WebSocket connection to the list
openWSConnections.push(socket)
console.debug(`new WS connection: ${socket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)
return socket
}
export default {
/**
* Connect to the WebSocket at the given path.
* This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback.
*
* @param {string} path path to connect to, e.g. `/ws`
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} heartbeatCallback heartbeat callback
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @param {number} [heartbeatInterval=5] heartbeat interval in seconds
* @return {WebSocket}
*/
connect (path, messageCallback, heartbeatCallback, readyCallback, errorCallback, heartbeatInterval = 5) {
return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval)
},
/**
* Connect to the event WebSocket, which provides direct access to the EventBus.
* This convenience method takes care of the keepalive mechanism as well as filter setup.
*
* @param {string[]} topics array of event topics to filter by, if empty all events are received
* @param {fn} messageCallback message callback to handle incoming messages
* @param {fn} [readyCallback] ready callback
* @param {fn} [errorCallback] error callback
* @return {WebSocket}
*/
events (topics, messageCallback, readyCallback, errorCallback) {
let socket
const extendedMessageCallback = (event) => {
if (event.type === 'WebSocketEvent') return
messageCallback(event)
}
const extendedReadyCallback = (event) => {
socket.send(eventSourceFilterMessage(socket.id, [socket.id]))
if (Array.isArray(topics) && topics.length > 0) socket.send(eventTopicFilterMesssage(socket.id, topics))
if (readyCallback) readyCallback(event)
}
const heartbeatCallback = () => {
socket.send(heartbeatMessage(socket.id))
}
socket = this.connect('/ws/events', extendedMessageCallback, heartbeatCallback, extendedReadyCallback, errorCallback)
return socket
},
/**
* Close the given WebSocket connection.
*
* @param {WebSocket} socket
* @param {fn} [callback=null] callback to execute on connection close
*/
close (socket, callback = null) {
if (!socket) return
if (openWSConnections.indexOf(socket) >= 0) {
openWSConnections.splice(openWSConnections.indexOf(socket), 1)
}
console.debug(`WS connection closed: ${socket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)
socket.onclose = (event) => {
if (callback) {
callback(event)
}
}
socket.clearKeepalive()
socket.close()
}
}

View File

@ -0,0 +1,290 @@
import { f7 } from 'framework7-vue'
import { getAccessToken } from './auth'
/**
* A message that is sent over the openHAB WebSocket.
* Interface for the structure of the message payload sent over the WebSocket.
*/
interface WebSocketMessage {
type: string;
topic: string;
payload: string;
source: string;
}
/**
* A WebSocket that is extended with a keepalive/heartbeat mechanism.
*/
interface KeepaliveWebSocket extends WebSocket {
id: string;
keepaliveTimer?: number;
setKeepalive: (seconds: number) => void;
clearKeepalive: () => void;
}
/**
* Build a heartbeat message for the given WebSocket client id.
* @param {string} id WS client id
* @return {string}
*/
function heartbeatMessage (id: string): string {
const message: WebSocketMessage = {
type: 'WebSocketEvent',
topic: 'openhab/websocket/heartbeat',
payload: 'PING',
source: id
}
return JSON.stringify(message)
}
/**
* Serializes an array of strings into a string representation for a JSON payload: `["a","b"]`.
* @param {string[]} arr array of strings to serialize
* @returns {string} serialized string
*/
function arrayToSerialisedString (arr: string[]): string {
return '[' + arr.map((e) => '"' + e + '"').join(',') + ']'
}
/**
* Build an event source filter message for the given WebSocket client id and the given sources.
* Source filters can be used to remove events from a specific source from the event WS.
* @param {string} id WS client id
* @param {string[]} sources event sources to exclude
* @return {string}
*/
function eventSourceFilterMessage (id: string, sources: string[]): string {
const message: WebSocketMessage = {
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/source',
payload: arrayToSerialisedString(sources),
source: id
}
return JSON.stringify(message)
}
/**
* Build an event type filter message for the given WebSocket client id and the given event types.
* Event type filters can be used to select a subset of all available events for the event WS.
* @param {string} id WS client id
* @param {string[]} types event types to include
* @return {string}
*/
function eventTypeFilterMessage (id: string, types: string[]): string {
const message: WebSocketMessage = {
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/type',
payload: arrayToSerialisedString(types),
source: id
}
return JSON.stringify(message)
}
/**
* Build an event topic filter message for the given WebSocket client id and the given event topics.
* Event topic filters can be used to select a subset of all available events for the event WS.
* @param {string} id WS client id
* @param {string[]} topics event topics to include
* @returns {string}
*/
function eventTopicFilterMessage (id: string, topics: string[]): string {
const message: WebSocketMessage = {
type: 'WebSocketEvent',
topic: 'openhab/websocket/filter/topic',
payload: arrayToSerialisedString(topics),
source: id
}
return JSON.stringify(message)
}
const openWSConnections: KeepaliveWebSocket[] = []
type MessageCallback = (data: WebSocketMessage) => void;
type ReadyCallback = (event: Event) => void;
type ErrorCallback = (event: Event) => void;
type HeartbeatCallback = () => void;
type CloseCallback = (event: CloseEvent) => void;
/**
* Creates a new {@link KeepaliveWebSocket} connection.
* @param path the path to connect to, e.g. `/ws`
* @param messageCallback the callback to handle incoming messages
* @param readyCallback the callback to handle the connection being ready
* @param errorCallback the callback to handle errors
* @param heartbeatCallback the callback to handle heartbeats
* @param heartbeatInterval the interval in seconds for sending heartbeats
*/
function newWSConnection (
path: string,
messageCallback: MessageCallback,
readyCallback: ReadyCallback | undefined,
errorCallback: ErrorCallback | undefined,
heartbeatCallback: HeartbeatCallback | undefined,
heartbeatInterval: number
): KeepaliveWebSocket {
const encodedToken = btoa(getAccessToken()).replace(/=*$/, '')
// Create a new WebSocket connection and set the access token through the protocol field
const socket = new WebSocket(path, [
`org.openhab.ws.accessToken.base64.${encodedToken}`,
'org.openhab.ws.protocol.default'
]) as KeepaliveWebSocket
socket.id = 'ui-' + f7.utils.id()
// Implement the setKeepalive method on the socket object
socket.setKeepalive = (seconds: number) => {
if (!heartbeatCallback) return
console.debug('Setting keepalive interval seconds', seconds)
socket.clearKeepalive()
socket.keepaliveTimer = setInterval(() => {
heartbeatCallback()
}, seconds * 1000)
}
// Implement the clearKeepalive method on the socket object
socket.clearKeepalive = () => {
if (socket.keepaliveTimer) clearInterval(socket.keepaliveTimer)
delete socket.keepaliveTimer
}
// Handle WebSocket connection opened
socket.onopen = (event: Event) => {
socket.setKeepalive(heartbeatInterval)
if (readyCallback) readyCallback(event)
}
// Handle WebSocket message received
socket.onmessage = (event: MessageEvent) => {
let evt: WebSocketMessage
try {
// The message is expected to be JSON, but we handle the case where it's not.
evt = JSON.parse(event.data)
} catch (e) {
console.error('Error while parsing message', e)
return
}
messageCallback(evt)
}
// Handle WebSocket error
socket.onerror = (event: Event) => {
console.error('WebSocket error', event)
if (errorCallback) errorCallback(event)
}
// WebSocket closure is handled in the `close` function of the exported WebSocketService, not here.
// Add the new WebSocket connection to the list
openWSConnections.push(socket)
console.debug(`new WS connection: ${socket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)
return socket
}
const WebSocketService = {
/**
* Connect to the WebSocket at the given path.
* This method provides raw access to WebSockets, the caller has to take care of the keepalive mechanism by specifying a heartbeat callback.
*
* @param path path to connect to, e.g. `/ws`
* @param messageCallback message callback to handle incoming messages
* @param heartbeatCallback heartbeat callback
* @param readyCallback ready callback
* @param errorCallback error callback
* @param heartbeatInterval heartbeat interval in seconds (defaults to 5)
*/
connect (
path: string,
messageCallback: MessageCallback,
heartbeatCallback: HeartbeatCallback,
readyCallback?: ReadyCallback,
errorCallback?: ErrorCallback,
heartbeatInterval: number = 5
): KeepaliveWebSocket {
return newWSConnection(path, messageCallback, readyCallback, errorCallback, heartbeatCallback, heartbeatInterval)
},
/**
* Connect to the event WebSocket, which provides direct access to the EventBus.
* This convenience method takes care of the keepalive mechanism as well as filter setup.
*
* @param topics array of event topics to filter by, if empty all events are received
* @param messageCallback message callback to handle incoming messages
* @param readyCallback ready callback
* @param errorCallback error callback
*/
events (
topics: string[],
messageCallback: MessageCallback,
readyCallback?: ReadyCallback,
errorCallback?: ErrorCallback
): KeepaliveWebSocket {
let socket: KeepaliveWebSocket
/**
* Extends the message callback by filtering out WebSocketEvent messages, which are only relevant for managing the WebSocket connection.
* @param event
*/
const extendedMessageCallback: MessageCallback = (event: WebSocketMessage) => {
if (event.type === 'WebSocketEvent') return
messageCallback(event)
}
/**
* Extends the ready callback by sending the event source filter message and event topic filter message.
* @param event
*/
const extendedReadyCallback: ReadyCallback = (event: Event) => {
socket.send(eventSourceFilterMessage(socket.id, [socket.id]))
if (Array.isArray(topics) && topics.length > 0) socket.send(eventTopicFilterMessage(socket.id, topics))
if (readyCallback) readyCallback(event)
}
const heartbeatCallback: HeartbeatCallback = () => {
socket.send(heartbeatMessage(socket.id))
}
socket = this.connect(
'/ws/events',
extendedMessageCallback,
heartbeatCallback,
extendedReadyCallback,
errorCallback
)
return socket
},
/**
* Close the given WebSocket connection.
*
* @param socket the WebSocket connection to close
* @param callback callback to execute on connection close
*/
close (
socket: WebSocket,
callback?: CloseCallback): void {
if (!socket) return
const keepaliveWebSocket = socket as KeepaliveWebSocket
const index = openWSConnections.indexOf(keepaliveWebSocket)
if (index >= 0) {
openWSConnections.splice(index, 1)
}
console.debug(`WS connection closed: ${keepaliveWebSocket.url}, ${openWSConnections.length} open connections`)
console.debug(openWSConnections)
keepaliveWebSocket.onclose = (event: CloseEvent) => {
if (callback) {
callback(event)
}
}
keepaliveWebSocket.clearKeepalive()
keepaliveWebSocket.close()
}
}
export default WebSocketService

View File

@ -65,7 +65,7 @@ export const useStatesStore = defineStore('states', () => {
clearTrackingList()
if (trackerEventSource.value) {
console.debug('Closing existing state tracker connection')
openhab.sse.close(trackerEventSource.value, null)
openhab.sse.close(trackerEventSource.value)
clearStateTracker()
}
const eventSource = openhab.sse.connectStateTracker(