DSMR: Refactor sensor creation, added typing to sensors (#52153)

* DSMR: Refactor sensor creation, added typing to sensors

* Log from package level

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
pull/52159/head
Franck Nijhof 2021-06-24 16:56:43 +02:00 committed by GitHub
parent 04c9665241
commit 75c3daa45f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 249 additions and 151 deletions

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import asyncio
from functools import partial
import logging
import os
from typing import Any
@ -29,10 +28,9 @@ from .const import (
CONF_TIME_BETWEEN_UPDATE,
DEFAULT_TIME_BETWEEN_UPDATE,
DOMAIN,
LOGGER,
)
_LOGGER = logging.getLogger(__name__)
CONF_MANUAL_PATH = "Enter Manually"
@ -92,7 +90,7 @@ class DSMRConnection:
try:
transport, protocol = await asyncio.create_task(reader_factory())
except (serial.serialutil.SerialException, OSError):
_LOGGER.exception("Error connecting to DSMR")
LOGGER.exception("Error connecting to DSMR")
return False
if transport:

View File

@ -1,7 +1,16 @@
"""Constants for the DSMR integration."""
from __future__ import annotations
import logging
from dsmr_parser import obis_references
from .models import DSMRSensor
DOMAIN = "dsmr"
LOGGER = logging.getLogger(__package__)
PLATFORMS = ["sensor"]
CONF_DSMR_VERSION = "dsmr_version"
@ -28,3 +37,160 @@ ICON_GAS = "mdi:fire"
ICON_POWER = "mdi:flash"
ICON_POWER_FAILURE = "mdi:flash-off"
ICON_SWELL_SAG = "mdi:pulse"
SENSORS: list[DSMRSensor] = [
DSMRSensor(
name="Power Consumption",
obis_reference=obis_references.CURRENT_ELECTRICITY_USAGE,
force_update=True,
),
DSMRSensor(
name="Power Production",
obis_reference=obis_references.CURRENT_ELECTRICITY_DELIVERY,
force_update=True,
),
DSMRSensor(
name="Power Tariff",
obis_reference=obis_references.ELECTRICITY_ACTIVE_TARIFF,
),
DSMRSensor(
name="Energy Consumption (tarif 1)",
obis_reference=obis_references.ELECTRICITY_USED_TARIFF_1,
force_update=True,
),
DSMRSensor(
name="Energy Consumption (tarif 2)",
obis_reference=obis_references.ELECTRICITY_USED_TARIFF_2,
force_update=True,
),
DSMRSensor(
name="Energy Production (tarif 1)",
obis_reference=obis_references.ELECTRICITY_DELIVERED_TARIFF_1,
force_update=True,
),
DSMRSensor(
name="Energy Production (tarif 2)",
obis_reference=obis_references.ELECTRICITY_DELIVERED_TARIFF_2,
force_update=True,
),
DSMRSensor(
name="Power Consumption Phase L1",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE,
),
DSMRSensor(
name="Power Consumption Phase L2",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE,
),
DSMRSensor(
name="Power Consumption Phase L3",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE,
),
DSMRSensor(
name="Power Production Phase L1",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE,
),
DSMRSensor(
name="Power Production Phase L2",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE,
),
DSMRSensor(
name="Power Production Phase L3",
obis_reference=obis_references.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE,
),
DSMRSensor(
name="Short Power Failure Count",
obis_reference=obis_references.SHORT_POWER_FAILURE_COUNT,
),
DSMRSensor(
name="Long Power Failure Count",
obis_reference=obis_references.LONG_POWER_FAILURE_COUNT,
),
DSMRSensor(
name="Voltage Sags Phase L1",
obis_reference=obis_references.VOLTAGE_SAG_L1_COUNT,
),
DSMRSensor(
name="Voltage Sags Phase L2",
obis_reference=obis_references.VOLTAGE_SAG_L2_COUNT,
),
DSMRSensor(
name="Voltage Sags Phase L3",
obis_reference=obis_references.VOLTAGE_SAG_L3_COUNT,
),
DSMRSensor(
name="Voltage Swells Phase L1",
obis_reference=obis_references.VOLTAGE_SWELL_L1_COUNT,
),
DSMRSensor(
name="Voltage Swells Phase L2",
obis_reference=obis_references.VOLTAGE_SWELL_L2_COUNT,
),
DSMRSensor(
name="Voltage Swells Phase L3",
obis_reference=obis_references.VOLTAGE_SWELL_L3_COUNT,
),
DSMRSensor(
name="Voltage Phase L1",
obis_reference=obis_references.INSTANTANEOUS_VOLTAGE_L1,
),
DSMRSensor(
name="Voltage Phase L2",
obis_reference=obis_references.INSTANTANEOUS_VOLTAGE_L2,
),
DSMRSensor(
name="Voltage Phase L3",
obis_reference=obis_references.INSTANTANEOUS_VOLTAGE_L3,
),
DSMRSensor(
name="Current Phase L1",
obis_reference=obis_references.INSTANTANEOUS_CURRENT_L1,
),
DSMRSensor(
name="Current Phase L2",
obis_reference=obis_references.INSTANTANEOUS_CURRENT_L2,
),
DSMRSensor(
name="Current Phase L3",
obis_reference=obis_references.INSTANTANEOUS_CURRENT_L3,
),
DSMRSensor(
name="Energy Consumption (total)",
obis_reference=obis_references.LUXEMBOURG_ELECTRICITY_USED_TARIFF_GLOBAL,
dsmr_versions={"5L"},
force_update=True,
),
DSMRSensor(
name="Energy Production (total)",
obis_reference=obis_references.LUXEMBOURG_ELECTRICITY_DELIVERED_TARIFF_GLOBAL,
dsmr_versions={"5L"},
force_update=True,
),
DSMRSensor(
name="Energy Consumption (total)",
obis_reference=obis_references.ELECTRICITY_IMPORTED_TOTAL,
dsmr_versions={"2.2", "4", "5", "5B"},
force_update=True,
),
DSMRSensor(
name="Gas Consumption",
obis_reference=obis_references.HOURLY_GAS_METER_READING,
dsmr_versions={"4", "5", "5L"},
force_update=True,
is_gas=True,
),
DSMRSensor(
name="Gas Consumption",
obis_reference=obis_references.BELGIUM_HOURLY_GAS_METER_READING,
dsmr_versions={"5B"},
force_update=True,
is_gas=True,
),
DSMRSensor(
name="Gas Consumption",
obis_reference=obis_references.GAS_METER_READING,
dsmr_versions={"2.2"},
force_update=True,
is_gas=True,
),
]

View File

@ -0,0 +1,16 @@
"""Models for the DSMR integration."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class DSMRSensor:
"""Represents an DSMR Sensor."""
name: str
obis_reference: str
dsmr_versions: set[str] | None = None
force_update: bool = False
is_gas: bool = False

View File

@ -6,10 +6,11 @@ from asyncio import CancelledError
from contextlib import suppress
from datetime import timedelta
from functools import partial
import logging
from typing import Any
from dsmr_parser import obis_references as obis_ref
from dsmr_parser.clients.protocol import create_dsmr_reader, create_tcp_dsmr_reader
from dsmr_parser.objects import DSMRObject
import serial
import voluptuous as vol
@ -18,6 +19,8 @@ from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PORT, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import CoreState, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, StateType
from homeassistant.util import Throttle
from .const import (
@ -40,9 +43,10 @@ from .const import (
ICON_POWER,
ICON_POWER_FAILURE,
ICON_SWELL_SAG,
LOGGER,
SENSORS,
)
_LOGGER = logging.getLogger(__name__)
from .models import DSMRSensor
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
@ -57,7 +61,12 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: dict[str, Any] | None = None,
) -> None:
"""Import the platform into a config entry."""
hass.async_create_task(
hass.config_entries.flow.async_init(
@ -67,139 +76,37 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the DSMR sensor."""
config = entry.data
options = entry.options
dsmr_version = config[CONF_DSMR_VERSION]
# Define list of name,obis,force_update mappings to generate entities
obis_mapping = [
["Power Consumption", obis_ref.CURRENT_ELECTRICITY_USAGE, True],
["Power Production", obis_ref.CURRENT_ELECTRICITY_DELIVERY, True],
["Power Tariff", obis_ref.ELECTRICITY_ACTIVE_TARIFF, False],
["Energy Consumption (tarif 1)", obis_ref.ELECTRICITY_USED_TARIFF_1, True],
["Energy Consumption (tarif 2)", obis_ref.ELECTRICITY_USED_TARIFF_2, True],
["Energy Production (tarif 1)", obis_ref.ELECTRICITY_DELIVERED_TARIFF_1, True],
["Energy Production (tarif 2)", obis_ref.ELECTRICITY_DELIVERED_TARIFF_2, True],
[
"Power Consumption Phase L1",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE,
False,
],
[
"Power Consumption Phase L2",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE,
False,
],
[
"Power Consumption Phase L3",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE,
False,
],
[
"Power Production Phase L1",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE,
False,
],
[
"Power Production Phase L2",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE,
False,
],
[
"Power Production Phase L3",
obis_ref.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE,
False,
],
["Short Power Failure Count", obis_ref.SHORT_POWER_FAILURE_COUNT, False],
["Long Power Failure Count", obis_ref.LONG_POWER_FAILURE_COUNT, False],
["Voltage Sags Phase L1", obis_ref.VOLTAGE_SAG_L1_COUNT, False],
["Voltage Sags Phase L2", obis_ref.VOLTAGE_SAG_L2_COUNT, False],
["Voltage Sags Phase L3", obis_ref.VOLTAGE_SAG_L3_COUNT, False],
["Voltage Swells Phase L1", obis_ref.VOLTAGE_SWELL_L1_COUNT, False],
["Voltage Swells Phase L2", obis_ref.VOLTAGE_SWELL_L2_COUNT, False],
["Voltage Swells Phase L3", obis_ref.VOLTAGE_SWELL_L3_COUNT, False],
["Voltage Phase L1", obis_ref.INSTANTANEOUS_VOLTAGE_L1, False],
["Voltage Phase L2", obis_ref.INSTANTANEOUS_VOLTAGE_L2, False],
["Voltage Phase L3", obis_ref.INSTANTANEOUS_VOLTAGE_L3, False],
["Current Phase L1", obis_ref.INSTANTANEOUS_CURRENT_L1, False],
["Current Phase L2", obis_ref.INSTANTANEOUS_CURRENT_L2, False],
["Current Phase L3", obis_ref.INSTANTANEOUS_CURRENT_L3, False],
dsmr_version = entry.data[CONF_DSMR_VERSION]
entities = [
DSMREntity(sensor, entry)
for sensor in SENSORS
if (sensor.dsmr_versions is None or dsmr_version in sensor.dsmr_versions)
and (not sensor.is_gas or CONF_SERIAL_ID_GAS in entry.data)
]
if dsmr_version == "5L":
obis_mapping.extend(
[
[
"Energy Consumption (total)",
obis_ref.LUXEMBOURG_ELECTRICITY_USED_TARIFF_GLOBAL,
True,
],
[
"Energy Production (total)",
obis_ref.LUXEMBOURG_ELECTRICITY_DELIVERED_TARIFF_GLOBAL,
True,
],
]
)
else:
obis_mapping.extend(
[["Energy Consumption (total)", obis_ref.ELECTRICITY_IMPORTED_TOTAL, True]]
)
# Generate device entities
devices = [
DSMREntity(
name, DEVICE_NAME_ENERGY, config[CONF_SERIAL_ID], obis, config, force_update
)
for name, obis, force_update in obis_mapping
]
# Protocol version specific obis
if CONF_SERIAL_ID_GAS in config:
if dsmr_version in ("4", "5", "5L"):
gas_obis = obis_ref.HOURLY_GAS_METER_READING
elif dsmr_version in ("5B",):
gas_obis = obis_ref.BELGIUM_HOURLY_GAS_METER_READING
else:
gas_obis = obis_ref.GAS_METER_READING
# Add gas meter reading
devices += [
DSMREntity(
"Gas Consumption",
DEVICE_NAME_GAS,
config[CONF_SERIAL_ID_GAS],
gas_obis,
config,
True,
)
]
async_add_entities(devices)
async_add_entities(entities)
min_time_between_updates = timedelta(
seconds=options.get(CONF_TIME_BETWEEN_UPDATE, DEFAULT_TIME_BETWEEN_UPDATE)
seconds=entry.options.get(CONF_TIME_BETWEEN_UPDATE, DEFAULT_TIME_BETWEEN_UPDATE)
)
@Throttle(min_time_between_updates)
def update_entities_telegram(telegram):
def update_entities_telegram(telegram: dict[str, DSMRObject]):
"""Update entities with latest telegram and trigger state update."""
# Make all device entities aware of new telegram
for device in devices:
device.update_data(telegram)
for entity in entities:
entity.update_data(telegram)
# Creates an asyncio.Protocol factory for reading DSMR telegrams from
# serial and calls update_entities_telegram to update entities on arrival
if CONF_HOST in config:
if CONF_HOST in entry.data:
reader_factory = partial(
create_tcp_dsmr_reader,
config[CONF_HOST],
config[CONF_PORT],
config[CONF_DSMR_VERSION],
entry.data[CONF_HOST],
entry.data[CONF_PORT],
entry.data[CONF_DSMR_VERSION],
update_entities_telegram,
loop=hass.loop,
keep_alive_interval=60,
@ -207,13 +114,13 @@ async def async_setup_entry(
else:
reader_factory = partial(
create_dsmr_reader,
config[CONF_PORT],
config[CONF_DSMR_VERSION],
entry.data[CONF_PORT],
entry.data[CONF_DSMR_VERSION],
update_entities_telegram,
loop=hass.loop,
)
async def connect_and_reconnect():
async def connect_and_reconnect() -> None:
"""Connect to DSMR and keep reconnecting until Home Assistant stops."""
stop_listener = None
transport = None
@ -245,12 +152,12 @@ async def async_setup_entry(
update_entities_telegram({})
# throttle reconnect attempts
await asyncio.sleep(config[CONF_RECONNECT_INTERVAL])
await asyncio.sleep(entry.data[CONF_RECONNECT_INTERVAL])
except (serial.serialutil.SerialException, OSError):
# Log any error while establishing connection and drop to retry
# connection wait
_LOGGER.exception("Error connecting to DSMR")
LOGGER.exception("Error connecting to DSMR")
transport = None
protocol = None
except CancelledError:
@ -277,40 +184,48 @@ class DSMREntity(SensorEntity):
_attr_should_poll = False
def __init__(self, name, device_name, device_serial, obis, config, force_update):
def __init__(self, sensor: DSMRSensor, entry: ConfigEntry) -> None:
"""Initialize entity."""
self._obis = obis
self._config = config
self.telegram = {}
self._sensor = sensor
self._entry = entry
self.telegram: dict[str, DSMRObject] = {}
self._attr_name = name
self._attr_force_update = force_update
self._attr_unique_id = f"{device_serial}_{name}".replace(" ", "_")
device_serial = entry.data[CONF_SERIAL_ID]
device_name = DEVICE_NAME_ENERGY
if sensor.is_gas:
device_serial = entry.data[CONF_SERIAL_ID_GAS]
device_name = DEVICE_NAME_GAS
self._attr_name = sensor.name
self._attr_force_update = sensor.force_update
self._attr_unique_id = f"{device_serial}_{sensor.name}".replace(" ", "_")
self._attr_device_info = {
"identifiers": {(DOMAIN, device_serial)},
"name": device_name,
}
@callback
def update_data(self, telegram):
def update_data(self, telegram: dict[str, DSMRObject]) -> None:
"""Update data."""
self.telegram = telegram
if self.hass and self._obis in self.telegram:
if self.hass and self._sensor.obis_reference in self.telegram:
self.async_write_ha_state()
def get_dsmr_object_attr(self, attribute):
def get_dsmr_object_attr(self, attribute: str) -> str | None:
"""Read attribute from last received telegram for this DSMR object."""
# Make sure telegram contains an object for this entities obis
if self._obis not in self.telegram:
if self._sensor.obis_reference not in self.telegram:
return None
# Get the attribute value if the object has it
dsmr_object = self.telegram[self._obis]
dsmr_object = self.telegram[self._sensor.obis_reference]
return getattr(dsmr_object, attribute, None)
@property
def icon(self):
def icon(self) -> str | None:
"""Icon to use in the frontend, if any."""
if not self.name:
return None
if "Sags" in self.name or "Swells" in self.name:
return ICON_SWELL_SAG
if "Failure" in self.name:
@ -319,18 +234,21 @@ class DSMREntity(SensorEntity):
return ICON_POWER
if "Gas" in self.name:
return ICON_GAS
return None
@property
def state(self):
def state(self) -> StateType:
"""Return the state of sensor, if available, translate if needed."""
value = self.get_dsmr_object_attr("value")
if value is None:
return None
if self._obis == obis_ref.ELECTRICITY_ACTIVE_TARIFF:
return self.translate_tariff(value, self._config[CONF_DSMR_VERSION])
if self._sensor.obis_reference == obis_ref.ELECTRICITY_ACTIVE_TARIFF:
return self.translate_tariff(value, self._entry.data[CONF_DSMR_VERSION])
with suppress(TypeError):
value = round(
float(value), self._config.get(CONF_PRECISION, DEFAULT_PRECISION)
float(value), self._entry.data.get(CONF_PRECISION, DEFAULT_PRECISION)
)
if value is not None:
@ -339,16 +257,16 @@ class DSMREntity(SensorEntity):
return None
@property
def unit_of_measurement(self):
def unit_of_measurement(self) -> str | None:
"""Return the unit of measurement of this entity, if any."""
return self.get_dsmr_object_attr("unit")
@staticmethod
def translate_tariff(value, dsmr_version):
def translate_tariff(value: str, dsmr_version: str) -> str | None:
"""Convert 2/1 to normal/low depending on DSMR version."""
# DSMR V5B: Note: In Belgium values are swapped:
# Rate code 2 is used for low rate and rate code 1 is used for normal rate.
if dsmr_version in ("5B",):
if dsmr_version == "5B":
if value == "0001":
value = "0002"
elif value == "0002":