Merge branch 'pep257-device-tracker' into dev

pull/1511/head
Fabian Affolter 2016-03-08 08:08:35 +01:00
commit 29a651dd92
17 changed files with 156 additions and 246 deletions

View File

@ -1,5 +1,5 @@
"""
Provides functionality to keep track of devices.
Provide functionality to keep track of devices.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/device_tracker/
@ -68,7 +68,7 @@ _LOGGER = logging.getLogger(__name__)
def is_on(hass, entity_id=None):
"""Returns if any or specified device is home."""
"""Return the state if any or a specified device is home."""
entity = entity_id or ENTITY_ID_ALL_DEVICES
return hass.states.is_state(entity, STATE_HOME)
@ -139,7 +139,7 @@ def setup(hass, config):
device_tracker_discovered)
def update_stale(now):
""" Clean up stale devices. """
"""Clean up stale devices."""
tracker.update_stale(now)
track_utc_time_change(hass, update_stale, second=range(0, 60, 5))
@ -161,8 +161,10 @@ def setup(hass, config):
class DeviceTracker(object):
"""Represents a device tracker."""
"""Representation of a device tracker."""
def __init__(self, hass, consider_home, track_new, home_range, devices):
"""Initialize a device tracker."""
self.hass = hass
self.devices = {dev.dev_id: dev for dev in devices}
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
@ -179,7 +181,7 @@ class DeviceTracker(object):
def see(self, mac=None, dev_id=None, host_name=None, location_name=None,
gps=None, gps_accuracy=None, battery=None):
"""Notify device tracker that you see a device."""
"""Notify the device tracker that you see a device."""
with self.lock:
if mac is None and dev_id is None:
raise HomeAssistantError('Neither mac or device id passed in')
@ -218,7 +220,7 @@ class DeviceTracker(object):
update_config(self.hass.config.path(YAML_DEVICES), dev_id, device)
def setup_group(self):
"""Initializes group for all tracked devices."""
"""Initialize group for all tracked devices."""
entity_ids = (dev.entity_id for dev in self.devices.values()
if dev.track)
self.group = group.Group(
@ -234,7 +236,8 @@ class DeviceTracker(object):
class Device(Entity):
"""Represents a tracked device."""
"""Represent a tracked device."""
host_name = None
location_name = None
gps = None
@ -248,6 +251,7 @@ class Device(Entity):
def __init__(self, hass, consider_home, home_range, track, dev_id, mac,
name=None, picture=None, away_hide=False):
"""Initialize a device."""
self.hass = hass
self.entity_id = ENTITY_ID_FORMAT.format(dev_id)
@ -280,7 +284,7 @@ class Device(Entity):
@property
def name(self):
"""Returns the name of the entity."""
"""Return the name of the entity."""
return self.config_name or self.host_name or DEVICE_DEFAULT_NAME
@property

View File

@ -31,7 +31,7 @@ _LEASES_REGEX = re.compile(
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates configuration and returns an Actiontec scanner."""
"""Validate the configuration and return an Actiontec scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -43,11 +43,10 @@ Device = namedtuple("Device", ["mac", "ip", "last_update"])
class ActiontecDeviceScanner(object):
"""
This class queries a an actiontec router for connected devices.
Adapted from DD-WRT scanner.
"""
"""This class queries a an actiontec router for connected devices."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
@ -58,14 +57,12 @@ class ActiontecDeviceScanner(object):
_LOGGER.info("actiontec scanner initialized")
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client.mac for client in self.last_results]
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
@ -75,9 +72,9 @@ class ActiontecDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the Actiontec MI424WR router is up
to date. Returns boolean if scanning successful.
"""Ensure the information from the router is up to date.
Return boolean if scanning successful.
"""
_LOGGER.info("Scanning")
if not self.success_init:

View File

