core/homeassistant/components/device_tracker/ubus.py

174 lines
5.5 KiB
Python
Raw Normal View History

"""
homeassistant.components.device_tracker.ubus
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Device tracker platform that supports scanning a OpenWRT router for device
presence.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.ubus.html
"""
import logging
import json
from datetime import timedelta
import re
import threading
import requests
from homeassistant.const import CONF_HOST, CONF_USERNAME, CONF_PASSWORD
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle
from homeassistant.components.device_tracker import DOMAIN
# Return cached results if last scan was less then this time ago
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
def get_scanner(hass, config):
""" Validates config and returns a Luci scanner. """
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
return None
scanner = UbusDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
# pylint: disable=too-many-instance-attributes
class UbusDeviceScanner(object):
"""
This class queries a wireless router running OpenWrt firmware
for connected devices. Adapted from Tomato scanner.
Configure your routers' ubus ACL based on following instructions:
http://wiki.openwrt.org/doc/techref/ubus
2015-10-24 09:20:57 +00:00
Read only access will be fine.
2015-10-24 09:20:57 +00:00
To use this class you have to install rpcd-mod-file package
in your OpenWrt router:
opkg install rpcd-mod-file
2015-10-24 09:20:57 +00:00
"""
def __init__(self, config):
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
self.lock = threading.Lock()
self.last_results = {}
self.url = 'http://{}/ubus'.format(host)
2015-10-24 09:20:57 +00:00
self.session_id = _get_session_id(self.url, username, password)
self.hostapd = []
2015-10-24 09:20:57 +00:00
self.leasefile = None
self.mac2name = None
self.success_init = self.session_id is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device ids.
"""
self._update_info()
return self.last_results
def get_device_name(self, device):
""" Returns the name of the given device or None if we don't know. """
with self.lock:
if self.leasefile is None:
2015-10-24 09:20:57 +00:00
result = _req_json_rpc(self.url, self.session_id,
'call', 'uci', 'get',
config="dhcp", type="dnsmasq")
if result:
2015-10-26 10:50:09 +00:00
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
2015-10-24 09:20:57 +00:00
return
if self.mac2name is None:
2015-10-24 09:20:57 +00:00
result = _req_json_rpc(self.url, self.session_id,
'call', 'file', 'read',
path=self.leasefile)
if result:
self.mac2name = dict()
for line in result["data"].splitlines():
2015-10-26 10:50:09 +00:00
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
return
2015-10-24 09:20:57 +00:00
return self.mac2name.get(device.upper(), None)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the Luci router is up to date.
Returns boolean if scanning successful.
"""
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Checking ARP")
2015-10-24 09:20:57 +00:00
if not self.hostapd:
2015-10-24 09:20:57 +00:00
hostapd = _req_json_rpc(self.url, self.session_id,
'list', 'hostapd.*', '')
2015-10-26 10:32:00 +00:00
self.hostapd.extend(hostapd.keys())
2015-10-24 09:20:57 +00:00
self.last_results = []
2015-10-24 09:20:57 +00:00
results = 0
for hostapd in self.hostapd:
2015-10-24 09:20:57 +00:00
result = _req_json_rpc(self.url, self.session_id,
'call', hostapd, 'get_clients')
if result:
2015-10-24 09:20:57 +00:00
results = results + 1
2015-10-26 10:32:00 +00:00
self.last_results.extend(result['clients'].keys())
2015-10-26 10:32:00 +00:00
return bool(results)
2015-10-24 09:20:57 +00:00
2015-10-26 10:50:09 +00:00
def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params):
""" Perform one JSON RPC operation. """
2015-10-24 09:20:57 +00:00
data = json.dumps({"jsonrpc": "2.0",
"id": 1,
"method": rpcmethod,
"params": [session_id,
subsystem,
method,
2015-10-26 10:50:09 +00:00
params]})
try:
res = requests.post(url, data=data, timeout=5)
2015-10-24 09:20:57 +00:00
except requests.exceptions.Timeout:
return
2015-10-24 09:20:57 +00:00
if res.status_code == 200:
response = res.json()
2015-10-24 09:20:57 +00:00
2015-10-26 10:50:09 +00:00
if rpcmethod == "call":
return response["result"][1]
else:
return response["result"]
2015-10-24 09:20:57 +00:00
def _get_session_id(url, username, password):
""" Get authentication token for the given host+username+password. """
2015-10-24 09:20:57 +00:00
res = _req_json_rpc(url, "00000000000000000000000000000000", 'call',
'session', 'login', username=username,
password=password)
return res["ubus_rpc_session"]