Fix tests for device tracker

pull/345/head
Paulus Schoutsen 2015-09-12 09:15:28 -07:00
parent f9b17ab026
commit 4ccedca3e5
6 changed files with 214 additions and 190 deletions

View File

@ -1,6 +1,6 @@
"""
homeassistant.components.tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
homeassistant.components.device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides functionality to keep track of devices.
@ -91,8 +91,8 @@ def setup(hass, config):
track_new = util.convert(conf.get(CONF_TRACK_NEW), bool,
DEFAULT_CONF_TRACK_NEW)
devices = load_yaml_config_file(yaml_path)
tracker = DeviceTracker(hass, devices, consider_home, track_new)
devices = load_config(yaml_path, hass, consider_home)
tracker = DeviceTracker(hass, consider_home, track_new, devices)
def setup_platform(p_type, p_config, disc_info=None):
""" Setup a device tracker platform. """
@ -127,9 +127,9 @@ def setup(hass, config):
discovery.listen(hass, DISCOVERY_PLATFORMS.keys(),
device_tracker_discovered)
def update_stale(event):
def update_stale(now):
""" Clean up stale devices. """
tracker.update_stale()
tracker.update_stale(now)
track_utc_time_change(hass, update_stale, second=range(0, 60, 5))
return True
@ -137,26 +137,17 @@ def setup(hass, config):
class DeviceTracker(object):
""" Track devices """
def __init__(self, hass, config, consider_home, track_new):
def __init__(self, hass, consider_home, track_new, devices):
self.hass = hass
self.devices = {}
self.mac_to_dev = {}
self.devices = {dev.dev_id: dev for dev in devices}
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
self.consider_home = timedelta(seconds=consider_home)
self.track_new = track_new
self.lock = threading.Lock()
# Load config
for dev_id, device_dict in config.items():
dev_id = str(dev_id)
device_dict = device_dict or {}
away_hide = device_dict.get(CONF_AWAY_HIDE, False)
device = Device(
hass, self.consider_home, device_dict.get('track', False),
dev_id, device_dict.get('mac'), device_dict.get('name'),
device_dict.get('picture'), away_hide)
if device.mac:
self.mac_to_dev[device.mac] = device
self.devices[dev_id] = device
for device in devices:
if device.track:
device.update_ha_state()
# pylint: disable=too-many-arguments
def see(self, mac=None, dev_id=None, host_name=None, location_name=None,
@ -169,7 +160,7 @@ class DeviceTracker(object):
mac = mac.upper()
device = self.mac_to_dev.get(mac)
if not device:
dev_id = util.slugify(host_name) or mac.replace(':', '')
dev_id = util.slugify(host_name or mac)
else:
dev_id = str(dev_id)
device = self.devices.get(dev_id)
@ -194,10 +185,9 @@ class DeviceTracker(object):
update_config(self.hass.config.path(YAML_DEVICES), dev_id, device)
def update_stale(self):
def update_stale(self, now):
""" Update stale devices. """
with self.lock:
now = dt_util.utcnow()
for device in self.devices.values():
if device.last_update_home and device.stale(now):
device.update_ha_state(True)
@ -261,6 +251,8 @@ class Device(Entity):
attr[ATTR_LATITUDE] = self.gps[0],
attr[ATTR_LONGITUDE] = self.gps[1],
return attr
@property
def hidden(self):
""" If device should be hidden. """
@ -307,18 +299,15 @@ def convert_csv_config(csv_path, yaml_path):
return True
def update_config(path, dev_id, device):
""" Add device to YAML config file. """
with open(path, 'a') as out:
out.write('\n')
out.write('{}:\n'.format(device.dev_id))
for key, value in (('name', device.name), ('mac', device.mac),
('picture', ''),
('track', 'yes' if device.track else 'no'),
(CONF_AWAY_HIDE,
'yes' if device.away_hide else 'no')):
out.write(' {}: {}\n'.format(key, '' if value is None else value))
def load_config(path, hass, consider_home):
""" Load devices from YAML config file. """
if not os.path.isfile(path):
return []
return [
Device(hass, consider_home, device.get('track', False),
str(dev_id), device.get('mac'), device.get('name'),
device.get('picture'), device.get(CONF_AWAY_HIDE, False))
for dev_id, device in load_yaml_config_file(path).items()]
def setup_scanner_platform(hass, config, scanner, see):
@ -343,3 +332,17 @@ def setup_scanner_platform(hass, config, scanner, see):
interval))
device_tracker_scan(None)
def update_config(path, dev_id, device):
""" Add device to YAML config file. """
with open(path, 'a') as out:
out.write('\n')
out.write('{}:\n'.format(device.dev_id))
for key, value in (('name', device.name), ('mac', device.mac),
('picture', device.config_picture),
('track', 'yes' if device.track else 'no'),
(CONF_AWAY_HIDE,
'yes' if device.away_hide else 'no')):
out.write(' {}: {}\n'.format(key, '' if value is None else value))

View File

@ -10,11 +10,11 @@ from unittest import mock
from homeassistant import core as ha, loader
import homeassistant.util.location as location_util
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.const import (
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
EVENT_STATE_CHANGED)
EVENT_STATE_CHANGED, EVENT_PLATFORM_DISCOVERED, ATTR_SERVICE,
ATTR_DISCOVERED)
from homeassistant.components import sun, mqtt
@ -86,10 +86,11 @@ def fire_time_changed(hass, time):
hass.bus.fire(EVENT_TIME_CHANGED, {'now': time})
def trigger_device_tracker_scan(hass):
""" Triggers the device tracker to scan. """
fire_time_changed(
hass, dt_util.utcnow().replace(second=0) + timedelta(hours=1))
def fire_service_discovered(hass, service, info):
hass.bus.fire(EVENT_PLATFORM_DISCOVERED, {
ATTR_SERVICE: service,
ATTR_DISCOVERED: info
})
def ensure_sun_risen(hass):

View File

@ -6,18 +6,20 @@ Tests the device tracker compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
from unittest.mock import patch
from datetime import timedelta
import os
import homeassistant.core as ha
import homeassistant.loader as loader
from homeassistant.config import load_yaml_config_file
from homeassistant.loader import get_component
import homeassistant.util.dt as dt_util
from homeassistant.const import (
STATE_HOME, STATE_NOT_HOME, ATTR_ENTITY_PICTURE, CONF_PLATFORM,
DEVICE_DEFAULT_NAME)
ATTR_ENTITY_PICTURE, ATTR_HIDDEN, STATE_HOME, STATE_NOT_HOME,
CONF_PLATFORM, ATTR_FRIENDLY_NAME)
import homeassistant.components.device_tracker as device_tracker
from tests.common import get_test_home_assistant
from tests.common import (
get_test_home_assistant, fire_time_changed, fire_service_discovered)
class TestComponentsDeviceTracker(unittest.TestCase):
@ -26,16 +28,16 @@ class TestComponentsDeviceTracker(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.known_dev_path = self.hass.config.path(
device_tracker.KNOWN_DEVICES_FILE)
self.yaml_devices = self.hass.config.path(device_tracker.YAML_DEVICES)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
try:
os.remove(self.yaml_devices)
except FileNotFoundError:
pass
if os.path.isfile(self.known_dev_path):
os.remove(self.known_dev_path)
self.hass.stop()
def test_is_on(self):
""" Test is_on method. """
@ -49,145 +51,145 @@ class TestComponentsDeviceTracker(unittest.TestCase):
self.assertFalse(device_tracker.is_on(self.hass, entity_id))
def test_setup(self):
""" Test setup method. """
# Bogus config
self.assertFalse(device_tracker.setup(self.hass, {}))
def test_migrating_config(self):
csv_devices = self.hass.config.path(device_tracker.CSV_DEVICES)
self.assertFalse(
device_tracker.setup(self.hass, {device_tracker.DOMAIN: {}}))
self.assertFalse(os.path.isfile(csv_devices))
self.assertFalse(os.path.isfile(self.yaml_devices))
# Test with non-existing component
self.assertFalse(device_tracker.setup(
self.hass, {device_tracker.DOMAIN: {CONF_PLATFORM: 'nonexisting'}}
))
person1 = {
'mac': 'AB:CD:EF:GH:IJ:KL',
'name': 'Paulus',
'track': True,
'picture': 'http://placehold.it/200x200',
}
person2 = {
'mac': 'MN:OP:QR:ST:UV:WX:YZ',
'name': 'Anne Therese',
'track': False,
'picture': None,
}
# Test with a bad known device file around
with open(self.known_dev_path, 'w') as fil:
fil.write("bad data\nbad data\n")
try:
with open(csv_devices, 'w') as fil:
fil.write('device,name,track,picture\n')
for pers in (person1, person2):
fil.write('{},{},{},{}\n'.format(
pers['mac'], pers['name'],
'1' if pers['track'] else '0', pers['picture'] or ''))
self.assertFalse(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
self.assertTrue(device_tracker.setup(self.hass, {}))
self.assertFalse(os.path.isfile(csv_devices))
self.assertTrue(os.path.isfile(self.yaml_devices))
def test_writing_known_devices_file(self):
""" Test the device tracker class. """
scanner = loader.get_component(
'device_tracker.test').get_scanner(None, None)
yaml_config = load_yaml_config_file(self.yaml_devices)
self.assertEqual(2, len(yaml_config))
for pers, yaml_pers in zip(
(person2, person1), sorted(yaml_config.values(),
key=lambda pers: pers['name'])):
for key, value in pers.items():
self.assertEqual(value, yaml_pers.get(key))
finally:
try:
os.remove(csv_devices)
except FileNotFoundError:
pass
def test_reading_yaml_config(self):
dev_id = 'test'
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, 'AB:CD:EF:GH:IJ',
'Test name', 'http://test.picture', True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
self.assertTrue(device_tracker.setup(self.hass, {}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
device.consider_home)[0]
self.assertEqual(device.dev_id, config.dev_id)
self.assertEqual(device.track, config.track)
self.assertEqual(device.mac, config.mac)
self.assertEqual(device.config_picture, config.config_picture)
self.assertEqual(device.away_hide, config.away_hide)
self.assertEqual(device.consider_home, config.consider_home)
def test_setup_without_yaml_file(self):
self.assertTrue(device_tracker.setup(self.hass, {}))
def test_adding_unknown_device_to_config(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
timedelta(seconds=0))[0]
self.assertEqual('DEV1', config.dev_id)
self.assertEqual(True, config.track)
def test_discovery(self):
scanner = get_component('device_tracker.test').SCANNER
with patch.dict(device_tracker.DISCOVERY_PLATFORMS, {'test': 'test'}):
with patch.object(scanner, 'scan_devices') as mock_scan:
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
fire_service_discovered(self.hass, 'test', {})
self.assertTrue(mock_scan.called)
def test_update_stale(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
scanner.come_home('DEV2')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
# Ensure a new known devices file has been created.
# Since the device_tracker uses a set internally we cannot
# know what the order of the devices in the known devices file is.
# To ensure all the three expected lines are there, we sort the file
with open(self.known_dev_path) as fil:
self.assertEqual(
['DEV1,{},0,\n'.format(DEVICE_DEFAULT_NAME), 'DEV2,dev2,0,\n',
'device,name,track,picture\n'],
sorted(fil))
# Write one where we track dev1, dev2
with open(self.known_dev_path, 'w') as fil:
fil.write('device,name,track,picture\n')
fil.write('DEV1,device 1,1,http://example.com/dev1.jpg\n')
fil.write('DEV2,device 2,1,http://example.com/dev2.jpg\n')
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
self.assertEqual(STATE_HOME,
self.hass.states.get('device_tracker.dev1').state)
scanner.leave_home('DEV1')
scanner.come_home('DEV3')
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
now = dt_util.utcnow().replace(second=0) + timedelta(hours=1)
self.hass.pool.block_till_done()
with patch('homeassistant.util.dt.utcnow', return_value=now):
fire_time_changed(self.hass, now)
self.hass.pool.block_till_done()
dev1 = device_tracker.ENTITY_ID_FORMAT.format('device_1')
dev2 = device_tracker.ENTITY_ID_FORMAT.format('device_2')
dev3 = device_tracker.ENTITY_ID_FORMAT.format('DEV3')
self.assertEqual(STATE_NOT_HOME,
self.hass.states.get('device_tracker.dev1').state)
now = dt_util.utcnow()
def test_entity_attributes(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
friendly_name = 'Paulus'
picture = 'http://placehold.it/200x200'
# Device scanner scans every 12 seconds. We need to sync our times to
# be every 12 seconds or else the time_changed event will be ignored.
nowAlmostMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowAlmostMinimumGone -= timedelta(
seconds=12+(nowAlmostMinimumGone.second % 12))
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
friendly_name, picture, away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
nowMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowMinimumGone += timedelta(seconds=12-(nowMinimumGone.second % 12))
self.assertTrue(device_tracker.setup(self.hass, {}))
# Test initial is correct
self.assertTrue(device_tracker.is_on(self.hass))
self.assertFalse(device_tracker.is_on(self.hass, dev1))
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertIsNone(self.hass.states.get(dev3))
attrs = self.hass.states.get(entity_id).attributes
self.assertEqual(
'http://example.com/dev1.jpg',
self.hass.states.get(dev1).attributes.get(ATTR_ENTITY_PICTURE))
self.assertEqual(
'http://example.com/dev2.jpg',
self.hass.states.get(dev2).attributes.get(ATTR_ENTITY_PICTURE))
self.assertEqual(friendly_name, attrs.get(ATTR_FRIENDLY_NAME))
self.assertEqual(picture, attrs.get(ATTR_ENTITY_PICTURE))
# Test if dev3 got added to known dev file
with open(self.known_dev_path) as fil:
self.assertEqual('DEV3,dev3,0,\n', list(fil)[-1])
def test_device_hidden(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
# Change dev3 to track
with open(self.known_dev_path, 'w') as fil:
fil.write("device,name,track,picture\n")
fil.write('DEV1,Device 1,1,http://example.com/picture.jpg\n')
fil.write('DEV2,Device 2,1,http://example.com/picture.jpg\n')
fil.write('DEV3,DEV3,1,\n')
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
scanner.leave_home('DEV2')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
# reload dev file
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
self.hass.pool.block_till_done()
# Test what happens if a device comes home and another leaves
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error margin on time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
# dev3 should be tracked now after we reload the known devices
self.assertTrue(device_tracker.is_on(self.hass, dev3))
self.assertIsNone(
self.hass.states.get(dev3).attributes.get(ATTR_ENTITY_PICTURE))
# Test if device leaves what happens, test the time span
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowAlmostMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))
# Now test if gone for longer then error margin
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
self.assertFalse(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))
self.assertTrue(self.hass.states.get(entity_id)
.attributes.get(ATTR_HIDDEN))

View File

@ -0,0 +1,37 @@
import unittest
import os
from homeassistant.components import device_tracker
from homeassistant.const import CONF_PLATFORM
from tests.common import (
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
class TestComponentsDeviceTrackerMQTT(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
mock_mqtt_component(self.hass)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
def test_new_message(self):
dev_id = 'paulus'
enttiy_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
topic = '/location/paulus'
location = 'work'
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'mqtt',
'devices': {dev_id: topic}
}}))
fire_mqtt_message(self.hass, topic, location)
self.hass.pool.block_till_done()
self.assertEqual(location, self.hass.states.get(enttiy_id).state)

View File

@ -34,14 +34,6 @@ class TestHelpersEntity(unittest.TestCase):
ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes)
def test_setting_hidden_to_true(self):
self.entity.hidden = True
self.entity.update_ha_state()
state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_true(self):
""" Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id,
@ -50,14 +42,3 @@ class TestHelpersEntity(unittest.TestCase):
state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_false(self):
""" Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id,
[ATTR_HIDDEN], [False])
self.entity.hidden = True
self.entity.update_ha_state()
self.assertNotIn(
ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes)