Add sensor to group (#83186)

pull/86558/head
G Johansson 2023-01-24 20:12:27 +01:00 committed by GitHub
parent 886d2fc3a1
commit b3c5c6ae9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 927 additions and 16 deletions

View File

@ -76,6 +76,7 @@ PLATFORMS = [
Platform.LOCK,
Platform.MEDIA_PLAYER,
Platform.NOTIFY,
Platform.SENSOR,
Platform.SWITCH,
]

View File

@ -7,7 +7,7 @@ from typing import Any, cast
import voluptuous as vol
from homeassistant.const import CONF_ENTITIES
from homeassistant.const import CONF_ENTITIES, CONF_TYPE
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er, selector
from homeassistant.helpers.schema_config_entry_flow import (
@ -21,11 +21,21 @@ from homeassistant.helpers.schema_config_entry_flow import (
from . import DOMAIN
from .binary_sensor import CONF_ALL
from .const import CONF_HIDE_MEMBERS
from .const import CONF_HIDE_MEMBERS, CONF_IGNORE_NON_NUMERIC
_STATISTIC_MEASURES = [
selector.SelectOptionDict(value="min", label="Minimum"),
selector.SelectOptionDict(value="max", label="Maximum"),
selector.SelectOptionDict(value="mean", label="Arithmetic mean"),
selector.SelectOptionDict(value="median", label="Median"),
selector.SelectOptionDict(value="last", label="Most recently updated"),
selector.SelectOptionDict(value="range", label="Statistical range"),
selector.SelectOptionDict(value="sum", label="Sum"),
]
async def basic_group_options_schema(
domain: str, handler: SchemaCommonFlowHandler
domain: str | list[str], handler: SchemaCommonFlowHandler
) -> vol.Schema:
"""Generate options schema."""
return vol.Schema(
@ -39,7 +49,7 @@ async def basic_group_options_schema(
)
def basic_group_config_schema(domain: str) -> vol.Schema:
def basic_group_config_schema(domain: str | list[str]) -> vol.Schema:
"""Generate config schema."""
return vol.Schema(
{
@ -67,6 +77,32 @@ BINARY_SENSOR_CONFIG_SCHEMA = basic_group_config_schema("binary_sensor").extend(
}
)
SENSOR_CONFIG_EXTENDS = {
vol.Required(CONF_TYPE): selector.SelectSelector(
selector.SelectSelectorConfig(options=_STATISTIC_MEASURES),
),
}
SENSOR_OPTIONS = {
vol.Optional(CONF_IGNORE_NON_NUMERIC, default=False): selector.BooleanSelector(),
vol.Required(CONF_TYPE): selector.SelectSelector(
selector.SelectSelectorConfig(options=_STATISTIC_MEASURES),
),
}
async def sensor_options_schema(
domain: str, handler: SchemaCommonFlowHandler
) -> vol.Schema:
"""Generate options schema."""
return (
await basic_group_options_schema(["sensor", "number", "input_number"], handler)
).extend(SENSOR_OPTIONS)
SENSOR_CONFIG_SCHEMA = basic_group_config_schema(
["sensor", "number", "input_number"]
).extend(SENSOR_CONFIG_EXTENDS)
async def light_switch_options_schema(
domain: str, handler: SchemaCommonFlowHandler
@ -88,6 +124,7 @@ GROUP_TYPES = [
"light",
"lock",
"media_player",
"sensor",
"switch",
]
@ -139,6 +176,10 @@ CONFIG_FLOW = {
basic_group_config_schema("media_player"),
validate_user_input=set_group_type("media_player"),
),
"sensor": SchemaFlowFormStep(
SENSOR_CONFIG_SCHEMA,
validate_user_input=set_group_type("sensor"),
),
"switch": SchemaFlowFormStep(
basic_group_config_schema("switch"),
validate_user_input=set_group_type("switch"),
@ -156,6 +197,7 @@ OPTIONS_FLOW = {
"media_player": SchemaFlowFormStep(
partial(basic_group_options_schema, "media_player")
),
"sensor": SchemaFlowFormStep(partial(sensor_options_schema, "sensor")),
"switch": SchemaFlowFormStep(partial(light_switch_options_schema, "switch")),
}

View File

@ -1,3 +1,4 @@
"""Constants for the Group integration."""
CONF_HIDE_MEMBERS = "hide_members"
CONF_IGNORE_NON_NUMERIC = "ignore_non_numeric"

View File

@ -0,0 +1,410 @@
"""This platform allows several sensors to be grouped into one sensor to provide numeric combinations."""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import logging
import statistics
from typing import TYPE_CHECKING, Any
import voluptuous as vol
from homeassistant.components.input_number import DOMAIN as INPUT_NUMBER_DOMAIN
from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
DEVICE_CLASSES_SCHEMA,
DOMAIN,
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
STATE_CLASSES_SCHEMA,
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_DEVICE_CLASS,
CONF_ENTITIES,
CONF_NAME,
CONF_TYPE,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant, State, callback
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from . import GroupEntity
from .const import CONF_IGNORE_NON_NUMERIC
DEFAULT_NAME = "Sensor Group"
ATTR_MIN_VALUE = "min_value"
ATTR_MIN_ENTITY_ID = "min_entity_id"
ATTR_MAX_VALUE = "max_value"
ATTR_MAX_ENTITY_ID = "max_entity_id"
ATTR_MEAN = "mean"
ATTR_MEDIAN = "median"
ATTR_LAST = "last"
ATTR_LAST_ENTITY_ID = "last_entity_id"
ATTR_RANGE = "range"
ATTR_SUM = "sum"
SENSOR_TYPES = {
ATTR_MIN_VALUE: "min",
ATTR_MAX_VALUE: "max",
ATTR_MEAN: "mean",
ATTR_MEDIAN: "median",
ATTR_LAST: "last",
ATTR_RANGE: "range",
ATTR_SUM: "sum",
}
SENSOR_TYPE_TO_ATTR = {v: k for k, v in SENSOR_TYPES.items()}
# No limit on parallel updates to enable a group calling another group
PARALLEL_UPDATES = 0
PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_ENTITIES): cv.entities_domain(
[DOMAIN, NUMBER_DOMAIN, INPUT_NUMBER_DOMAIN]
),
vol.Required(CONF_TYPE): vol.All(cv.string, vol.In(SENSOR_TYPES.values())),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_IGNORE_NON_NUMERIC, default=False): cv.boolean,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): str,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
}
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Switch Group platform."""
async_add_entities(
[
SensorGroup(
config.get(CONF_UNIQUE_ID),
config[CONF_NAME],
config[CONF_ENTITIES],
config[CONF_IGNORE_NON_NUMERIC],
config[CONF_TYPE],
config.get(CONF_UNIT_OF_MEASUREMENT),
config.get(CONF_STATE_CLASS),
config.get(CONF_DEVICE_CLASS),
)
]
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Initialize Switch Group config entry."""
registry = er.async_get(hass)
entities = er.async_validate_entity_ids(
registry, config_entry.options[CONF_ENTITIES]
)
async_add_entities(
[
SensorGroup(
config_entry.entry_id,
config_entry.title,
entities,
config_entry.options.get(CONF_IGNORE_NON_NUMERIC, True),
config_entry.options[CONF_TYPE],
None,
None,
None,
)
]
)
def calc_min(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate min value."""
val: float | None = None
entity_id: str | None = None
for sensor_id, sensor_value, _ in sensor_values:
if val is None or val > sensor_value:
entity_id, val = sensor_id, sensor_value
attributes = {ATTR_MIN_ENTITY_ID: entity_id}
if TYPE_CHECKING:
assert val is not None
return attributes, val
def calc_max(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate max value."""
val: float | None = None
entity_id: str | None = None
for sensor_id, sensor_value, _ in sensor_values:
if val is None or val < sensor_value:
entity_id, val = sensor_id, sensor_value
attributes = {ATTR_MAX_ENTITY_ID: entity_id}
if TYPE_CHECKING:
assert val is not None
return attributes, val
def calc_mean(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate mean value."""
result = (sensor_value for _, sensor_value, _ in sensor_values)
value: float = statistics.mean(result)
return {}, value
def calc_median(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate median value."""
result = (sensor_value for _, sensor_value, _ in sensor_values)
value: float = statistics.median(result)
return {}, value
def calc_last(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate last value."""
last_updated: datetime | None = None
last_entity_id: str | None = None
for entity_id, state_f, state in sensor_values:
if last_updated is None or state.last_updated > last_updated:
last_updated = state.last_updated
last = state_f
last_entity_id = entity_id
attributes = {ATTR_LAST_ENTITY_ID: last_entity_id}
return attributes, last
def calc_range(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate range value."""
max_result = max((sensor_value for _, sensor_value, _ in sensor_values))
min_result = min((sensor_value for _, sensor_value, _ in sensor_values))
value: float = max_result - min_result
return {}, value
def calc_sum(
sensor_values: list[tuple[str, float, State]]
) -> tuple[dict[str, str | None], float]:
"""Calculate a sum of values."""
result = 0.0
for _, sensor_value, _ in sensor_values:
result += sensor_value
return {}, result
CALC_TYPES: dict[
str,
Callable[[list[tuple[str, float, State]]], tuple[dict[str, str | None], float]],
] = {
"min": calc_min,
"max": calc_max,
"mean": calc_mean,
"median": calc_median,
"last": calc_last,
"range": calc_range,
"sum": calc_sum,
}
class SensorGroup(GroupEntity, SensorEntity):
"""Representation of a sensor group."""
_attr_available = False
_attr_should_poll = False
_attr_icon = "mdi:calculator"
def __init__(
self,
unique_id: str | None,
name: str,
entity_ids: list[str],
mode: bool,
sensor_type: str,
unit_of_measurement: str | None,
state_class: SensorStateClass | None,
device_class: SensorDeviceClass | None,
) -> None:
"""Initialize a sensor group."""
self._entity_ids = entity_ids
self._sensor_type = sensor_type
self._attr_state_class = state_class
self.calc_state_class: SensorStateClass | None = None
self._attr_device_class = device_class
self.calc_device_class: SensorDeviceClass | None = None
self._attr_native_unit_of_measurement = unit_of_measurement
self.calc_unit_of_measurement: str | None = None
self._attr_name = name
if name == DEFAULT_NAME:
self._attr_name = f"{DEFAULT_NAME} {sensor_type}".capitalize()
self._attr_extra_state_attributes = {ATTR_ENTITY_ID: entity_ids}
self._attr_unique_id = unique_id
self.mode = all if mode is False else any
self._state_calc: Callable[
[list[tuple[str, float, State]]],
tuple[dict[str, str | None], float | None],
] = CALC_TYPES[self._sensor_type]
self._state_incorrect: set[str] = set()
self._extra_state_attribute: dict[str, Any] = {}
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
@callback
def async_state_changed_listener(event: Event) -> None:
"""Handle child updates."""
self.async_set_context(event.context)
self.async_defer_or_update_ha_state()
self.async_on_remove(
async_track_state_change_event(
self.hass, self._entity_ids, async_state_changed_listener
)
)
await super().async_added_to_hass()
@callback
def async_update_group_state(self) -> None:
"""Query all members and determine the sensor group state."""
states: list[StateType] = []
valid_states: list[bool] = []
sensor_values: list[tuple[str, float, State]] = []
for entity_id in self._entity_ids:
if (state := self.hass.states.get(entity_id)) is not None:
states.append(state.state)
try:
sensor_values.append((entity_id, float(state.state), state))
if entity_id in self._state_incorrect:
self._state_incorrect.remove(entity_id)
except ValueError:
valid_states.append(False)
if entity_id not in self._state_incorrect:
self._state_incorrect.add(entity_id)
_LOGGER.warning(
"Unable to use state. Only numerical states are supported,"
" entity %s with value %s excluded from calculation",
entity_id,
state.state,
)
continue
valid_states.append(True)
# Set group as unavailable if all members do not have numeric values
self._attr_available = any(numeric_state for numeric_state in valid_states)
valid_state = self.mode(
state not in (STATE_UNKNOWN, STATE_UNAVAILABLE) for state in states
)
valid_state_numeric = self.mode(numeric_state for numeric_state in valid_states)
if not valid_state or not valid_state_numeric:
self._attr_native_value = None
return
# Calculate values
self._calculate_entity_properties()
self._extra_state_attribute, self._attr_native_value = self._state_calc(
sensor_values
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the sensor."""
return {ATTR_ENTITY_ID: self._entity_ids, **self._extra_state_attribute}
@property
def device_class(self) -> SensorDeviceClass | None:
"""Return device class."""
if self._attr_device_class is not None:
return self._attr_device_class
return self.calc_device_class
@property
def state_class(self) -> SensorStateClass | str | None:
"""Return state class."""
if self._attr_state_class is not None:
return self._attr_state_class
return self.calc_state_class
@property
def native_unit_of_measurement(self) -> str | None:
"""Return native unit of measurement."""
if self._attr_native_unit_of_measurement is not None:
return self._attr_native_unit_of_measurement
return self.calc_unit_of_measurement
def _calculate_entity_properties(self) -> None:
"""Calculate device_class, state_class and unit of measurement."""
device_classes = []
state_classes = []
unit_of_measurements = []
if (
self._attr_device_class
and self._attr_state_class
and self._attr_native_unit_of_measurement
):
return
for entity_id in self._entity_ids:
if (state := self.hass.states.get(entity_id)) is not None:
device_classes.append(state.attributes.get("device_class"))
state_classes.append(state.attributes.get("state_class"))
unit_of_measurements.append(state.attributes.get("unit_of_measurement"))
self.calc_device_class = None
self.calc_state_class = None
self.calc_unit_of_measurement = None
# Calculate properties and save if all same
if (
not self._attr_device_class
and device_classes
and all(x == device_classes[0] for x in device_classes)
):
self.calc_device_class = device_classes[0]
if (
not self._attr_state_class
and state_classes
and all(x == state_classes[0] for x in state_classes)
):
self.calc_state_class = state_classes[0]
if (
not self._attr_unit_of_measurement
and unit_of_measurements
and all(x == unit_of_measurements[0] for x in unit_of_measurements)
):
self.calc_unit_of_measurement = unit_of_measurements[0]

View File

@ -12,6 +12,7 @@
"light": "Light group",
"lock": "Lock group",
"media_player": "Media player group",
"sensor": "Sensor group",
"switch": "Switch group"
}
},
@ -65,6 +66,21 @@
"name": "[%key:component::group::config::step::binary_sensor::data::name%]"
}
},
"sensor": {
"title": "[%key:component::group::config::step::user::title%]",
"description": "If \"ignore non-numeric\" is enabled, the group's state is calculated if at least one member has a numerical value. If \"ignore non-numeric\" is disabled, the group's state is calculated only if all group members have numerical values.",
"data": {
"ignore_non_numeric": "Ignore non-numeric",
"entities": "Members",
"hide_members": "Hide members",
"name": "Name",
"type": "Type",
"round_digits": "Round value to number of decimals",
"device_class": "Device class",
"state_class": "State class",
"unit_of_measurement": "Unit of Measurement"
}
},
"switch": {
"title": "[%key:component::group::config::step::user::title%]",
"data": {
@ -117,6 +133,19 @@
"hide_members": "[%key:component::group::config::step::binary_sensor::data::hide_members%]"
}
},
"sensor": {
"description": "[%key:component::group::config::step::sensor::description%]",
"data": {
"ignore_non_numeric": "[%key:component::group::config::step::sensor::data::ignore_non_numeric%]",
"entities": "[%key:component::group::config::step::sensor::data::entities%]",
"hide_members": "[%key:component::group::config::step::sensor::data::hide_members%]",
"type": "[%key:component::group::config::step::sensor::data::type%]",
"round_digits": "[%key:component::group::config::step::sensor::data::round_digits%]",
"device_class": "[%key:component::group::config::step::sensor::data::device_class%]",
"state_class": "[%key:component::group::config::step::sensor::data::state_class%]",
"unit_of_measurement": "[%key:component::group::config::step::sensor::data::unit_of_measurement%]"
}
},
"switch": {
"description": "[%key:component::group::config::step::binary_sensor::description%]",
"data": {

View File

@ -51,6 +51,21 @@
},
"title": "Add Group"
},
"sensor": {
"data": {
"device_class": "Device class",
"entities": "Members",
"hide_members": "Hide members",
"ignore_non_numeric": "Ignore non-numeric",
"name": "Name",
"round_digits": "Round value to number of decimals",
"state_class": "State class",
"type": "Type",
"unit_of_measurement": "Unit of Measurement"
},
"description": "If \"ignore non-numeric\" is enabled, the group's state is calculated if at least one member has a numerical value. If \"ignore non-numeric\" is disabled, the group's state is calculated only if all group members have numerical values.",
"title": "Add Group"
},
"switch": {
"data": {
"entities": "Members",
@ -68,6 +83,7 @@
"light": "Light group",
"lock": "Lock group",
"media_player": "Media player group",
"sensor": "Sensor group",
"switch": "Switch group"
},
"title": "Add Group"
@ -116,6 +132,19 @@
"hide_members": "Hide members"
}
},
"sensor": {
"data": {
"device_class": "Device class",
"entities": "Members",
"hide_members": "Hide members",
"ignore_non_numeric": "Ignore non-numeric",
"round_digits": "Round value to number of decimals",
"state_class": "State class",
"type": "Type",
"unit_of_measurement": "Unit of Measurement"
},
"description": "If \"ignore non-numeric\" is enabled, the group's state is calculated if at least one member has a numerical value. If \"ignore non-numeric\" is disabled, the group's state is calculated only if all group members have numerical values."
},
"switch": {
"data": {
"all": "All entities",

View File

@ -0,0 +1,7 @@
sensor:
- platform: group
entities:
- sensor.test_1
- sensor.test_2
name: second_test
type: mean

View File

@ -22,6 +22,15 @@ from tests.common import MockConfigEntry
("light", "on", "on", {}, {}, {}, {}),
("lock", "locked", "locked", {}, {}, {}, {}),
("media_player", "on", "on", {}, {}, {}, {}),
(
"sensor",
"20.0",
"10",
{},
{"type": "sum"},
{"type": "sum"},
{},
),
("switch", "on", "on", {}, {}, {}, {}),
),
)
@ -171,19 +180,25 @@ def get_suggested(schema, key):
@pytest.mark.parametrize(
"group_type,member_state,extra_options",
"group_type,member_state,extra_options,options_options",
(
("binary_sensor", "on", {"all": False}),
("cover", "open", {}),
("fan", "on", {}),
("light", "on", {"all": False}),
("lock", "locked", {}),
("media_player", "on", {}),
("switch", "on", {"all": False}),
("binary_sensor", "on", {"all": False}, {}),
("cover", "open", {}, {}),
("fan", "on", {}, {}),
("light", "on", {"all": False}, {}),
("lock", "locked", {}, {}),
("media_player", "on", {}, {}),
(
"sensor",
"10",
{"ignore_non_numeric": False, "type": "sum"},
{"ignore_non_numeric": False, "type": "sum"},
),
("switch", "on", {"all": False}, {}),
),
)
async def test_options(
hass: HomeAssistant, group_type, member_state, extra_options
hass: HomeAssistant, group_type, member_state, extra_options, options_options
) -> None:
"""Test reconfiguring."""
members1 = [f"{group_type}.one", f"{group_type}.two"]
@ -226,9 +241,7 @@ async def test_options(
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"entities": members2,
},
user_input={"entities": members2, **options_options},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"] == {

View File

@ -1429,6 +1429,16 @@ async def test_plant_group(hass):
("fan", "on", {}),
("light", "on", {"all": False}),
("media_player", "on", {}),
(
"sensor",
"1",
{
"all": True,
"type": "max",
"round_digits": 2.0,
"state_class": "measurement",
},
),
),
)
async def test_setup_and_remove_config_entry(

View File

@ -0,0 +1,369 @@
"""The tests for the Group Sensor platform."""
from __future__ import annotations
import statistics
from typing import Any
from unittest.mock import patch
import pytest
from pytest import LogCaptureFixture
from homeassistant import config as hass_config
from homeassistant.components.group import DOMAIN as GROUP_DOMAIN
from homeassistant.components.group.sensor import (
ATTR_LAST_ENTITY_ID,
ATTR_MAX_ENTITY_ID,
ATTR_MIN_ENTITY_ID,
DEFAULT_NAME,
)
from homeassistant.components.sensor import (
ATTR_STATE_CLASS,
DOMAIN as SENSOR_DOMAIN,
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_UNIT_OF_MEASUREMENT,
SERVICE_RELOAD,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
import homeassistant.helpers.entity_registry as er
from homeassistant.setup import async_setup_component
from tests.common import get_fixture_path
VALUES = [17, 20, 15.3]
VALUES_ERROR = [17, "string", 15.3]
COUNT = len(VALUES)
MIN_VALUE = min(VALUES)
MAX_VALUE = max(VALUES)
MEAN = statistics.mean(VALUES)
MEDIAN = statistics.median(VALUES)
RANGE = max(VALUES) - min(VALUES)
SUM_VALUE = sum(VALUES)
@pytest.mark.parametrize(
"sensor_type, result, attributes",
[
("min", MIN_VALUE, {ATTR_MIN_ENTITY_ID: "sensor.test_3"}),
("max", MAX_VALUE, {ATTR_MAX_ENTITY_ID: "sensor.test_2"}),
("mean", MEAN, {}),
("median", MEDIAN, {}),
("last", VALUES[2], {ATTR_LAST_ENTITY_ID: "sensor.test_3"}),
("range", RANGE, {}),
("sum", SUM_VALUE, {}),
],
)
async def test_sensors(
hass: HomeAssistant,
sensor_type: str,
result: str,
attributes: dict[str, Any],
) -> None:
"""Test the sensors."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": DEFAULT_NAME,
"type": sensor_type,
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"unique_id": "very_unique_id",
}
}
entity_ids = config["sensor"]["entities"]
for entity_id, value in dict(zip(entity_ids, VALUES)).items():
hass.states.async_set(
entity_id,
value,
{
ATTR_DEVICE_CLASS: SensorDeviceClass.VOLUME,
ATTR_STATE_CLASS: SensorStateClass.MEASUREMENT,
ATTR_UNIT_OF_MEASUREMENT: "L",
},
)
await hass.async_block_till_done()
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get(f"sensor.sensor_group_{sensor_type}")
assert float(state.state) == pytest.approx(float(result))
assert state.attributes.get(ATTR_ENTITY_ID) == entity_ids
for key, value in attributes.items():
assert state.attributes.get(key) == value
assert state.attributes.get(ATTR_DEVICE_CLASS) == SensorDeviceClass.VOLUME
assert state.attributes.get(ATTR_STATE_CLASS) == SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "L"
entity_reg = er.async_get(hass)
entity = entity_reg.async_get(f"sensor.sensor_group_{sensor_type}")
assert entity.unique_id == "very_unique_id"
async def test_sensors_attributes_defined(hass: HomeAssistant) -> None:
"""Test the sensors."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": DEFAULT_NAME,
"type": "sum",
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"unique_id": "very_unique_id",
"device_class": SensorDeviceClass.WATER,
"state_class": SensorStateClass.TOTAL_INCREASING,
"unit_of_measurement": "",
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_ids = config["sensor"]["entities"]
for entity_id, value in dict(zip(entity_ids, VALUES)).items():
hass.states.async_set(
entity_id,
value,
{
ATTR_DEVICE_CLASS: SensorDeviceClass.VOLUME,
ATTR_STATE_CLASS: SensorStateClass.MEASUREMENT,
ATTR_UNIT_OF_MEASUREMENT: "L",
},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.sensor_group_sum")
assert state.state == str(float(SUM_VALUE))
assert state.attributes.get(ATTR_ENTITY_ID) == entity_ids
assert state.attributes.get(ATTR_DEVICE_CLASS) == SensorDeviceClass.WATER
assert state.attributes.get(ATTR_STATE_CLASS) == SensorStateClass.TOTAL_INCREASING
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == ""
async def test_not_enough_sensor_value(hass: HomeAssistant) -> None:
"""Test that there is nothing done if not enough values available."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": "test_max",
"type": "max",
"ignore_non_numeric": True,
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"state_class": SensorStateClass.MEASUREMENT,
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_ids = config["sensor"]["entities"]
hass.states.async_set(entity_ids[0], STATE_UNKNOWN)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_max")
assert state.state == STATE_UNAVAILABLE
assert state.attributes.get("min_entity_id") is None
assert state.attributes.get("max_entity_id") is None
hass.states.async_set(entity_ids[1], VALUES[1])
await hass.async_block_till_done()
state = hass.states.get("sensor.test_max")
assert state.state not in [STATE_UNAVAILABLE, STATE_UNKNOWN]
assert entity_ids[1] == state.attributes.get("max_entity_id")
hass.states.async_set(entity_ids[2], STATE_UNKNOWN)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_max")
assert state.state not in [STATE_UNAVAILABLE, STATE_UNKNOWN]
assert entity_ids[1] == state.attributes.get("max_entity_id")
hass.states.async_set(entity_ids[1], STATE_UNAVAILABLE)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_max")
assert state.state == STATE_UNAVAILABLE
assert state.attributes.get("min_entity_id") is None
assert state.attributes.get("max_entity_id") is None
async def test_reload(hass: HomeAssistant) -> None:
"""Verify we can reload sensors."""
hass.states.async_set("sensor.test_1", 12345)
hass.states.async_set("sensor.test_2", 45678)
await async_setup_component(
hass,
"sensor",
{
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": "test_sensor",
"type": "mean",
"entities": ["sensor.test_1", "sensor.test_2"],
"state_class": SensorStateClass.MEASUREMENT,
}
},
)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 3
assert hass.states.get("sensor.test_sensor")
yaml_path = get_fixture_path("sensor_configuration.yaml", "group")
with patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path):
await hass.services.async_call(
GROUP_DOMAIN,
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 3
assert hass.states.get("sensor.test_sensor") is None
assert hass.states.get("sensor.second_test")
async def test_sensor_incorrect_state(
hass: HomeAssistant, caplog: LogCaptureFixture
) -> None:
"""Test the min sensor."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": "test_failure",
"type": "min",
"ignore_non_numeric": True,
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"unique_id": "very_unique_id",
"state_class": SensorStateClass.MEASUREMENT,
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_ids = config["sensor"]["entities"]
for entity_id, value in dict(zip(entity_ids, VALUES_ERROR)).items():
hass.states.async_set(entity_id, value)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_failure")
assert state.state == "15.3"
assert (
"Unable to use state. Only numerical states are supported, entity sensor.test_2 with value string excluded from calculation"
in caplog.text
)
for entity_id, value in dict(zip(entity_ids, VALUES)).items():
hass.states.async_set(entity_id, value)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_failure")
assert state.state == "15.3"
async def test_sensor_require_all_states(hass: HomeAssistant) -> None:
"""Test the sum sensor with missing state require all."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": "test_sum",
"type": "sum",
"ignore_non_numeric": False,
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"unique_id": "very_unique_id_sum_sensor",
"state_class": SensorStateClass.MEASUREMENT,
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_ids = config["sensor"]["entities"]
for entity_id, value in dict(zip(entity_ids, VALUES_ERROR)).items():
hass.states.async_set(entity_id, value)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_sum")
assert state.state == STATE_UNKNOWN
async def test_sensor_calculated_properties(hass: HomeAssistant) -> None:
"""Test the sensor calculating device_class, state_class and unit of measurement."""
config = {
SENSOR_DOMAIN: {
"platform": GROUP_DOMAIN,
"name": "test_sum",
"type": "sum",
"entities": ["sensor.test_1", "sensor.test_2", "sensor.test_3"],
"unique_id": "very_unique_id_sum_sensor",
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_ids = config["sensor"]["entities"]
hass.states.async_set(
entity_ids[0],
VALUES[0],
{
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.MEASUREMENT,
"unit_of_measurement": "kWh",
},
)
hass.states.async_set(
entity_ids[1],
VALUES[1],
{
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.MEASUREMENT,
"unit_of_measurement": "kWh",
},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_sum")
assert state.state == str(float(sum([VALUES[0], VALUES[1]])))
assert state.attributes.get("device_class") == "energy"
assert state.attributes.get("state_class") == "measurement"
assert state.attributes.get("unit_of_measurement") == "kWh"
hass.states.async_set(
entity_ids[2],
VALUES[2],
{
"device_class": SensorDeviceClass.BATTERY,
"state_class": SensorStateClass.TOTAL,
"unit_of_measurement": None,
},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_sum")
assert state.state == str(sum(VALUES))
assert state.attributes.get("device_class") is None
assert state.attributes.get("state_class") is None
assert state.attributes.get("unit_of_measurement") is None