core/homeassistant/components/ubus/device_tracker.py

185 lines
5.9 KiB
Python
Raw Normal View History

"""Support for OpenWRT (ubus) routers."""
from __future__ import annotations
2016-02-19 05:27:50 +00:00
import logging
import re
2016-02-19 05:27:50 +00:00
from openwrt.ubus import Ubus
2016-09-04 08:06:16 +00:00
import voluptuous as vol
from homeassistant.components.device_tracker import (
2019-07-31 19:25:30 +00:00
DOMAIN,
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
2019-07-31 19:25:30 +00:00
DeviceScanner,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
2019-07-31 19:25:30 +00:00
CONF_DHCP_SOFTWARE = "dhcp_software"
DEFAULT_DHCP_SOFTWARE = "dnsmasq"
DHCP_SOFTWARES = ["dnsmasq", "odhcpd", "none"]
PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
2019-07-31 19:25:30 +00:00
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_DHCP_SOFTWARE, default=DEFAULT_DHCP_SOFTWARE): vol.In(
DHCP_SOFTWARES
),
}
)
2016-09-04 08:06:16 +00:00
def get_scanner(hass: HomeAssistant, config: ConfigType) -> DeviceScanner | None:
2016-03-07 20:18:53 +00:00
"""Validate the configuration and return an ubus scanner."""
dhcp_sw = config[DOMAIN][CONF_DHCP_SOFTWARE]
scanner: DeviceScanner
2019-07-31 19:25:30 +00:00
if dhcp_sw == "dnsmasq":
scanner = DnsmasqUbusDeviceScanner(config[DOMAIN])
2019-07-31 19:25:30 +00:00
elif dhcp_sw == "odhcpd":
scanner = OdhcpdUbusDeviceScanner(config[DOMAIN])
else:
scanner = UbusDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
def _refresh_on_access_denied(func):
"""If remove rebooted, it lost our session so rebuild one and try again."""
2019-07-31 19:25:30 +00:00
def decorator(self, *args, **kwargs):
"""Wrap the function to refresh session_id on PermissionError."""
try:
return func(self, *args, **kwargs)
except PermissionError:
2019-07-31 19:25:30 +00:00
_LOGGER.warning(
"Invalid session detected."
" Trying to refresh session_id and re-run RPC"
)
self.ubus.connect()
return func(self, *args, **kwargs)
return decorator
class UbusDeviceScanner(DeviceScanner):
"""
2016-03-07 20:18:53 +00:00
This class queries a wireless router running OpenWrt firmware.
2015-10-24 09:20:57 +00:00
2016-03-07 20:18:53 +00:00
Adapted from Tomato scanner.
"""
def __init__(self, config):
2016-03-07 20:18:53 +00:00
"""Initialize the scanner."""
host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
self.last_results = {}
self.url = f"http://{host}/ubus"
self.ubus = Ubus(self.url, self.username, self.password)
self.hostapd = []
self.mac2name = None
self.success_init = self.ubus.connect() is not None
def scan_devices(self):
2016-03-07 20:18:53 +00:00
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
def _generate_mac2name(self):
2018-04-04 21:30:02 +00:00
"""Return empty MAC to name dict. Overridden if DHCP server is set."""
2020-04-04 20:31:56 +00:00
self.mac2name = {}
@_refresh_on_access_denied
def get_device_name(self, device):
2016-03-07 20:18:53 +00:00
"""Return the name of the given device or None if we don't know."""
if self.mac2name is None:
self._generate_mac2name()
if self.mac2name is None:
# Generation of mac2name dictionary failed
return None
name = self.mac2name.get(device.upper(), None)
return name
@_refresh_on_access_denied
def _update_info(self):
"""Ensure the information from the router is up to date.
2016-03-07 20:18:53 +00:00
Returns boolean if scanning successful.
"""
if not self.success_init:
return False
_LOGGER.info("Checking hostapd")
2015-10-24 09:20:57 +00:00
if not self.hostapd:
hostapd = self.ubus.get_hostapd()
self.hostapd.extend(hostapd.keys())
2015-10-24 09:20:57 +00:00
self.last_results = []
results = 0
# for each access point
for hostapd in self.hostapd:
2021-10-31 17:32:17 +00:00
if result := self.ubus.get_hostapd_clients(hostapd):
results = results + 1
# Check for each device is authorized (valid wpa key)
2019-07-31 19:25:30 +00:00
for key in result["clients"].keys():
device = result["clients"][key]
if device["authorized"]:
self.last_results.append(key)
return bool(results)
2015-10-24 09:20:57 +00:00
2015-10-26 10:50:09 +00:00
class DnsmasqUbusDeviceScanner(UbusDeviceScanner):
"""Implement the Ubus device scanning for the dnsmasq DHCP server."""
def __init__(self, config):
"""Initialize the scanner."""
super().__init__(config)
self.leasefile = None
def _generate_mac2name(self):
if self.leasefile is None:
2021-10-31 17:32:17 +00:00
if result := self.ubus.get_uci_config("dhcp", "dnsmasq"):
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
result = self.ubus.file_read(self.leasefile)
if result:
2020-04-04 20:31:56 +00:00
self.mac2name = {}
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the ubus.file_read()
return
class OdhcpdUbusDeviceScanner(UbusDeviceScanner):
"""Implement the Ubus device scanning for the odhcp DHCP server."""
def _generate_mac2name(self):
2021-10-31 17:32:17 +00:00
if result := self.ubus.get_dhcp_method("ipv4leases"):
2020-04-04 20:31:56 +00:00
self.mac2name = {}
for device in result["device"].values():
2019-07-31 19:25:30 +00:00
for lease in device["leases"]:
mac = lease["mac"] # mac = aabbccddeeff
# Convert it to expected format with colon
2019-07-31 19:25:30 +00:00
mac = ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
self.mac2name[mac.upper()] = lease["hostname"]
else:
# Error, handled in the ubus.get_dhcp_method()
return