core/homeassistant/components/actiontec/device_tracker.py

123 lines
4.1 KiB
Python
Raw Normal View History

"""Support for Actiontec MI424WR (Verizon FIOS) routers."""
import logging
import re
import telnetlib
from collections import namedtuple
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
from homeassistant.components.device_tracker import (
2019-07-31 19:25:30 +00:00
DOMAIN,
PLATFORM_SCHEMA,
DeviceScanner,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
_LOGGER = logging.getLogger(__name__)
_LEASES_REGEX = re.compile(
2019-07-31 19:25:30 +00:00
r"(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})"
+ r"\smac:\s(?P<mac>([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))"
+ r"\svalid\sfor:\s(?P<timevalid>(-?\d+))"
+ r"\ssec"
)
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
}
)
def get_scanner(hass, config):
"""Validate the configuration and return an Actiontec scanner."""
scanner = ActiontecDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
2016-11-19 05:47:59 +00:00
2019-07-31 19:25:30 +00:00
Device = namedtuple("Device", ["mac", "ip", "last_update"])
class ActiontecDeviceScanner(DeviceScanner):
2018-01-27 19:58:27 +00:00
"""This class queries an actiontec router for connected devices."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.last_results = []
data = self.get_actiontec_data()
self.success_init = data is not None
_LOGGER.info("canner initialized")
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client.mac for client in self.last_results]
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
if client.mac == device:
return client.ip
return None
def _update_info(self):
"""Ensure the information from the router is up to date.
Return boolean if scanning successful.
"""
_LOGGER.info("Scanning")
if not self.success_init:
return False
now = dt_util.now()
actiontec_data = self.get_actiontec_data()
if not actiontec_data:
return False
2019-07-31 19:25:30 +00:00
self.last_results = [
Device(data["mac"], name, now)
for name, data in actiontec_data.items()
if data["timevalid"] > -60
]
_LOGGER.info("Scan successful")
return True
def get_actiontec_data(self):
"""Retrieve data from Actiontec MI424WR and return parsed result."""
try:
telnet = telnetlib.Telnet(self.host)
2019-07-31 19:25:30 +00:00
telnet.read_until(b"Username: ")
telnet.write((self.username + "\n").encode("ascii"))
telnet.read_until(b"Password: ")
telnet.write((self.password + "\n").encode("ascii"))
prompt = telnet.read_until(b"Wireless Broadband Router> ").split(b"\n")[-1]
telnet.write("firewall mac_cache_dump\n".encode("ascii"))
telnet.write("\n".encode("ascii"))
telnet.read_until(prompt)
2019-07-31 19:25:30 +00:00
leases_result = telnet.read_until(prompt).split(b"\n")[1:-1]
telnet.write("exit\n".encode("ascii"))
except EOFError:
_LOGGER.exception("Unexpected response from router")
return
except ConnectionRefusedError:
_LOGGER.exception("Connection refused by router. Telnet enabled?")
return None
devices = {}
for lease in leases_result:
2019-07-31 19:25:30 +00:00
match = _LEASES_REGEX.search(lease.decode("utf-8"))
if match is not None:
2019-07-31 19:25:30 +00:00
devices[match.group("ip")] = {
"ip": match.group("ip"),
"mac": match.group("mac").upper(),
"timevalid": int(match.group("timevalid")),
}
return devices