Subscribe to Traccar Server events (#111262)

* Subscribe to Traccar Server events

* No need to unsubscribe on error

* typo

* rename _attrs

* arg type

* reorder return type

* more spesific

* Update stale docstring
pull/111277/head^2
Joakim Sørensen 2024-02-24 12:35:32 +01:00 committed by GitHub
parent ee57c924f2
commit 79572c0a5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 167 additions and 37 deletions

View File

@ -1,6 +1,9 @@
"""The Traccar Server integration."""
from __future__ import annotations
from datetime import timedelta
from aiohttp import CookieJar
from pytraccar import ApiClient
from homeassistant.config_entries import ConfigEntry
@ -14,7 +17,8 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant.helpers.event import async_track_time_interval
from .const import (
CONF_CUSTOM_ATTRIBUTES,
@ -30,10 +34,16 @@ PLATFORMS: list[Platform] = [Platform.DEVICE_TRACKER]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Traccar Server from a config entry."""
client_session = async_create_clientsession(
hass,
cookie_jar=CookieJar(
unsafe=not entry.data[CONF_SSL] or not entry.data[CONF_VERIFY_SSL]
),
)
coordinator = TraccarServerCoordinator(
hass=hass,
client=ApiClient(
client_session=async_get_clientsession(hass),
client_session=client_session,
host=entry.data[CONF_HOST],
port=entry.data[CONF_PORT],
username=entry.data[CONF_USERNAME],
@ -54,6 +64,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_reload_entry))
if entry.options.get(CONF_EVENTS):
entry.async_on_unload(
async_track_time_interval(
hass,
coordinator.import_events,
timedelta(seconds=30),
cancel_on_shutdown=True,
name="traccar_server_import_events",
)
)
return True

View File

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
from datetime import datetime
from typing import TYPE_CHECKING, Any, TypedDict
from pytraccar import (
@ -10,11 +10,14 @@ from pytraccar import (
DeviceModel,
GeofenceModel,
PositionModel,
SubscriptionData,
TraccarException,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
@ -31,7 +34,7 @@ class TraccarServerCoordinatorDataDevice(TypedDict):
attributes: dict[str, Any]
TraccarServerCoordinatorData = dict[str, TraccarServerCoordinatorDataDevice]
TraccarServerCoordinatorData = dict[int, TraccarServerCoordinatorDataDevice]
class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorData]):
@ -54,14 +57,17 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
hass=hass,
logger=LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=30),
update_interval=None,
)
self.client = client
self.custom_attributes = custom_attributes
self.events = events
self.max_accuracy = max_accuracy
self.skip_accuracy_filter_for = skip_accuracy_filter_for
self._geofences: list[GeofenceModel] = []
self._last_event_import: datetime | None = None
self._subscription: asyncio.Task | None = None
self._should_log_subscription_error: bool = True
async def _async_update_data(self) -> TraccarServerCoordinatorData:
"""Fetch data from Traccar Server."""
@ -85,35 +91,21 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
assert isinstance(positions, list[PositionModel]) # type: ignore[misc]
assert isinstance(geofences, list[GeofenceModel]) # type: ignore[misc]
self._geofences = geofences
for position in positions:
if (device := get_device(position["deviceId"], devices)) is None:
continue
attr = {}
skip_accuracy_filter = False
for custom_attr in self.custom_attributes:
attr[custom_attr] = device["attributes"].get(
custom_attr,
position["attributes"].get(custom_attr, None),
)
if custom_attr in self.skip_accuracy_filter_for:
skip_accuracy_filter = True
accuracy = position["accuracy"] or 0.0
if (
not skip_accuracy_filter
and self.max_accuracy > 0
and accuracy > self.max_accuracy
):
LOGGER.debug(
"Excluded position by accuracy filter: %f (%s)",
accuracy,
device["id"],
attr
:= self._return_custom_attributes_if_not_filtered_by_accuracy_configuration(
device, position
)
) is None:
continue
data[device["uniqueId"]] = {
data[device["id"]] = {
"device": device,
"geofence": get_first_geofence(
geofences,
@ -123,12 +115,57 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
"attributes": attr,
}
if self.events:
self.hass.async_create_task(self.import_events(devices))
await self.subscribe()
return data
async def import_events(self, devices: list[DeviceModel]) -> None:
async def handle_subscription_data(self, data: SubscriptionData) -> None:
"""Handle subscription data."""
self.logger.debug("Received subscription data: %s", data)
self._should_log_subscription_error = True
update_devices = set()
for device in data.get("devices") or []:
device_id = device["id"]
if device_id not in self.data:
continue
if (
attr
:= self._return_custom_attributes_if_not_filtered_by_accuracy_configuration(
device, self.data[device_id]["position"]
)
) is None:
continue
self.data[device_id]["device"] = device
self.data[device_id]["attributes"] = attr
update_devices.add(device_id)
for position in data.get("positions") or []:
device_id = position["deviceId"]
if device_id not in self.data:
continue
if (
attr
:= self._return_custom_attributes_if_not_filtered_by_accuracy_configuration(
self.data[device_id]["device"], position
)
) is None:
continue
self.data[device_id]["position"] = position
self.data[device_id]["attributes"] = attr
self.data[device_id]["geofence"] = get_first_geofence(
self._geofences,
position["geofenceIds"] or [],
)
update_devices.add(device_id)
for device_id in update_devices:
dispatcher_send(self.hass, f"{DOMAIN}_{device_id}")
async def import_events(self, _: datetime) -> None:
"""Import events from Traccar."""
start_time = dt_util.utcnow().replace(tzinfo=None)
end_time = None
@ -137,7 +174,7 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
end_time = start_time - (start_time - self._last_event_import)
events = await self.client.get_reports_events(
devices=[device["id"] for device in devices],
devices=list(self.data),
start_time=start_time,
end_time=end_time,
event_types=self.events,
@ -147,7 +184,7 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
self._last_event_import = start_time
for event in events:
device = get_device(event["deviceId"], devices)
device = self.data[event["deviceId"]]["device"]
self.hass.bus.async_fire(
# This goes against two of the HA core guidelines:
# 1. Event names should be prefixed with the domain name of
@ -165,3 +202,57 @@ class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorDat
"attributes": event["attributes"],
},
)
async def unsubscribe(self, *args) -> None:
"""Unsubscribe from Traccar Server."""
if self._subscription is None:
return
self._should_log_subscription_error = False
self._subscription.cancel()
self._subscription = None
async def subscribe(self) -> None:
"""Subscribe to events."""
if self._subscription is not None:
return
async def _subscriber():
try:
await self.client.subscribe(self.handle_subscription_data)
except TraccarException as ex:
if self._should_log_subscription_error:
self._should_log_subscription_error = False
LOGGER.error("Error while subscribing to Traccar: %s", ex)
# Retry after 10 seconds
await asyncio.sleep(10)
await _subscriber()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.unsubscribe)
self.config_entry.async_on_unload(self.unsubscribe)
self._subscription = asyncio.create_task(_subscriber())
def _return_custom_attributes_if_not_filtered_by_accuracy_configuration(
self,
device: DeviceModel,
position: PositionModel,
) -> dict[str, Any] | None:
"""Return a dictionary of custom attributes if not filtered by accuracy configuration."""
attr = {}
skip_accuracy_filter = False
for custom_attr in self.custom_attributes:
if custom_attr in self.skip_accuracy_filter_for:
skip_accuracy_filter = True
attr[custom_attr] = device["attributes"].get(
custom_attr,
position["attributes"].get(custom_attr, None),
)
accuracy = position["accuracy"] or 0.0
if (
not skip_accuracy_filter
and self.max_accuracy > 0
and accuracy > self.max_accuracy
):
return None
return attr

View File

@ -30,6 +30,7 @@ async def async_get_config_entry_diagnostics(
return async_redact_data(
{
"subscription_status": coordinator.client.subscription_status,
"config_entry_options": dict(config_entry.options),
"coordinator_data": coordinator.data,
"entities": [
@ -63,6 +64,7 @@ async def async_get_device_diagnostics(
return async_redact_data(
{
"subscription_status": coordinator.client.subscription_status,
"config_entry_options": dict(entry.options),
"coordinator_data": coordinator.data,
"entities": [

View File

@ -6,6 +6,7 @@ from typing import Any
from pytraccar import DeviceModel, GeofenceModel, PositionModel
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
@ -22,7 +23,7 @@ class TraccarServerEntity(CoordinatorEntity[TraccarServerCoordinator]):
) -> None:
"""Initialize the Traccar Server entity."""
super().__init__(coordinator)
self.device_id = device["uniqueId"]
self.device_id = device["id"]
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device["uniqueId"])},
model=device["model"],
@ -57,3 +58,14 @@ class TraccarServerEntity(CoordinatorEntity[TraccarServerCoordinator]):
def traccar_attributes(self) -> dict[str, Any]:
"""Return the attributes."""
return self.coordinator.data[self.device_id]["attributes"]
async def async_added_to_hass(self) -> None:
"""Entity added to hass."""
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{DOMAIN}_{self.device_id}",
self.async_write_ha_state,
)
)
await super().async_added_to_hass()

View File

@ -4,6 +4,6 @@
"codeowners": ["@ludeeus"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/traccar_server",
"iot_class": "local_polling",
"iot_class": "local_push",
"requirements": ["pytraccar==2.1.1"]
}

View File

@ -6208,7 +6208,7 @@
"traccar_server": {
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_polling",
"iot_class": "local_push",
"name": "Traccar Server"
}
}

View File

@ -3,7 +3,7 @@ from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
from pytraccar import ApiClient
from pytraccar import ApiClient, SubscriptionStatus
from homeassistant.components.traccar_server.const import (
CONF_CUSTOM_ATTRIBUTES,
@ -39,6 +39,7 @@ def mock_traccar_api_client() -> Generator[AsyncMock, None, None]:
new=mock_client,
):
client: ApiClient = mock_client.return_value
client.subscription_status = SubscriptionStatus.DISCONNECTED
client.get_devices.return_value = load_json_array_fixture(
"traccar_server/devices.json"
)
@ -55,6 +56,8 @@ def mock_traccar_api_client() -> Generator[AsyncMock, None, None]:
"traccar_server/reports_events.json"
)
client.subscribe = AsyncMock()
yield client

View File

@ -13,7 +13,7 @@
]),
}),
'coordinator_data': dict({
'abc123': dict({
'0': dict({
'attributes': dict({
'custom_attr_1': 'custom_attr_1_value',
}),
@ -96,6 +96,7 @@
}),
}),
]),
'subscription_status': 'disconnected',
})
# ---
# name: test_entry_diagnostics[entry]
@ -112,7 +113,7 @@
]),
}),
'coordinator_data': dict({
'abc123': dict({
'0': dict({
'attributes': dict({
'custom_attr_1': 'custom_attr_1_value',
}),
@ -195,5 +196,6 @@
}),
}),
]),
'subscription_status': 'disconnected',
})
# ---