core/homeassistant/components/cert_expiry/sensor.py

162 lines
4.7 KiB
Python
Raw Normal View History

"""Counter for the days until an HTTPS (TLS) certificate will expire."""
from datetime import timedelta
import logging
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import (
CONF_HOST,
CONF_PORT,
EVENT_HOMEASSISTANT_START,
TIME_DAYS,
)
from homeassistant.core import callback
from homeassistant.exceptions import PlatformNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_call_later
from .const import DEFAULT_PORT, DOMAIN
from .errors import TemporaryFailure, ValidationFailure
from .helper import get_cert_time_to_expiry
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(hours=12)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
}
2019-07-31 19:25:30 +00:00
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up certificate expiry sensor."""
@callback
def schedule_import(_):
"""Schedule delayed import after HA is fully started."""
async_call_later(hass, 10, do_import)
@callback
def do_import(_):
"""Process YAML import."""
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=dict(config)
)
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, schedule_import)
2019-07-31 19:25:30 +00:00
async def async_setup_entry(hass, entry, async_add_entities):
"""Add cert-expiry entry."""
days = 0
error = None
hostname = entry.data[CONF_HOST]
port = entry.data[CONF_PORT]
if entry.unique_id is None:
hass.config_entries.async_update_entry(entry, unique_id=f"{hostname}:{port}")
try:
days = await get_cert_time_to_expiry(hass, hostname, port)
except TemporaryFailure as err:
_LOGGER.error(err)
raise PlatformNotReady
except ValidationFailure as err:
error = err
async_add_entities(
[SSLCertificate(hostname, port, days, error)], False,
)
return True
class SSLCertificate(Entity):
"""Implementation of the certificate expiry sensor."""
def __init__(self, server_name, server_port, days, error):
"""Initialize the sensor."""
self.server_name = server_name
self.server_port = server_port
display_port = f":{server_port}" if server_port != DEFAULT_PORT else ""
self._name = f"Cert Expiry ({self.server_name}{display_port})"
self._available = True
self._error = error
self._state = days
self._valid = False
if error is None:
self._valid = True
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def unique_id(self):
"""Return a unique id for the sensor."""
return f"{self.server_name}:{self.server_port}"
@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
return TIME_DAYS
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def icon(self):
"""Icon to use in the frontend, if any."""
2019-07-31 19:25:30 +00:00
return "mdi:certificate"
@property
def available(self):
"""Return the availability of the sensor."""
return self._available
async def async_update(self):
"""Fetch the certificate information."""
try:
days_to_expiry = await get_cert_time_to_expiry(
self.hass, self.server_name, self.server_port
)
except TemporaryFailure as err:
_LOGGER.error(err.args[0])
self._available = False
return
except ValidationFailure as err:
_LOGGER.error(
"Certificate validation error: %s [%s]", self.server_name, err
)
self._available = True
self._error = err
self._state = 0
self._valid = False
return
except Exception: # pylint: disable=broad-except
_LOGGER.exception(
"Unknown error checking %s:%s", self.server_name, self.server_port
)
self._available = False
return
self._available = True
self._error = None
self._state = days_to_expiry
self._valid = True
@property
def device_state_attributes(self):
"""Return additional sensor state attributes."""
return {"is_valid": self._valid, "error": str(self._error)}