Add metadata to logbook live stream websocket endpoint (#72394)

pull/72403/head
J. Nick Koston 2022-05-24 00:37:47 -05:00 committed by GitHub
parent abbfed24a4
commit 1c25e1d7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 121 additions and 47 deletions

View File

@ -75,6 +75,7 @@ async def _async_send_historical_events(
end_time: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
partial: bool,
) -> dt | None:
"""Select historical data from the database and deliver it to the websocket.
@ -94,88 +95,107 @@ async def _async_send_historical_events(
)
if not is_big_query:
message, last_event_time = await _async_get_ws_formatted_events(
message, last_event_time = await _async_get_ws_stream_events(
hass,
msg_id,
start_time,
end_time,
formatter,
event_processor,
partial,
)
# If there is no last_time, there are no historical
# results, but we still send an empty message so
# If there is no last_event_time, there are no historical
# results, but we still send an empty message
# if its the last one (not partial) so
# consumers of the api know their request was
# answered but there were no results
connection.send_message(message)
if last_event_time or not partial:
connection.send_message(message)
return last_event_time
# This is a big query so we deliver
# the first three hours and then
# we fetch the old data
recent_query_start = end_time - timedelta(hours=BIG_QUERY_RECENT_HOURS)
recent_message, recent_query_last_event_time = await _async_get_ws_formatted_events(
recent_message, recent_query_last_event_time = await _async_get_ws_stream_events(
hass,
msg_id,
recent_query_start,
end_time,
formatter,
event_processor,
partial=True,
)
if recent_query_last_event_time:
connection.send_message(recent_message)
older_message, older_query_last_event_time = await _async_get_ws_formatted_events(
older_message, older_query_last_event_time = await _async_get_ws_stream_events(
hass,
msg_id,
start_time,
recent_query_start,
formatter,
event_processor,
partial,
)
# If there is no last_time, there are no historical
# results, but we still send an empty message so
# If there is no last_event_time, there are no historical
# results, but we still send an empty message
# if its the last one (not partial) so
# consumers of the api know their request was
# answered but there were no results
if older_query_last_event_time or not recent_query_last_event_time:
if older_query_last_event_time or not partial:
connection.send_message(older_message)
# Returns the time of the newest event
return recent_query_last_event_time or older_query_last_event_time
async def _async_get_ws_formatted_events(
async def _async_get_ws_stream_events(
hass: HomeAssistant,
msg_id: int,
start_time: dt,
end_time: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
partial: bool,
) -> tuple[str, dt | None]:
"""Async wrapper around _ws_formatted_get_events."""
return await get_instance(hass).async_add_executor_job(
_ws_formatted_get_events,
_ws_stream_get_events,
msg_id,
start_time,
end_time,
formatter,
event_processor,
partial,
)
def _ws_formatted_get_events(
def _ws_stream_get_events(
msg_id: int,
start_day: dt,
end_day: dt,
formatter: Callable[[int, Any], dict[str, Any]],
event_processor: EventProcessor,
partial: bool,
) -> tuple[str, dt | None]:
"""Fetch events and convert them to json in the executor."""
events = event_processor.get_events(start_day, end_day)
last_time = None
if events:
last_time = dt_util.utc_from_timestamp(events[-1]["when"])
result = formatter(msg_id, events)
return JSON_DUMP(result), last_time
message = {
"events": events,
"start_time": dt_util.utc_to_timestamp(start_day),
"end_time": dt_util.utc_to_timestamp(end_day),
}
if partial:
# This is a hint to consumers of the api that
# we are about to send a another block of historical
# data in case the UI needs to show that historical
# data is still loading in the future
message["partial"] = True
return JSON_DUMP(formatter(msg_id, message)), last_time
async def _async_events_consumer(
@ -209,7 +229,7 @@ async def _async_events_consumer(
JSON_DUMP(
messages.event_message(
msg_id,
logbook_events,
{"events": logbook_events},
)
)
)
@ -279,6 +299,7 @@ async def ws_event_stream(
end_time,
messages.event_message,
event_processor,
partial=False,
)
return
@ -331,6 +352,7 @@ async def ws_event_stream(
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
partial=True,
)
await _async_wait_for_recorder_sync(hass)
@ -353,16 +375,16 @@ async def ws_event_stream(
second_fetch_start_time = max(
last_event_time or max_recorder_behind, max_recorder_behind
)
message, final_cutoff_time = await _async_get_ws_formatted_events(
await _async_send_historical_events(
hass,
connection,
msg_id,
second_fetch_start_time,
subscriptions_setup_complete_time,
messages.event_message,
event_processor,
partial=False,
)
if final_cutoff_time: # Only sends results if we have them
connection.send_message(message)
if not subscriptions:
# Unsubscribe happened while waiting for formatted events
@ -379,6 +401,20 @@ async def ws_event_stream(
)
def _ws_formatted_get_events(
msg_id: int,
start_time: dt,
end_time: dt,
event_processor: EventProcessor,
) -> str:
"""Fetch events and convert them to json in the executor."""
return JSON_DUMP(
messages.result_message(
msg_id, event_processor.get_events(start_time, end_time)
)
)
@websocket_api.websocket_command(
{
vol.Required("type"): "logbook/get_events",
@ -438,12 +474,12 @@ async def ws_get_events(
include_entity_name=False,
)
message, _ = await _async_get_ws_formatted_events(
hass,
msg["id"],
start_time,
end_time,
messages.result_message,
event_processor,
connection.send_message(
await hass.async_add_executor_job(
_ws_formatted_get_events,
msg["id"],
start_time,
end_time,
event_processor,
)
)
connection.send_message(message)

View File

@ -492,13 +492,16 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
"when": state.last_updated.timestamp(),
}
]
assert msg["event"]["start_time"] == now.timestamp()
assert msg["event"]["end_time"] > msg["event"]["start_time"]
assert msg["event"]["partial"] is True
hass.states.async_set("light.alpha", "on")
hass.states.async_set("light.alpha", "off")
@ -520,7 +523,14 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == []
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]["events"]
assert msg["event"]["events"] == [
{
"entity_id": "light.alpha",
"state": "off",
@ -560,7 +570,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"context_id": ANY,
"domain": "automation",
@ -624,7 +634,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"context_id": "ac5bd62de45711eaaeb351041eec8dd9",
"context_user_id": "b400facee45711eaa9308bfd3d19e474",
@ -686,7 +696,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
@ -716,7 +726,7 @@ async def test_subscribe_unsubscribe_logbook_stream(
msg = await websocket_client.receive_json()
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"context_domain": "automation",
"context_entity_id": "automation.alarm",
@ -788,7 +798,10 @@ async def test_subscribe_unsubscribe_logbook_stream_entities(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert "start_time" in msg["event"]
assert "end_time" in msg["event"]
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
@ -805,7 +818,16 @@ async def test_subscribe_unsubscribe_logbook_stream_entities(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert "start_time" in msg["event"]
assert "end_time" in msg["event"]
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == [
{
"entity_id": "light.small",
"state": "off",
@ -871,7 +893,8 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
@ -888,7 +911,14 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert "partial" not in msg["event"]
assert msg["event"]["events"] == []
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert "partial" not in msg["event"]
assert msg["event"]["events"] == [
{
"entity_id": "light.small",
"state": "off",
@ -962,7 +992,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_past_only(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
@ -1048,7 +1078,7 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "on",
@ -1060,7 +1090,8 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["partial"] is True
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.four_days_ago",
"state": "off",
@ -1068,6 +1099,13 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query(
}
]
# And finally a response without partial set to indicate no more
# historical data is coming
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == []
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
@ -1123,7 +1161,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == []
assert msg["event"]["events"] == []
hass.bus.async_fire("mock_event", {"device_id": device.id})
await hass.async_block_till_done()
@ -1131,7 +1169,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{"domain": "test", "message": "is on fire", "name": "device name", "when": ANY}
]
@ -1250,7 +1288,7 @@ async def test_live_stream_with_one_second_commit_interval(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"])
recieved_rows.extend(msg["event"]["events"])
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "6"})
@ -1262,7 +1300,7 @@ async def test_live_stream_with_one_second_commit_interval(
msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5)
assert msg["id"] == 7
assert msg["type"] == "event"
recieved_rows.extend(msg["event"])
recieved_rows.extend(msg["event"]["events"])
# Make sure we get rows back in order
assert recieved_rows == [
@ -1326,7 +1364,7 @@ async def test_subscribe_disconnected(hass, recorder_mock, hass_ws_client):
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{
"entity_id": "binary_sensor.is_light",
"state": "off",
@ -1437,7 +1475,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == []
assert msg["event"]["events"] == []
hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"})
await hass.async_block_till_done()
@ -1445,7 +1483,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{"domain": "test", "message": "1", "name": "device name", "when": ANY}
]
@ -1455,7 +1493,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"] == [
assert msg["event"]["events"] == [
{"domain": "test", "message": "2", "name": "device name", "when": ANY}
]