226 lines
7.4 KiB
Python
226 lines
7.4 KiB
Python
"""Support for getting data from websites with scraping."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from bs4 import BeautifulSoup
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.rest.data import RestData
|
|
from homeassistant.components.sensor import (
|
|
CONF_STATE_CLASS,
|
|
DEVICE_CLASSES_SCHEMA,
|
|
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
|
|
STATE_CLASSES_SCHEMA,
|
|
SensorEntity,
|
|
)
|
|
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
|
from homeassistant.const import (
|
|
CONF_ATTRIBUTE,
|
|
CONF_AUTHENTICATION,
|
|
CONF_DEVICE_CLASS,
|
|
CONF_HEADERS,
|
|
CONF_NAME,
|
|
CONF_PASSWORD,
|
|
CONF_RESOURCE,
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
CONF_USERNAME,
|
|
CONF_VALUE_TEMPLATE,
|
|
CONF_VERIFY_SSL,
|
|
HTTP_BASIC_AUTHENTICATION,
|
|
HTTP_DIGEST_AUTHENTICATION,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.device_registry import DeviceEntryType
|
|
from homeassistant.helpers.entity import DeviceInfo
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.template import Template
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
|
|
from .const import CONF_INDEX, CONF_SELECT, DEFAULT_NAME, DEFAULT_VERIFY_SSL, DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
ICON = "mdi:web"
|
|
|
|
PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Required(CONF_RESOURCE): cv.string,
|
|
vol.Required(CONF_SELECT): cv.string,
|
|
vol.Optional(CONF_ATTRIBUTE): cv.string,
|
|
vol.Optional(CONF_INDEX, default=0): cv.positive_int,
|
|
vol.Optional(CONF_AUTHENTICATION): vol.In(
|
|
[HTTP_BASIC_AUTHENTICATION, HTTP_DIGEST_AUTHENTICATION]
|
|
),
|
|
vol.Optional(CONF_HEADERS): vol.Schema({cv.string: cv.string}),
|
|
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
|
|
vol.Optional(CONF_PASSWORD): cv.string,
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
|
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
|
|
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
|
|
vol.Optional(CONF_USERNAME): cv.string,
|
|
vol.Optional(CONF_VALUE_TEMPLATE): cv.string,
|
|
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
|
|
}
|
|
)
|
|
|
|
|
|
async def async_setup_platform(
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
async_add_entities: AddEntitiesCallback,
|
|
discovery_info: DiscoveryInfoType | None = None,
|
|
) -> None:
|
|
"""Set up the Web scrape sensor."""
|
|
_LOGGER.warning(
|
|
# Config flow added in Home Assistant Core 2022.7, remove import flow in 2022.9
|
|
"Loading Scrape via platform setup has been deprecated in Home Assistant 2022.7 "
|
|
"Your configuration has been automatically imported and you can "
|
|
"remove it from your configuration.yaml"
|
|
)
|
|
|
|
if config.get(CONF_VALUE_TEMPLATE):
|
|
template: Template = Template(config[CONF_VALUE_TEMPLATE])
|
|
template.ensure_valid()
|
|
config[CONF_VALUE_TEMPLATE] = template.template
|
|
|
|
hass.async_create_task(
|
|
hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": SOURCE_IMPORT},
|
|
data=config,
|
|
)
|
|
)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
|
|
) -> None:
|
|
"""Set up the Scrape sensor entry."""
|
|
name: str = entry.options[CONF_NAME]
|
|
resource: str = entry.options[CONF_RESOURCE]
|
|
select: str | None = entry.options.get(CONF_SELECT)
|
|
attr: str | None = entry.options.get(CONF_ATTRIBUTE)
|
|
index: int = int(entry.options[CONF_INDEX])
|
|
unit: str | None = entry.options.get(CONF_UNIT_OF_MEASUREMENT)
|
|
device_class: str | None = entry.options.get(CONF_DEVICE_CLASS)
|
|
state_class: str | None = entry.options.get(CONF_STATE_CLASS)
|
|
value_template: str | None = entry.options.get(CONF_VALUE_TEMPLATE)
|
|
entry_id: str = entry.entry_id
|
|
|
|
val_template: Template | None = None
|
|
if value_template is not None:
|
|
val_template = Template(value_template, hass)
|
|
|
|
rest = hass.data[DOMAIN][entry.entry_id]
|
|
|
|
async_add_entities(
|
|
[
|
|
ScrapeSensor(
|
|
rest,
|
|
name,
|
|
select,
|
|
attr,
|
|
index,
|
|
val_template,
|
|
unit,
|
|
device_class,
|
|
state_class,
|
|
entry_id,
|
|
resource,
|
|
)
|
|
],
|
|
True,
|
|
)
|
|
|
|
|
|
class ScrapeSensor(SensorEntity):
|
|
"""Representation of a web scrape sensor."""
|
|
|
|
_attr_icon = ICON
|
|
|
|
def __init__(
|
|
self,
|
|
rest: RestData,
|
|
name: str,
|
|
select: str | None,
|
|
attr: str | None,
|
|
index: int,
|
|
value_template: Template | None,
|
|
unit: str | None,
|
|
device_class: str | None,
|
|
state_class: str | None,
|
|
entry_id: str,
|
|
resource: str,
|
|
) -> None:
|
|
"""Initialize a web scrape sensor."""
|
|
self.rest = rest
|
|
self._attr_native_value = None
|
|
self._select = select
|
|
self._attr = attr
|
|
self._index = index
|
|
self._value_template = value_template
|
|
self._attr_name = name
|
|
self._attr_native_unit_of_measurement = unit
|
|
self._attr_device_class = device_class
|
|
self._attr_state_class = state_class
|
|
self._attr_unique_id = entry_id
|
|
self._attr_device_info = DeviceInfo(
|
|
entry_type=DeviceEntryType.SERVICE,
|
|
identifiers={(DOMAIN, entry_id)},
|
|
manufacturer="Scrape",
|
|
name=name,
|
|
configuration_url=resource,
|
|
)
|
|
|
|
def _extract_value(self) -> Any:
|
|
"""Parse the html extraction in the executor."""
|
|
raw_data = BeautifulSoup(self.rest.data, "lxml")
|
|
_LOGGER.debug(raw_data)
|
|
|
|
try:
|
|
if self._attr is not None:
|
|
value = raw_data.select(self._select)[self._index][self._attr]
|
|
else:
|
|
tag = raw_data.select(self._select)[self._index]
|
|
if tag.name in ("style", "script", "template"):
|
|
value = tag.string
|
|
else:
|
|
value = tag.text
|
|
except IndexError:
|
|
_LOGGER.warning("Index '%s' not found in %s", self._attr, self.entity_id)
|
|
value = None
|
|
except KeyError:
|
|
_LOGGER.warning(
|
|
"Attribute '%s' not found in %s", self._attr, self.entity_id
|
|
)
|
|
value = None
|
|
_LOGGER.debug(value)
|
|
return value
|
|
|
|
async def async_update(self) -> None:
|
|
"""Get the latest data from the source and updates the state."""
|
|
await self.rest.async_update()
|
|
await self._async_update_from_rest_data()
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Ensure the data from the initial update is reflected in the state."""
|
|
await self._async_update_from_rest_data()
|
|
|
|
async def _async_update_from_rest_data(self) -> None:
|
|
"""Update state from the rest data."""
|
|
if self.rest.data is None:
|
|
_LOGGER.error("Unable to retrieve data for %s", self.name)
|
|
return
|
|
|
|
value = await self.hass.async_add_executor_job(self._extract_value)
|
|
|
|
if self._value_template is not None:
|
|
self._attr_native_value = (
|
|
self._value_template.async_render_with_possible_json_value(value, None)
|
|
)
|
|
else:
|
|
self._attr_native_value = value
|