Add expiration timestamp to cert_expiry sensors (#36399)
* Add expiration timestamp to cert_expiry sensors * Clear timestamp if cert becomes invalid * Use timezone-aware timestamps * Use DataUpdateCoordinator, split timestamp to separate sensor * Use UTC, simpler add/remove handling * Review fixes * Fix incomplete mock that fails in 3.8 * Use static timestamps, improve helper method namepull/36919/head
parent
f69fc79fd1
commit
e92e26b73a
|
@ -1,6 +1,20 @@
|
|||
"""The cert_expiry component."""
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
|
||||
from .const import DEFAULT_PORT, DOMAIN
|
||||
from .errors import TemporaryFailure, ValidationFailure
|
||||
from .helper import get_cert_expiry_timestamp
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
SCAN_INTERVAL = timedelta(hours=12)
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
|
@ -10,6 +24,20 @@ async def async_setup(hass, config):
|
|||
|
||||
async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry):
|
||||
"""Load the saved entities."""
|
||||
host = entry.data[CONF_HOST]
|
||||
port = entry.data[CONF_PORT]
|
||||
|
||||
coordinator = CertExpiryDataUpdateCoordinator(hass, host, port)
|
||||
await coordinator.async_refresh()
|
||||
|
||||
if not coordinator.last_update_success:
|
||||
raise ConfigEntryNotReady
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
hass.data[DOMAIN][entry.entry_id] = coordinator
|
||||
|
||||
if entry.unique_id is None:
|
||||
hass.config_entries.async_update_entry(entry, unique_id=f"{host}:{port}")
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.async_forward_entry_setup(entry, "sensor")
|
||||
|
@ -20,3 +48,37 @@ async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry):
|
|||
async def async_unload_entry(hass, entry):
|
||||
"""Unload a config entry."""
|
||||
return await hass.config_entries.async_forward_entry_unload(entry, "sensor")
|
||||
|
||||
|
||||
class CertExpiryDataUpdateCoordinator(DataUpdateCoordinator):
|
||||
"""Class to manage fetching Cert Expiry data from single endpoint."""
|
||||
|
||||
def __init__(self, hass, host, port):
|
||||
"""Initialize global Cert Expiry data updater."""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.cert_error = None
|
||||
self.is_cert_valid = False
|
||||
|
||||
display_port = f":{port}" if port != DEFAULT_PORT else ""
|
||||
name = f"{self.host}{display_port}"
|
||||
|
||||
super().__init__(
|
||||
hass, _LOGGER, name=name, update_interval=SCAN_INTERVAL,
|
||||
)
|
||||
|
||||
async def _async_update_data(self):
|
||||
"""Fetch certificate."""
|
||||
try:
|
||||
timestamp = await get_cert_expiry_timestamp(self.hass, self.host, self.port)
|
||||
except TemporaryFailure as err:
|
||||
raise UpdateFailed(err.args[0])
|
||||
except ValidationFailure as err:
|
||||
self.cert_error = err
|
||||
self.is_cert_valid = False
|
||||
_LOGGER.error("Certificate validation error: %s [%s]", self.host, err)
|
||||
return None
|
||||
|
||||
self.cert_error = None
|
||||
self.is_cert_valid = True
|
||||
return timestamp
|
||||
|
|
|
@ -13,7 +13,7 @@ from .errors import (
|
|||
ResolveFailed,
|
||||
ValidationFailure,
|
||||
)
|
||||
from .helper import get_cert_time_to_expiry
|
||||
from .helper import get_cert_expiry_timestamp
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -31,7 +31,7 @@ class CertexpiryConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
async def _test_connection(self, user_input=None):
|
||||
"""Test connection to the server and try to get the certificate."""
|
||||
try:
|
||||
await get_cert_time_to_expiry(
|
||||
await get_cert_expiry_timestamp(
|
||||
self.hass,
|
||||
user_input[CONF_HOST],
|
||||
user_input.get(CONF_PORT, DEFAULT_PORT),
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
"""Helper functions for the Cert Expiry platform."""
|
||||
from datetime import datetime
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
from homeassistant.util import dt
|
||||
|
||||
from .const import TIMEOUT
|
||||
from .errors import (
|
||||
ConnectionRefused,
|
||||
|
@ -23,8 +24,8 @@ def get_cert(host, port):
|
|||
return cert
|
||||
|
||||
|
||||
async def get_cert_time_to_expiry(hass, hostname, port):
|
||||
"""Return the certificate's time to expiry in days."""
|
||||
async def get_cert_expiry_timestamp(hass, hostname, port):
|
||||
"""Return the certificate's expiration timestamp."""
|
||||
try:
|
||||
cert = await hass.async_add_executor_job(get_cert, hostname, port)
|
||||
except socket.gaierror:
|
||||
|
@ -39,6 +40,4 @@ async def get_cert_time_to_expiry(hass, hostname, port):
|
|||
raise ValidationFailure(err.args[0])
|
||||
|
||||
ts_seconds = ssl.cert_time_to_seconds(cert["notAfter"])
|
||||
timestamp = datetime.fromtimestamp(ts_seconds)
|
||||
expiry = timestamp - datetime.today()
|
||||
return expiry.days
|
||||
return dt.utc_from_timestamp(ts_seconds)
|
||||
|
|
|
@ -9,18 +9,17 @@ from homeassistant.config_entries import SOURCE_IMPORT
|
|||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
CONF_PORT,
|
||||
DEVICE_CLASS_TIMESTAMP,
|
||||
EVENT_HOMEASSISTANT_START,
|
||||
TIME_DAYS,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.exceptions import PlatformNotReady
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.helpers.event import async_call_later
|
||||
from homeassistant.util import dt
|
||||
|
||||
from .const import DEFAULT_PORT, DOMAIN
|
||||
from .errors import TemporaryFailure, ValidationFailure
|
||||
from .helper import get_cert_time_to_expiry
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -56,63 +55,37 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
|||
|
||||
async def async_setup_entry(hass, entry, async_add_entities):
|
||||
"""Add cert-expiry entry."""
|
||||
days = 0
|
||||
error = None
|
||||
hostname = entry.data[CONF_HOST]
|
||||
port = entry.data[CONF_PORT]
|
||||
coordinator = hass.data[DOMAIN][entry.entry_id]
|
||||
|
||||
if entry.unique_id is None:
|
||||
hass.config_entries.async_update_entry(entry, unique_id=f"{hostname}:{port}")
|
||||
sensors = [
|
||||
SSLCertificateDays(coordinator),
|
||||
SSLCertificateTimestamp(coordinator),
|
||||
]
|
||||
|
||||
try:
|
||||
days = await get_cert_time_to_expiry(hass, hostname, port)
|
||||
except TemporaryFailure as err:
|
||||
_LOGGER.error(err)
|
||||
raise PlatformNotReady
|
||||
except ValidationFailure as err:
|
||||
error = err
|
||||
|
||||
async_add_entities(
|
||||
[SSLCertificate(hostname, port, days, error)], False,
|
||||
)
|
||||
return True
|
||||
async_add_entities(sensors, True)
|
||||
|
||||
|
||||
class SSLCertificate(Entity):
|
||||
"""Implementation of the certificate expiry sensor."""
|
||||
class CertExpiryEntity(Entity):
|
||||
"""Defines a base Cert Expiry entity."""
|
||||
|
||||
def __init__(self, server_name, server_port, days, error):
|
||||
"""Initialize the sensor."""
|
||||
self.server_name = server_name
|
||||
self.server_port = server_port
|
||||
display_port = f":{server_port}" if server_port != DEFAULT_PORT else ""
|
||||
self._name = f"Cert Expiry ({self.server_name}{display_port})"
|
||||
self._available = True
|
||||
self._error = error
|
||||
self._state = days
|
||||
self._valid = False
|
||||
if error is None:
|
||||
self._valid = True
|
||||
def __init__(self, coordinator):
|
||||
"""Initialize the Cert Expiry entity."""
|
||||
self.coordinator = coordinator
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""Connect to dispatcher listening for entity data notifications."""
|
||||
self.async_on_remove(
|
||||
self.coordinator.async_add_listener(self.async_write_ha_state)
|
||||
)
|
||||
|
||||
async def async_update(self):
|
||||
"""Update Cert Expiry entity."""
|
||||
await self.coordinator.async_request_refresh()
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Return a unique id for the sensor."""
|
||||
return f"{self.server_name}:{self.server_port}"
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self):
|
||||
"""Return the unit this state is expressed in."""
|
||||
return TIME_DAYS
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the sensor."""
|
||||
return self._state
|
||||
def available(self):
|
||||
"""Return True if entity is available."""
|
||||
return self.coordinator.last_update_success
|
||||
|
||||
@property
|
||||
def icon(self):
|
||||
|
@ -120,42 +93,68 @@ class SSLCertificate(Entity):
|
|||
return "mdi:certificate"
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
"""Return the availability of the sensor."""
|
||||
return self._available
|
||||
|
||||
async def async_update(self):
|
||||
"""Fetch the certificate information."""
|
||||
try:
|
||||
days_to_expiry = await get_cert_time_to_expiry(
|
||||
self.hass, self.server_name, self.server_port
|
||||
)
|
||||
except TemporaryFailure as err:
|
||||
_LOGGER.error(err.args[0])
|
||||
self._available = False
|
||||
return
|
||||
except ValidationFailure as err:
|
||||
_LOGGER.error(
|
||||
"Certificate validation error: %s [%s]", self.server_name, err
|
||||
)
|
||||
self._available = True
|
||||
self._error = err
|
||||
self._state = 0
|
||||
self._valid = False
|
||||
return
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception(
|
||||
"Unknown error checking %s:%s", self.server_name, self.server_port
|
||||
)
|
||||
self._available = False
|
||||
return
|
||||
|
||||
self._available = True
|
||||
self._error = None
|
||||
self._state = days_to_expiry
|
||||
self._valid = True
|
||||
def should_poll(self):
|
||||
"""Return the polling requirement of the entity."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
"""Return additional sensor state attributes."""
|
||||
return {"is_valid": self._valid, "error": str(self._error)}
|
||||
return {
|
||||
"is_valid": self.coordinator.is_cert_valid,
|
||||
"error": str(self.coordinator.cert_error),
|
||||
}
|
||||
|
||||
|
||||
class SSLCertificateDays(CertExpiryEntity):
|
||||
"""Implementation of the Cert Expiry days sensor."""
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return f"Cert Expiry ({self.coordinator.name})"
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the sensor."""
|
||||
if not self.coordinator.is_cert_valid:
|
||||
return 0
|
||||
|
||||
expiry = self.coordinator.data - dt.utcnow()
|
||||
return expiry.days
|
||||
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Return a unique id for the sensor."""
|
||||
return f"{self.coordinator.host}:{self.coordinator.port}"
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self):
|
||||
"""Return the unit this state is expressed in."""
|
||||
return TIME_DAYS
|
||||
|
||||
|
||||
class SSLCertificateTimestamp(CertExpiryEntity):
|
||||
"""Implementation of the Cert Expiry timestamp sensor."""
|
||||
|
||||
@property
|
||||
def device_class(self):
|
||||
"""Return the device class of the sensor."""
|
||||
return DEVICE_CLASS_TIMESTAMP
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return f"Cert Expiry Timestamp ({self.coordinator.name})"
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the sensor."""
|
||||
if self.coordinator.data:
|
||||
return self.coordinator.data.isoformat()
|
||||
return None
|
||||
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Return a unique id for the sensor."""
|
||||
return f"{self.coordinator.host}:{self.coordinator.port}-timestamp"
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
"""Helpers for Cert Expiry tests."""
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from homeassistant.util import dt
|
||||
|
||||
|
||||
def static_datetime():
|
||||
"""Build a datetime object for testing in the correct timezone."""
|
||||
return dt.as_utc(datetime(2020, 6, 12, 8, 0, 0))
|
||||
|
||||
|
||||
def future_timestamp(days):
|
||||
"""Create timestamp object for requested days in future."""
|
||||
delta = timedelta(days=days, minutes=1)
|
||||
return static_datetime() + delta
|
|
@ -7,6 +7,7 @@ from homeassistant.components.cert_expiry.const import DEFAULT_PORT, DOMAIN
|
|||
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
|
||||
|
||||
from .const import HOST, PORT
|
||||
from .helpers import future_timestamp
|
||||
|
||||
from tests.async_mock import patch
|
||||
from tests.common import MockConfigEntry
|
||||
|
@ -21,7 +22,7 @@ async def test_user(hass):
|
|||
assert result["step_id"] == "user"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry"
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={CONF_HOST: HOST, CONF_PORT: PORT}
|
||||
|
@ -65,12 +66,15 @@ async def test_user_with_bad_cert(hass):
|
|||
async def test_import_host_only(hass):
|
||||
"""Test import with host only."""
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry",
|
||||
return_value=1,
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
), patch(
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "import"}, data={CONF_HOST: HOST}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
|
@ -78,21 +82,21 @@ async def test_import_host_only(hass):
|
|||
assert result["data"][CONF_PORT] == DEFAULT_PORT
|
||||
assert result["result"].unique_id == f"{HOST}:{DEFAULT_PORT}"
|
||||
|
||||
with patch("homeassistant.components.cert_expiry.sensor.async_setup_entry"):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_import_host_and_port(hass):
|
||||
"""Test import with host and port."""
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry",
|
||||
return_value=1,
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
), patch(
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": "import"},
|
||||
data={CONF_HOST: HOST, CONF_PORT: PORT},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
|
@ -100,18 +104,19 @@ async def test_import_host_and_port(hass):
|
|||
assert result["data"][CONF_PORT] == PORT
|
||||
assert result["result"].unique_id == f"{HOST}:{PORT}"
|
||||
|
||||
with patch("homeassistant.components.cert_expiry.sensor.async_setup_entry"):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_import_non_default_port(hass):
|
||||
"""Test import with host and non-default port."""
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry"
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
), patch(
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "import"}, data={CONF_HOST: HOST, CONF_PORT: 888}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["title"] == f"{HOST}:888"
|
||||
|
@ -119,21 +124,21 @@ async def test_import_non_default_port(hass):
|
|||
assert result["data"][CONF_PORT] == 888
|
||||
assert result["result"].unique_id == f"{HOST}:888"
|
||||
|
||||
with patch("homeassistant.components.cert_expiry.sensor.async_setup_entry"):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_import_with_name(hass):
|
||||
"""Test import with name (deprecated)."""
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry",
|
||||
return_value=1,
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
), patch(
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": "import"},
|
||||
data={CONF_NAME: "legacy", CONF_HOST: HOST, CONF_PORT: PORT},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
|
@ -141,9 +146,6 @@ async def test_import_with_name(hass):
|
|||
assert result["data"][CONF_PORT] == PORT
|
||||
assert result["result"].unique_id == f"{HOST}:{PORT}"
|
||||
|
||||
with patch("homeassistant.components.cert_expiry.sensor.async_setup_entry"):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_bad_import(hass):
|
||||
"""Test import step."""
|
||||
|
|
|
@ -9,6 +9,7 @@ from homeassistant.setup import async_setup_component
|
|||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import HOST, PORT
|
||||
from .helpers import future_timestamp, static_datetime
|
||||
|
||||
from tests.async_mock import patch
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
@ -30,11 +31,10 @@ async def test_setup_with_config(hass):
|
|||
async_fire_time_changed(hass, next_update)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.config_flow.get_cert_expiry_timestamp"
|
||||
), patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
@ -52,8 +52,8 @@ async def test_update_unique_id(hass):
|
|||
assert not entry.unique_id
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(1),
|
||||
):
|
||||
assert await async_setup_component(hass, DOMAIN, {}) is True
|
||||
await hass.async_block_till_done()
|
||||
|
@ -62,7 +62,8 @@ async def test_update_unique_id(hass):
|
|||
assert entry.unique_id == f"{HOST}:{PORT}"
|
||||
|
||||
|
||||
async def test_unload_config_entry(hass):
|
||||
@patch("homeassistant.util.dt.utcnow", return_value=static_datetime())
|
||||
async def test_unload_config_entry(mock_now, hass):
|
||||
"""Test unloading a config entry."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
|
@ -76,8 +77,8 @@ async def test_unload_config_entry(hass):
|
|||
assert entry is config_entries[0]
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(100),
|
||||
):
|
||||
assert await async_setup_component(hass, DOMAIN, {}) is True
|
||||
await hass.async_block_till_done()
|
||||
|
|
|
@ -3,16 +3,19 @@ from datetime import timedelta
|
|||
import socket
|
||||
import ssl
|
||||
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT, STATE_UNAVAILABLE
|
||||
from homeassistant.config_entries import ENTRY_STATE_SETUP_RETRY
|
||||
from homeassistant.const import CONF_HOST, CONF_PORT, STATE_UNAVAILABLE, STATE_UNKNOWN
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import HOST, PORT
|
||||
from .helpers import future_timestamp, static_datetime
|
||||
|
||||
from tests.async_mock import patch
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
|
||||
async def test_async_setup_entry(hass):
|
||||
@patch("homeassistant.util.dt.utcnow", return_value=static_datetime())
|
||||
async def test_async_setup_entry(mock_now, hass):
|
||||
"""Test async_setup_entry."""
|
||||
entry = MockConfigEntry(
|
||||
domain="cert_expiry",
|
||||
|
@ -20,9 +23,11 @@ async def test_async_setup_entry(hass):
|
|||
unique_id=f"{HOST}:{PORT}",
|
||||
)
|
||||
|
||||
timestamp = future_timestamp(100)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=timestamp,
|
||||
):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
|
@ -35,6 +40,13 @@ async def test_async_setup_entry(hass):
|
|||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_timestamp_example_com")
|
||||
assert state is not None
|
||||
assert state.state != STATE_UNAVAILABLE
|
||||
assert state.state == timestamp.isoformat()
|
||||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
|
||||
async def test_async_setup_entry_bad_cert(hass):
|
||||
"""Test async_setup_entry with a bad/expired cert."""
|
||||
|
@ -73,11 +85,10 @@ async def test_async_setup_entry_host_unavailable(hass):
|
|||
side_effect=socket.gaierror,
|
||||
):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id) is False
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_example_com")
|
||||
assert state is None
|
||||
assert entry.state == ENTRY_STATE_SETUP_RETRY
|
||||
|
||||
next_update = dt_util.utcnow() + timedelta(seconds=45)
|
||||
async_fire_time_changed(hass, next_update)
|
||||
|
@ -91,7 +102,8 @@ async def test_async_setup_entry_host_unavailable(hass):
|
|||
assert state is None
|
||||
|
||||
|
||||
async def test_update_sensor(hass):
|
||||
@patch("homeassistant.util.dt.utcnow", return_value=static_datetime())
|
||||
async def test_update_sensor(mock_now, hass):
|
||||
"""Test async_update for sensor."""
|
||||
entry = MockConfigEntry(
|
||||
domain="cert_expiry",
|
||||
|
@ -99,9 +111,11 @@ async def test_update_sensor(hass):
|
|||
unique_id=f"{HOST}:{PORT}",
|
||||
)
|
||||
|
||||
timestamp = future_timestamp(100)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=timestamp,
|
||||
):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
|
@ -114,12 +128,21 @@ async def test_update_sensor(hass):
|
|||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_timestamp_example_com")
|
||||
assert state is not None
|
||||
assert state.state != STATE_UNAVAILABLE
|
||||
assert state.state == timestamp.isoformat()
|
||||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
timestamp2 = future_timestamp(99)
|
||||
|
||||
next_update = dt_util.utcnow() + timedelta(hours=12)
|
||||
async_fire_time_changed(hass, next_update)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=99,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=timestamp2,
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
@ -130,8 +153,16 @@ async def test_update_sensor(hass):
|
|||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_timestamp_example_com")
|
||||
assert state is not None
|
||||
assert state.state != STATE_UNAVAILABLE
|
||||
assert state.state == timestamp2.isoformat()
|
||||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
async def test_update_sensor_network_errors(hass):
|
||||
|
||||
@patch("homeassistant.util.dt.utcnow", return_value=static_datetime())
|
||||
async def test_update_sensor_network_errors(mock_now, hass):
|
||||
"""Test async_update for sensor."""
|
||||
entry = MockConfigEntry(
|
||||
domain="cert_expiry",
|
||||
|
@ -139,9 +170,11 @@ async def test_update_sensor_network_errors(hass):
|
|||
unique_id=f"{HOST}:{PORT}",
|
||||
)
|
||||
|
||||
timestamp = future_timestamp(100)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=100,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=timestamp,
|
||||
):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
|
@ -154,6 +187,13 @@ async def test_update_sensor_network_errors(hass):
|
|||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_timestamp_example_com")
|
||||
assert state is not None
|
||||
assert state.state != STATE_UNAVAILABLE
|
||||
assert state.state == timestamp.isoformat()
|
||||
assert state.attributes.get("error") == "None"
|
||||
assert state.attributes.get("is_valid")
|
||||
|
||||
next_update = dt_util.utcnow() + timedelta(hours=12)
|
||||
async_fire_time_changed(hass, next_update)
|
||||
|
||||
|
@ -170,8 +210,8 @@ async def test_update_sensor_network_errors(hass):
|
|||
async_fire_time_changed(hass, next_update)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.cert_expiry.sensor.get_cert_time_to_expiry",
|
||||
return_value=99,
|
||||
"homeassistant.components.cert_expiry.get_cert_expiry_timestamp",
|
||||
return_value=future_timestamp(99),
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
@ -198,6 +238,12 @@ async def test_update_sensor_network_errors(hass):
|
|||
assert state.attributes.get("error") == "something bad"
|
||||
assert not state.attributes.get("is_valid")
|
||||
|
||||
state = hass.states.get("sensor.cert_expiry_timestamp_example_com")
|
||||
assert state is not None
|
||||
assert state.state == STATE_UNKNOWN
|
||||
assert state.attributes.get("error") == "something bad"
|
||||
assert not state.attributes.get("is_valid")
|
||||
|
||||
next_update = dt_util.utcnow() + timedelta(hours=12)
|
||||
async_fire_time_changed(hass, next_update)
|
||||
|
||||
|
|
Loading…
Reference in New Issue