Support energy/power sensors in the WeMo component (#53419)

pull/53508/head
Eric Severance 2021-07-26 08:50:22 -07:00 committed by GitHub
parent 9a000a1183
commit a6331d85ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 344 additions and 38 deletions

View File

@ -10,6 +10,7 @@ from homeassistant import config_entries
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.fan import DOMAIN as FAN_DOMAIN
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DISCOVERY, EVENT_HOMEASSISTANT_STOP
@ -28,17 +29,17 @@ MAX_CONCURRENCY = 3
# Mapping from Wemo model_name to domain.
WEMO_MODEL_DISPATCH = {
"Bridge": LIGHT_DOMAIN,
"CoffeeMaker": SWITCH_DOMAIN,
"Dimmer": LIGHT_DOMAIN,
"Humidifier": FAN_DOMAIN,
"Insight": SWITCH_DOMAIN,
"LightSwitch": SWITCH_DOMAIN,
"Maker": SWITCH_DOMAIN,
"Motion": BINARY_SENSOR_DOMAIN,
"OutdoorPlug": SWITCH_DOMAIN,
"Sensor": BINARY_SENSOR_DOMAIN,
"Socket": SWITCH_DOMAIN,
"Bridge": [LIGHT_DOMAIN],
"CoffeeMaker": [SWITCH_DOMAIN],
"Dimmer": [LIGHT_DOMAIN],
"Humidifier": [FAN_DOMAIN],
"Insight": [SENSOR_DOMAIN, SWITCH_DOMAIN],
"LightSwitch": [SWITCH_DOMAIN],
"Maker": [SWITCH_DOMAIN],
"Motion": [BINARY_SENSOR_DOMAIN],
"OutdoorPlug": [SWITCH_DOMAIN],
"Sensor": [BINARY_SENSOR_DOMAIN],
"Socket": [SWITCH_DOMAIN],
}
_LOGGER = logging.getLogger(__name__)
@ -151,32 +152,31 @@ class WemoDispatcher:
if wemo.serialnumber in self._added_serial_numbers:
return
component = WEMO_MODEL_DISPATCH.get(wemo.model_name, SWITCH_DOMAIN)
device = await async_register_device(hass, self._config_entry, wemo)
for component in WEMO_MODEL_DISPATCH.get(wemo.model_name, [SWITCH_DOMAIN]):
# Three cases:
# - First time we see component, we need to load it and initialize the backlog
# - Component is being loaded, add to backlog
# - Component is loaded, backlog is gone, dispatch discovery
# Three cases:
# - First time we see component, we need to load it and initialize the backlog
# - Component is being loaded, add to backlog
# - Component is loaded, backlog is gone, dispatch discovery
if component not in self._loaded_components:
hass.data[DOMAIN]["pending"][component] = [device]
self._loaded_components.add(component)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(
self._config_entry, component
if component not in self._loaded_components:
hass.data[DOMAIN]["pending"][component] = [device]
self._loaded_components.add(component)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(
self._config_entry, component
)
)
)
elif component in hass.data[DOMAIN]["pending"]:
hass.data[DOMAIN]["pending"][component].append(device)
elif component in hass.data[DOMAIN]["pending"]:
hass.data[DOMAIN]["pending"][component].append(device)
else:
async_dispatcher_send(
hass,
f"{DOMAIN}.{component}",
device,
)
else:
async_dispatcher_send(
hass,
f"{DOMAIN}.{component}",
device,
)
self._added_serial_numbers.add(wemo.serialnumber)

View File

