Migrate Flu Near You to DataUpdateCoordinator (#42594)

* Migrate Flu Near You to DataUpdateCoordinator

* Prefer partial over lambda

* Remove overlap with CoordinatorEntity
pull/43077/head
Aaron Bach 2020-11-07 19:45:14 -07:00 committed by GitHub
parent 2b2d7558de
commit 8dbddd874f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 188 additions and 269 deletions

View File

@ -1,182 +1,91 @@
"""The flunearyou component."""
import asyncio
from datetime import timedelta
from functools import partial
from pyflunearyou import Client
from pyflunearyou.errors import FluNearYouError
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from homeassistant.core import callback
from homeassistant.helpers import aiohttp_client, config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CATEGORY_CDC_REPORT,
CATEGORY_USER_REPORT,
DATA_CLIENT,
DATA_COORDINATOR,
DOMAIN,
LOGGER,
SENSORS,
TOPIC_UPDATE,
)
DATA_LISTENER = "listener"
DEFAULT_SCAN_INTERVAL = timedelta(minutes=30)
DEFAULT_UPDATE_INTERVAL = timedelta(minutes=30)
CONFIG_SCHEMA = cv.deprecated(DOMAIN, invalidation_version="0.119")
@callback
def async_get_api_category(sensor_type):
"""Get the category that a particular sensor type belongs to."""
try:
return next(
(
category
for category, sensors in SENSORS.items()
for sensor in sensors
if sensor[0] == sensor_type
)
)
except StopIteration as err:
raise ValueError(f"Can't find category sensor type: {sensor_type}") from err
PLATFORMS = ["sensor"]
async def async_setup(hass, config):
"""Set up the Flu Near You component."""
hass.data[DOMAIN] = {DATA_CLIENT: {}, DATA_LISTENER: {}}
hass.data[DOMAIN] = {DATA_COORDINATOR: {}}
return True
async def async_setup_entry(hass, config_entry):
"""Set up Flu Near You as config entry."""
hass.data[DOMAIN][DATA_COORDINATOR][config_entry.entry_id] = {}
websession = aiohttp_client.async_get_clientsession(hass)
client = Client(websession)
fny = FluNearYouData(
hass,
Client(websession),
config_entry.data.get(CONF_LATITUDE, hass.config.latitude),
config_entry.data.get(CONF_LONGITUDE, hass.config.longitude),
)
await fny.async_update()
hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id] = fny
latitude = config_entry.data.get(CONF_LATITUDE, hass.config.latitude)
longitude = config_entry.data.get(CONF_LONGITUDE, hass.config.longitude)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
)
async def async_update(api_category):
"""Get updated date from the API based on category."""
try:
if api_category == CATEGORY_CDC_REPORT:
return await client.cdc_reports.status_by_coordinates(
latitude, longitude
)
return await client.user_reports.status_by_coordinates(latitude, longitude)
except FluNearYouError as err:
raise UpdateFailed(err) from err
async def refresh(event_time):
"""Refresh data from Flu Near You."""
await fny.async_update()
data_init_tasks = []
for api_category in [CATEGORY_CDC_REPORT, CATEGORY_USER_REPORT]:
coordinator = hass.data[DOMAIN][DATA_COORDINATOR][config_entry.entry_id][
api_category
] = DataUpdateCoordinator(
hass,
LOGGER,
name=f"{api_category} ({latitude}, {longitude})",
update_interval=DEFAULT_UPDATE_INTERVAL,
update_method=partial(async_update, api_category),
)
data_init_tasks.append(coordinator.async_refresh())
hass.data[DOMAIN][DATA_LISTENER][config_entry.entry_id] = async_track_time_interval(
hass, refresh, DEFAULT_SCAN_INTERVAL
)
await asyncio.gather(*data_init_tasks)
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, component)
)
return True
async def async_unload_entry(hass, config_entry):
"""Unload an Flu Near You config entry."""
hass.data[DOMAIN][DATA_CLIENT].pop(config_entry.entry_id)
unload_ok = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(config_entry, component)
for component in PLATFORMS
]
)
)
if unload_ok:
hass.data[DOMAIN][DATA_COORDINATOR].pop(config_entry.entry_id)
remove_listener = hass.data[DOMAIN][DATA_LISTENER].pop(config_entry.entry_id)
remove_listener()
await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
return True
class FluNearYouData:
"""Define a data object to retrieve info from Flu Near You."""
def __init__(self, hass, client, latitude, longitude):
"""Initialize."""
self._async_cancel_time_interval_listener = None
self._client = client
self._hass = hass
self.data = {}
self.latitude = latitude
self.longitude = longitude
self._api_category_count = {
CATEGORY_CDC_REPORT: 0,
CATEGORY_USER_REPORT: 0,
}
self._api_category_locks = {
CATEGORY_CDC_REPORT: asyncio.Lock(),
CATEGORY_USER_REPORT: asyncio.Lock(),
}
async def _async_get_data_from_api(self, api_category):
"""Update and save data for a particular API category."""
if self._api_category_count[api_category] == 0:
return
if api_category == CATEGORY_CDC_REPORT:
api_coro = self._client.cdc_reports.status_by_coordinates(
self.latitude, self.longitude
)
else:
api_coro = self._client.user_reports.status_by_coordinates(
self.latitude, self.longitude
)
try:
self.data[api_category] = await api_coro
except FluNearYouError as err:
LOGGER.error("Unable to get %s data: %s", api_category, err)
self.data[api_category] = None
async def _async_update_listener_action(self, now):
"""Define an async_track_time_interval action to update data."""
await self.async_update()
@callback
def async_deregister_api_interest(self, sensor_type):
"""Decrement the number of entities with data needs from an API category."""
# If this deregistration should leave us with no registration at all, remove the
# time interval:
if sum(self._api_category_count.values()) == 0:
if self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener()
self._async_cancel_time_interval_listener = None
return
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] -= 1
async def async_register_api_interest(self, sensor_type):
"""Increment the number of entities with data needs from an API category."""
# If this is the first registration we have, start a time interval:
if not self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener = async_track_time_interval(
self._hass,
self._async_update_listener_action,
DEFAULT_SCAN_INTERVAL,
)
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] += 1
# If a sensor registers interest in a particular API call and the data doesn't
# exist for it yet, make the API call and grab the data:
async with self._api_category_locks[api_category]:
if api_category not in self.data:
await self._async_get_data_from_api(api_category)
async def async_update(self):
"""Update Flu Near You data."""
tasks = [
self._async_get_data_from_api(api_category)
for api_category in self._api_category_count
]
await asyncio.gather(*tasks)
LOGGER.debug("Received new data")
async_dispatcher_send(self._hass, TOPIC_UPDATE)
return unload_ok

