diff --git a/.coveragerc b/.coveragerc index aa5921526de..74faed07c4a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -172,6 +172,7 @@ omit = homeassistant/components/device_tracker/trackr.py homeassistant/components/device_tracker/ubus.py homeassistant/components/device_tracker/volvooncall.py + homeassistant/components/device_tracker/xiaomi.py homeassistant/components/discovery.py homeassistant/components/downloader.py homeassistant/components/emoncms_history.py diff --git a/README.rst b/README.rst index 43517760ed7..1e25b6dcc90 100644 --- a/README.rst +++ b/README.rst @@ -26,7 +26,8 @@ Examples of devices Home Assistant can interface with: `Netgear `__, `DD-WRT `__, `TPLink `__, - `ASUSWRT `__ and any SNMP + `ASUSWRT `__, + `Xiaomi `__ and any SNMP capable Linksys WAP/WRT - `Philips Hue `__ lights, `WeMo `__ diff --git a/homeassistant/components/device_tracker/xiaomi.py b/homeassistant/components/device_tracker/xiaomi.py new file mode 100644 index 00000000000..ff53d1fe99f --- /dev/null +++ b/homeassistant/components/device_tracker/xiaomi.py @@ -0,0 +1,145 @@ +""" +Support for Xiaomi Mi routers. + +For more details about this platform, please refer to the documentation at +https://home-assistant.io/components/device_tracker.xiaomi/ +""" +import logging +import threading +from datetime import timedelta + +import requests +import voluptuous as vol + +import homeassistant.helpers.config_validation as cv +from homeassistant.components.device_tracker import ( + DOMAIN, PLATFORM_SCHEMA, DeviceScanner) +from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME +from homeassistant.util import Throttle + +# Return cached results if last scan was less then this time ago. +MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5) + +_LOGGER = logging.getLogger(__name__) + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Required(CONF_HOST): cv.string, + vol.Required(CONF_USERNAME, default='admin'): cv.string, + vol.Required(CONF_PASSWORD): cv.string +}) + + +def get_scanner(hass, config): + """Validate the configuration and return a Xiaomi Device Scanner.""" + scanner = XioamiDeviceScanner(config[DOMAIN]) + + return scanner if scanner.success_init else None + + +class XioamiDeviceScanner(DeviceScanner): + """This class queries a Xiaomi Mi router. + + Adapted from Luci scanner. + """ + + def __init__(self, config): + """Initialize the scanner.""" + host = config[CONF_HOST] + username, password = config[CONF_USERNAME], config[CONF_PASSWORD] + + self.lock = threading.Lock() + + self.last_results = {} + self.token = _get_token(host, username, password) + + self.host = host + + self.mac2name = None + self.success_init = self.token is not None + + def scan_devices(self): + """Scan for new devices and return a list with found device IDs.""" + self._update_info() + return self.last_results + + def get_device_name(self, device): + """Return the name of the given device or None if we don't know.""" + with self.lock: + if self.mac2name is None: + url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist" + url = url.format(self.host, self.token) + result = _get_device_list(url) + if result: + hosts = [x for x in result + if 'mac' in x and 'name' in x] + mac2name_list = [ + (x['mac'].upper(), x['name']) for x in hosts] + self.mac2name = dict(mac2name_list) + else: + # Error, handled in the _req_json_rpc + return + return self.mac2name.get(device.upper(), None) + + @Throttle(MIN_TIME_BETWEEN_SCANS) + def _update_info(self): + """Ensure the informations from the router are up to date. + + Returns true if scanning successful. + """ + if not self.success_init: + return False + + with self.lock: + _LOGGER.info('Refreshing device list') + url = "http://{}/cgi-bin/luci/;stok={}/api/misystem/devicelist" + url = url.format(self.host, self.token) + result = _get_device_list(url) + if result: + self.last_results = [] + for device_entry in result: + # Check if the device is marked as connected + if int(device_entry['online']) == 1: + self.last_results.append(device_entry['mac']) + + return True + + return False + + +def _get_device_list(url, **kwargs): + try: + res = requests.get(url, timeout=5, **kwargs) + except requests.exceptions.Timeout: + _LOGGER.exception('Connection to the router timed out') + return + return _extract_result(res, 'list') + + +def _get_token(host, username, password): + """Get authentication token for the given host+username+password.""" + url = 'http://{}/cgi-bin/luci/api/xqsystem/login'.format(host) + data = {'username': username, 'password': password} + try: + res = requests.post(url, data=data, timeout=5) + except requests.exceptions.Timeout: + _LOGGER.exception('Connection to the router timed out') + return + return _extract_result(res, 'token') + + +def _extract_result(res, key_name): + if res.status_code == 200: + try: + result = res.json() + except ValueError: + # If json decoder could not parse the response + _LOGGER.exception('Failed to parse response from mi router') + return + try: + return result[key_name] + except KeyError: + _LOGGER.exception('No %s in response from mi router. %s', + key_name, result) + return + else: + _LOGGER.error('Invalid response from mi router: %s', res) diff --git a/tests/components/device_tracker/test_xiaomi.py b/tests/components/device_tracker/test_xiaomi.py new file mode 100644 index 00000000000..482ed7c0c0d --- /dev/null +++ b/tests/components/device_tracker/test_xiaomi.py @@ -0,0 +1,221 @@ +"""The tests for the Xiaomi router device tracker platform.""" +import logging +import unittest +from unittest import mock +from unittest.mock import patch + +import requests + +from homeassistant.components.device_tracker import DOMAIN, xiaomi as xiaomi +from homeassistant.components.device_tracker.xiaomi import get_scanner +from homeassistant.const import (CONF_HOST, CONF_USERNAME, CONF_PASSWORD, + CONF_PLATFORM) +from tests.common import get_test_home_assistant + +_LOGGER = logging.getLogger(__name__) + +INVALID_USERNAME = 'bob' +URL_AUTHORIZE = 'http://192.168.0.1/cgi-bin/luci/api/xqsystem/login' +URL_LIST_END = 'api/misystem/devicelist' + + +def mocked_requests(*args, **kwargs): + """Mock requests.get invocations.""" + class MockResponse: + """Class to represent a mocked response.""" + + def __init__(self, json_data, status_code): + """Initialize the mock response class.""" + self.json_data = json_data + self.status_code = status_code + + def json(self): + """Return the json of the response.""" + return self.json_data + + @property + def content(self): + """Return the content of the response.""" + return self.json() + + def raise_for_status(self): + """Raise an HTTPError if status is not 200.""" + if self.status_code != 200: + raise requests.HTTPError(self.status_code) + + data = kwargs.get('data') + + if data and data.get('username', None) == INVALID_USERNAME: + return MockResponse({ + "code": "401", + "msg": "Invalid token" + }, 200) + elif str(args[0]).startswith(URL_AUTHORIZE): + print("deliver authorized") + return MockResponse({ + "url": "/cgi-bin/luci/;stok=ef5860/web/home", + "token": "ef5860", + "code": "0" + }, 200) + elif str(args[0]).endswith(URL_LIST_END): + return MockResponse({ + "mac": "1C:98:EC:0E:D5:A4", + "list": [ + { + "mac": "23:83:BF:F6:38:A0", + "oname": "12255ff", + "isap": 0, + "parent": "", + "authority": { + "wan": 1, + "pridisk": 0, + "admin": 1, + "lan": 0 + }, + "push": 0, + "online": 1, + "name": "Device1", + "times": 0, + "ip": [ + { + "downspeed": "0", + "online": "496957", + "active": 1, + "upspeed": "0", + "ip": "192.168.0.25" + } + ], + "statistics": { + "downspeed": "0", + "online": "496957", + "upspeed": "0" + }, + "icon": "", + "type": 1 + }, + { + "mac": "1D:98:EC:5E:D5:A6", + "oname": "CdddFG58", + "isap": 0, + "parent": "", + "authority": { + "wan": 1, + "pridisk": 0, + "admin": 1, + "lan": 0 + }, + "push": 0, + "online": 1, + "name": "Device2", + "times": 0, + "ip": [ + { + "downspeed": "0", + "online": "347325", + "active": 1, + "upspeed": "0", + "ip": "192.168.0.3" + } + ], + "statistics": { + "downspeed": "0", + "online": "347325", + "upspeed": "0" + }, + "icon": "", + "type": 0 + }, + ], + "code": 0 + }, 200) + else: + _LOGGER.debug('UNKNOWN ROUTE') + + +class TestXiaomiDeviceScanner(unittest.TestCase): + """Xiaomi device scanner test class.""" + + def setUp(self): + """Initialize values for this testcase class.""" + self.hass = get_test_home_assistant() + + def tearDown(self): + """Stop everything that was started.""" + self.hass.stop() + + @mock.patch( + 'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner', + return_value=mock.MagicMock()) + def test_config(self, xiaomi_mock): + """Testing minimal configuration.""" + config = { + DOMAIN: xiaomi.PLATFORM_SCHEMA({ + CONF_PLATFORM: xiaomi.DOMAIN, + CONF_HOST: '192.168.0.1', + CONF_PASSWORD: 'passwordTest' + }) + } + xiaomi.get_scanner(self.hass, config) + self.assertEqual(xiaomi_mock.call_count, 1) + self.assertEqual(xiaomi_mock.call_args, mock.call(config[DOMAIN])) + call_arg = xiaomi_mock.call_args[0][0] + self.assertEqual(call_arg['username'], 'admin') + self.assertEqual(call_arg['password'], 'passwordTest') + self.assertEqual(call_arg['host'], '192.168.0.1') + self.assertEqual(call_arg['platform'], 'device_tracker') + + @mock.patch( + 'homeassistant.components.device_tracker.xiaomi.XioamiDeviceScanner', + return_value=mock.MagicMock()) + def test_config_full(self, xiaomi_mock): + """Testing full configuration.""" + config = { + DOMAIN: xiaomi.PLATFORM_SCHEMA({ + CONF_PLATFORM: xiaomi.DOMAIN, + CONF_HOST: '192.168.0.1', + CONF_USERNAME: 'alternativeAdminName', + CONF_PASSWORD: 'passwordTest' + }) + } + xiaomi.get_scanner(self.hass, config) + self.assertEqual(xiaomi_mock.call_count, 1) + self.assertEqual(xiaomi_mock.call_args, mock.call(config[DOMAIN])) + call_arg = xiaomi_mock.call_args[0][0] + self.assertEqual(call_arg['username'], 'alternativeAdminName') + self.assertEqual(call_arg['password'], 'passwordTest') + self.assertEqual(call_arg['host'], '192.168.0.1') + self.assertEqual(call_arg['platform'], 'device_tracker') + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_invalid_credential(self, mock_get, mock_post): + """"Testing invalid credential handling.""" + config = { + DOMAIN: xiaomi.PLATFORM_SCHEMA({ + CONF_PLATFORM: xiaomi.DOMAIN, + CONF_HOST: '192.168.0.1', + CONF_USERNAME: INVALID_USERNAME, + CONF_PASSWORD: 'passwordTest' + }) + } + self.assertIsNone(get_scanner(self.hass, config)) + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_valid_credential(self, mock_get, mock_post): + """"Testing valid refresh.""" + config = { + DOMAIN: xiaomi.PLATFORM_SCHEMA({ + CONF_PLATFORM: xiaomi.DOMAIN, + CONF_HOST: '192.168.0.1', + CONF_USERNAME: 'admin', + CONF_PASSWORD: 'passwordTest' + }) + } + scanner = get_scanner(self.hass, config) + self.assertIsNotNone(scanner) + self.assertEqual(2, len(scanner.scan_devices())) + self.assertEqual("Device1", + scanner.get_device_name("23:83:BF:F6:38:A0")) + self.assertEqual("Device2", + scanner.get_device_name("1D:98:EC:5E:D5:A6"))