@ -28,7 +28,7 @@ _DEVICES_REGEX = re.compile(
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates configuration and returns a Aruba scanner."""
"""Validate the configuration and return a Aruba scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -41,7 +41,9 @@ def get_scanner(hass, config):
class ArubaDeviceScanner(object):
"""This class queries a Aruba Access Point for connected devices."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
@ -50,19 +52,17 @@ class ArubaDeviceScanner(object):
self.last_results = {}
# Test the router is accessible
# Test the router is accessible.
data = self.get_aruba_data()
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client['mac'] for client in self.last_results]
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
@ -72,9 +72,9 @@ class ArubaDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the Aruba Access Point is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the Aruba Access Point is up to date.
Return boolean if scanning successful.
"""
if not self.success_init:
return False

View File

@ -36,7 +36,7 @@ _IP_NEIGH_REGEX = re.compile(
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates configuration and returns an ASUS-WRT scanner."""
"""Validate the configuration and return an ASUS-WRT scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -48,11 +48,10 @@ def get_scanner(hass, config):
class AsusWrtDeviceScanner(object):
"""
This class queries a router running ASUSWRT firmware
for connected devices. Adapted from DD-WRT scanner.
"""
"""This class queries a router running ASUSWRT firmware."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = str(config[CONF_USERNAME])
self.password = str(config[CONF_PASSWORD])
@ -61,19 +60,17 @@ class AsusWrtDeviceScanner(object):
self.last_results = {}
# Test the router is accessible
# Test the router is accessible.
data = self.get_asuswrt_data()
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client['mac'] for client in self.last_results]
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
@ -83,9 +80,9 @@ class AsusWrtDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the ASUSWRT router is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the ASUSWRT router is up to date.
Return boolean if scanning successful.
"""
if not self.success_init:
return False

View File

@ -27,7 +27,7 @@ _MAC_REGEX = re.compile(r'(([0-9A-Fa-f]{1,2}\:){5}[0-9A-Fa-f]{1,2})')
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates config and returns a DD-WRT scanner."""
"""Validate the configuration and return a DD-WRT scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -40,11 +40,10 @@ def get_scanner(hass, config):
# pylint: disable=too-many-instance-attributes
class DdWrtDeviceScanner(object):
"""
This class queries a wireless router running DD-WRT firmware
for connected devices. Adapted from Tomato scanner.
"""
"""This class queries a wireless router running DD-WRT firmware."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
@ -61,15 +60,13 @@ class DdWrtDeviceScanner(object):
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
with self.lock:
# If not initialised and not already scanned and not found.
if device not in self.mac2name:
@ -102,9 +99,9 @@ class DdWrtDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the DD-WRT router is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the DD-WRT router is up to date.
Return boolean if scanning successful.
"""
if not self.success_init:
return False

View File

@ -1,6 +1,4 @@
"""
Demo platform for the device tracker.
"""
"""Demo platform for the device tracker."""
import random
from homeassistant.components.device_tracker import DOMAIN

View File

@ -21,7 +21,7 @@ _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
"""Validates configuration and returns FritzBoxScanner."""
"""Validate the configuration and return FritzBoxScanner."""
if not validate_config(config,
{DOMAIN: []},
_LOGGER):
@ -33,20 +33,10 @@ def get_scanner(hass, config):
# pylint: disable=too-many-instance-attributes
class FritzBoxScanner(object):
"""
This class queries a FRITZ!Box router. It is using the
fritzconnection library for communication with the router.
"""This class queries a FRITZ!Box router."""
The API description can be found under:
https://pypi.python.org/pypi/fritzconnection/0.4.6
This scanner retrieves the list of known hosts and checks their
corresponding states (on, or off).
Due to a bug of the fritzbox api (router side) it is not possible
to track more than 16 hosts.
"""
def __init__(self, config):
"""Initialize the scanner."""
self.last_results = []
self.host = '169.254.1.1' # This IP is valid for all FRITZ!Box router.
self.username = 'admin'
@ -95,7 +85,7 @@ class FritzBoxScanner(object):
return active_hosts
def get_device_name(self, mac):
"""Returns the name of the given device or None if is not known."""
"""Return the name of the given device or None if is not known."""
ret = self.fritz_box.get_specific_host_entry(mac)["NewHostName"]
if ret == {}:
return None
@ -103,7 +93,7 @@ class FritzBoxScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Retrieves latest information from the FRITZ!Box."""
"""Retrieve latest information from the FRITZ!Box."""
if not self.success_init:
return False

View File

@ -43,7 +43,7 @@ def setup_scanner(hass, config, see):
return False
def keep_alive(now):
"""Keeps authenticating iCloud connection."""
"""Keep authenticating iCloud connection."""
api.authenticate()
_LOGGER.info("Authenticate against iCloud")

View File

@ -1,5 +1,5 @@
"""
Support for OpenWRT routers.
Support for OpenWRT (luci) routers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.luci/
@ -24,7 +24,7 @@ _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
"""Validates configuration and returns a Luci scanner."""
"""Validate the configuration and return a Luci scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -37,19 +37,13 @@ def get_scanner(hass, config):
# pylint: disable=too-many-instance-attributes
class LuciDeviceScanner(object):
"""This class queries a wireless router running OpenWrt firmware.
Adapted from Tomato scanner.
"""
This class queries a wireless router running OpenWrt firmware
for connected devices. Adapted from Tomato scanner.
# opkg install luci-mod-rpc
for this to work on the router.
The API is described here:
http://luci.subsignal.org/trac/wiki/Documentation/JsonRpcHowTo
(Currently, we do only wifi iwscan, and no DHCP lease access.)
"""
def __init__(self, config):
"""Initialize the scanner."""
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
@ -66,15 +60,12 @@ class LuciDeviceScanner(object):
self.success_init = self.token is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.mac2name is None:
url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host)
@ -94,8 +85,8 @@ class LuciDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the Luci router is up to date.
"""Ensure the information from the Luci router is up to date.
Returns boolean if scanning successful.
"""
if not self.success_init:

View File

@ -20,7 +20,7 @@ REQUIREMENTS = ['pynetgear==0.3.2']
def get_scanner(hass, config):
"""Validates configuration and returns a Netgear scanner."""
"""Validate the configuration and returns a Netgear scanner."""
info = config[DOMAIN]
host = info.get(CONF_HOST)
username = info.get(CONF_USERNAME)
@ -37,7 +37,9 @@ def get_scanner(hass, config):
class NetgearDeviceScanner(object):
"""Queries a Netgear wireless router using the SOAP-API."""
def __init__(self, host, username, password):
"""Initialize the scanner."""
import pynetgear
self.last_results = []
@ -62,15 +64,13 @@ class NetgearDeviceScanner(object):
_LOGGER.error("Failed to Login")
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return (device.mac for device in self.last_results)
def get_device_name(self, mac):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
try:
return next(device.name for device in self.last_results
if device.mac == mac)
@ -79,8 +79,8 @@ class NetgearDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Retrieves latest information from the Netgear router.
"""Retrieve latest information from the Netgear router.
Returns boolean if scanning successful.
"""
if not self.success_init:

View File

@ -28,7 +28,7 @@ REQUIREMENTS = ['python-nmap==0.4.3']
def get_scanner(hass, config):
"""Validates configuration and returns a Nmap scanner."""
"""Validate the configuration and return a Nmap scanner."""
if not validate_config(config, {DOMAIN: [CONF_HOSTS]},
_LOGGER):
return None
@ -54,7 +54,9 @@ def _arp(ip_address):
class NmapDeviceScanner(object):
"""This class scans for devices using nmap."""
def __init__(self, config):
"""Initialize the scanner."""
self.last_results = []
self.hosts = config[CONF_HOSTS]
@ -65,15 +67,13 @@ class NmapDeviceScanner(object):
_LOGGER.info("nmap scanner initialized")
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [device.mac for device in self.last_results]
def get_device_name(self, mac):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
filter_named = [device.name for device in self.last_results
if device.mac == mac]
@ -84,8 +84,8 @@ class NmapDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Scans the network for devices.
"""Scan the network for devices.
Returns boolean if scanning successful.
"""
_LOGGER.info("Scanning")

View File

@ -26,7 +26,7 @@ CONF_BASEOID = "baseoid"
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates configuration and returns an snmp scanner."""
"""Validate the configuration and return an snmp scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_COMMUNITY, CONF_BASEOID]},
_LOGGER):
@ -38,8 +38,10 @@ def get_scanner(hass, config):
class SnmpScanner(object):
"""Queries any SNMP capable Acces Point for connected devices."""
"""Queries any SNMP capable Access Point for connected devices."""
def __init__(self, config):
"""Initialize the scanner."""
from pysnmp.entity.rfc3413.oneliner import cmdgen
self.snmp = cmdgen.CommandGenerator()
@ -56,9 +58,7 @@ class SnmpScanner(object):
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client['mac'] for client in self.last_results
if client.get('mac')]
@ -66,15 +66,15 @@ class SnmpScanner(object):
# Supressing no-self-use warning
# pylint: disable=R0201
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
# We have no names
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the WAP is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the WAP is up to date.
Return boolean if scanning successful.
"""
if not self.success_init:
return False

View File

@ -32,7 +32,7 @@ _DEVICES_REGEX = re.compile(
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validates configuration and returns a THOMSON scanner."""
"""Validate the configuration and return a THOMSON scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -44,11 +44,10 @@ def get_scanner(hass, config):
class ThomsonDeviceScanner(object):
"""
This class queries a router running THOMSON firmware for connected devices.
Adapted from ASUSWRT scanner.
"""
"""This class queries a router running THOMSON firmware."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
@ -57,19 +56,17 @@ class ThomsonDeviceScanner(object):
self.last_results = {}
# Test the router is accessible
# Test the router is accessible.
data = self.get_thomson_data()
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client['mac'] for client in self.last_results]
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
@ -79,9 +76,9 @@ class ThomsonDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the THOMSON router is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the THOMSON router is up to date.
Return boolean if scanning successful.
"""
if not self.success_init:
return False
@ -92,7 +89,7 @@ class ThomsonDeviceScanner(object):
if not data:
return False
# flag C stands for CONNECTED
# Flag C stands for CONNECTED
active_clients = [client for client in data.values() if
client['status'].find('C') != -1]
self.last_results = active_clients

View File

@ -26,7 +26,7 @@ _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
"""Validates configuration and returns a Tomato scanner."""
"""Validate the configuration and returns a Tomato scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME,
CONF_PASSWORD, CONF_HTTP_ID]},
@ -37,13 +37,10 @@ def get_scanner(hass, config):
class TomatoDeviceScanner(object):
"""This class queries a wireless router running Tomato firmware
for connected devices.
"""This class queries a wireless router running Tomato firmware."""
A description of the Tomato API can be found on
http://paulusschoutsen.nl/blog/2013/10/tomato-api-documentation/
"""
def __init__(self, config):
"""Initialize the scanner."""
host, http_id = config[CONF_HOST], config[CONF_HTTP_ID]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
@ -64,15 +61,13 @@ class TomatoDeviceScanner(object):
self.success_init = self._update_tomato_info()
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_tomato_info()
return [item[1] for item in self.last_results['wldev']]
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
filter_named = [item[0] for item in self.last_results['dhcpd_lease']
if item[2] == device]
@ -83,8 +78,9 @@ class TomatoDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_tomato_info(self):
"""Ensures the information from the Tomato router is up to date.
Returns boolean if scanning successful.
"""Ensure the information from the Tomato router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
self.logger.info("Scanning")

