Use voluptuous for nmap, tplink, thomson device trackers (#3124)

* Use Voluptuous for nmap, tplink, thomson device_trackers

* Fix logic
pull/3424/merge
Johann Kellerman 2016-09-18 08:44:15 +02:00 committed by Paulus Schoutsen
parent 9184773f8f
commit 169f054c6c
3 changed files with 57 additions and 54 deletions

View File

@ -10,11 +10,13 @@ import subprocess
from collections import namedtuple from collections import namedtuple
from datetime import timedelta from datetime import timedelta
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.components.device_tracker import DOMAIN from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.const import CONF_HOSTS from homeassistant.const import CONF_HOSTS
from homeassistant.helpers import validate_config from homeassistant.util import Throttle
from homeassistant.util import Throttle, convert
# Return cached results if last scan was less then this time ago # Return cached results if last scan was less then this time ago
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5) MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
@ -27,18 +29,21 @@ CONF_EXCLUDE = 'exclude'
REQUIREMENTS = ['python-nmap==0.6.1'] REQUIREMENTS = ['python-nmap==0.6.1']
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOSTS): vol.All(cv.ensure_list, [cv.string]),
vol.Required(CONF_HOME_INTERVAL, default=0): cv.positive_int,
vol.Optional(CONF_EXCLUDE, default=[]):
vol.All(cv.ensure_list, vol.Length(min=1))
})
def get_scanner(hass, config): def get_scanner(hass, config):
"""Validate the configuration and return a Nmap scanner.""" """Validate the configuration and return a Nmap scanner."""
if not validate_config(config, {DOMAIN: [CONF_HOSTS]},
_LOGGER):
return None
scanner = NmapDeviceScanner(config[DOMAIN]) scanner = NmapDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
Device = namedtuple("Device", ["mac", "name", "ip", "last_update"]) Device = namedtuple('Device', ['mac', 'name', 'ip', 'last_update'])
def _arp(ip_address): def _arp(ip_address):
@ -49,24 +54,26 @@ def _arp(ip_address):
match = re.search(r'(([0-9A-Fa-f]{1,2}\:){5}[0-9A-Fa-f]{1,2})', str(out)) match = re.search(r'(([0-9A-Fa-f]{1,2}\:){5}[0-9A-Fa-f]{1,2})', str(out))
if match: if match:
return match.group(0) return match.group(0)
_LOGGER.info("No MAC address found for %s", ip_address) _LOGGER.info('No MAC address found for %s', ip_address)
return None return None
class NmapDeviceScanner(object): class NmapDeviceScanner(object):
"""This class scans for devices using nmap.""" """This class scans for devices using nmap."""
exclude = []
def __init__(self, config): def __init__(self, config):
"""Initialize the scanner.""" """Initialize the scanner."""
self.last_results = [] self.last_results = []
self.hosts = config[CONF_HOSTS] self.hosts = config[CONF_HOSTS]
self.exclude = config.get(CONF_EXCLUDE, []) self.exclude = config.get(CONF_EXCLUDE, [])
minutes = convert(config.get(CONF_HOME_INTERVAL), int, 0) minutes = config[CONF_HOME_INTERVAL]
self.home_interval = timedelta(minutes=minutes) self.home_interval = timedelta(minutes=minutes)
self.success_init = self._update_info() self.success_init = self._update_info()
_LOGGER.info("nmap scanner initialized") _LOGGER.info('nmap scanner initialized')
def scan_devices(self): def scan_devices(self):
"""Scan for new devices and return a list with found device IDs.""" """Scan for new devices and return a list with found device IDs."""
@ -90,21 +97,18 @@ class NmapDeviceScanner(object):
Returns boolean if scanning successful. Returns boolean if scanning successful.
""" """
_LOGGER.info("Scanning") _LOGGER.info('Scanning')
from nmap import PortScanner, PortScannerError from nmap import PortScanner, PortScannerError
scanner = PortScanner() scanner = PortScanner()
options = "-F --host-timeout 5s " options = '-F --host-timeout 5s '
exclude = "--exclude "
if self.home_interval: if self.home_interval:
boundary = dt_util.now() - self.home_interval boundary = dt_util.now() - self.home_interval
last_results = [device for device in self.last_results last_results = [device for device in self.last_results
if device.last_update > boundary] if device.last_update > boundary]
if last_results: if last_results:
# Pylint is confused here.
# pylint: disable=no-member
exclude_hosts = self.exclude + [device.ip for device exclude_hosts = self.exclude + [device.ip for device
in last_results] in last_results]
else: else:
@ -113,8 +117,7 @@ class NmapDeviceScanner(object):
last_results = [] last_results = []
exclude_hosts = self.exclude exclude_hosts = self.exclude
if exclude_hosts: if exclude_hosts:
exclude = " --exclude {}".format(",".join(exclude_hosts)) options += ' --exclude {}'.format(','.join(exclude_hosts))
options += exclude
try: try:
result = scanner.scan(hosts=self.hosts, arguments=options) result = scanner.scan(hosts=self.hosts, arguments=options)
@ -134,5 +137,5 @@ class NmapDeviceScanner(object):
self.last_results = last_results self.last_results = last_results
_LOGGER.info("nmap scan successful") _LOGGER.info('nmap scan successful')
return True return True

