Monitor SSE Connection Health (#1499)

Signed-off-by: Dan Cunningham <dan@digitaldan.com>
pull/1512/head
Dan Cunningham 2022-10-07 13:15:52 -07:00 committed by GitHub
parent ce8d46a66c
commit 09adec507b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 7 deletions

View File

@ -385,6 +385,19 @@ export default {
return (this.serverUrl || window.location.origin)
}
},
watch: {
'$store.state.states.sseConnected': {
handler: function (connected) {
console.debug('sseConnected', connected)
if (window.OHApp && typeof window.OHApp.sseConnected === 'function') {
try {
window.OHApp.sseConnected(connected)
} catch {}
}
},
immediate: true // provides initial (not changed yet) state
}
},
methods: {
loadData (useCredentials) {
const useCredentialsPromise = (useCredentials) ? this.setBasicCredentials() : Promise.resolve()

View File

@ -3,7 +3,7 @@ import { getAccessToken, getTokenInCustomHeader, getBasicCredentials, getRequire
let openSSEClients = []
function newSSEConnection (path, readyCallback, messageCallback, errorCallback) {
function newSSEConnection (path, readyCallback, messageCallback, errorCallback, heartbeatCallback) {
let eventSource
const headers = {}
if (getAccessToken() && getRequireToken()) {
@ -27,6 +27,14 @@ function newSSEConnection (path, readyCallback, messageCallback, errorCallback)
readyCallback(e.data)
})
eventSource.addEventListener('alive', (e) => {
let evt = JSON.parse(e.data)
eventSource.setKeepalive(evt.interval)
if (heartbeatCallback) {
heartbeatCallback(true)
}
})
eventSource.onmessage = (event) => {
let evt = JSON.parse(event.data)
messageCallback(evt)
@ -37,6 +45,7 @@ function newSSEConnection (path, readyCallback, messageCallback, errorCallback)
eventSource.onerror = () => {
console.warn('SSE error')
eventSource.clearKeepalive()
if (errorCallback) {
errorCallback()
}
@ -45,6 +54,24 @@ function newSSEConnection (path, readyCallback, messageCallback, errorCallback)
}
}
eventSource.setKeepalive = (seconds = 10) => {
console.debug('Setting keepalive interval seconds', seconds)
eventSource.clearKeepalive()
eventSource.keepaliveTimer = setTimeout(() => {
console.warn('SSE timeout error')
if (heartbeatCallback) {
heartbeatCallback(false)
}
}, (seconds + 2) * 1000)
if (heartbeatCallback) {
heartbeatCallback(true)
}
}
eventSource.clearKeepalive = () => {
if (eventSource.keepaliveTimer) clearTimeout(eventSource.keepaliveTimer)
}
openSSEClients.push(eventSource)
console.debug(`new SSE connection: ${eventSource.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
@ -52,11 +79,11 @@ function newSSEConnection (path, readyCallback, messageCallback, errorCallback)
}
export default {
connect (path, topics, messageCallback, errorCallback) {
return newSSEConnection(path, null, messageCallback, errorCallback)
connect (path, topics, messageCallback, errorCallback, heartbeatCallback) {
return newSSEConnection(path, null, messageCallback, errorCallback, heartbeatCallback)
},
connectStateTracker (path, readyCallback, updateCallback, errorCallback) {
return newSSEConnection(path, readyCallback, updateCallback, errorCallback)
connectStateTracker (path, readyCallback, updateCallback, errorCallback, heartbeatCallback) {
return newSSEConnection(path, readyCallback, updateCallback, errorCallback, heartbeatCallback)
},
close (client, callback) {
if (!client) return
@ -65,7 +92,7 @@ export default {
}
console.debug(`SSE connection closed: ${client.url}, ${openSSEClients.length} open`)
console.debug(openSSEClients)
client.close()
client.clearKeepalive()
}
}

View File

@ -6,7 +6,8 @@ const state = {
trackerConnectionId: null,
trackerEventSource: null,
pendingTrackingListUpdate: false,
keepConnectionOpen: false
keepConnectionOpen: false,
sseConnected: false
}
let stateTrackingProxy = null
@ -73,11 +74,18 @@ const actions = {
const trackingListJson = JSON.stringify(context.state.trackingList)
console.debug('Setting initial tracking list: ' + trackingListJson)
this._vm.$oh.api.postPlain('/rest/events/states/' + connectionId, JSON.stringify(context.state.trackingList), 'text/plain', 'application/json')
context.commit('sseConnected', true)
},
(updates) => {
for (const item in updates) {
context.commit('setItemState', { itemName: item, itemState: updates[item] })
}
},
() => {
context.commit('sseConnected', false)
},
(healthy) => {
context.commit('sseConnected', healthy)
})
context.commit('setTrackingEventSource', eventSource)
},
@ -139,6 +147,9 @@ const mutations = {
keepConnectionOpen (state, value) {
state.keepConnectionOpen = value
},
sseConnected (state, value) {
state.sseConnected = value
},
setItemState (state, { itemName, itemState }) {
Vue.set(state.itemStates, itemName.toString(), itemState)
// state.itemStates[itemName] = itemState