"""The tests for the device tracker component.""" from datetime import datetime, timedelta import json import logging import os from unittest.mock import Mock, call from asynctest import patch import pytest from homeassistant.components import zone import homeassistant.components.device_tracker as device_tracker from homeassistant.components.device_tracker import const, legacy from homeassistant.const import ( ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME, ATTR_GPS_ACCURACY, ATTR_ICON, ATTR_LATITUDE, ATTR_LONGITUDE, CONF_PLATFORM, STATE_HOME, STATE_NOT_HOME, ) from homeassistant.core import State, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import discovery from homeassistant.helpers.json import JSONEncoder from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import ( assert_setup_component, async_fire_time_changed, mock_registry, mock_restore_cache, patch_yaml_files, ) from tests.components.device_tracker import common TEST_PLATFORM = {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}} _LOGGER = logging.getLogger(__name__) @pytest.fixture(name="yaml_devices") def mock_yaml_devices(hass): """Get a path for storing yaml devices.""" yaml_devices = hass.config.path(legacy.YAML_DEVICES) if os.path.isfile(yaml_devices): os.remove(yaml_devices) yield yaml_devices if os.path.isfile(yaml_devices): os.remove(yaml_devices) async def test_is_on(hass): """Test is_on method.""" entity_id = f"{const.DOMAIN}.test" hass.states.async_set(entity_id, STATE_HOME) assert device_tracker.is_on(hass, entity_id) hass.states.async_set(entity_id, STATE_NOT_HOME) assert not device_tracker.is_on(hass, entity_id) async def test_reading_broken_yaml_config(hass): """Test when known devices contains invalid data.""" files = { "empty.yaml": "", "nodict.yaml": "100", "badkey.yaml": "@:\n name: Device", "noname.yaml": "my_device:\n", "allok.yaml": "My Device:\n name: Device", "oneok.yaml": ("My Device!:\n name: Device\nbad_device:\n nme: Device"), } args = {"hass": hass, "consider_home": timedelta(seconds=60)} with patch_yaml_files(files): assert await legacy.async_load_config("empty.yaml", **args) == [] assert await legacy.async_load_config("nodict.yaml", **args) == [] assert await legacy.async_load_config("noname.yaml", **args) == [] assert await legacy.async_load_config("badkey.yaml", **args) == [] res = await legacy.async_load_config("allok.yaml", **args) assert len(res) == 1 assert res[0].name == "Device" assert res[0].dev_id == "my_device" res = await legacy.async_load_config("oneok.yaml", **args) assert len(res) == 1 assert res[0].name == "Device" assert res[0].dev_id == "my_device" async def test_reading_yaml_config(hass, yaml_devices): """Test the rendering of the YAML configuration.""" dev_id = "test" device = legacy.Device( hass, timedelta(seconds=180), True, dev_id, "AB:CD:EF:GH:IJ", "Test name", picture="http://test.picture", icon="mdi:kettle", ) await hass.async_add_executor_job( legacy.update_config, yaml_devices, dev_id, device ) assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) config = (await legacy.async_load_config(yaml_devices, hass, device.consider_home))[ 0 ] assert device.dev_id == config.dev_id assert device.track == config.track assert device.mac == config.mac assert device.config_picture == config.config_picture assert device.consider_home == config.consider_home assert device.icon == config.icon @patch("homeassistant.components.device_tracker.const.LOGGER.warning") async def test_duplicate_mac_dev_id(mock_warning, hass): """Test adding duplicate MACs or device IDs to DeviceTracker.""" devices = [ legacy.Device( hass, True, True, "my_device", "AB:01", "My device", None, None, False ), legacy.Device( hass, True, True, "your_device", "AB:01", "Your device", None, None, False ), ] legacy.DeviceTracker(hass, False, True, {}, devices) _LOGGER.debug(mock_warning.call_args_list) assert ( mock_warning.call_count == 1 ), "The only warning call should be duplicates (check DEBUG)" args, _ = mock_warning.call_args assert "Duplicate device MAC" in args[0], "Duplicate MAC warning expected" mock_warning.reset_mock() devices = [ legacy.Device( hass, True, True, "my_device", "AB:01", "My device", None, None, False ), legacy.Device( hass, True, True, "my_device", None, "Your device", None, None, False ), ] legacy.DeviceTracker(hass, False, True, {}, devices) _LOGGER.debug(mock_warning.call_args_list) assert ( mock_warning.call_count == 1 ), "The only warning call should be duplicates (check DEBUG)" args, _ = mock_warning.call_args assert "Duplicate device IDs" in args[0], "Duplicate device IDs warning expected" async def test_setup_without_yaml_file(hass): """Test with no YAML file.""" with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) async def test_gravatar(hass): """Test the Gravatar generation.""" dev_id = "test" device = legacy.Device( hass, timedelta(seconds=180), True, dev_id, "AB:CD:EF:GH:IJ", "Test name", gravatar="test@example.com", ) gravatar_url = ( "https://www.gravatar.com/avatar/" "55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar" ) assert device.config_picture == gravatar_url async def test_gravatar_and_picture(hass): """Test that Gravatar overrides picture.""" dev_id = "test" device = legacy.Device( hass, timedelta(seconds=180), True, dev_id, "AB:CD:EF:GH:IJ", "Test name", picture="http://test.picture", gravatar="test@example.com", ) gravatar_url = ( "https://www.gravatar.com/avatar/" "55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar" ) assert device.config_picture == gravatar_url @patch("homeassistant.components.device_tracker.legacy.DeviceTracker.see") @patch("homeassistant.components.demo.device_tracker.setup_scanner", autospec=True) async def test_discover_platform(mock_demo_setup_scanner, mock_see, hass): """Test discovery of device_tracker demo platform.""" await discovery.async_load_platform( hass, device_tracker.DOMAIN, "demo", {"test_key": "test_val"}, {"bla": {}} ) await hass.async_block_till_done() assert device_tracker.DOMAIN in hass.config.components assert mock_demo_setup_scanner.called assert mock_demo_setup_scanner.call_args[0] == ( hass, {}, mock_see, {"test_key": "test_val"}, ) async def test_update_stale(hass, mock_device_tracker_conf): """Test stalled update.""" scanner = getattr(hass.components, "test.device_tracker").SCANNER scanner.reset() scanner.come_home("DEV1") register_time = datetime(2015, 9, 15, 23, tzinfo=dt_util.UTC) scan_time = datetime(2015, 9, 15, 23, 1, tzinfo=dt_util.UTC) with patch( "homeassistant.components.device_tracker.legacy.dt_util.utcnow", return_value=register_time, ): with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component( hass, device_tracker.DOMAIN, { device_tracker.DOMAIN: { CONF_PLATFORM: "test", device_tracker.CONF_CONSIDER_HOME: 59, } }, ) await hass.async_block_till_done() assert STATE_HOME == hass.states.get("device_tracker.dev1").state scanner.leave_home("DEV1") with patch( "homeassistant.components.device_tracker.legacy.dt_util.utcnow", return_value=scan_time, ): async_fire_time_changed(hass, scan_time) await hass.async_block_till_done() assert STATE_NOT_HOME == hass.states.get("device_tracker.dev1").state async def test_entity_attributes(hass, mock_device_tracker_conf): """Test the entity attributes.""" devices = mock_device_tracker_conf dev_id = "test_entity" entity_id = f"{const.DOMAIN}.{dev_id}" friendly_name = "Paulus" picture = "http://placehold.it/200x200" icon = "mdi:kettle" device = legacy.Device( hass, timedelta(seconds=180), True, dev_id, None, friendly_name, picture, icon=icon, ) devices.append(device) with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) attrs = hass.states.get(entity_id).attributes assert friendly_name == attrs.get(ATTR_FRIENDLY_NAME) assert icon == attrs.get(ATTR_ICON) assert picture == attrs.get(ATTR_ENTITY_PICTURE) @patch("homeassistant.components.device_tracker.legacy." "DeviceTracker.async_see") async def test_see_service(mock_see, hass): """Test the see service with a unicode dev_id and NO MAC.""" with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) params = { "dev_id": "some_device", "host_name": "example.com", "location_name": "Work", "gps": [0.3, 0.8], "attributes": {"test": "test"}, } common.async_see(hass, **params) await hass.async_block_till_done() assert mock_see.call_count == 1 assert mock_see.call_count == 1 assert mock_see.call_args == call(**params) mock_see.reset_mock() params["dev_id"] += chr(233) # e' acute accent from icloud common.async_see(hass, **params) await hass.async_block_till_done() assert mock_see.call_count == 1 assert mock_see.call_count == 1 assert mock_see.call_args == call(**params) async def test_see_service_guard_config_entry(hass, mock_device_tracker_conf): """Test the guard if the device is registered in the entity registry.""" mock_entry = Mock() dev_id = "test" entity_id = f"{const.DOMAIN}.{dev_id}" mock_registry(hass, {entity_id: mock_entry}) devices = mock_device_tracker_conf assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) params = {"dev_id": dev_id, "gps": [0.3, 0.8]} common.async_see(hass, **params) await hass.async_block_till_done() assert not devices async def test_new_device_event_fired(hass, mock_device_tracker_conf): """Test that the device tracker will fire an event.""" with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) test_events = [] @callback def listener(event): """Record that our event got called.""" test_events.append(event) hass.bus.async_listen("device_tracker_new_device", listener) common.async_see(hass, "mac_1", host_name="hello") common.async_see(hass, "mac_1", host_name="hello") await hass.async_block_till_done() assert len(test_events) == 1 # Assert we can serialize the event json.dumps(test_events[0].as_dict(), cls=JSONEncoder) assert test_events[0].data == { "entity_id": "device_tracker.hello", "host_name": "hello", "mac": "MAC_1", } async def test_duplicate_yaml_keys(hass, mock_device_tracker_conf): """Test that the device tracker will not generate invalid YAML.""" devices = mock_device_tracker_conf with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) common.async_see(hass, "mac_1", host_name="hello") common.async_see(hass, "mac_2", host_name="hello") await hass.async_block_till_done() assert len(devices) == 2 assert devices[0].dev_id != devices[1].dev_id async def test_invalid_dev_id(hass, mock_device_tracker_conf): """Test that the device tracker will not allow invalid dev ids.""" devices = mock_device_tracker_conf with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) common.async_see(hass, dev_id="hello-world") await hass.async_block_till_done() assert not devices async def test_see_state(hass, yaml_devices): """Test device tracker see records state correctly.""" assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM) params = { "mac": "AA:BB:CC:DD:EE:FF", "dev_id": "some_device", "host_name": "example.com", "location_name": "Work", "gps": [0.3, 0.8], "gps_accuracy": 1, "battery": 100, "attributes": {"test": "test", "number": 1}, } common.async_see(hass, **params) await hass.async_block_till_done() config = await legacy.async_load_config(yaml_devices, hass, timedelta(seconds=0)) assert len(config) == 1 state = hass.states.get("device_tracker.example_com") attrs = state.attributes assert state.state == "Work" assert state.object_id == "example_com" assert state.name == "example.com" assert attrs["friendly_name"] == "example.com" assert attrs["battery"] == 100 assert attrs["latitude"] == 0.3 assert attrs["longitude"] == 0.8 assert attrs["test"] == "test" assert attrs["gps_accuracy"] == 1 assert attrs["source_type"] == "gps" assert attrs["number"] == 1 async def test_see_passive_zone_state(hass, mock_device_tracker_conf): """Test that the device tracker sets gps for passive trackers.""" register_time = datetime(2015, 9, 15, 23, tzinfo=dt_util.UTC) scan_time = datetime(2015, 9, 15, 23, 1, tzinfo=dt_util.UTC) with assert_setup_component(1, zone.DOMAIN): zone_info = { "name": "Home", "latitude": 1, "longitude": 2, "radius": 250, "passive": False, } await async_setup_component(hass, zone.DOMAIN, {"zone": zone_info}) scanner = getattr(hass.components, "test.device_tracker").SCANNER scanner.reset() scanner.come_home("dev1") with patch( "homeassistant.components.device_tracker.legacy.dt_util.utcnow", return_value=register_time, ): with assert_setup_component(1, device_tracker.DOMAIN): assert await async_setup_component( hass, device_tracker.DOMAIN, { device_tracker.DOMAIN: { CONF_PLATFORM: "test", device_tracker.CONF_CONSIDER_HOME: 59, } }, ) await hass.async_block_till_done() state = hass.states.get("device_tracker.dev1") attrs = state.attributes assert STATE_HOME == state.state assert state.object_id == "dev1" assert state.name == "dev1" assert attrs.get("friendly_name") == "dev1" assert attrs.get("latitude") == 1 assert attrs.get("longitude") == 2 assert attrs.get("gps_accuracy") == 0 assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER scanner.leave_home("dev1") with patch( "homeassistant.components.device_tracker.legacy.dt_util.utcnow", return_value=scan_time, ): async_fire_time_changed(hass, scan_time) await hass.async_block_till_done() state = hass.states.get("device_tracker.dev1") attrs = state.attributes assert STATE_NOT_HOME == state.state assert state.object_id == "dev1" assert state.name == "dev1" assert attrs.get("friendly_name") == "dev1" assert attrs.get("latitude") is None assert attrs.get("longitude") is None assert attrs.get("gps_accuracy") is None assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER @patch("homeassistant.components.device_tracker.const.LOGGER.warning") async def test_see_failures(mock_warning, hass, mock_device_tracker_conf): """Test that the device tracker see failures.""" devices = mock_device_tracker_conf tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), 0, {}, []) # MAC is not a string (but added) await tracker.async_see(mac=567, host_name="Number MAC") # No device id or MAC(not added) with pytest.raises(HomeAssistantError): await tracker.async_see() assert mock_warning.call_count == 0 # Ignore gps on invalid GPS (both added & warnings) await tracker.async_see(mac="mac_1_bad_gps", gps=1) await tracker.async_see(mac="mac_2_bad_gps", gps=[1]) await tracker.async_see(mac="mac_3_bad_gps", gps="gps") await hass.async_block_till_done() assert mock_warning.call_count == 3 assert len(devices) == 4 async def test_async_added_to_hass(hass): """Test restoring state.""" attr = { ATTR_LONGITUDE: 18, ATTR_LATITUDE: -33, const.ATTR_SOURCE_TYPE: "gps", ATTR_GPS_ACCURACY: 2, const.ATTR_BATTERY: 100, } mock_restore_cache(hass, [State("device_tracker.jk", "home", attr)]) path = hass.config.path(legacy.YAML_DEVICES) files = {path: "jk:\n name: JK Phone\n track: True"} with patch_yaml_files(files): assert await async_setup_component(hass, device_tracker.DOMAIN, {}) state = hass.states.get("device_tracker.jk") assert state assert state.state == "home" for key, val in attr.items(): atr = state.attributes.get(key) assert atr == val, "{}={} expected: {}".format(key, atr, val) async def test_bad_platform(hass): """Test bad platform.""" config = {"device_tracker": [{"platform": "bad_platform"}]} with assert_setup_component(0, device_tracker.DOMAIN): assert await async_setup_component(hass, device_tracker.DOMAIN, config) async def test_adding_unknown_device_to_config(mock_device_tracker_conf, hass): """Test the adding of unknown devices to configuration file.""" scanner = getattr(hass.components, "test.device_tracker").SCANNER scanner.reset() scanner.come_home("DEV1") await async_setup_component( hass, device_tracker.DOMAIN, {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}} ) await hass.async_block_till_done() assert len(mock_device_tracker_conf) == 1 device = mock_device_tracker_conf[0] assert device.dev_id == "dev1" assert device.track async def test_picture_and_icon_on_see_discovery(mock_device_tracker_conf, hass): """Test that picture and icon are set in initial see.""" tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), False, {}, []) await tracker.async_see(dev_id=11, picture="pic_url", icon="mdi:icon") await hass.async_block_till_done() assert len(mock_device_tracker_conf) == 1 assert mock_device_tracker_conf[0].icon == "mdi:icon" assert mock_device_tracker_conf[0].entity_picture == "pic_url" async def test_backward_compatibility_for_track_new(mock_device_tracker_conf, hass): """Test backward compatibility for track new.""" tracker = legacy.DeviceTracker( hass, timedelta(seconds=60), False, {device_tracker.CONF_TRACK_NEW: True}, [] ) await tracker.async_see(dev_id=13) await hass.async_block_till_done() assert len(mock_device_tracker_conf) == 1 assert mock_device_tracker_conf[0].track is False async def test_old_style_track_new_is_skipped(mock_device_tracker_conf, hass): """Test old style config is skipped.""" tracker = legacy.DeviceTracker( hass, timedelta(seconds=60), None, {device_tracker.CONF_TRACK_NEW: False}, [] ) await tracker.async_see(dev_id=14) await hass.async_block_till_done() assert len(mock_device_tracker_conf) == 1 assert mock_device_tracker_conf[0].track is False def test_see_schema_allowing_ios_calls(): """Test SEE service schema allows extra keys. Temp work around because the iOS app sends incorrect data. """ device_tracker.SERVICE_SEE_PAYLOAD_SCHEMA( { "dev_id": "Test", "battery": 35, "battery_status": "Not Charging", "gps": [10.0, 10.0], "gps_accuracy": 300, "hostname": "beer", } )