core/tests/components/locative/test_init.py

313 lines
9.1 KiB
Python
Raw Normal View History

2016-03-09 09:25:50 +00:00
"""The tests the for Locative device tracker platform."""
from http import HTTPStatus
2021-01-01 21:31:56 +00:00
from unittest.mock import patch
import pytest
2015-12-31 07:33:48 +00:00
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import locative
2019-07-31 19:25:30 +00:00
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER_DOMAIN
from homeassistant.components.locative import DOMAIN, TRACKER_UPDATE
from homeassistant.config import async_process_ha_core_config
from homeassistant.helpers.dispatcher import DATA_DISPATCHER
from homeassistant.setup import async_setup_component
2019-02-24 16:49:50 +00:00
# pylint: disable=redefined-outer-name
2015-12-31 07:33:48 +00:00
@pytest.fixture(autouse=True)
def mock_dev_track(mock_device_tracker_conf):
"""Mock device tracker config loading."""
@pytest.fixture
Upgrade pytest-aiohttp (#82475) * Upgrade pytest-aiohttp * Make sure executors, tasks and timers are closed Some test will trigger warnings on garbage collect, these warnings spills over into next test. Some test trigger tasks that raise errors on shutdown, these spill over into next test. This is to mimic older pytest-aiohttp and it's behaviour on test cleanup. Discussions on similar changes for pytest-aiohttp are here: https://github.com/pytest-dev/pytest-asyncio/pull/309 * Replace loop with event_loop * Make sure time is frozen for tests * Make sure the ConditionType is not async /home-assistant/homeassistant/helpers/template.py:2082: RuntimeWarning: coroutine 'AsyncMockMixin._execute_mock_call' was never awaited def wrapper(*args, **kwargs): Enable tracemalloc to get traceback where the object was allocated. See https://docs.pytest.org/en/stable/how-to/capture-warnings.html#resource-warnings for more info. * Increase litejet press tests with a factor 10 The times are simulated anyway, and we can't stop the normal event from occuring. * Use async handlers for aiohttp tests/components/motioneye/test_camera.py::test_get_still_image_from_camera tests/components/motioneye/test_camera.py::test_get_still_image_from_camera tests/components/motioneye/test_camera.py::test_get_stream_from_camera tests/components/motioneye/test_camera.py::test_get_stream_from_camera tests/components/motioneye/test_camera.py::test_camera_option_stream_url_template tests/components/motioneye/test_camera.py::test_camera_option_stream_url_template /Users/joakim/src/hass/home-assistant/venv/lib/python3.9/site-packages/aiohttp/web_urldispatcher.py:189: DeprecationWarning: Bare functions are deprecated, use async ones warnings.warn( * Switch to freezegun in modbus tests The tests allowed clock to tick in between steps * Make sure skybell object are fully mocked Old tests would trigger attempts to post to could services: ``` DEBUG:aioskybell:HTTP post https://cloud.myskybell.com/api/v3/login/ Request with headers: {'content-type': 'application/json', 'accept': '*/*', 'x-skybell-app-id': 'd2b542c7-a7e4-4e1e-b77d-2b76911c7c46', 'x-skybell-client-id': '1f36a3c0-6dee-4997-a6db-4e1c67338e57'} ``` * Fix sorting that broke after rebase
2022-11-29 21:36:36 +00:00
async def locative_client(event_loop, hass, hass_client):
"""Locative mock client."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
2019-07-31 19:25:30 +00:00
with patch("homeassistant.components.device_tracker.legacy.update_config"):
return await hass_client()
@pytest.fixture
async def webhook_id(hass, locative_client):
"""Initialize the Geofency component and get the webhook_id."""
await async_process_ha_core_config(
2020-08-27 11:56:20 +00:00
hass,
{"internal_url": "http://example.local:8123"},
)
2019-07-31 19:25:30 +00:00
result = await hass.config_entries.flow.async_init(
"locative", context={"source": config_entries.SOURCE_USER}
2019-07-31 19:25:30 +00:00
)
assert result["type"] == data_entry_flow.FlowResultType.FORM, result
2019-07-31 19:25:30 +00:00
result = await hass.config_entries.flow.async_configure(result["flow_id"], {})
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
await hass.async_block_till_done()
2019-07-31 19:25:30 +00:00
return result["result"].data["webhook_id"]
async def test_missing_data(locative_client, webhook_id):
"""Test missing data."""
url = f"/api/webhook/{webhook_id}"
data = {
2019-07-31 19:25:30 +00:00
"latitude": 1.0,
"longitude": 1.1,
"device": "123",
"id": "Home",
"trigger": "enter",
}
# No data
req = await locative_client.post(url)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
# No latitude
copy = data.copy()
2019-07-31 19:25:30 +00:00
del copy["latitude"]
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
# No device
copy = data.copy()
2019-07-31 19:25:30 +00:00
del copy["device"]
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
# No location
copy = data.copy()
2019-07-31 19:25:30 +00:00
del copy["id"]
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
# No trigger
copy = data.copy()
2019-07-31 19:25:30 +00:00
del copy["trigger"]
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
# Test message
copy = data.copy()
2019-07-31 19:25:30 +00:00
copy["trigger"] = "test"
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.OK
# Test message, no location
copy = data.copy()
2019-07-31 19:25:30 +00:00
copy["trigger"] = "test"
del copy["id"]
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.OK
# Unknown trigger
copy = data.copy()
2019-07-31 19:25:30 +00:00
copy["trigger"] = "foobar"
req = await locative_client.post(url, data=copy)
assert req.status == HTTPStatus.UNPROCESSABLE_ENTITY
async def test_enter_and_exit(hass, locative_client, webhook_id):
"""Test when there is a known zone."""
url = f"/api/webhook/{webhook_id}"
data = {
2019-07-31 19:25:30 +00:00
"latitude": 40.7855,
"longitude": -111.7367,
"device": "123",
"id": "Home",
"trigger": "enter",
}
# Enter the Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state_name = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
).state
assert state_name == "home"
2019-07-31 19:25:30 +00:00
data["id"] = "HOME"
data["trigger"] = "exit"
# Exit Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state_name = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
).state
assert state_name == "not_home"
2019-07-31 19:25:30 +00:00
data["id"] = "hOmE"
data["trigger"] = "enter"
# Enter Home again
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state_name = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
).state
assert state_name == "home"
2019-07-31 19:25:30 +00:00
data["trigger"] = "exit"
# Exit Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state_name = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
).state
assert state_name == "not_home"
2019-07-31 19:25:30 +00:00
data["id"] = "work"
data["trigger"] = "enter"
# Enter Work
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state_name = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
).state
assert state_name == "work"
async def test_exit_after_enter(hass, locative_client, webhook_id):
"""Test when an exit message comes after an enter message."""
url = f"/api/webhook/{webhook_id}"
data = {
2019-07-31 19:25:30 +00:00
"latitude": 40.7855,
"longitude": -111.7367,
"device": "123",
"id": "Home",
"trigger": "enter",
}
# Enter Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
assert state.state == "home"
2019-07-31 19:25:30 +00:00
data["id"] = "Work"
# Enter Work
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
assert state.state == "work"
2019-07-31 19:25:30 +00:00
data["id"] = "Home"
data["trigger"] = "exit"
# Exit Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
assert state.state == "work"
async def test_exit_first(hass, locative_client, webhook_id):
"""Test when an exit message is sent first on a new device."""
url = f"/api/webhook/{webhook_id}"
data = {
2019-07-31 19:25:30 +00:00
"latitude": 40.7855,
"longitude": -111.7367,
"device": "new_device",
"id": "Home",
"trigger": "exit",
}
# Exit Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
assert state.state == "not_home"
async def test_two_devices(hass, locative_client, webhook_id):
"""Test updating two different devices."""
url = f"/api/webhook/{webhook_id}"
data_device_1 = {
2019-07-31 19:25:30 +00:00
"latitude": 40.7855,
"longitude": -111.7367,
"device": "device_1",
"id": "Home",
"trigger": "exit",
}
# Exit Home
req = await locative_client.post(url, data=data_device_1)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data_device_1["device"])
)
assert state.state == "not_home"
# Enter Home
data_device_2 = dict(data_device_1)
2019-07-31 19:25:30 +00:00
data_device_2["device"] = "device_2"
data_device_2["trigger"] = "enter"
req = await locative_client.post(url, data=data_device_2)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-07-31 19:25:30 +00:00
state = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data_device_2["device"])
)
assert state.state == "home"
state = hass.states.get(
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data_device_1["device"])
)
assert state.state == "not_home"
@pytest.mark.xfail(
2019-07-31 19:25:30 +00:00
reason="The device_tracker component does not support unloading yet."
)
2019-02-24 16:49:50 +00:00
async def test_load_unload_entry(hass, locative_client, webhook_id):
"""Test that the appropriate dispatch signals are added and removed."""
url = f"/api/webhook/{webhook_id}"
2019-02-24 16:49:50 +00:00
data = {
2019-07-31 19:25:30 +00:00
"latitude": 40.7855,
"longitude": -111.7367,
"device": "new_device",
"id": "Home",
"trigger": "exit",
2019-02-24 16:49:50 +00:00
}
# Exit Home
req = await locative_client.post(url, data=data)
await hass.async_block_till_done()
assert req.status == HTTPStatus.OK
2019-02-24 16:49:50 +00:00
2019-07-31 19:25:30 +00:00
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
assert state.state == "not_home"
2019-02-24 16:49:50 +00:00
assert len(hass.data[DATA_DISPATCHER][TRACKER_UPDATE]) == 1
entry = hass.config_entries.async_entries(DOMAIN)[0]
await locative.async_unload_entry(hass, entry)
await hass.async_block_till_done()
2019-02-24 16:49:50 +00:00
assert not hass.data[DATA_DISPATCHER][TRACKER_UPDATE]