Fix modbus test_delay (#66993)

pull/67240/head
jan iversen 2022-02-25 17:06:25 +01:00 committed by GitHub
parent c856f673fb
commit c6f5633e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 23 deletions

View File

@ -711,34 +711,33 @@ async def test_delay(hass, mock_pymodbus):
]
}
mock_pymodbus.read_coils.return_value = ReadResult([0x01])
now = dt_util.utcnow()
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
start_time = dt_util.utcnow()
with mock.patch(
"homeassistant.helpers.event.dt_util.utcnow", return_value=start_time
):
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNKNOWN
# pass first scan_interval
start_time = now
now = now + timedelta(seconds=(set_scan_interval + 1))
with mock.patch(
"homeassistant.helpers.event.dt_util.utcnow", return_value=now, autospec=True
):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
stop_time = start_time + timedelta(seconds=(set_delay + 1))
step_timedelta = timedelta(seconds=1)
while now < stop_time:
now = now + step_timedelta
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
time_sensor_active = start_time + timedelta(seconds=2)
time_after_delay = start_time + timedelta(seconds=(set_delay))
time_after_scan = start_time + timedelta(seconds=(set_delay + set_scan_interval))
time_stop = time_after_scan + timedelta(seconds=10)
now = start_time
while now < time_stop:
now += timedelta(seconds=1)
with mock.patch(
"homeassistant.helpers.event.dt_util.utcnow",
return_value=now,
autospec=True,
):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
now = now + step_timedelta + timedelta(seconds=2)
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
if now > time_sensor_active:
if now <= time_after_delay:
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
elif now > time_after_scan:
assert hass.states.get(entity_id).state == STATE_ON
@pytest.mark.parametrize(