Add energy integration (#52001)

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
Co-authored-by: Erik <erik@montnemery.com>
pull/53513/head
Bram Kragten 2021-07-26 18:37:37 +02:00 committed by GitHub
parent 4b189bd8c5
commit fcc6ea7497
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1137 additions and 8 deletions

View File

@ -34,6 +34,7 @@ homeassistant.components.dsmr.*
homeassistant.components.dunehd.*
homeassistant.components.elgato.*
homeassistant.components.esphome.*
homeassistant.components.energy.*
homeassistant.components.fastdotcom.*
homeassistant.components.fitbit.*
homeassistant.components.forecast_solar.*

View File

@ -141,6 +141,7 @@ homeassistant/components/emby/* @mezz64
homeassistant/components/emoncms/* @borpin
homeassistant/components/emonitor/* @bdraco
homeassistant/components/emulated_kasa/* @kbickar
homeassistant/components/energy/* @home_assistant/core
homeassistant/components/enigma2/* @fbradyirl
homeassistant/components/enocean/* @bdurrer
homeassistant/components/enphase_envoy/* @gtdiehl

View File

@ -7,6 +7,7 @@
"cloud",
"counter",
"dhcp",
"energy",
"frontend",
"history",
"input_boolean",

View File

@ -0,0 +1,25 @@
"""The Energy integration."""
from __future__ import annotations
from homeassistant.components import frontend
from homeassistant.core import HomeAssistant
from homeassistant.helpers import discovery
from homeassistant.helpers.typing import ConfigType
from . import websocket_api
from .const import DOMAIN
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up Energy."""
websocket_api.async_setup(hass)
frontend.async_register_built_in_panel(hass, DOMAIN, DOMAIN, "mdi:lightning-bolt")
hass.async_create_task(
discovery.async_load_platform(hass, "sensor", DOMAIN, {}, config)
)
hass.data[DOMAIN] = {
"cost_sensors": {},
}
return True

View File

@ -0,0 +1,3 @@
"""Constants for the Energy integration."""
DOMAIN = "energy"

View File

@ -0,0 +1,264 @@
"""Energy data."""
from __future__ import annotations
import asyncio
from collections import Counter
from collections.abc import Awaitable
from typing import Callable, Literal, Optional, TypedDict, Union, cast
import voluptuous as vol
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, singleton, storage
from .const import DOMAIN
STORAGE_VERSION = 1
STORAGE_KEY = DOMAIN
@singleton.singleton(f"{DOMAIN}_manager")
async def async_get_manager(hass: HomeAssistant) -> EnergyManager:
"""Return an initialized data manager."""
manager = EnergyManager(hass)
await manager.async_initialize()
return manager
class FlowFromGridSourceType(TypedDict):
"""Dictionary describing the 'from' stat for the grid source."""
# statistic_id of a an energy meter (kWh)
stat_energy_from: str
# statistic_id of costs ($) incurred from the energy meter
# If set to None and entity_energy_from and entity_energy_price are configured,
# an EnergyCostSensor will be automatically created
stat_cost: str | None
# Used to generate costs if stat_cost is set to None
entity_energy_from: str | None # entity_id of an energy meter (kWh), entity_id of the energy meter for stat_energy_from
entity_energy_price: str | None # entity_id of an entity providing price ($/kWh)
number_energy_price: float | None # Price for energy ($/kWh)
class FlowToGridSourceType(TypedDict):
"""Dictionary describing the 'to' stat for the grid source."""
# kWh meter
stat_energy_to: str
# statistic_id of compensation ($) received for contributing back
# If set to None and entity_energy_from and entity_energy_price are configured,
# an EnergyCostSensor will be automatically created
stat_compensation: str | None
# Used to generate costs if stat_compensation is set to None
entity_energy_from: str | None # entity_id of an energy meter (kWh), entity_id of the energy meter for stat_energy_from
entity_energy_price: str | None # entity_id of an entity providing price ($/kWh)
number_energy_price: float | None # Price for energy ($/kWh)
class GridSourceType(TypedDict):
"""Dictionary holding the source of grid energy consumption."""
type: Literal["grid"]
flow_from: list[FlowFromGridSourceType]
flow_to: list[FlowToGridSourceType]
cost_adjustment_day: float
class SolarSourceType(TypedDict):
"""Dictionary holding the source of energy production."""
type: Literal["solar"]
stat_energy_from: str
config_entry_solar_forecast: list[str] | None
SourceType = Union[GridSourceType, SolarSourceType]
class DeviceConsumption(TypedDict):
"""Dictionary holding the source of individual device consumption."""
# This is an ever increasing value
stat_consumption: str
class EnergyPreferences(TypedDict):
"""Dictionary holding the energy data."""
currency: str
energy_sources: list[SourceType]
device_consumption: list[DeviceConsumption]
class EnergyPreferencesUpdate(EnergyPreferences, total=False):
"""all types optional."""
def _flow_from_ensure_single_price(
val: FlowFromGridSourceType,
) -> FlowFromGridSourceType:
"""Ensure we use a single price source."""
if (
val["entity_energy_price"] is not None
and val["number_energy_price"] is not None
):
raise vol.Invalid("Define either an entity or a fixed number for the price")
return val
FLOW_FROM_GRID_SOURCE_SCHEMA = vol.All(
vol.Schema(
{
vol.Required("stat_energy_from"): str,
vol.Optional("stat_cost"): vol.Any(str, None),
vol.Optional("entity_energy_from"): vol.Any(str, None),
vol.Optional("entity_energy_price"): vol.Any(str, None),
vol.Optional("number_energy_price"): vol.Any(vol.Coerce(float), None),
}
),
_flow_from_ensure_single_price,
)
FLOW_TO_GRID_SOURCE_SCHEMA = vol.Schema(
{
vol.Required("stat_energy_to"): str,
vol.Optional("stat_compensation"): vol.Any(str, None),
vol.Optional("entity_energy_to"): vol.Any(str, None),
vol.Optional("entity_energy_price"): vol.Any(str, None),
vol.Optional("number_energy_price"): vol.Any(vol.Coerce(float), None),
}
)
def _generate_unique_value_validator(key: str) -> Callable[[list[dict]], list[dict]]:
"""Generate a validator that ensures a value is only used once."""
def validate_uniqueness(
val: list[dict],
) -> list[dict]:
"""Ensure that the user doesn't add duplicate values."""
counts = Counter(flow_from[key] for flow_from in val)
for value, count in counts.items():
if count > 1:
raise vol.Invalid(f"Cannot specify {value} more than once")
return val
return validate_uniqueness
GRID_SOURCE_SCHEMA = vol.Schema(
{
vol.Required("type"): "grid",
vol.Required("flow_from"): vol.All(
[FLOW_FROM_GRID_SOURCE_SCHEMA],
_generate_unique_value_validator("stat_energy_from"),
),
vol.Required("flow_to"): vol.All(
[FLOW_TO_GRID_SOURCE_SCHEMA],
_generate_unique_value_validator("stat_energy_to"),
),
vol.Required("cost_adjustment_day"): vol.Coerce(float),
}
)
SOLAR_SOURCE_SCHEMA = vol.Schema(
{
vol.Required("type"): "solar",
vol.Required("stat_energy_from"): str,
vol.Optional("config_entry_solar_forecast"): vol.Any([str], None),
}
)
def check_type_limits(value: list[SourceType]) -> list[SourceType]:
"""Validate that we don't have too many of certain types."""
types = Counter([val["type"] for val in value])
if types.get("grid", 0) > 1:
raise vol.Invalid("You cannot have more than 1 grid source")
return value
ENERGY_SOURCE_SCHEMA = vol.All(
vol.Schema(
[
cv.key_value_schemas(
"type",
{
"grid": GRID_SOURCE_SCHEMA,
"solar": SOLAR_SOURCE_SCHEMA,
},
)
]
),
check_type_limits,
)
DEVICE_CONSUMPTION_SCHEMA = vol.Schema(
{
vol.Required("stat_consumption"): str,
}
)
class EnergyManager:
"""Manage the instance energy prefs."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize energy manager."""
self._hass = hass
self._store = storage.Store(hass, STORAGE_VERSION, STORAGE_KEY)
self.data: EnergyPreferences | None = None
self._update_listeners: list[Callable[[], Awaitable]] = []
async def async_initialize(self) -> None:
"""Initialize the energy integration."""
self.data = cast(Optional[EnergyPreferences], await self._store.async_load())
@staticmethod
def default_preferences() -> EnergyPreferences:
"""Return default preferences."""
return {
"currency": "",
"energy_sources": [],
"device_consumption": [],
}
async def async_update(self, update: EnergyPreferencesUpdate) -> None:
"""Update the preferences."""
if self.data is None:
data = EnergyManager.default_preferences()
else:
data = self.data.copy()
for key in (
"currency",
"energy_sources",
"device_consumption",
):
if key in update:
data[key] = update[key] # type: ignore
self.data = data
self._store.async_delay_save(lambda: cast(dict, self.data), 60)
if not self._update_listeners:
return
await asyncio.gather(*(listener() for listener in self._update_listeners))
@callback
def async_listen_updates(self, update_listener: Callable[[], Awaitable]) -> None:
"""Listen for data updates."""
self._update_listeners.append(update_listener)

View File

@ -0,0 +1,8 @@
{
"domain": "energy",
"name": "Energy",
"documentation": "https://www.home-assistant.io/integrations/energy",
"codeowners": ["@home_assistant/core"],
"iot_class": "calculated",
"dependencies": ["websocket_api", "history"]
}

View File

@ -0,0 +1,258 @@
"""Helper sensor for calculating utility costs."""
from __future__ import annotations
from dataclasses import dataclass
from functools import partial
from typing import Any, Final, Literal, TypeVar, cast
from homeassistant.components.sensor import (
ATTR_LAST_RESET,
DEVICE_CLASS_MONETARY,
STATE_CLASS_MEASUREMENT,
SensorEntity,
)
from homeassistant.core import HomeAssistant, State, callback, split_entity_id
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
import homeassistant.util.dt as dt_util
from .const import DOMAIN
from .data import EnergyManager, async_get_manager
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the energy sensors."""
manager = await async_get_manager(hass)
process_now = partial(_process_manager_data, hass, manager, async_add_entities, {})
manager.async_listen_updates(process_now)
if manager.data:
await process_now()
T = TypeVar("T")
@dataclass
class FlowAdapter:
"""Adapter to allow flows to be used as sensors."""
flow_type: Literal["flow_from", "flow_to"]
stat_energy_key: Literal["stat_energy_from", "stat_energy_to"]
entity_energy_key: Literal["entity_energy_from", "entity_energy_to"]
total_money_key: Literal["stat_cost", "stat_compensation"]
name_suffix: str
entity_id_suffix: str
FLOW_ADAPTERS: Final = (
FlowAdapter(
"flow_from",
"stat_energy_from",
"entity_energy_from",
"stat_cost",
"Cost",
"cost",
),
FlowAdapter(
"flow_to",
"stat_energy_to",
"entity_energy_to",
"stat_compensation",
"Compensation",
"compensation",
),
)
async def _process_manager_data(
hass: HomeAssistant,
manager: EnergyManager,
async_add_entities: AddEntitiesCallback,
current_entities: dict[tuple[str, str], EnergyCostSensor],
) -> None:
"""Process updated data."""
to_add: list[SensorEntity] = []
to_remove = dict(current_entities)
async def finish() -> None:
if to_add:
async_add_entities(to_add)
for key, entity in to_remove.items():
current_entities.pop(key)
await entity.async_remove()
if not manager.data:
await finish()
return
for energy_source in manager.data["energy_sources"]:
if energy_source["type"] != "grid":
continue
for adapter in FLOW_ADAPTERS:
for flow in energy_source[adapter.flow_type]:
# Opting out of the type complexity because can't get it to work
untyped_flow = cast(dict, flow)
# No need to create an entity if we already have a cost stat
if untyped_flow.get(adapter.total_money_key) is not None:
continue
# This is unique among all flow_from's
key = (adapter.flow_type, untyped_flow[adapter.stat_energy_key])
# Make sure the right data is there
# If the entity existed, we don't pop it from to_remove so it's removed
if untyped_flow.get(adapter.entity_energy_key) is None or (
untyped_flow.get("entity_energy_price") is None
and untyped_flow.get("number_energy_price") is None
):
continue
current_entity = to_remove.pop(key, None)
if current_entity:
current_entity.update_config(untyped_flow)
continue
current_entities[key] = EnergyCostSensor(
adapter,
manager.data["currency"],
untyped_flow,
)
to_add.append(current_entities[key])
await finish()
class EnergyCostSensor(SensorEntity):
"""Calculate costs incurred by consuming energy.
This is intended as a fallback for when no specific cost sensor is available for the
utility.
"""
def __init__(
self,
adapter: FlowAdapter,
currency: str,
flow: dict,
) -> None:
"""Initialize the sensor."""
super().__init__()
self._adapter = adapter
self.entity_id = f"{flow[adapter.entity_energy_key]}_{adapter.entity_id_suffix}"
self._attr_device_class = DEVICE_CLASS_MONETARY
self._attr_state_class = STATE_CLASS_MEASUREMENT
self._attr_unit_of_measurement = currency
self._flow = flow
self._last_energy_sensor_state: State | None = None
def _reset(self, energy_state: State) -> None:
"""Reset the cost sensor."""
self._attr_state = 0.0
self._attr_last_reset = dt_util.utcnow()
self._last_energy_sensor_state = energy_state
self.async_write_ha_state()
@callback
def _update_cost(self) -> None:
"""Update incurred costs."""
energy_state = self.hass.states.get(
cast(str, self._flow[self._adapter.entity_energy_key])
)
if energy_state is None or ATTR_LAST_RESET not in energy_state.attributes:
return
try:
energy = float(energy_state.state)
except ValueError:
return
# Determine energy price
if self._flow["entity_energy_price"] is not None:
energy_price_state = self.hass.states.get(self._flow["entity_energy_price"])
if energy_price_state is None:
return
try:
energy_price = float(energy_price_state.state)
except ValueError:
return
else:
energy_price_state = None
energy_price = cast(float, self._flow["number_energy_price"])
if self._last_energy_sensor_state is None:
# Initialize as it's the first time all required entities are in place.
self._reset(energy_state)
return
cur_value = cast(float, self._attr_state)
if (
energy_state.attributes[ATTR_LAST_RESET]
!= self._last_energy_sensor_state.attributes[ATTR_LAST_RESET]
):
# Energy meter was reset, reset cost sensor too
self._reset(energy_state)
else:
# Update with newly incurred cost
old_energy_value = float(self._last_energy_sensor_state.state)
self._attr_state = cur_value + (energy - old_energy_value) * energy_price
self._last_energy_sensor_state = energy_state
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
energy_state = self.hass.states.get(self._flow[self._adapter.entity_energy_key])
if energy_state:
name = energy_state.name
else:
name = split_entity_id(self._flow[self._adapter.entity_energy_key])[
0
].replace("_", " ")
self._attr_name = f"{name} {self._adapter.name_suffix}"
self._update_cost()
# Store stat ID in hass.data so frontend can look it up
self.hass.data[DOMAIN]["cost_sensors"][
self._flow[self._adapter.entity_energy_key]
] = self.entity_id
@callback
def async_state_changed_listener(*_: Any) -> None:
"""Handle child updates."""
self._update_cost()
self.async_write_ha_state()
self.async_on_remove(
async_track_state_change_event(
self.hass,
cast(str, self._flow[self._adapter.entity_energy_key]),
async_state_changed_listener,
)
)
async def async_will_remove_from_hass(self) -> None:
"""Handle removing from hass."""
self.hass.data[DOMAIN]["cost_sensors"].pop(
self._flow[self._adapter.entity_energy_key]
)
await super().async_will_remove_from_hass()
@callback
def update_config(self, flow: dict) -> None:
"""Update the config."""
self._flow = flow

View File

@ -0,0 +1,3 @@
{
"title": "Energy"
}

View File

@ -0,0 +1,3 @@
{
"title": "Energy"
}

View File

@ -0,0 +1,116 @@
"""The Energy websocket API."""
from __future__ import annotations
import asyncio
import functools
from typing import Any, Awaitable, Callable, Dict, cast
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN
from .data import (
DEVICE_CONSUMPTION_SCHEMA,
ENERGY_SOURCE_SCHEMA,
EnergyManager,
EnergyPreferencesUpdate,
async_get_manager,
)
EnergyWebSocketCommandHandler = Callable[
[HomeAssistant, websocket_api.ActiveConnection, Dict[str, Any], "EnergyManager"],
None,
]
AsyncEnergyWebSocketCommandHandler = Callable[
[HomeAssistant, websocket_api.ActiveConnection, Dict[str, Any], "EnergyManager"],
Awaitable[None],
]
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up the energy websocket API."""
websocket_api.async_register_command(hass, ws_get_prefs)
websocket_api.async_register_command(hass, ws_save_prefs)
websocket_api.async_register_command(hass, ws_info)
def _ws_with_manager(
func: Any,
) -> websocket_api.WebSocketCommandHandler:
"""Decorate a function to pass in a manager."""
@websocket_api.async_response
@functools.wraps(func)
async def with_manager(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
manager = await async_get_manager(hass)
result = func(hass, connection, msg, manager)
if asyncio.iscoroutine(result):
await result
return with_manager
@websocket_api.websocket_command(
{
vol.Required("type"): "energy/get_prefs",
}
)
@_ws_with_manager
@callback
def ws_get_prefs(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
manager: EnergyManager,
) -> None:
"""Handle get prefs command."""
if manager.data is None:
connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "No prefs")
return
connection.send_result(msg["id"], manager.data)
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "energy/save_prefs",
vol.Optional("currency"): str,
vol.Optional("energy_sources"): ENERGY_SOURCE_SCHEMA,
vol.Optional("device_consumption"): [DEVICE_CONSUMPTION_SCHEMA],
}
)
@_ws_with_manager
async def ws_save_prefs(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
manager: EnergyManager,
) -> None:
"""Handle get prefs command."""
msg_id = msg.pop("id")
msg.pop("type")
await manager.async_update(cast(EnergyPreferencesUpdate, msg))
connection.send_result(msg_id, manager.data)
@websocket_api.websocket_command(
{
vol.Required("type"): "energy/info",
}
)
@callback
def ws_info(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Handle get info command."""
connection.send_result(msg["id"], hass.data[DOMAIN])

View File

@ -93,8 +93,10 @@ def ws_list_forecasts(
for config_entry_id, coordinator in hass.data[DOMAIN].items():
forecasts[config_entry_id] = {
timestamp.isoformat(): val
for timestamp, val in coordinator.data.watts.items()
"wh_hours": {
timestamp.isoformat(): val
for timestamp, val in coordinator.data.wh_hours.items()
}
}
connection.send_result(msg["id"], forecasts)

View File

@ -164,7 +164,7 @@ async def ws_get_statistics_during_period(
@websocket_api.websocket_command(
{
vol.Required("type"): "history/list_statistic_ids",
vol.Optional("statistic_type"): str,
vol.Optional("statistic_type"): vol.Any("sum", "mean"),
}
)
@websocket_api.require_admin

View File

@ -21,6 +21,8 @@ from .const import ( # noqa: F401
ERR_UNAUTHORIZED,
ERR_UNKNOWN_COMMAND,
ERR_UNKNOWN_ERROR,
AsyncWebSocketCommandHandler,
WebSocketCommandHandler,
)
from .decorators import ( # noqa: F401
async_response,

View File

@ -385,6 +385,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.energy.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.fastdotcom.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -0,0 +1 @@
"""Tests for the Energy integration."""

View File

@ -0,0 +1,220 @@
"""Test the Energy sensors."""
import copy
from datetime import timedelta
from unittest.mock import patch
import pytest
from homeassistant.components.energy import data
from homeassistant.components.sensor import (
ATTR_LAST_RESET,
ATTR_STATE_CLASS,
STATE_CLASS_MEASUREMENT,
)
from homeassistant.components.sensor.recorder import compile_statistics
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
DEVICE_CLASS_MONETARY,
)
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_init_recorder_component
from tests.components.recorder.common import async_wait_recording_done_without_instance
async def setup_integration(hass):
"""Set up the integration."""
assert await async_setup_component(
hass, "energy", {"recorder": {"db_url": "sqlite://"}}
)
await hass.async_block_till_done()
async def test_cost_sensor_no_states(hass, hass_storage) -> None:
"""Test sensors are created."""
energy_data = data.EnergyManager.default_preferences()
energy_data["energy_sources"].append(
{
"type": "grid",
"flow_from": [
{
"stat_energy_from": "foo",
"entity_energy_from": "foo",
"stat_cost": None,
"entity_energy_price": "bar",
"number_energy_price": None,
}
],
"cost_adjustment_day": 0,
}
)
hass_storage[data.STORAGE_KEY] = {
"version": 1,
"data": energy_data,
}
await setup_integration(hass)
# TODO: No states, should the cost entity refuse to setup?
@pytest.mark.parametrize("initial_energy,initial_cost", [(0, "0.0"), (None, "unknown")])
@pytest.mark.parametrize(
"price_entity,fixed_price", [("sensor.energy_price", None), (None, 1)]
)
@pytest.mark.parametrize(
"usage_sensor_entity_id,cost_sensor_entity_id,flow_type",
[
("sensor.energy_consumption", "sensor.energy_consumption_cost", "flow_from"),
(
"sensor.energy_production",
"sensor.energy_production_compensation",
"flow_to",
),
],
)
async def test_cost_sensor_price_entity(
hass,
hass_storage,
hass_ws_client,
initial_energy,
initial_cost,
price_entity,
fixed_price,
usage_sensor_entity_id,
cost_sensor_entity_id,
flow_type,
) -> None:
"""Test energy cost price from sensor entity."""
def _compile_statistics(_):
return compile_statistics(hass, now, now + timedelta(seconds=1))
await async_init_recorder_component(hass)
energy_data = data.EnergyManager.default_preferences()
energy_data["energy_sources"].append(
{
"type": "grid",
"flow_from": [
{
"stat_energy_from": "sensor.energy_consumption",
"entity_energy_from": "sensor.energy_consumption",
"stat_cost": None,
"entity_energy_price": price_entity,
"number_energy_price": fixed_price,
}
]
if flow_type == "flow_from"
else [],
"flow_to": [
{
"stat_energy_to": "sensor.energy_production",
"entity_energy_to": "sensor.energy_production",
"stat_compensation": None,
"entity_energy_price": price_entity,
"number_energy_price": fixed_price,
}
]
if flow_type == "flow_to"
else [],
"cost_adjustment_day": 0,
}
)
hass_storage[data.STORAGE_KEY] = {
"version": 1,
"data": energy_data,
}
now = dt_util.utcnow()
last_reset = dt_util.utc_from_timestamp(0).isoformat()
# Optionally initialize dependent entities
if initial_energy is not None:
hass.states.async_set(
usage_sensor_entity_id, initial_energy, {"last_reset": last_reset}
)
hass.states.async_set("sensor.energy_price", "1")
with patch("homeassistant.util.dt.utcnow", return_value=now):
await setup_integration(hass)
state = hass.states.get(cost_sensor_entity_id)
assert state.state == initial_cost
assert state.attributes[ATTR_DEVICE_CLASS] == DEVICE_CLASS_MONETARY
if initial_cost != "unknown":
assert state.attributes[ATTR_LAST_RESET] == now.isoformat()
assert state.attributes[ATTR_STATE_CLASS] == STATE_CLASS_MEASUREMENT
assert state.attributes[ATTR_UNIT_OF_MEASUREMENT] == ""
# Optional late setup of dependent entities
if initial_energy is None:
with patch("homeassistant.util.dt.utcnow", return_value=now):
hass.states.async_set(
usage_sensor_entity_id, "0", {"last_reset": last_reset}
)
await hass.async_block_till_done()
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "0.0"
assert state.attributes[ATTR_DEVICE_CLASS] == DEVICE_CLASS_MONETARY
assert state.attributes[ATTR_LAST_RESET] == now.isoformat()
assert state.attributes[ATTR_STATE_CLASS] == STATE_CLASS_MEASUREMENT
assert state.attributes[ATTR_UNIT_OF_MEASUREMENT] == ""
# # Unique ID temp disabled
# # entity_registry = er.async_get(hass)
# # entry = entity_registry.async_get(cost_sensor_entity_id)
# # assert entry.unique_id == "energy_energy_consumption cost"
# Energy use bumped to 10 kWh
hass.states.async_set(usage_sensor_entity_id, "10", {"last_reset": last_reset})
await hass.async_block_till_done()
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "10.0" # 0 € + (10-0) kWh * 1 €/kWh = 10 €
# Nothing happens when price changes
if price_entity is not None:
hass.states.async_set(price_entity, "2")
await hass.async_block_till_done()
else:
energy_data = copy.deepcopy(energy_data)
energy_data["energy_sources"][0][flow_type][0]["number_energy_price"] = 2
client = await hass_ws_client(hass)
await client.send_json({"id": 5, "type": "energy/save_prefs", **energy_data})
msg = await client.receive_json()
assert msg["success"]
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "10.0" # 10 € + (10-10) kWh * 2 €/kWh = 10 €
# Additional consumption is using the new price
hass.states.async_set(usage_sensor_entity_id, "14.5", {"last_reset": last_reset})
await hass.async_block_till_done()
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "19.0" # 10 € + (14.5-10) kWh * 2 €/kWh = 19 €
# Check generated statistics
await async_wait_recording_done_without_instance(hass)
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
assert cost_sensor_entity_id in statistics
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 19.0
# Energy sensor is reset, with start point at 4kWh
last_reset = (now + timedelta(seconds=1)).isoformat()
hass.states.async_set(usage_sensor_entity_id, "4", {"last_reset": last_reset})
await hass.async_block_till_done()
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "0.0" # 0 € + (4-4) kWh * 2 €/kWh = 0 €
# Energy use bumped to 10 kWh
hass.states.async_set(usage_sensor_entity_id, "10", {"last_reset": last_reset})
await hass.async_block_till_done()
state = hass.states.get(cost_sensor_entity_id)
assert state.state == "12.0" # 0 € + (10-4) kWh * 2 €/kWh = 12 €
# Check generated statistics
await async_wait_recording_done_without_instance(hass)
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
assert cost_sensor_entity_id in statistics
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 31.0

View File

@ -0,0 +1,209 @@
"""Test the Energy websocket API."""
import pytest
from homeassistant.components.energy import data
from homeassistant.setup import async_setup_component
from tests.common import flush_store
@pytest.fixture(autouse=True)
async def setup_integration(hass):
"""Set up the integration."""
assert await async_setup_component(
hass, "energy", {"recorder": {"db_url": "sqlite://"}}
)
async def test_get_preferences_no_data(hass, hass_ws_client) -> None:
"""Test we get error if no preferences set."""
client = await hass_ws_client(hass)
await client.send_json({"id": 5, "type": "energy/get_prefs"})
msg = await client.receive_json()
assert msg["id"] == 5
assert not msg["success"]
assert msg["error"] == {"code": "not_found", "message": "No prefs"}
async def test_get_preferences_default(hass, hass_ws_client, hass_storage) -> None:
"""Test we get preferences."""
manager = await data.async_get_manager(hass)
manager.data = data.EnergyManager.default_preferences()
client = await hass_ws_client(hass)
await client.send_json({"id": 5, "type": "energy/get_prefs"})
msg = await client.receive_json()
assert msg["id"] == 5
assert msg["success"]
assert msg["result"] == data.EnergyManager.default_preferences()
async def test_save_preferences(hass, hass_ws_client, hass_storage) -> None:
"""Test we can save preferences."""
client = await hass_ws_client(hass)
# Test saving default prefs is also valid.
default_prefs = data.EnergyManager.default_preferences()
await client.send_json({"id": 5, "type": "energy/save_prefs", **default_prefs})
msg = await client.receive_json()
assert msg["id"] == 5
assert msg["success"]
assert msg["result"] == default_prefs
new_prefs = {
"currency": "$",
"energy_sources": [
{
"type": "grid",
"flow_from": [
{
"stat_energy_from": "sensor.heat_pump_meter",
"stat_cost": "heat_pump_kwh_cost",
"entity_energy_from": None,
"entity_energy_price": None,
"number_energy_price": None,
},
{
"stat_energy_from": "sensor.heat_pump_meter_2",
"stat_cost": None,
"entity_energy_from": "sensor.heat_pump_meter_2",
"entity_energy_price": None,
"number_energy_price": 0.20,
},
],
"flow_to": [
{
"stat_energy_to": "sensor.return_to_grid_peak",
"stat_compensation": None,
"entity_energy_to": None,
"entity_energy_price": None,
"number_energy_price": None,
},
{
"stat_energy_to": "sensor.return_to_grid_offpeak",
"stat_compensation": None,
"entity_energy_to": "sensor.return_to_grid_offpeak",
"entity_energy_price": None,
"number_energy_price": 0.20,
},
],
"cost_adjustment_day": 1.2,
},
{
"type": "solar",
"stat_energy_from": "my_solar_production",
"config_entry_solar_forecast": ["predicted_config_entry"],
},
],
"device_consumption": [{"stat_consumption": "some_device_usage"}],
}
await client.send_json({"id": 6, "type": "energy/save_prefs", **new_prefs})
msg = await client.receive_json()
assert msg["id"] == 6
assert msg["success"]
assert msg["result"] == new_prefs
assert data.STORAGE_KEY not in hass_storage, "expected not to be written yet"
await flush_store((await data.async_get_manager(hass))._store)
assert hass_storage[data.STORAGE_KEY]["data"] == new_prefs
# Verify info reflects data.
await client.send_json({"id": 7, "type": "energy/info"})
msg = await client.receive_json()
assert msg["id"] == 7
assert msg["success"]
assert msg["result"] == {
"cost_sensors": {
"sensor.heat_pump_meter_2": "sensor.heat_pump_meter_2_cost",
"sensor.return_to_grid_offpeak": "sensor.return_to_grid_offpeak_compensation",
}
}
# Prefs with limited options
new_prefs_2 = {
"energy_sources": [
{
"type": "grid",
"flow_from": [
{
"stat_energy_from": "sensor.heat_pump_meter",
"stat_cost": None,
"entity_energy_from": None,
"entity_energy_price": None,
"number_energy_price": None,
}
],
"flow_to": [],
"cost_adjustment_day": 1.2,
},
{
"type": "solar",
"stat_energy_from": "my_solar_production",
"config_entry_solar_forecast": None,
},
],
}
await client.send_json({"id": 8, "type": "energy/save_prefs", **new_prefs_2})
msg = await client.receive_json()
assert msg["id"] == 8
assert msg["success"]
assert msg["result"] == {**new_prefs, **new_prefs_2}
async def test_handle_duplicate_from_stat(hass, hass_ws_client) -> None:
"""Test we handle duplicate from stats."""
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 5,
"type": "energy/save_prefs",
"energy_sources": [
{
"type": "grid",
"flow_from": [
{
"stat_energy_from": "sensor.heat_pump_meter",
"stat_cost": None,
"entity_energy_from": None,
"entity_energy_price": None,
"number_energy_price": None,
},
{
"stat_energy_from": "sensor.heat_pump_meter",
"stat_cost": None,
"entity_energy_from": None,
"entity_energy_price": None,
"number_energy_price": None,
},
],
"flow_to": [],
"cost_adjustment_day": 0,
},
],
}
)
msg = await client.receive_json()
assert msg["id"] == 5
assert not msg["success"]
assert msg["error"]["code"] == "invalid_format"

View File

@ -18,7 +18,7 @@ async def test_load_unload_config_entry(
hass_ws_client,
) -> None:
"""Test the Forecast.Solar configuration entry loading/unloading."""
mock_forecast_solar.estimate.return_value.watts = {
mock_forecast_solar.estimate.return_value.wh_hours = {
datetime(2021, 6, 27, 13, 0, tzinfo=timezone.utc): 12,
datetime(2021, 6, 27, 14, 0, tzinfo=timezone.utc): 8,
}
@ -41,8 +41,10 @@ async def test_load_unload_config_entry(
assert result["success"]
assert result["result"] == {
mock_config_entry.entry_id: {
"2021-06-27T13:00:00+00:00": 12,
"2021-06-27T14:00:00+00:00": 8,
"wh_hours": {
"2021-06-27T13:00:00+00:00": 12,
"2021-06-27T14:00:00+00:00": 8,
}
}
}

View File

@ -1025,8 +1025,7 @@ async def test_list_statistic_ids(hass, hass_ws_client, units, attributes, unit)
{"id": 4, "type": "history/list_statistic_ids", "statistic_type": "dogs"}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == []
assert not response["success"]
await client.send_json(
{"id": 5, "type": "history/list_statistic_ids", "statistic_type": "mean"}