@ -0,0 +1,132 @@
"""Support for power sensors in WeMo Insight devices."""
import asyncio
from datetime import datetime, timedelta
from typing import Callable
from homeassistant.components.sensor import STATE_CLASS_MEASUREMENT, SensorEntity
from homeassistant.const import (
DEVICE_CLASS_ENERGY,
DEVICE_CLASS_POWER,
ENERGY_KILO_WATT_HOUR,
POWER_WATT,
STATE_UNAVAILABLE,
)
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.typing import StateType
from homeassistant.util import Throttle, convert, dt
from .const import DOMAIN as WEMO_DOMAIN
from .entity import WemoSubscriptionEntity
from .wemo_device import DeviceWrapper
SCAN_INTERVAL = timedelta(seconds=10)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up WeMo sensors."""
async def _discovered_wemo(device: DeviceWrapper):
"""Handle a discovered Wemo device."""
@Throttle(SCAN_INTERVAL)
def update_insight_params():
device.wemo.update_insight_params()
async_add_entities(
[
InsightCurrentPower(device, update_insight_params),
InsightTodayEnergy(device, update_insight_params),
]
)
async_dispatcher_connect(hass, f"{WEMO_DOMAIN}.sensor", _discovered_wemo)
await asyncio.gather(
*(
_discovered_wemo(device)
for device in hass.data[WEMO_DOMAIN]["pending"].pop("sensor")
)
)
class InsightSensor(WemoSubscriptionEntity, SensorEntity):
"""Common base for WeMo Insight power sensors."""
def __init__(
self,
device: DeviceWrapper,
update_insight_params: Callable,
name_suffix: str,
device_class: str,
unit_of_measurement: str,
) -> None:
"""Initialize the WeMo Insight power sensor."""
super().__init__(device)
self._update_insight_params = update_insight_params
self._name_suffix = name_suffix
self._attr_device_class = device_class
self._attr_state_class = STATE_CLASS_MEASUREMENT
self._attr_unit_of_measurement = unit_of_measurement
@property
def name(self) -> str:
"""Return the name of the entity if any."""
return f"{self.wemo.name} {self._name_suffix}"
@property
def unique_id(self) -> str:
"""Return the id of this entity."""
return f"{self.wemo.serialnumber}_{self._name_suffix}"
def _update(self, force_update=True) -> None:
with self._wemo_exception_handler("update status"):
if force_update or not self.wemo.insight_params:
self._update_insight_params()
class InsightCurrentPower(InsightSensor):
"""Current instantaineous power consumption."""
def __init__(self, device: DeviceWrapper, update_insight_params: Callable) -> None:
"""Initialize the WeMo Insight power sensor."""
super().__init__(
device,
update_insight_params,
"Current Power",
DEVICE_CLASS_POWER,
POWER_WATT,
)
@property
def state(self) -> StateType:
"""Return the current power consumption."""
if "currentpower" not in self.wemo.insight_params:
return STATE_UNAVAILABLE
return convert(self.wemo.insight_params["currentpower"], float, 0.0) / 1000.0
class InsightTodayEnergy(InsightSensor):
"""Energy used today."""
def __init__(self, device: DeviceWrapper, update_insight_params: Callable) -> None:
"""Initialize the WeMo Insight power sensor."""
super().__init__(
device,
update_insight_params,
"Today Energy",
DEVICE_CLASS_ENERGY,
ENERGY_KILO_WATT_HOUR,
)
@property
def last_reset(self) -> datetime:
"""Return the time when the sensor was initialized."""
return dt.start_of_local_day()
@property
def state(self) -> StateType:
"""Return the current energy use today."""
if "todaymw" not in self.wemo.insight_params:
return STATE_UNAVAILABLE
miliwatts = convert(self.wemo.insight_params["todaymw"], float, 0.0)
return round(miliwatts / (1000.0 * 1000.0 * 60), 2)

View File

@ -19,7 +19,7 @@ MOCK_SERIAL_NUMBER = "WemoSerialNumber"
@pytest.fixture(name="pywemo_model")
def pywemo_model_fixture():
"""Fixture containing a pywemo class name used by pywemo_device_fixture."""
return "Insight"
return "LightSwitch"
@pytest.fixture(name="pywemo_registry")

View File

@ -7,6 +7,7 @@ import threading
from unittest.mock import patch
import async_timeout
import pywemo
from pywemo.ouimeaux_device.api.service import ActionException
from homeassistant.components.homeassistant import (
@ -139,10 +140,14 @@ async def test_async_update_locked_multiple_callbacks(
async def test_async_locked_update_with_exception(
hass, wemo_entity, pywemo_device, update_polling_method=None
hass,
wemo_entity,
pywemo_device,
update_polling_method=None,
expected_state=STATE_OFF,
):
"""Test that the entity becomes unavailable when communication is lost."""
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
assert hass.states.get(wemo_entity.entity_id).state == expected_state
await async_setup_component(hass, HA_DOMAIN, {})
update_polling_method = update_polling_method or pywemo_device.get_state
update_polling_method.side_effect = ActionException
@ -157,9 +162,11 @@ async def test_async_locked_update_with_exception(
assert hass.states.get(wemo_entity.entity_id).state == STATE_UNAVAILABLE
async def test_async_update_with_timeout_and_recovery(hass, wemo_entity, pywemo_device):
async def test_async_update_with_timeout_and_recovery(
hass, wemo_entity, pywemo_device, expected_state=STATE_OFF
):
"""Test that the entity becomes unavailable after a timeout, and that it recovers."""
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
assert hass.states.get(wemo_entity.entity_id).state == expected_state
await async_setup_component(hass, HA_DOMAIN, {})
event = threading.Event()
@ -170,6 +177,8 @@ async def test_async_update_with_timeout_and_recovery(hass, wemo_entity, pywemo_
if hasattr(pywemo_device, "bridge_update"):
pywemo_device.bridge_update.side_effect = get_state
elif isinstance(pywemo_device, pywemo.Insight):
pywemo_device.update_insight_params.side_effect = get_state
else:
pywemo_device.get_state.side_effect = get_state
timeout = async_timeout.timeout(0)
@ -187,4 +196,4 @@ async def test_async_update_with_timeout_and_recovery(hass, wemo_entity, pywemo_
# Check that the entity recovers and is available after the update succeeds.
event.set()
await hass.async_block_till_done()
assert hass.states.get(wemo_entity.entity_id).state == STATE_OFF
assert hass.states.get(wemo_entity.entity_id).state == expected_state

View File

@ -0,0 +1,165 @@
"""Tests for the Wemo sensor entity."""
import pytest
from homeassistant.components.homeassistant import (
DOMAIN as HA_DOMAIN,
SERVICE_UPDATE_ENTITY,
)
from homeassistant.components.wemo import CONF_DISCOVERY, CONF_STATIC
from homeassistant.components.wemo.const import DOMAIN
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNAVAILABLE
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from . import entity_test_helpers
from .conftest import MOCK_HOST, MOCK_PORT
@pytest.fixture
def pywemo_model():
"""Pywemo LightSwitch models use the switch platform."""
return "Insight"
@pytest.fixture(name="pywemo_device")
def pywemo_device_fixture(pywemo_device):
"""Fixture for WeMoDevice instances."""
pywemo_device.insight_params = {
"currentpower": 1.0,
"todaymw": 200000000.0,
"state": 0,
"onfor": 0,
"ontoday": 0,
"ontotal": 0,
"powerthreshold": 0,
}
yield pywemo_device
class InsightTestTemplate:
"""Base class for testing WeMo Insight Sensors."""
ENTITY_ID_SUFFIX: str
EXPECTED_STATE_VALUE: str
INSIGHT_PARAM_NAME: str
@pytest.fixture(name="wemo_entity")
@classmethod
async def async_wemo_entity_fixture(cls, hass, pywemo_device):
"""Fixture for a Wemo entity in hass."""
assert await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
CONF_DISCOVERY: False,
CONF_STATIC: [f"{MOCK_HOST}:{MOCK_PORT}"],
},
},
)
await hass.async_block_till_done()
entity_registry = er.async_get(hass)
correct_entity = None
to_remove = []
for entry in entity_registry.entities.values():
if entry.entity_id.endswith(cls.ENTITY_ID_SUFFIX):
correct_entity = entry
else:
to_remove.append(entry.entity_id)
for removal in to_remove:
entity_registry.async_remove(removal)
assert len(entity_registry.entities) == 1
return correct_entity
# Tests that are in common among wemo platforms. These test methods will be run
# in the scope of this test module. They will run using the pywemo_model from
# this test module (Insight).
async def test_async_update_locked_multiple_updates(
self, hass, pywemo_registry, wemo_entity, pywemo_device
):
"""Test that two hass async_update state updates do not proceed at the same time."""
pywemo_device.subscription_update.return_value = False
await entity_test_helpers.test_async_update_locked_multiple_updates(
hass,
pywemo_registry,
wemo_entity,
pywemo_device,
update_polling_method=pywemo_device.update_insight_params,
)
async def test_async_update_locked_multiple_callbacks(
self, hass, pywemo_registry, wemo_entity, pywemo_device
):
"""Test that two device callback state updates do not proceed at the same time."""
pywemo_device.subscription_update.return_value = False
await entity_test_helpers.test_async_update_locked_multiple_callbacks(
hass,
pywemo_registry,
wemo_entity,
pywemo_device,
update_polling_method=pywemo_device.update_insight_params,
)
async def test_async_update_locked_callback_and_update(
self, hass, pywemo_registry, wemo_entity, pywemo_device
):
"""Test that a callback and a state update request can't both happen at the same time."""
pywemo_device.subscription_update.return_value = False
await entity_test_helpers.test_async_update_locked_callback_and_update(
hass,
pywemo_registry,
wemo_entity,
pywemo_device,
update_polling_method=pywemo_device.update_insight_params,
)
async def test_async_locked_update_with_exception(
self, hass, wemo_entity, pywemo_device
):
"""Test that the entity becomes unavailable when communication is lost."""
await entity_test_helpers.test_async_locked_update_with_exception(
hass,
wemo_entity,
pywemo_device,
update_polling_method=pywemo_device.update_insight_params,
expected_state=self.EXPECTED_STATE_VALUE,
)
async def test_async_update_with_timeout_and_recovery(
self, hass, wemo_entity, pywemo_device
):
"""Test that the entity becomes unavailable after a timeout, and that it recovers."""
await entity_test_helpers.test_async_update_with_timeout_and_recovery(
hass, wemo_entity, pywemo_device, expected_state=self.EXPECTED_STATE_VALUE
)
async def test_state_unavailable(self, hass, wemo_entity, pywemo_device):
"""Test that there is no failure if the insight_params is not populated."""
del pywemo_device.insight_params[self.INSIGHT_PARAM_NAME]
await async_setup_component(hass, HA_DOMAIN, {})
await hass.services.async_call(
HA_DOMAIN,
SERVICE_UPDATE_ENTITY,
{ATTR_ENTITY_ID: [wemo_entity.entity_id]},
blocking=True,
)
assert hass.states.get(wemo_entity.entity_id).state == STATE_UNAVAILABLE
class TestInsightCurrentPower(InsightTestTemplate):
"""Test the InsightCurrentPower class."""
ENTITY_ID_SUFFIX = "_current_power"
EXPECTED_STATE_VALUE = "0.001"
INSIGHT_PARAM_NAME = "currentpower"
class TestInsightTodayEnergy(InsightTestTemplate):
"""Test the InsightTodayEnergy class."""
ENTITY_ID_SUFFIX = "_today_energy"
EXPECTED_STATE_VALUE = "3.33"
INSIGHT_PARAM_NAME = "todaymw"