View File

@ -10,9 +10,11 @@ import telnetlib
import threading import threading
from datetime import timedelta from datetime import timedelta
from homeassistant.components.device_tracker import DOMAIN import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle from homeassistant.util import Throttle
# Return cached results if last scan was less then this time ago. # Return cached results if last scan was less then this time ago.
@ -21,23 +23,24 @@ MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_DEVICES_REGEX = re.compile( _DEVICES_REGEX = re.compile(
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s' + r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s'
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+' + r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+'
r'(?P<status>([^\s]+))\s+' + r'(?P<status>([^\s]+))\s+'
r'(?P<type>([^\s]+))\s+' + r'(?P<type>([^\s]+))\s+'
r'(?P<intf>([^\s]+))\s+' + r'(?P<intf>([^\s]+))\s+'
r'(?P<hwintf>([^\s]+))\s+' + r'(?P<hwintf>([^\s]+))\s+'
r'(?P<host>([^\s]+))') r'(?P<host>([^\s]+))')
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string
})
# pylint: disable=unused-argument # pylint: disable=unused-argument
def get_scanner(hass, config): def get_scanner(hass, config):
"""Validate the configuration and return a THOMSON scanner.""" """Validate the configuration and return a THOMSON scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
return None
scanner = ThomsonDeviceScanner(config[DOMAIN]) scanner = ThomsonDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
@ -84,7 +87,7 @@ class ThomsonDeviceScanner(object):
return False return False
with self.lock: with self.lock:
_LOGGER.info("Checking ARP") _LOGGER.info('Checking ARP')
data = self.get_thomson_data() data = self.get_thomson_data()
if not data: if not data:
return False return False
@ -108,11 +111,11 @@ class ThomsonDeviceScanner(object):
devices_result = telnet.read_until(b'=>').split(b'\r\n') devices_result = telnet.read_until(b'=>').split(b'\r\n')
telnet.write('exit\r\n'.encode('ascii')) telnet.write('exit\r\n'.encode('ascii'))
except EOFError: except EOFError:
_LOGGER.exception("Unexpected response from router") _LOGGER.exception('Unexpected response from router')
return return
except ConnectionRefusedError: except ConnectionRefusedError:
_LOGGER.exception("Connection refused by router," + _LOGGER.exception('Connection refused by router,'
" is telnet enabled?") ' is telnet enabled?')
return return
devices = {} devices = {}

View File

@ -12,10 +12,11 @@ import threading
from datetime import timedelta from datetime import timedelta
import requests import requests
import voluptuous as vol
from homeassistant.components.device_tracker import DOMAIN import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle from homeassistant.util import Throttle
# Return cached results if last scan was less then this time ago # Return cached results if last scan was less then this time ago
@ -23,27 +24,23 @@ MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string
})
def get_scanner(hass, config): def get_scanner(hass, config):
"""Validate the configuration and return a TP-Link scanner.""" """Validate the configuration and return a TP-Link scanner."""
if not validate_config(config, for cls in [Tplink4DeviceScanner, Tplink3DeviceScanner,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]}, Tplink2DeviceScanner, TplinkDeviceScanner]:
_LOGGER): scanner = cls(config[DOMAIN])
if scanner.success_init:
return scanner
return None return None
scanner = Tplink4DeviceScanner(config[DOMAIN])
if not scanner.success_init:
scanner = Tplink3DeviceScanner(config[DOMAIN])
if not scanner.success_init:
scanner = Tplink2DeviceScanner(config[DOMAIN])
if not scanner.success_init:
scanner = TplinkDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
class TplinkDeviceScanner(object): class TplinkDeviceScanner(object):
"""This class queries a wireless router running TP-Link firmware.""" """This class queries a wireless router running TP-Link firmware."""