Remove modbus pragma no cover and solve nan (#99221)

* Remove pragma no cover.

* Ruff !

* Review comments.

* update test.

* Review.

* review.

* Add slave test.
pull/100223/head
jan iversen 2023-09-12 16:05:59 +02:00 committed by Paulus Schoutsen
parent 2dcf6a6f5b
commit d5ff05bdf5
2 changed files with 131 additions and 13 deletions

View File

@ -188,10 +188,14 @@ class BaseStructPlatform(BasePlatform, RestoreEntity):
registers.reverse()
return registers
def __process_raw_value(self, entry: float | int | str) -> float | int | str | None:
def __process_raw_value(
self, entry: float | int | str | bytes
) -> float | int | str | bytes | None:
"""Process value from sensor with NaN handling, scaling, offset, min/max etc."""
if self._nan_value and entry in (self._nan_value, -self._nan_value):
return None
if isinstance(entry, bytes):
return entry
val: float | int = self._scale * entry + self._offset
if self._min_value is not None and val < self._min_value:
return self._min_value
@ -232,14 +236,20 @@ class BaseStructPlatform(BasePlatform, RestoreEntity):
if isinstance(v_temp, int) and self._precision == 0:
v_result.append(str(v_temp))
elif v_temp is None:
v_result.append("") # pragma: no cover
v_result.append("0")
elif v_temp != v_temp: # noqa: PLR0124
# NaN float detection replace with None
v_result.append("nan") # pragma: no cover
v_result.append("0")
else:
v_result.append(f"{float(v_temp):.{self._precision}f}")
return ",".join(map(str, v_result))
# NaN float detection replace with None
if val[0] != val[0]: # noqa: PLR0124
return None
if byte_string == b"nan\x00":
return None
# Apply scale, precision, limits to floats and ints
val_result = self.__process_raw_value(val[0])
@ -249,15 +259,10 @@ class BaseStructPlatform(BasePlatform, RestoreEntity):
if val_result is None:
return None
# NaN float detection replace with None
if val_result != val_result: # noqa: PLR0124
return None # pragma: no cover
if isinstance(val_result, int) and self._precision == 0:
return str(val_result)
if isinstance(val_result, str):
if val_result == "nan":
val_result = None # pragma: no cover
return val_result
if isinstance(val_result, bytes):
return val_result.decode()
return f"{float(val_result):.{self._precision}f}"

View File

@ -1,4 +1,6 @@
"""The tests for the Modbus sensor component."""
import struct
from freezegun.api import FrozenDateTimeFactory
import pytest
@ -625,6 +627,21 @@ async def test_all_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> None:
@pytest.mark.parametrize(
("config_addon", "register_words", "do_exception", "expected"),
[
(
{
CONF_SLAVE_COUNT: 1,
CONF_UNIQUE_ID: SLAVE_UNIQUE_ID,
CONF_DATA_TYPE: DataType.FLOAT32,
},
[
0x5102,
0x0304,
int.from_bytes(struct.pack(">f", float("nan"))[0:2]),
int.from_bytes(struct.pack(">f", float("nan"))[2:4]),
],
False,
["34899771392", "0"],
),
(
{
CONF_SLAVE_COUNT: 0,
@ -902,6 +919,65 @@ async def test_wrong_unpack(hass: HomeAssistant, mock_do_cycle) -> None:
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
@pytest.mark.parametrize(
"do_config",
[
{
CONF_SENSORS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 51,
CONF_SCAN_INTERVAL: 1,
},
],
},
],
)
@pytest.mark.parametrize(
("config_addon", "register_words", "expected"),
[
(
{
CONF_DATA_TYPE: DataType.FLOAT32,
},
[
int.from_bytes(struct.pack(">f", float("nan"))[0:2]),
int.from_bytes(struct.pack(">f", float("nan"))[2:4]),
],
STATE_UNAVAILABLE,
),
(
{
CONF_DATA_TYPE: DataType.FLOAT32,
},
[0x6E61, 0x6E00],
STATE_UNAVAILABLE,
),
(
{
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_COUNT: 2,
CONF_STRUCTURE: "4s",
},
[0x6E61, 0x6E00],
STATE_UNAVAILABLE,
),
(
{
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_COUNT: 2,
CONF_STRUCTURE: "4s",
},
[0x6161, 0x6100],
"aaa\x00",
),
],
)
async def test_unpack_ok(hass: HomeAssistant, mock_do_cycle, expected) -> None:
"""Run test for sensor."""
assert hass.states.get(ENTITY_ID).state == expected
@pytest.mark.parametrize(
"do_config",
[
@ -965,10 +1041,35 @@ async def test_lazy_error_sensor(
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_STRUCTURE: ">4f",
},
# floats: 7.931250095367432, 10.600000381469727,
# floats: nan, 10.600000381469727,
# 1.000879611487865e-28, 10.566553115844727
[0x40FD, 0xCCCD, 0x4129, 0x999A, 0x10FD, 0xC0CD, 0x4129, 0x109A],
"7.93,10.60,0.00,10.57",
[
int.from_bytes(struct.pack(">f", float("nan"))[0:2]),
int.from_bytes(struct.pack(">f", float("nan"))[2:4]),
0x4129,
0x999A,
0x10FD,
0xC0CD,
0x4129,
0x109A,
],
"0,10.60,0.00,10.57",
),
(
{
CONF_COUNT: 4,
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_STRUCTURE: ">2i",
CONF_NAN_VALUE: 0x0000000F,
},
# int: nan, 10,
[
0x0000,
0x000F,
0x0000,
0x000A,
],
"0,10",
),
(
{
@ -988,6 +1089,18 @@ async def test_lazy_error_sensor(
[0x0101],
"257",
),
(
{
CONF_COUNT: 8,
CONF_PRECISION: 2,
CONF_DATA_TYPE: DataType.CUSTOM,
CONF_STRUCTURE: ">4f",
},
# floats: 7.931250095367432, 10.600000381469727,
# 1.000879611487865e-28, 10.566553115844727
[0x40FD, 0xCCCD, 0x4129, 0x999A, 0x10FD, 0xC0CD, 0x4129, 0x109A],
"7.93,10.60,0.00,10.57",
),
],
)
async def test_struct_sensor(hass: HomeAssistant, mock_do_cycle, expected) -> None: