review comments. (#52337)

pull/52341/head
jan iversen 2021-06-30 14:34:33 +02:00 committed by GitHub
parent 0476c7f9ee
commit c0751c060f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 193 additions and 193 deletions

View File

@ -21,8 +21,8 @@ from homeassistant.core import State
from .conftest import ReadResult, base_test, prepare_service_update
sensor_name = "test_binary_sensor"
entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
SENSOR_NAME = "test_binary_sensor"
ENTITY_ID = f"{SENSOR_DOMAIN}.{SENSOR_NAME}"
@pytest.mark.parametrize(
@ -31,7 +31,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_BINARY_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
}
]
@ -39,7 +39,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_BINARY_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_SLAVE: 10,
CONF_INPUT_TYPE: CALL_TYPE_DISCRETE,
@ -88,8 +88,8 @@ async def test_all_binary_sensor(hass, do_type, regs, expected):
"""Run test for given config."""
state = await base_test(
hass,
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, CONF_INPUT_TYPE: do_type},
sensor_name,
{CONF_NAME: SENSOR_NAME, CONF_ADDRESS: 1234, CONF_INPUT_TYPE: do_type},
SENSOR_NAME,
SENSOR_DOMAIN,
CONF_BINARY_SENSORS,
None,
@ -107,7 +107,7 @@ async def test_service_binary_sensor_update(hass, mock_pymodbus):
config = {
CONF_BINARY_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 1234,
CONF_INPUT_TYPE: CALL_TYPE_COIL,
}
@ -119,21 +119,21 @@ async def test_service_binary_sensor_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
mock_pymodbus.read_coils.return_value = ReadResult([0x01])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
@pytest.mark.parametrize(
"mock_test_state",
[(State(entity_id, STATE_ON),)],
[(State(ENTITY_ID, STATE_ON),)],
indirect=True,
)
@pytest.mark.parametrize(
@ -142,7 +142,7 @@ async def test_service_binary_sensor_update(hass, mock_pymodbus):
{
CONF_BINARY_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
}
]
@ -151,4 +151,4 @@ async def test_service_binary_sensor_update(hass, mock_pymodbus):
)
async def test_restore_state_binary_sensor(hass, mock_test_state, mock_modbus):
"""Run test for binary sensor restore state."""
assert hass.states.get(entity_id).state == mock_test_state[0].state
assert hass.states.get(ENTITY_ID).state == mock_test_state[0].state

View File

@ -16,8 +16,8 @@ from homeassistant.core import State
from .conftest import ReadResult, base_test, prepare_service_update
climate_name = "test_climate"
entity_id = f"{CLIMATE_DOMAIN}.{climate_name}"
CLIMATE_NAME = "test_climate"
ENTITY_ID = f"{CLIMATE_DOMAIN}.{CLIMATE_NAME}"
@pytest.mark.parametrize(
@ -26,7 +26,7 @@ entity_id = f"{CLIMATE_DOMAIN}.{climate_name}"
{
CONF_CLIMATES: [
{
CONF_NAME: climate_name,
CONF_NAME: CLIMATE_NAME,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
CONF_SLAVE: 10,
@ -36,7 +36,7 @@ entity_id = f"{CLIMATE_DOMAIN}.{climate_name}"
{
CONF_CLIMATES: [
{
CONF_NAME: climate_name,
CONF_NAME: CLIMATE_NAME,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
CONF_SLAVE: 10,
@ -63,18 +63,18 @@ async def test_config_climate(hass, mock_modbus):
)
async def test_temperature_climate(hass, regs, expected):
"""Run test for given config."""
climate_name = "modbus_test_climate"
CLIMATE_NAME = "modbus_test_climate"
return
state = await base_test(
hass,
{
CONF_NAME: climate_name,
CONF_NAME: CLIMATE_NAME,
CONF_SLAVE: 1,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
CONF_COUNT: 2,
},
climate_name,
CLIMATE_NAME,
CLIMATE_DOMAIN,
CONF_CLIMATES,
None,
@ -92,7 +92,7 @@ async def test_service_climate_update(hass, mock_pymodbus):
config = {
CONF_CLIMATES: [
{
CONF_NAME: climate_name,
CONF_NAME: CLIMATE_NAME,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
CONF_SLAVE: 10,
@ -105,12 +105,12 @@ async def test_service_climate_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == "auto"
assert hass.states.get(ENTITY_ID).state == "auto"
test_value = State(entity_id, 35)
test_value = State(ENTITY_ID, 35)
test_value.attributes = {ATTR_TEMPERATURE: 37}
@ -125,7 +125,7 @@ test_value.attributes = {ATTR_TEMPERATURE: 37}
{
CONF_CLIMATES: [
{
CONF_NAME: climate_name,
CONF_NAME: CLIMATE_NAME,
CONF_TARGET_TEMP: 117,
CONF_ADDRESS: 117,
}
@ -135,6 +135,6 @@ test_value.attributes = {ATTR_TEMPERATURE: 37}
)
async def test_restore_state_climate(hass, mock_test_state, mock_modbus):
"""Run test for sensor restore state."""
state = hass.states.get(entity_id)
state = hass.states.get(ENTITY_ID)
assert state.state == HVAC_MODE_AUTO
assert state.attributes[ATTR_TEMPERATURE] == 37

View File

@ -31,8 +31,8 @@ from homeassistant.core import State
from .conftest import ReadResult, base_test, prepare_service_update
cover_name = "test_cover"
entity_id = f"{COVER_DOMAIN}.{cover_name}"
COVER_NAME = "test_cover"
ENTITY_ID = f"{COVER_DOMAIN}.{COVER_NAME}"
@pytest.mark.parametrize(
@ -41,7 +41,7 @@ entity_id = f"{COVER_DOMAIN}.{cover_name}"
{
CONF_COVERS: [
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_ADDRESS: 1234,
CONF_INPUT_TYPE: CALL_TYPE_COIL,
}
@ -50,7 +50,7 @@ entity_id = f"{COVER_DOMAIN}.{cover_name}"
{
CONF_COVERS: [
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_ADDRESS: 1234,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_SLAVE: 10,
@ -95,12 +95,12 @@ async def test_coil_cover(hass, regs, expected):
state = await base_test(
hass,
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_INPUT_TYPE: CALL_TYPE_COIL,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
},
cover_name,
COVER_NAME,
COVER_DOMAIN,
CONF_COVERS,
None,
@ -142,11 +142,11 @@ async def test_register_cover(hass, regs, expected):
state = await base_test(
hass,
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
},
cover_name,
COVER_NAME,
COVER_DOMAIN,
CONF_COVERS,
None,
@ -164,7 +164,7 @@ async def test_service_cover_update(hass, mock_pymodbus):
config = {
CONF_COVERS: [
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_ADDRESS: 1234,
CONF_STATUS_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
}
@ -176,23 +176,23 @@ async def test_service_cover_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_CLOSED
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x01])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(ENTITY_ID).state == STATE_OPEN
@pytest.mark.parametrize(
"mock_test_state",
[
(State(entity_id, STATE_CLOSED),),
(State(entity_id, STATE_CLOSING),),
(State(entity_id, STATE_OPENING),),
(State(entity_id, STATE_OPEN),),
(State(ENTITY_ID, STATE_CLOSED),),
(State(ENTITY_ID, STATE_CLOSING),),
(State(ENTITY_ID, STATE_OPENING),),
(State(ENTITY_ID, STATE_OPEN),),
],
indirect=True,
)
@ -202,7 +202,7 @@ async def test_service_cover_update(hass, mock_pymodbus):
{
CONF_COVERS: [
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_INPUT_TYPE: CALL_TYPE_COIL,
CONF_ADDRESS: 1234,
CONF_STATE_OPEN: 1,
@ -219,22 +219,22 @@ async def test_service_cover_update(hass, mock_pymodbus):
async def test_restore_state_cover(hass, mock_test_state, mock_modbus):
"""Run test for cover restore state."""
test_state = mock_test_state[0].state
assert hass.states.get(entity_id).state == test_state
assert hass.states.get(ENTITY_ID).state == test_state
async def test_service_cover_move(hass, mock_pymodbus):
"""Run test for service homeassistant.update_entity."""
entity_id2 = f"{entity_id}2"
ENTITY_ID2 = f"{ENTITY_ID}2"
config = {
CONF_COVERS: [
{
CONF_NAME: cover_name,
CONF_NAME: COVER_NAME,
CONF_ADDRESS: 1234,
CONF_STATUS_REGISTER_TYPE: CALL_TYPE_REGISTER_HOLDING,
},
{
CONF_NAME: f"{cover_name}2",
CONF_NAME: f"{COVER_NAME}2",
CONF_INPUT_TYPE: CALL_TYPE_COIL,
CONF_ADDRESS: 1234,
},
@ -246,26 +246,26 @@ async def test_service_cover_move(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"cover", "open_cover", {"entity_id": entity_id}, blocking=True
"cover", "open_cover", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(ENTITY_ID).state == STATE_OPEN
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x00])
await hass.services.async_call(
"cover", "close_cover", {"entity_id": entity_id}, blocking=True
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_CLOSED
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
mock_pymodbus.reset()
mock_pymodbus.read_holding_registers.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"cover", "close_cover", {"entity_id": entity_id}, blocking=True
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
)
assert mock_pymodbus.read_holding_registers.called
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
mock_pymodbus.read_coils.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"cover", "close_cover", {"entity_id": entity_id2}, blocking=True
"cover", "close_cover", {"entity_id": ENTITY_ID2}, blocking=True
)
assert hass.states.get(entity_id2).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE

View File

@ -34,8 +34,8 @@ from homeassistant.setup import async_setup_component
from .conftest import ReadResult, base_test, prepare_service_update
fan_name = "test_fan"
entity_id = f"{FAN_DOMAIN}.{fan_name}"
FAN_NAME = "test_fan"
ENTITY_ID = f"{FAN_DOMAIN}.{FAN_NAME}"
@pytest.mark.parametrize(
@ -44,7 +44,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
}
]
@ -52,7 +52,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
}
@ -61,7 +61,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -78,7 +78,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -95,7 +95,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -112,7 +112,7 @@ entity_id = f"{FAN_DOMAIN}.{fan_name}"
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -164,13 +164,13 @@ async def test_all_fan(hass, call_type, regs, verify, expected):
state = await base_test(
hass,
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_WRITE_TYPE: call_type,
**verify,
},
fan_name,
FAN_NAME,
FAN_DOMAIN,
CONF_FANS,
None,
@ -184,7 +184,7 @@ async def test_all_fan(hass, call_type, regs, verify, expected):
@pytest.mark.parametrize(
"mock_test_state",
[(State(entity_id, STATE_ON),)],
[(State(ENTITY_ID, STATE_ON),)],
indirect=True,
)
@pytest.mark.parametrize(
@ -193,7 +193,7 @@ async def test_all_fan(hass, call_type, regs, verify, expected):
{
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
}
]
@ -202,13 +202,13 @@ async def test_all_fan(hass, call_type, regs, verify, expected):
)
async def test_restore_state_fan(hass, mock_test_state, mock_modbus):
"""Run test for fan restore state."""
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
async def test_fan_service_turn(hass, caplog, mock_pymodbus):
"""Run test for service turn_on/turn_off."""
entity_id2 = f"{FAN_DOMAIN}.{fan_name}2"
ENTITY_ID2 = f"{FAN_DOMAIN}.{FAN_NAME}2"
config = {
MODBUS_DOMAIN: {
CONF_TYPE: "tcp",
@ -216,12 +216,12 @@ async def test_fan_service_turn(hass, caplog, mock_pymodbus):
CONF_PORT: 5501,
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
},
{
CONF_NAME: f"{fan_name}2",
CONF_NAME: f"{FAN_NAME}2",
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_VERIFY: {},
@ -233,44 +233,44 @@ async def test_fan_service_turn(hass, caplog, mock_pymodbus):
await hass.async_block_till_done()
assert MODBUS_DOMAIN in hass.config.components
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
await hass.services.async_call(
"fan", "turn_on", service_data={"entity_id": entity_id}
"fan", "turn_on", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
await hass.services.async_call(
"fan", "turn_off", service_data={"entity_id": entity_id}
"fan", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x01])
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
await hass.services.async_call(
"fan", "turn_on", service_data={"entity_id": entity_id2}
"fan", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_ON
assert hass.states.get(ENTITY_ID2).state == STATE_ON
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x00])
await hass.services.async_call(
"fan", "turn_off", service_data={"entity_id": entity_id2}
"fan", "turn_off", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
mock_pymodbus.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"fan", "turn_on", service_data={"entity_id": entity_id2}
"fan", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE
mock_pymodbus.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"fan", "turn_off", service_data={"entity_id": entity_id}
"fan", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
async def test_service_fan_update(hass, mock_pymodbus):
@ -279,7 +279,7 @@ async def test_service_fan_update(hass, mock_pymodbus):
config = {
CONF_FANS: [
{
CONF_NAME: fan_name,
CONF_NAME: FAN_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
@ -292,11 +292,11 @@ async def test_service_fan_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
mock_pymodbus.read_coils.return_value = ReadResult([0x00])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF

View File

@ -34,8 +34,8 @@ from homeassistant.setup import async_setup_component
from .conftest import ReadResult, base_test, prepare_service_update
light_name = "test_light"
entity_id = f"{LIGHT_DOMAIN}.{light_name}"
LIGHT_NAME = "test_light"
ENTITY_ID = f"{LIGHT_DOMAIN}.{LIGHT_NAME}"
@pytest.mark.parametrize(
@ -44,7 +44,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
}
]
@ -52,7 +52,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
}
@ -61,7 +61,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -78,7 +78,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -95,7 +95,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -112,7 +112,7 @@ entity_id = f"{LIGHT_DOMAIN}.{light_name}"
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -164,13 +164,13 @@ async def test_all_light(hass, call_type, regs, verify, expected):
state = await base_test(
hass,
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_WRITE_TYPE: call_type,
**verify,
},
light_name,
LIGHT_NAME,
LIGHT_DOMAIN,
CONF_LIGHTS,
None,
@ -184,7 +184,7 @@ async def test_all_light(hass, call_type, regs, verify, expected):
@pytest.mark.parametrize(
"mock_test_state",
[(State(entity_id, STATE_ON),)],
[(State(ENTITY_ID, STATE_ON),)],
indirect=True,
)
@pytest.mark.parametrize(
@ -193,7 +193,7 @@ async def test_all_light(hass, call_type, regs, verify, expected):
{
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
}
]
@ -202,13 +202,13 @@ async def test_all_light(hass, call_type, regs, verify, expected):
)
async def test_restore_state_light(hass, mock_test_state, mock_modbus):
"""Run test for sensor restore state."""
assert hass.states.get(entity_id).state == mock_test_state[0].state
assert hass.states.get(ENTITY_ID).state == mock_test_state[0].state
async def test_light_service_turn(hass, caplog, mock_pymodbus):
"""Run test for service turn_on/turn_off."""
entity_id2 = f"{entity_id}2"
ENTITY_ID2 = f"{ENTITY_ID}2"
config = {
MODBUS_DOMAIN: {
CONF_TYPE: "tcp",
@ -216,12 +216,12 @@ async def test_light_service_turn(hass, caplog, mock_pymodbus):
CONF_PORT: 5501,
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
},
{
CONF_NAME: f"{light_name}2",
CONF_NAME: f"{LIGHT_NAME}2",
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_VERIFY: {},
@ -233,44 +233,44 @@ async def test_light_service_turn(hass, caplog, mock_pymodbus):
await hass.async_block_till_done()
assert MODBUS_DOMAIN in hass.config.components
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id}
"light", "turn_on", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id}
"light", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x01])
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id2}
"light", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_ON
assert hass.states.get(ENTITY_ID2).state == STATE_ON
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x00])
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id2}
"light", "turn_off", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
mock_pymodbus.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id2}
"light", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE
mock_pymodbus.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id}
"light", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
async def test_service_light_update(hass, mock_pymodbus):
@ -279,7 +279,7 @@ async def test_service_light_update(hass, mock_pymodbus):
config = {
CONF_LIGHTS: [
{
CONF_NAME: light_name,
CONF_NAME: LIGHT_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
@ -292,11 +292,11 @@ async def test_service_light_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
mock_pymodbus.read_coils.return_value = ReadResult([0x00])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF

View File

@ -38,8 +38,8 @@ from homeassistant.core import State
from .conftest import ReadResult, base_config_test, base_test, prepare_service_update
sensor_name = "test_sensor"
entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
SENSOR_NAME = "test_sensor"
ENTITY_ID = f"{SENSOR_DOMAIN}.{SENSOR_NAME}"
@pytest.mark.parametrize(
@ -48,7 +48,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
}
]
@ -56,7 +56,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_SLAVE: 10,
CONF_COUNT: 1,
@ -72,7 +72,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_SLAVE: 10,
CONF_COUNT: 1,
@ -88,7 +88,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_NONE,
@ -98,7 +98,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_COUNT: 1,
CONF_SWAP: CONF_SWAP_BYTE,
@ -108,7 +108,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD,
@ -118,7 +118,7 @@ entity_id = f"{SENSOR_DOMAIN}.{sensor_name}"
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
CONF_COUNT: 2,
CONF_SWAP: CONF_SWAP_WORD_BYTE,
@ -212,7 +212,7 @@ async def test_config_wrong_struct_sensor(
"""Run test for sensor with wrong struct."""
config_sensor = {
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
**do_config,
}
caplog.set_level(logging.WARNING)
@ -221,7 +221,7 @@ async def test_config_wrong_struct_sensor(
await base_config_test(
hass,
config_sensor,
sensor_name,
SENSOR_NAME,
SENSOR_DOMAIN,
CONF_SENSORS,
None,
@ -507,8 +507,8 @@ async def test_all_sensor(hass, cfg, regs, expected):
state = await base_test(
hass,
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, **cfg},
sensor_name,
{CONF_NAME: SENSOR_NAME, CONF_ADDRESS: 1234, **cfg},
SENSOR_NAME,
SENSOR_DOMAIN,
CONF_SENSORS,
CONF_REGISTERS,
@ -561,8 +561,8 @@ async def test_struct_sensor(hass, cfg, regs, expected):
state = await base_test(
hass,
{CONF_NAME: sensor_name, CONF_ADDRESS: 1234, **cfg},
sensor_name,
{CONF_NAME: SENSOR_NAME, CONF_ADDRESS: 1234, **cfg},
SENSOR_NAME,
SENSOR_DOMAIN,
CONF_SENSORS,
None,
@ -576,7 +576,7 @@ async def test_struct_sensor(hass, cfg, regs, expected):
@pytest.mark.parametrize(
"mock_test_state",
[(State(entity_id, "117"),)],
[(State(ENTITY_ID, "117"),)],
indirect=True,
)
@pytest.mark.parametrize(
@ -585,7 +585,7 @@ async def test_struct_sensor(hass, cfg, regs, expected):
{
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 51,
}
]
@ -594,7 +594,7 @@ async def test_struct_sensor(hass, cfg, regs, expected):
)
async def test_restore_state_sensor(hass, mock_test_state, mock_modbus):
"""Run test for sensor restore state."""
assert hass.states.get(entity_id).state == mock_test_state[0].state
assert hass.states.get(ENTITY_ID).state == mock_test_state[0].state
@pytest.mark.parametrize(
@ -602,11 +602,11 @@ async def test_restore_state_sensor(hass, mock_test_state, mock_modbus):
[
(
CONF_SWAP_WORD,
f"Error in sensor {sensor_name} swap(word) not possible due to the registers count: 1, needed: 2",
f"Error in sensor {SENSOR_NAME} swap(word) not possible due to the registers count: 1, needed: 2",
),
(
CONF_SWAP_WORD_BYTE,
f"Error in sensor {sensor_name} swap(word_byte) not possible due to the registers count: 1, needed: 2",
f"Error in sensor {SENSOR_NAME} swap(word_byte) not possible due to the registers count: 1, needed: 2",
),
],
)
@ -615,7 +615,7 @@ async def test_swap_sensor_wrong_config(
):
"""Run test for sensor swap."""
config = {
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 1234,
CONF_COUNT: 1,
CONF_SWAP: swap_type,
@ -627,7 +627,7 @@ async def test_swap_sensor_wrong_config(
await base_config_test(
hass,
config,
sensor_name,
SENSOR_NAME,
SENSOR_DOMAIN,
CONF_SENSORS,
None,
@ -642,7 +642,7 @@ async def test_service_sensor_update(hass, mock_pymodbus):
config = {
CONF_SENSORS: [
{
CONF_NAME: sensor_name,
CONF_NAME: SENSOR_NAME,
CONF_ADDRESS: 1234,
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
}
@ -654,11 +654,11 @@ async def test_service_sensor_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == "27"
assert hass.states.get(ENTITY_ID).state == "27"
mock_pymodbus.read_input_registers.return_value = ReadResult([32])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == "32"
assert hass.states.get(ENTITY_ID).state == "32"

View File

@ -43,8 +43,8 @@ from .conftest import ReadResult, base_test, prepare_service_update
from tests.common import async_fire_time_changed
switch_name = "test_switch"
entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
SWITCH_NAME = "test_switch"
ENTITY_ID = f"{SWITCH_DOMAIN}.{SWITCH_NAME}"
@pytest.mark.parametrize(
@ -53,7 +53,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
}
]
@ -61,7 +61,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
}
@ -70,7 +70,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -88,7 +88,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -107,7 +107,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -125,7 +125,7 @@ entity_id = f"{SWITCH_DOMAIN}.{switch_name}"
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
@ -179,13 +179,13 @@ async def test_all_switch(hass, call_type, regs, verify, expected):
state = await base_test(
hass,
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_WRITE_TYPE: call_type,
**verify,
},
switch_name,
SWITCH_NAME,
SWITCH_DOMAIN,
CONF_SWITCHES,
None,
@ -199,7 +199,7 @@ async def test_all_switch(hass, call_type, regs, verify, expected):
@pytest.mark.parametrize(
"mock_test_state",
[(State(entity_id, STATE_ON),)],
[(State(ENTITY_ID, STATE_ON),)],
indirect=True,
)
@pytest.mark.parametrize(
@ -208,7 +208,7 @@ async def test_all_switch(hass, call_type, regs, verify, expected):
{
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
}
]
@ -217,13 +217,13 @@ async def test_all_switch(hass, call_type, regs, verify, expected):
)
async def test_restore_state_switch(hass, mock_test_state, mock_modbus):
"""Run test for sensor restore state."""
assert hass.states.get(entity_id).state == mock_test_state[0].state
assert hass.states.get(ENTITY_ID).state == mock_test_state[0].state
async def test_switch_service_turn(hass, caplog, mock_pymodbus):
"""Run test for service turn_on/turn_off."""
entity_id2 = f"{SWITCH_DOMAIN}.{switch_name}2"
ENTITY_ID2 = f"{SWITCH_DOMAIN}.{SWITCH_NAME}2"
config = {
MODBUS_DOMAIN: {
CONF_TYPE: "tcp",
@ -231,12 +231,12 @@ async def test_switch_service_turn(hass, caplog, mock_pymodbus):
CONF_PORT: 5501,
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
},
{
CONF_NAME: f"{switch_name}2",
CONF_NAME: f"{SWITCH_NAME}2",
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_VERIFY: {},
@ -248,44 +248,44 @@ async def test_switch_service_turn(hass, caplog, mock_pymodbus):
await hass.async_block_till_done()
assert MODBUS_DOMAIN in hass.config.components
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
await hass.services.async_call(
"switch", "turn_on", service_data={"entity_id": entity_id}
"switch", "turn_on", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
await hass.services.async_call(
"switch", "turn_off", service_data={"entity_id": entity_id}
"switch", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x01])
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
await hass.services.async_call(
"switch", "turn_on", service_data={"entity_id": entity_id2}
"switch", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_ON
assert hass.states.get(ENTITY_ID2).state == STATE_ON
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x00])
await hass.services.async_call(
"switch", "turn_off", service_data={"entity_id": entity_id2}
"switch", "turn_off", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_OFF
assert hass.states.get(ENTITY_ID2).state == STATE_OFF
mock_pymodbus.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"switch", "turn_on", service_data={"entity_id": entity_id2}
"switch", "turn_on", service_data={"entity_id": ENTITY_ID2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE
mock_pymodbus.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"switch", "turn_off", service_data={"entity_id": entity_id}
"switch", "turn_off", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
async def test_service_switch_update(hass, mock_pymodbus):
@ -294,7 +294,7 @@ async def test_service_switch_update(hass, mock_pymodbus):
config = {
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
@ -307,14 +307,14 @@ async def test_service_switch_update(hass, mock_pymodbus):
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON
mock_pymodbus.read_coils.return_value = ReadResult([0x00])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
async def test_delay_switch(hass, mock_pymodbus):
@ -327,7 +327,7 @@ async def test_delay_switch(hass, mock_pymodbus):
CONF_PORT: 5501,
CONF_SWITCHES: [
{
CONF_NAME: switch_name,
CONF_NAME: SWITCH_NAME,
CONF_ADDRESS: 51,
CONF_SCAN_INTERVAL: 0,
CONF_VERIFY: {
@ -345,12 +345,12 @@ async def test_delay_switch(hass, mock_pymodbus):
assert await async_setup_component(hass, MODBUS_DOMAIN, config) is True
await hass.async_block_till_done()
await hass.services.async_call(
"switch", "turn_on", service_data={"entity_id": entity_id}
"switch", "turn_on", service_data={"entity_id": ENTITY_ID}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert hass.states.get(ENTITY_ID).state == STATE_OFF
now = now + timedelta(seconds=2)
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
assert hass.states.get(ENTITY_ID).state == STATE_ON