core/homeassistant/components/huawei_router/device_tracker.py

157 lines
5.0 KiB
Python

"""Support for HUAWEI routers."""
import base64
from collections import namedtuple
import logging
import re
import requests
import voluptuous as vol
from homeassistant.components.device_tracker import (
DOMAIN,
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
DeviceScanner,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
}
)
def get_scanner(hass, config):
"""Validate the configuration and return a HUAWEI scanner."""
scanner = HuaweiDeviceScanner(config[DOMAIN])
return scanner
Device = namedtuple("Device", ["name", "ip", "mac", "state"])
class HuaweiDeviceScanner(DeviceScanner):
"""This class queries a router running HUAWEI firmware."""
ARRAY_REGEX = re.compile(r"var UserDevinfo = new Array\((.*)null\);")
DEVICE_REGEX = re.compile(r"new USERDevice\((.*?)\),")
DEVICE_ATTR_REGEX = re.compile(
'"(?P<Domain>.*?)","(?P<IpAddr>.*?)",'
'"(?P<MacAddr>.*?)","(?P<Port>.*?)",'
'"(?P<IpType>.*?)","(?P<DevType>.*?)",'
'"(?P<DevStatus>.*?)","(?P<PortType>.*?)",'
'"(?P<Time>.*?)","(?P<HostName>.*?)",'
'"(?P<IPv4Enabled>.*?)","(?P<IPv6Enabled>.*?)",'
'"(?P<DeviceType>.*?)"'
)
LOGIN_COOKIE = {"Cookie": "body:Language:portuguese:id=-1"}
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = base64.b64encode(bytes(config[CONF_PASSWORD], "utf-8"))
self.last_results = []
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [client.mac for client in self.last_results]
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
if not self.last_results:
return None
for client in self.last_results:
if client.mac == device:
return client.name
return None
def _update_info(self):
"""Ensure the information from the router is up to date.
Return boolean if scanning successful.
"""
data = self._get_data()
if not data:
return False
active_clients = [client for client in data if client.state]
self.last_results = active_clients
_LOGGER.debug(
"Active clients: %s",
"\n".join(f"{client.mac} {client.name}" for client in active_clients),
)
return True
def _get_data(self):
"""Get the devices' data from the router.
Returns a list with all the devices known to the router DHCP server.
"""
array_regex_res = self.ARRAY_REGEX.search(self._get_devices_response())
devices = []
if array_regex_res:
device_regex_res = self.DEVICE_REGEX.findall(array_regex_res.group(1))
for device in device_regex_res:
device_attrs_regex_res = self.DEVICE_ATTR_REGEX.search(device)
devices.append(
Device(
device_attrs_regex_res.group("HostName"),
device_attrs_regex_res.group("IpAddr"),
device_attrs_regex_res.group("MacAddr"),
device_attrs_regex_res.group("DevStatus") == "Online",
)
)
return devices
def _get_devices_response(self):
"""Get the raw string with the devices from the router."""
cnt = requests.post(f"http://{self.host}/asp/GetRandCount.asp")
cnt_str = str(cnt.content, cnt.apparent_encoding, errors="replace")
_LOGGER.debug("Logging in")
cookie = requests.post(
f"http://{self.host}/login.cgi",
data=[
("UserName", self.username),
("PassWord", self.password),
("x.X_HW_Token", cnt_str),
],
cookies=self.LOGIN_COOKIE,
)
_LOGGER.debug("Requesting lan user info update")
# this request is needed or else some devices' state won't be updated
requests.get(
f"http://{self.host}/html/bbsp/common/lanuserinfo.asp",
cookies=cookie.cookies,
)
_LOGGER.debug("Requesting lan user info data")
devices = requests.get(
f"http://{self.host}/html/bbsp/common/GetLanUserDevInfo.asp",
cookies=cookie.cookies,
)
# we need to decode() using the request encoding, then encode() and
# decode('unicode_escape') to replace \\xXX with \xXX
# (i.e. \\x2d -> \x2d)
return (
devices.content.decode(devices.apparent_encoding)
.encode()
.decode("unicode_escape")
)