View File

@ -4,35 +4,7 @@ import logging
DOMAIN = "flunearyou"
LOGGER = logging.getLogger(__package__)
DATA_CLIENT = "client"
DATA_COORDINATOR = "coordinator"
CATEGORY_CDC_REPORT = "cdc_report"
CATEGORY_USER_REPORT = "user_report"
TOPIC_UPDATE = "flunearyou_update"
TYPE_CDC_LEVEL = "level"
TYPE_CDC_LEVEL2 = "level2"
TYPE_USER_CHICK = "chick"
TYPE_USER_DENGUE = "dengue"
TYPE_USER_FLU = "flu"
TYPE_USER_LEPTO = "lepto"
TYPE_USER_NO_SYMPTOMS = "none"
TYPE_USER_SYMPTOMS = "symptoms"
TYPE_USER_TOTAL = "total"
SENSORS = {
CATEGORY_CDC_REPORT: [
(TYPE_CDC_LEVEL, "CDC Level", "mdi:biohazard", None),
(TYPE_CDC_LEVEL2, "CDC Level 2", "mdi:biohazard", None),
],
CATEGORY_USER_REPORT: [
(TYPE_USER_CHICK, "Avian Flu Symptoms", "mdi:alert", "reports"),
(TYPE_USER_DENGUE, "Dengue Fever Symptoms", "mdi:alert", "reports"),
(TYPE_USER_FLU, "Flu Symptoms", "mdi:alert", "reports"),
(TYPE_USER_LEPTO, "Leptospirosis Symptoms", "mdi:alert", "reports"),
(TYPE_USER_NO_SYMPTOMS, "No Symptoms", "mdi:alert", "reports"),
(TYPE_USER_SYMPTOMS, "Flu-like Symptoms", "mdi:alert", "reports"),
(TYPE_USER_TOTAL, "Total Symptoms", "mdi:alert", "reports"),
],
}

View File