View File

@ -1,8 +1,5 @@
"""
homeassistant.components.device_tracker.tplink
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Device tracker platform that supports scanning a TP-Link router for device
presence.
Support for TP-Link routers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.tplink/
@ -28,7 +25,7 @@ _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
""" Validates config and returns a TP-Link scanner. """
"""Validate the configuration and return a TP-Link scanner. """
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -49,12 +46,10 @@ def get_scanner(hass, config):
class TplinkDeviceScanner(object):
"""
This class queries a wireless router running TP-Link firmware
for connected devices.
"""
"""This class queries a wireless router running TP-Link firmware."""
def __init__(self, config):
"""Initialize the scanner."""
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
@ -70,29 +65,21 @@ class TplinkDeviceScanner(object):
self.success_init = self._update_info()
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
# pylint: disable=no-self-use
def get_device_name(self, device):
"""
The TP-Link firmware doesn't save the name of the wireless device.
"""
"""The firmware doesn't save the name of the wireless device."""
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the TP-Link router is up to date.
Returns boolean if scanning successful.
"""
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Loading wireless clients...")
@ -111,34 +98,24 @@ class TplinkDeviceScanner(object):
class Tplink2DeviceScanner(TplinkDeviceScanner):
"""
This class queries a wireless router running newer version of TP-Link
firmware for connected devices.
"""
"""This class queries a router with newer version of TP-Link firmware."""
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results.keys()
# pylint: disable=no-self-use
def get_device_name(self, device):
"""
The TP-Link firmware doesn't save the name of the wireless device.
"""
"""The firmware doesn't save the name of the wireless device."""
return self.last_results.get(device)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the TP-Link router is up to date.
Returns boolean if scanning successful.
"""
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Loading wireless clients...")
@ -176,46 +153,36 @@ class Tplink2DeviceScanner(TplinkDeviceScanner):
class Tplink3DeviceScanner(TplinkDeviceScanner):
"""
This class queries the Archer C9 router running version 150811 or higher
of TP-Link firmware for connected devices.
"""
"""This class queries the Archer C9 router with version 150811 or high."""
def __init__(self, config):
"""Initialize the scanner."""
self.stok = ''
self.sysauth = ''
super(Tplink3DeviceScanner, self).__init__(config)
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results.keys()
# pylint: disable=no-self-use
def get_device_name(self, device):
"""
The TP-Link firmware doesn't save the name of the wireless device.
"""The firmware doesn't save the name of the wireless device.
We are forced to use the MAC address as name here.
"""
return self.last_results.get(device)
def _get_auth_tokens(self):
"""
Retrieves auth tokens from the router.
"""
"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving auth tokens...")
url = 'http://{}/cgi-bin/luci/;stok=/login?form=login' \
.format(self.host)
referer = 'http://{}/webpages/login.html'.format(self.host)
# if possible implement rsa encryption of password here
# If possible implement rsa encryption of password here.
response = requests.post(url,
params={'operation': 'login',
'username': self.username,
@ -236,11 +203,10 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the TP-Link router is up to date.
Returns boolean if scanning successful.
"""
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens()
@ -288,50 +254,37 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
class Tplink4DeviceScanner(TplinkDeviceScanner):
"""
This class queries an Archer C7 router running TP-Link firmware 150427
or newer.
"""
"""This class queries an Archer C7 router with TP-Link firmware 150427."""
def __init__(self, config):
"""Initialize the scanner."""
self.credentials = ''
self.token = ''
super(Tplink4DeviceScanner, self).__init__(config)
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
# pylint: disable=no-self-use
def get_device_name(self, device):
"""
The TP-Link firmware doesn't save the name of the wireless device.
"""
"""The firmware doesn't save the name of the wireless device."""
return None
def _get_auth_tokens(self):
"""
Retrieves auth tokens from the router.
"""
"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving auth tokens...")
url = 'http://{}/userRpm/LoginRpm.htm?Save=Save'.format(self.host)
# Generate md5 hash of password
password = hashlib.md5(self.password.encode('utf')).hexdigest()
credentials = '{}:{}'.format(self.username, password).encode('utf')
# Encode the credentials to be sent as a cookie
# Encode the credentials to be sent as a cookie.
self.credentials = base64.b64encode(credentials).decode('utf')
# Create the authorization cookie
# Create the authorization cookie.
cookie = 'Authorization=Basic {}'.format(self.credentials)
response = requests.get(url, headers={'cookie': cookie})
@ -350,11 +303,10 @@ class Tplink4DeviceScanner(TplinkDeviceScanner):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the TP-Link router is up to date.
Returns boolean if scanning successful.
"""
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
if (self.credentials == '') or (self.token == ''):
self._get_auth_tokens()

View File

@ -24,7 +24,7 @@ _LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
"""Validates configuration and returns a Luci scanner."""
"""Validate the configuration and return an ubus scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
@ -38,23 +38,13 @@ def get_scanner(hass, config):
# pylint: disable=too-many-instance-attributes
class UbusDeviceScanner(object):
"""
This class queries a wireless router running OpenWrt firmware
for connected devices. Adapted from Tomato scanner.
Configure your routers' ubus ACL based on following instructions:
http://wiki.openwrt.org/doc/techref/ubus
Read only access will be fine.
To use this class you have to install rpcd-mod-file package
in your OpenWrt router:
opkg install rpcd-mod-file
This class queries a wireless router running OpenWrt firmware.
Adapted from Tomato scanner.
"""
def __init__(self, config):
"""Initialize the scanner."""
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
@ -70,14 +60,12 @@ class UbusDeviceScanner(object):
self.success_init = self.session_id is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
def get_device_name(self, device):
"""Returns the name of the given device or None if we don't know."""
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.leasefile is None:
result = _req_json_rpc(self.url, self.session_id,
@ -106,8 +94,8 @@ class UbusDeviceScanner(object):
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the Luci router is up to date.
"""Ensure the information from the Luci router is up to date.
Returns boolean if scanning successful.
"""
if not self.success_init:

View File

@ -49,11 +49,14 @@ def get_scanner(hass, config):
class UnifiScanner(object):
"""Provide device_tracker support from Unifi WAP client data."""
def __init__(self, controller):
"""Initialize the scanner."""
self._controller = controller
self._update()
def _update(self):
"""Get the clients from the device."""
try:
clients = self._controller.get_clients()
except urllib.error.HTTPError as ex:
@ -63,12 +66,12 @@ class UnifiScanner(object):
self._clients = {client['mac']: client for client in clients}
def scan_devices(self):
"""Scans for devices."""
"""Scan for devices."""
self._update()
return self._clients.keys()
def get_device_name(self, mac):
"""Returns the name (if known) of the device.
"""Return the name (if known) of the device.
If a name has been set in Unifi, then return that, else
return the hostname if it has been detected.