core/homeassistant/components/netatmo/data_handler.py

189 lines
6.1 KiB
Python

"""The Netatmo data handler."""
from __future__ import annotations
import asyncio
from collections import deque
from datetime import timedelta
from itertools import islice
import logging
from time import time
import pyatmo
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.event import async_track_time_interval
from .const import (
AUTH,
DOMAIN,
MANUFACTURER,
WEBHOOK_ACTIVATION,
WEBHOOK_DEACTIVATION,
WEBHOOK_NACAMERA_CONNECTION,
WEBHOOK_PUSH_TYPE,
)
_LOGGER = logging.getLogger(__name__)
CAMERA_DATA_CLASS_NAME = "AsyncCameraData"
WEATHERSTATION_DATA_CLASS_NAME = "AsyncWeatherStationData"
HOMECOACH_DATA_CLASS_NAME = "AsyncHomeCoachData"
HOMEDATA_DATA_CLASS_NAME = "AsyncHomeData"
HOMESTATUS_DATA_CLASS_NAME = "AsyncHomeStatus"
PUBLICDATA_DATA_CLASS_NAME = "AsyncPublicData"
NEXT_SCAN = "next_scan"
DATA_CLASSES = {
WEATHERSTATION_DATA_CLASS_NAME: pyatmo.AsyncWeatherStationData,
HOMECOACH_DATA_CLASS_NAME: pyatmo.AsyncHomeCoachData,
CAMERA_DATA_CLASS_NAME: pyatmo.AsyncCameraData,
HOMEDATA_DATA_CLASS_NAME: pyatmo.AsyncHomeData,
HOMESTATUS_DATA_CLASS_NAME: pyatmo.AsyncHomeStatus,
PUBLICDATA_DATA_CLASS_NAME: pyatmo.AsyncPublicData,
}
BATCH_SIZE = 3
DEFAULT_INTERVALS = {
HOMEDATA_DATA_CLASS_NAME: 900,
HOMESTATUS_DATA_CLASS_NAME: 300,
CAMERA_DATA_CLASS_NAME: 900,
WEATHERSTATION_DATA_CLASS_NAME: 600,
HOMECOACH_DATA_CLASS_NAME: 300,
PUBLICDATA_DATA_CLASS_NAME: 600,
}
SCAN_INTERVAL = 60
class NetatmoDataHandler:
"""Manages the Netatmo data handling."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize self."""
self.hass = hass
self._auth = hass.data[DOMAIN][entry.entry_id][AUTH]
self.listeners: list[CALLBACK_TYPE] = []
self.data_classes: dict = {}
self.data = {}
self._queue = deque()
self._webhook: bool = False
async def async_setup(self):
"""Set up the Netatmo data handler."""
async_track_time_interval(
self.hass, self.async_update, timedelta(seconds=SCAN_INTERVAL)
)
self.listeners.append(
async_dispatcher_connect(
self.hass,
f"signal-{DOMAIN}-webhook-None",
self.handle_event,
)
)
async def async_update(self, event_time):
"""
Update device.
We do up to BATCH_SIZE calls in one update in order
to minimize the calls on the api service.
"""
for data_class in islice(self._queue, 0, BATCH_SIZE):
if data_class[NEXT_SCAN] > time():
continue
self.data_classes[data_class["name"]][NEXT_SCAN] = (
time() + data_class["interval"]
)
await self.async_fetch_data(data_class["name"])
self._queue.rotate(BATCH_SIZE)
@callback
def async_force_update(self, data_class_entry):
"""Prioritize data retrieval for given data class entry."""
self.data_classes[data_class_entry][NEXT_SCAN] = time()
self._queue.rotate(-(self._queue.index(self.data_classes[data_class_entry])))
async def async_cleanup(self):
"""Clean up the Netatmo data handler."""
for listener in self.listeners:
listener()
async def handle_event(self, event):
"""Handle webhook events."""
if event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_ACTIVATION:
_LOGGER.info("%s webhook successfully registered", MANUFACTURER)
self._webhook = True
elif event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_DEACTIVATION:
_LOGGER.info("%s webhook unregistered", MANUFACTURER)
self._webhook = False
elif event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_NACAMERA_CONNECTION:
_LOGGER.debug("%s camera reconnected", MANUFACTURER)
self.async_force_update(CAMERA_DATA_CLASS_NAME)
async def async_fetch_data(self, data_class_entry):
"""Fetch data and notify."""
try:
await self.data[data_class_entry].async_update()
except pyatmo.NoDevice as err:
_LOGGER.debug(err)
self.data[data_class_entry] = None
except pyatmo.ApiError as err:
_LOGGER.debug(err)
except asyncio.TimeoutError as err:
_LOGGER.debug(err)
return
for update_callback in self.data_classes[data_class_entry]["subscriptions"]:
if update_callback:
update_callback()
async def register_data_class(
self, data_class_name, data_class_entry, update_callback, **kwargs
):
"""Register data class."""
if data_class_entry in self.data_classes:
self.data_classes[data_class_entry]["subscriptions"].append(update_callback)
return
self.data_classes[data_class_entry] = {
"name": data_class_entry,
"interval": DEFAULT_INTERVALS[data_class_name],
NEXT_SCAN: time() + DEFAULT_INTERVALS[data_class_name],
"subscriptions": [update_callback],
}
self.data[data_class_entry] = DATA_CLASSES[data_class_name](
self._auth, **kwargs
)
await self.async_fetch_data(data_class_entry)
self._queue.append(self.data_classes[data_class_entry])
_LOGGER.debug("Data class %s added", data_class_entry)
async def unregister_data_class(self, data_class_entry, update_callback):
"""Unregister data class."""
self.data_classes[data_class_entry]["subscriptions"].remove(update_callback)
if not self.data_classes[data_class_entry].get("subscriptions"):
self._queue.remove(self.data_classes[data_class_entry])
self.data_classes.pop(data_class_entry)
self.data.pop(data_class_entry)
_LOGGER.debug("Data class %s removed", data_class_entry)
@property
def webhook(self) -> bool:
"""Return the webhook state."""
return self._webhook