core/homeassistant/components/iqvia/__init__.py

161 lines
5.1 KiB
Python
Raw Normal View History

"""Support for IQVIA."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from datetime import timedelta
from functools import partial
2022-01-11 20:23:26 +00:00
from typing import Any, cast
from pyiqvia import Client
from pyiqvia.errors import IQVIAError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import aiohttp_client
2021-09-12 15:34:44 +00:00
from homeassistant.helpers.entity import EntityDescription
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
2019-07-31 19:25:30 +00:00
)
from .const import (
2019-07-31 19:25:30 +00:00
CONF_ZIP_CODE,
DOMAIN,
LOGGER,
2019-07-31 19:25:30 +00:00
TYPE_ALLERGY_FORECAST,
TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_OUTLOOK,
TYPE_ASTHMA_FORECAST,
TYPE_ASTHMA_INDEX,
TYPE_DISEASE_FORECAST,
TYPE_DISEASE_INDEX,
)
2019-07-31 19:25:30 +00:00
DEFAULT_ATTRIBUTION = "Data provided by IQVIA™"
DEFAULT_SCAN_INTERVAL = timedelta(minutes=30)
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up IQVIA as config entry."""
if not entry.unique_id:
# If the config entry doesn't already have a unique ID, set one:
hass.config_entries.async_update_entry(
entry, **{"unique_id": entry.data[CONF_ZIP_CODE]}
)
websession = aiohttp_client.async_get_clientsession(hass)
2020-12-18 20:12:16 +00:00
client = Client(entry.data[CONF_ZIP_CODE], session=websession)
# We disable the client's request retry abilities here to avoid a lengthy (and
# blocking) startup:
client.disable_request_retries()
async def async_get_data_from_api(
api_coro: Callable[..., Awaitable]
) -> dict[str, Any]:
"""Get data from a particular API coroutine."""
try:
data = await api_coro()
except IQVIAError as err:
raise UpdateFailed from err
2022-01-11 20:23:26 +00:00
return cast(dict[str, Any], data)
coordinators = {}
init_data_update_tasks = []
for sensor_type, api_coro in (
(TYPE_ALLERGY_FORECAST, client.allergens.extended),
(TYPE_ALLERGY_INDEX, client.allergens.current),
(TYPE_ALLERGY_OUTLOOK, client.allergens.outlook),
(TYPE_ASTHMA_FORECAST, client.asthma.extended),
(TYPE_ASTHMA_INDEX, client.asthma.current),
(TYPE_DISEASE_FORECAST, client.disease.extended),
(TYPE_DISEASE_INDEX, client.disease.current),
):
coordinator = coordinators[sensor_type] = DataUpdateCoordinator(
hass,
LOGGER,
name=f"{entry.data[CONF_ZIP_CODE]} {sensor_type}",
update_interval=DEFAULT_SCAN_INTERVAL,
update_method=partial(async_get_data_from_api, api_coro),
)
init_data_update_tasks.append(coordinator.async_refresh())
results = await asyncio.gather(*init_data_update_tasks, return_exceptions=True)
if all(isinstance(result, Exception) for result in results):
# The IQVIA API can be selectively flaky, meaning that any number of the setup
# API calls could fail. We only retry integration setup if *all* of the initial
# API calls fail:
raise ConfigEntryNotReady()
# Once we've successfully authenticated, we re-enable client request retries:
client.enable_request_retries()
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinators
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload an OpenUV config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
2021-09-12 15:34:44 +00:00
class IQVIAEntity(CoordinatorEntity):
"""Define a base IQVIA entity."""
def __init__(
self,
coordinator: DataUpdateCoordinator,
entry: ConfigEntry,
2021-09-12 15:34:44 +00:00
description: EntityDescription,
) -> None:
"""Initialize."""
super().__init__(coordinator)
self._attr_extra_state_attributes = {}
2021-09-12 15:34:44 +00:00
self._attr_unique_id = f"{entry.data[CONF_ZIP_CODE]}_{description.key}"
self._entry = entry
2021-09-12 15:34:44 +00:00
self.entity_description = description
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
if not self.coordinator.last_update_success:
return
self.update_from_latest_data()
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await super().async_added_to_hass()
2021-09-12 15:34:44 +00:00
if self.entity_description.key == TYPE_ALLERGY_FORECAST:
self.async_on_remove(
self.hass.data[DOMAIN][self._entry.entry_id][
TYPE_ALLERGY_OUTLOOK
].async_add_listener(self._handle_coordinator_update)
)
self.update_from_latest_data()
@callback
def update_from_latest_data(self) -> None:
"""Update the entity from the latest data."""
raise NotImplementedError