@ -1,24 +1,14 @@
"""Support for user- and CDC-based flu info sensors from Flu Near You."""
from homeassistant.const import ATTR_ATTRIBUTION, ATTR_STATE
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from .const import (
CATEGORY_CDC_REPORT,
CATEGORY_USER_REPORT,
DATA_CLIENT,
DOMAIN,
SENSORS,
TOPIC_UPDATE,
TYPE_USER_CHICK,
TYPE_USER_DENGUE,
TYPE_USER_FLU,
TYPE_USER_LEPTO,
TYPE_USER_NO_SYMPTOMS,
TYPE_USER_SYMPTOMS,
TYPE_USER_TOTAL,
from homeassistant.const import (
ATTR_ATTRIBUTION,
ATTR_STATE,
CONF_LATITUDE,
CONF_LONGITUDE,
)
from homeassistant.core import callback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import CATEGORY_CDC_REPORT, CATEGORY_USER_REPORT, DATA_COORDINATOR, DOMAIN
ATTR_CITY = "city"
ATTR_REPORTED_DATE = "reported_date"
@ -30,46 +20,85 @@ ATTR_ZIP_CODE = "zip_code"
DEFAULT_ATTRIBUTION = "Data provided by Flu Near You"
EXTENDED_TYPE_MAPPING = {
TYPE_USER_FLU: "ili",
TYPE_USER_NO_SYMPTOMS: "no_symptoms",
TYPE_USER_TOTAL: "total_surveys",
SENSOR_TYPE_CDC_LEVEL = "level"
SENSOR_TYPE_CDC_LEVEL2 = "level2"
SENSOR_TYPE_USER_CHICK = "chick"
SENSOR_TYPE_USER_DENGUE = "dengue"
SENSOR_TYPE_USER_FLU = "flu"
SENSOR_TYPE_USER_LEPTO = "lepto"
SENSOR_TYPE_USER_NO_SYMPTOMS = "none"
SENSOR_TYPE_USER_SYMPTOMS = "symptoms"
SENSOR_TYPE_USER_TOTAL = "total"
CDC_SENSORS = [
(SENSOR_TYPE_CDC_LEVEL, "CDC Level", "mdi:biohazard", None),
(SENSOR_TYPE_CDC_LEVEL2, "CDC Level 2", "mdi:biohazard", None),
]
USER_SENSORS = [
(SENSOR_TYPE_USER_CHICK, "Avian Flu Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_DENGUE, "Dengue Fever Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_FLU, "Flu Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_LEPTO, "Leptospirosis Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_NO_SYMPTOMS, "No Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_SYMPTOMS, "Flu-like Symptoms", "mdi:alert", "reports"),
(SENSOR_TYPE_USER_TOTAL, "Total Symptoms", "mdi:alert", "reports"),
]
EXTENDED_SENSOR_TYPE_MAPPING = {
SENSOR_TYPE_USER_FLU: "ili",
SENSOR_TYPE_USER_NO_SYMPTOMS: "no_symptoms",
SENSOR_TYPE_USER_TOTAL: "total_surveys",
}
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up Flu Near You sensors based on a config entry."""
fny = hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id]
coordinators = hass.data[DOMAIN][DATA_COORDINATOR][config_entry.entry_id]
async_add_entities(
[
FluNearYouSensor(fny, sensor_type, name, category, icon, unit)
for category, sensors in SENSORS.items()
for sensor_type, name, icon, unit in sensors
],
True,
)
sensors = []
for (sensor_type, name, icon, unit) in CDC_SENSORS:
sensors.append(
CdcSensor(
coordinators[CATEGORY_CDC_REPORT],
config_entry,
sensor_type,
name,
icon,
unit,
)
)
for (sensor_type, name, icon, unit) in USER_SENSORS:
sensors.append(
UserSensor(
coordinators[CATEGORY_USER_REPORT],
config_entry,
sensor_type,
name,
icon,
unit,
)
)
async_add_entities(sensors)
class FluNearYouSensor(Entity):
class FluNearYouSensor(CoordinatorEntity):
"""Define a base Flu Near You sensor."""
def __init__(self, fny, sensor_type, name, category, icon, unit):
def __init__(self, coordinator, config_entry, sensor_type, name, icon, unit):
"""Initialize the sensor."""
super().__init__(coordinator)
self._attrs = {ATTR_ATTRIBUTION: DEFAULT_ATTRIBUTION}
self._category = category
self._fny = fny
self._config_entry = config_entry
self._icon = icon
self._name = name
self._sensor_type = sensor_type
self._state = None
self._unit = unit
@property
def available(self):
"""Return True if entity is available."""
return bool(self._fny.data[self._category])
@property
def device_state_attributes(self):
"""Return the device state attributes."""
@ -93,79 +122,88 @@ class FluNearYouSensor(Entity):
@property
def unique_id(self):
"""Return a unique, Home Assistant friendly identifier for this entity."""
return f"{self._fny.latitude},{self._fny.longitude}_{self._sensor_type}"
return (
f"{self._config_entry.data[CONF_LATITUDE]},"
f"{self._config_entry.data[CONF_LONGITUDE]}_{self._sensor_type}"
)
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.update_from_latest_data()
self.async_write_ha_state()
async def async_added_to_hass(self):
"""Register callbacks."""
@callback
def update():
"""Update the state."""
self.update_from_latest_data()
self.async_write_ha_state()
self.async_on_remove(async_dispatcher_connect(self.hass, TOPIC_UPDATE, update))
await self._fny.async_register_api_interest(self._sensor_type)
await super().async_added_to_hass()
self.update_from_latest_data()
async def async_will_remove_from_hass(self) -> None:
"""Disconnect dispatcher listener when removed."""
self._fny.async_deregister_api_interest(self._sensor_type)
@callback
def update_from_latest_data(self):
"""Update the sensor."""
cdc_data = self._fny.data.get(CATEGORY_CDC_REPORT)
user_data = self._fny.data.get(CATEGORY_USER_REPORT)
raise NotImplementedError
if self._category == CATEGORY_CDC_REPORT and cdc_data:
self._attrs.update(
{
ATTR_REPORTED_DATE: cdc_data["week_date"],
ATTR_STATE: cdc_data["name"],
}
)
self._state = cdc_data[self._sensor_type]
elif self._category == CATEGORY_USER_REPORT and user_data:
self._attrs.update(
{
ATTR_CITY: user_data["local"]["city"].split("(")[0],
ATTR_REPORTED_LATITUDE: user_data["local"]["latitude"],
ATTR_REPORTED_LONGITUDE: user_data["local"]["longitude"],
ATTR_STATE: user_data["state"]["name"],
ATTR_ZIP_CODE: user_data["local"]["zip"],
}
)
if self._sensor_type in user_data["state"]["data"]:
states_key = self._sensor_type
elif self._sensor_type in EXTENDED_TYPE_MAPPING:
states_key = EXTENDED_TYPE_MAPPING[self._sensor_type]
class CdcSensor(FluNearYouSensor):
"""Define a sensor for CDC reports."""
self._attrs[ATTR_STATE_REPORTS_THIS_WEEK] = user_data["state"]["data"][
states_key
]
self._attrs[ATTR_STATE_REPORTS_LAST_WEEK] = user_data["state"][
"last_week_data"
][states_key]
@callback
def update_from_latest_data(self):
"""Update the sensor."""
self._attrs.update(
{
ATTR_REPORTED_DATE: self.coordinator.data["week_date"],
ATTR_STATE: self.coordinator.data["name"],
}
)
self._state = self.coordinator.data[self._sensor_type]
if self._sensor_type == TYPE_USER_TOTAL:
self._state = sum(
v
for k, v in user_data["local"].items()
if k
in (
TYPE_USER_CHICK,
TYPE_USER_DENGUE,
TYPE_USER_FLU,
TYPE_USER_LEPTO,
TYPE_USER_SYMPTOMS,
)
class UserSensor(FluNearYouSensor):
"""Define a sensor for user reports."""
@callback
def update_from_latest_data(self):
"""Update the sensor."""
self._attrs.update(
{
ATTR_CITY: self.coordinator.data["local"]["city"].split("(")[0],
ATTR_REPORTED_LATITUDE: self.coordinator.data["local"]["latitude"],
ATTR_REPORTED_LONGITUDE: self.coordinator.data["local"]["longitude"],
ATTR_STATE: self.coordinator.data["state"]["name"],
ATTR_ZIP_CODE: self.coordinator.data["local"]["zip"],
}
)
if self._sensor_type in self.coordinator.data["state"]["data"]:
states_key = self._sensor_type
elif self._sensor_type in EXTENDED_SENSOR_TYPE_MAPPING:
states_key = EXTENDED_SENSOR_TYPE_MAPPING[self._sensor_type]
self._attrs[ATTR_STATE_REPORTS_THIS_WEEK] = self.coordinator.data["state"][
"data"
][states_key]
self._attrs[ATTR_STATE_REPORTS_LAST_WEEK] = self.coordinator.data["state"][
"last_week_data"
][states_key]
if self._sensor_type == SENSOR_TYPE_USER_TOTAL:
self._state = sum(
v
for k, v in self.coordinator.data["local"].items()
if k
in (
SENSOR_TYPE_USER_CHICK,
SENSOR_TYPE_USER_DENGUE,
SENSOR_TYPE_USER_FLU,
SENSOR_TYPE_USER_LEPTO,
SENSOR_TYPE_USER_SYMPTOMS,
)
else:
self._state = user_data["local"][self._sensor_type]
)
else:
self._state = self.coordinator.data["local"][self._sensor_type]