core/tests/components/device_tracker/test_init.py

630 lines
20 KiB
Python

"""The tests for the device tracker component."""
from datetime import datetime, timedelta
import json
import logging
import os
from unittest.mock import Mock, call, patch
import pytest
from homeassistant.components import zone
import homeassistant.components.device_tracker as device_tracker
from homeassistant.components.device_tracker import const, legacy
from homeassistant.const import (
ATTR_ENTITY_PICTURE,
ATTR_FRIENDLY_NAME,
ATTR_GPS_ACCURACY,
ATTR_ICON,
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_PLATFORM,
STATE_HOME,
STATE_NOT_HOME,
)
from homeassistant.core import State, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import discovery
from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import (
assert_setup_component,
async_fire_time_changed,
mock_registry,
mock_restore_cache,
patch_yaml_files,
)
from tests.components.device_tracker import common
TEST_PLATFORM = {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}}
_LOGGER = logging.getLogger(__name__)
@pytest.fixture(name="yaml_devices")
def mock_yaml_devices(hass):
"""Get a path for storing yaml devices."""
yaml_devices = hass.config.path(legacy.YAML_DEVICES)
if os.path.isfile(yaml_devices):
os.remove(yaml_devices)
yield yaml_devices
if os.path.isfile(yaml_devices):
os.remove(yaml_devices)
async def test_is_on(hass):
"""Test is_on method."""
entity_id = f"{const.DOMAIN}.test"
hass.states.async_set(entity_id, STATE_HOME)
assert device_tracker.is_on(hass, entity_id)
hass.states.async_set(entity_id, STATE_NOT_HOME)
assert not device_tracker.is_on(hass, entity_id)
async def test_reading_broken_yaml_config(hass):
"""Test when known devices contains invalid data."""
files = {
"empty.yaml": "",
"nodict.yaml": "100",
"badkey.yaml": "@:\n name: Device",
"noname.yaml": "my_device:\n",
"allok.yaml": "My Device:\n name: Device",
"oneok.yaml": ("My Device!:\n name: Device\nbad_device:\n nme: Device"),
}
args = {"hass": hass, "consider_home": timedelta(seconds=60)}
with patch_yaml_files(files):
assert await legacy.async_load_config("empty.yaml", **args) == []
assert await legacy.async_load_config("nodict.yaml", **args) == []
assert await legacy.async_load_config("noname.yaml", **args) == []
assert await legacy.async_load_config("badkey.yaml", **args) == []
res = await legacy.async_load_config("allok.yaml", **args)
assert len(res) == 1
assert res[0].name == "Device"
assert res[0].dev_id == "my_device"
res = await legacy.async_load_config("oneok.yaml", **args)
assert len(res) == 1
assert res[0].name == "Device"
assert res[0].dev_id == "my_device"
async def test_reading_yaml_config(hass, yaml_devices):
"""Test the rendering of the YAML configuration."""
dev_id = "test"
device = legacy.Device(
hass,
timedelta(seconds=180),
True,
dev_id,
"AB:CD:EF:GH:IJ",
"Test name",
picture="http://test.picture",
icon="mdi:kettle",
)
await hass.async_add_executor_job(
legacy.update_config, yaml_devices, dev_id, device
)
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
config = (await legacy.async_load_config(yaml_devices, hass, device.consider_home))[
0
]
assert device.dev_id == config.dev_id
assert device.track == config.track
assert device.mac == config.mac
assert device.config_picture == config.config_picture
assert device.consider_home == config.consider_home
assert device.icon == config.icon
assert f"{device_tracker.DOMAIN}.test" in hass.config.components
@patch("homeassistant.components.device_tracker.const.LOGGER.warning")
async def test_duplicate_mac_dev_id(mock_warning, hass):
"""Test adding duplicate MACs or device IDs to DeviceTracker."""
devices = [
legacy.Device(
hass, True, True, "my_device", "AB:01", "My device", None, None, False
),
legacy.Device(
hass, True, True, "your_device", "AB:01", "Your device", None, None, False
),
]
legacy.DeviceTracker(hass, False, True, {}, devices)
_LOGGER.debug(mock_warning.call_args_list)
assert (
mock_warning.call_count == 1
), "The only warning call should be duplicates (check DEBUG)"
args, _ = mock_warning.call_args
assert "Duplicate device MAC" in args[0], "Duplicate MAC warning expected"
mock_warning.reset_mock()
devices = [
legacy.Device(
hass, True, True, "my_device", "AB:01", "My device", None, None, False
),
legacy.Device(
hass, True, True, "my_device", None, "Your device", None, None, False
),
]
legacy.DeviceTracker(hass, False, True, {}, devices)
_LOGGER.debug(mock_warning.call_args_list)
assert (
mock_warning.call_count == 1
), "The only warning call should be duplicates (check DEBUG)"
args, _ = mock_warning.call_args
assert "Duplicate device IDs" in args[0], "Duplicate device IDs warning expected"
async def test_setup_without_yaml_file(hass):
"""Test with no YAML file."""
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
async def test_gravatar(hass):
"""Test the Gravatar generation."""
dev_id = "test"
device = legacy.Device(
hass,
timedelta(seconds=180),
True,
dev_id,
"AB:CD:EF:GH:IJ",
"Test name",
gravatar="test@example.com",
)
gravatar_url = (
"https://www.gravatar.com/avatar/"
"55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar"
)
assert device.config_picture == gravatar_url
async def test_gravatar_and_picture(hass):
"""Test that Gravatar overrides picture."""
dev_id = "test"
device = legacy.Device(
hass,
timedelta(seconds=180),
True,
dev_id,
"AB:CD:EF:GH:IJ",
"Test name",
picture="http://test.picture",
gravatar="test@example.com",
)
gravatar_url = (
"https://www.gravatar.com/avatar/"
"55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar"
)
assert device.config_picture == gravatar_url
@patch("homeassistant.components.device_tracker.legacy.DeviceTracker.see")
@patch("homeassistant.components.demo.device_tracker.setup_scanner", autospec=True)
async def test_discover_platform(mock_demo_setup_scanner, mock_see, hass):
"""Test discovery of device_tracker demo platform."""
await discovery.async_load_platform(
hass, device_tracker.DOMAIN, "demo", {"test_key": "test_val"}, {"bla": {}}
)
await hass.async_block_till_done()
assert device_tracker.DOMAIN in hass.config.components
assert mock_demo_setup_scanner.called
assert mock_demo_setup_scanner.call_args[0] == (
hass,
{},
mock_see,
{"test_key": "test_val"},
)
async def test_update_stale(hass, mock_device_tracker_conf):
"""Test stalled update."""
scanner = getattr(hass.components, "test.device_tracker").SCANNER
scanner.reset()
scanner.come_home("DEV1")
now = dt_util.utcnow()
register_time = datetime(now.year + 1, 9, 15, 23, tzinfo=dt_util.UTC)
scan_time = datetime(now.year + 1, 9, 15, 23, 1, tzinfo=dt_util.UTC)
with patch(
"homeassistant.components.device_tracker.legacy.dt_util.utcnow",
return_value=register_time,
), assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(
hass,
device_tracker.DOMAIN,
{
device_tracker.DOMAIN: {
CONF_PLATFORM: "test",
device_tracker.CONF_CONSIDER_HOME: 59,
}
},
)
await hass.async_block_till_done()
assert hass.states.get("device_tracker.dev1").state == STATE_HOME
scanner.leave_home("DEV1")
with patch(
"homeassistant.components.device_tracker.legacy.dt_util.utcnow",
return_value=scan_time,
):
async_fire_time_changed(hass, scan_time)
await hass.async_block_till_done()
assert hass.states.get("device_tracker.dev1").state == STATE_NOT_HOME
async def test_entity_attributes(hass, mock_device_tracker_conf):
"""Test the entity attributes."""
devices = mock_device_tracker_conf
dev_id = "test_entity"
entity_id = f"{const.DOMAIN}.{dev_id}"
friendly_name = "Paulus"
picture = "http://placehold.it/200x200"
icon = "mdi:kettle"
device = legacy.Device(
hass,
timedelta(seconds=180),
True,
dev_id,
None,
friendly_name,
picture,
icon=icon,
)
devices.append(device)
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
attrs = hass.states.get(entity_id).attributes
assert friendly_name == attrs.get(ATTR_FRIENDLY_NAME)
assert icon == attrs.get(ATTR_ICON)
assert picture == attrs.get(ATTR_ENTITY_PICTURE)
@patch("homeassistant.components.device_tracker.legacy." "DeviceTracker.async_see")
async def test_see_service(mock_see, hass):
"""Test the see service with a unicode dev_id and NO MAC."""
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
params = {
"dev_id": "some_device",
"host_name": "example.com",
"location_name": "Work",
"gps": [0.3, 0.8],
"attributes": {"test": "test"},
}
common.async_see(hass, **params)
await hass.async_block_till_done()
assert mock_see.call_count == 1
assert mock_see.call_count == 1
assert mock_see.call_args == call(**params)
mock_see.reset_mock()
params["dev_id"] += chr(233) # e' acute accent from icloud
common.async_see(hass, **params)
await hass.async_block_till_done()
assert mock_see.call_count == 1
assert mock_see.call_count == 1
assert mock_see.call_args == call(**params)
async def test_see_service_guard_config_entry(hass, mock_device_tracker_conf):
"""Test the guard if the device is registered in the entity registry."""
mock_entry = Mock()
dev_id = "test"
entity_id = f"{const.DOMAIN}.{dev_id}"
mock_registry(hass, {entity_id: mock_entry})
devices = mock_device_tracker_conf
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
params = {"dev_id": dev_id, "gps": [0.3, 0.8]}
common.async_see(hass, **params)
await hass.async_block_till_done()
assert not devices
async def test_new_device_event_fired(hass, mock_device_tracker_conf):
"""Test that the device tracker will fire an event."""
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
test_events = []
@callback
def listener(event):
"""Record that our event got called."""
test_events.append(event)
hass.bus.async_listen("device_tracker_new_device", listener)
common.async_see(hass, "mac_1", host_name="hello")
common.async_see(hass, "mac_1", host_name="hello")
await hass.async_block_till_done()
assert len(test_events) == 1
# Assert we can serialize the event
json.dumps(test_events[0].as_dict(), cls=JSONEncoder)
assert test_events[0].data == {
"entity_id": "device_tracker.hello",
"host_name": "hello",
"mac": "MAC_1",
}
async def test_duplicate_yaml_keys(hass, mock_device_tracker_conf):
"""Test that the device tracker will not generate invalid YAML."""
devices = mock_device_tracker_conf
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
common.async_see(hass, "mac_1", host_name="hello")
common.async_see(hass, "mac_2", host_name="hello")
await hass.async_block_till_done()
assert len(devices) == 2
assert devices[0].dev_id != devices[1].dev_id
async def test_invalid_dev_id(hass, mock_device_tracker_conf):
"""Test that the device tracker will not allow invalid dev ids."""
devices = mock_device_tracker_conf
with assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
common.async_see(hass, dev_id="hello-world")
await hass.async_block_till_done()
assert not devices
async def test_see_state(hass, yaml_devices):
"""Test device tracker see records state correctly."""
assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
params = {
"mac": "AA:BB:CC:DD:EE:FF",
"dev_id": "some_device",
"host_name": "example.com",
"location_name": "Work",
"gps": [0.3, 0.8],
"gps_accuracy": 1,
"battery": 100,
"attributes": {"test": "test", "number": 1},
}
common.async_see(hass, **params)
await hass.async_block_till_done()
config = await legacy.async_load_config(yaml_devices, hass, timedelta(seconds=0))
assert len(config) == 1
state = hass.states.get("device_tracker.example_com")
attrs = state.attributes
assert state.state == "Work"
assert state.object_id == "example_com"
assert state.name == "example.com"
assert attrs["friendly_name"] == "example.com"
assert attrs["battery"] == 100
assert attrs["latitude"] == 0.3
assert attrs["longitude"] == 0.8
assert attrs["test"] == "test"
assert attrs["gps_accuracy"] == 1
assert attrs["source_type"] == "gps"
assert attrs["number"] == 1
async def test_see_passive_zone_state(hass, mock_device_tracker_conf):
"""Test that the device tracker sets gps for passive trackers."""
now = dt_util.utcnow()
register_time = datetime(now.year + 1, 9, 15, 23, tzinfo=dt_util.UTC)
scan_time = datetime(now.year + 1, 9, 15, 23, 1, tzinfo=dt_util.UTC)
with assert_setup_component(1, zone.DOMAIN):
zone_info = {
"name": "Home",
"latitude": 1,
"longitude": 2,
"radius": 250,
"passive": False,
}
await async_setup_component(hass, zone.DOMAIN, {"zone": zone_info})
scanner = getattr(hass.components, "test.device_tracker").SCANNER
scanner.reset()
scanner.come_home("dev1")
with patch(
"homeassistant.components.device_tracker.legacy.dt_util.utcnow",
return_value=register_time,
), assert_setup_component(1, device_tracker.DOMAIN):
assert await async_setup_component(
hass,
device_tracker.DOMAIN,
{
device_tracker.DOMAIN: {
CONF_PLATFORM: "test",
device_tracker.CONF_CONSIDER_HOME: 59,
}
},
)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.dev1")
attrs = state.attributes
assert state.state == STATE_HOME
assert state.object_id == "dev1"
assert state.name == "dev1"
assert attrs.get("friendly_name") == "dev1"
assert attrs.get("latitude") == 1
assert attrs.get("longitude") == 2
assert attrs.get("gps_accuracy") == 0
assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER
scanner.leave_home("dev1")
with patch(
"homeassistant.components.device_tracker.legacy.dt_util.utcnow",
return_value=scan_time,
):
async_fire_time_changed(hass, scan_time)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.dev1")
attrs = state.attributes
assert state.state == STATE_NOT_HOME
assert state.object_id == "dev1"
assert state.name == "dev1"
assert attrs.get("friendly_name") == "dev1"
assert attrs.get("latitude") is None
assert attrs.get("longitude") is None
assert attrs.get("gps_accuracy") is None
assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER
@patch("homeassistant.components.device_tracker.const.LOGGER.warning")
async def test_see_failures(mock_warning, hass, mock_device_tracker_conf):
"""Test that the device tracker see failures."""
devices = mock_device_tracker_conf
tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), 0, {}, [])
# MAC is not a string (but added)
await tracker.async_see(mac=567, host_name="Number MAC")
# No device id or MAC(not added)
with pytest.raises(HomeAssistantError):
await tracker.async_see()
assert mock_warning.call_count == 0
# Ignore gps on invalid GPS (both added & warnings)
await tracker.async_see(mac="mac_1_bad_gps", gps=1)
await tracker.async_see(mac="mac_2_bad_gps", gps=[1])
await tracker.async_see(mac="mac_3_bad_gps", gps="gps")
await hass.async_block_till_done()
assert mock_warning.call_count == 3
assert len(devices) == 4
async def test_async_added_to_hass(hass):
"""Test restoring state."""
attr = {
ATTR_LONGITUDE: 18,
ATTR_LATITUDE: -33,
const.ATTR_SOURCE_TYPE: "gps",
ATTR_GPS_ACCURACY: 2,
const.ATTR_BATTERY: 100,
}
mock_restore_cache(hass, [State("device_tracker.jk", "home", attr)])
path = hass.config.path(legacy.YAML_DEVICES)
files = {path: "jk:\n name: JK Phone\n track: True"}
with patch_yaml_files(files):
assert await async_setup_component(hass, device_tracker.DOMAIN, {})
state = hass.states.get("device_tracker.jk")
assert state
assert state.state == "home"
for key, val in attr.items():
atr = state.attributes.get(key)
assert atr == val, f"{key}={atr} expected: {val}"
async def test_bad_platform(hass):
"""Test bad platform."""
config = {"device_tracker": [{"platform": "bad_platform"}]}
with assert_setup_component(0, device_tracker.DOMAIN):
assert await async_setup_component(hass, device_tracker.DOMAIN, config)
assert f"{device_tracker.DOMAIN}.bad_platform" not in hass.config.components
async def test_adding_unknown_device_to_config(mock_device_tracker_conf, hass):
"""Test the adding of unknown devices to configuration file."""
scanner = getattr(hass.components, "test.device_tracker").SCANNER
scanner.reset()
scanner.come_home("DEV1")
await async_setup_component(
hass, device_tracker.DOMAIN, {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}}
)
await hass.async_block_till_done()
assert len(mock_device_tracker_conf) == 1
device = mock_device_tracker_conf[0]
assert device.dev_id == "dev1"
assert device.track
async def test_picture_and_icon_on_see_discovery(mock_device_tracker_conf, hass):
"""Test that picture and icon are set in initial see."""
tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), False, {}, [])
await tracker.async_see(dev_id=11, picture="pic_url", icon="mdi:icon")
await hass.async_block_till_done()
assert len(mock_device_tracker_conf) == 1
assert mock_device_tracker_conf[0].icon == "mdi:icon"
assert mock_device_tracker_conf[0].entity_picture == "pic_url"
async def test_backward_compatibility_for_track_new(mock_device_tracker_conf, hass):
"""Test backward compatibility for track new."""
tracker = legacy.DeviceTracker(
hass, timedelta(seconds=60), False, {device_tracker.CONF_TRACK_NEW: True}, []
)
await tracker.async_see(dev_id=13)
await hass.async_block_till_done()
assert len(mock_device_tracker_conf) == 1
assert mock_device_tracker_conf[0].track is False
async def test_old_style_track_new_is_skipped(mock_device_tracker_conf, hass):
"""Test old style config is skipped."""
tracker = legacy.DeviceTracker(
hass, timedelta(seconds=60), None, {device_tracker.CONF_TRACK_NEW: False}, []
)
await tracker.async_see(dev_id=14)
await hass.async_block_till_done()
assert len(mock_device_tracker_conf) == 1
assert mock_device_tracker_conf[0].track is False
def test_see_schema_allowing_ios_calls():
"""Test SEE service schema allows extra keys.
Temp work around because the iOS app sends incorrect data.
"""
device_tracker.SERVICE_SEE_PAYLOAD_SCHEMA(
{
"dev_id": "Test",
"battery": 35,
"battery_status": "Not Charging",
"gps": [10.0, 10.0],
"gps_accuracy": 300,
"hostname": "beer",
}
)