core/homeassistant/components/sensor/haveibeenpwned.py

180 lines
6.0 KiB
Python
Raw Normal View History

"""
Support for haveibeenpwned (email breaches) sensor.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.haveibeenpwned/
"""
from datetime import timedelta
import logging
import voluptuous as vol
import requests
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL)
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
import homeassistant.util.dt as dt_util
from homeassistant.helpers.event import track_point_in_time
_LOGGER = logging.getLogger(__name__)
DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]),
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the HaveIBeenPwnedSensor sensor."""
emails = config.get(CONF_EMAIL)
data = HaveIBeenPwnedData(emails)
devices = []
for email in emails:
devices.append(HaveIBeenPwnedSensor(data, hass, email))
add_devices(devices)
# To make sure we get initial data for the sensors ignoring the normal
# throttle of 15 minutes but using an update throttle of 5 seconds
for sensor in devices:
sensor.update_nothrottle()
class HaveIBeenPwnedSensor(Entity):
"""Implementation of a HaveIBeenPwnedSensor."""
def __init__(self, data, hass, email):
"""Initialize the HaveIBeenPwnedSensor sensor."""
self._state = STATE_UNKNOWN
self._data = data
self._hass = hass
self._email = email
self._unit_of_measurement = "Breaches"
@property
def name(self):
"""Return the name of the sensor."""
return "Breaches {}".format(self._email)
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def device_state_attributes(self):
"""Return the atrributes of the sensor."""
val = {}
if self._email not in self._data.data:
return val
for idx, value in enumerate(self._data.data[self._email]):
tmpname = "breach {}".format(idx+1)
tmpvalue = "{} {}".format(
value["Title"],
dt_util.as_local(dt_util.parse_datetime(
value["AddedDate"])).strftime(DATE_STR_FORMAT))
val[tmpname] = tmpvalue
return val
def update_nothrottle(self, dummy=None):
"""Update sensor without throttle."""
self._data.update_no_throttle()
# Schedule a forced update 5 seconds in the future if the update above
# returned no data for this sensors email. This is mainly to make sure
# that we don't get HTTP Error "too many requests" and to have initial
# data after hass startup once we have the data it will update as
# normal using update
if self._email not in self._data.data:
track_point_in_time(self._hass,
self.update_nothrottle,
dt_util.now() +
MIN_TIME_BETWEEN_FORCED_UPDATES)
return
if self._email in self._data.data:
self._state = len(self._data.data[self._email])
self.update_ha_state()
def update(self):
"""Update data and see if it contains data for our email."""
self._data.update()
if self._email in self._data.data:
self._state = len(self._data.data[self._email])
class HaveIBeenPwnedData(object):
"""Class for handling the data retrieval."""
def __init__(self, emails):
"""Initialize the data object."""
self._email_count = len(emails)
self._current_index = 0
self.data = {}
self._email = emails[0]
self._emails = emails
def set_next_email(self):
"""Set the next email to be looked up."""
self._current_index = (self._current_index + 1) % self._email_count
self._email = self._emails[self._current_index]
def update_no_throttle(self):
"""Get the data for a specific email."""
self.update(no_throttle=True)
@Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES)
def update(self, **kwargs):
"""Get the latest data for current email from REST service."""
try:
url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \
format(self._email)
_LOGGER.info("Checking for breaches for email %s", self._email)
req = requests.get(url, headers={"User-agent": USER_AGENT},
allow_redirects=True, timeout=5)
except requests.exceptions.RequestException:
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'",
self._email)
return
if req.status_code == 200:
self.data[self._email] = sorted(req.json(),
key=lambda k: k["AddedDate"],
reverse=True)
# only goto next email if we had data so that
# the forced updates try this current email again
self.set_next_email()
elif req.status_code == 404:
self.data[self._email] = []
# only goto next email if we had data so that
# the forced updates try this current email again
self.set_next_email()
else:
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'"
"(HTTP Status_code = %d)", self._email,
req.status_code)