Add new OpenWRT presence detection routine based on ubus instead of luci
parent
7801489149
commit
29c9c5a7ec
|
@ -0,0 +1,171 @@
|
|||
"""
|
||||
homeassistant.components.device_tracker.ubus
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Device tracker platform that supports scanning a OpenWRT router for device
|
||||
presence.
|
||||
|
||||
For more details about this platform, please refer to the documentation at
|
||||
https://home-assistant.io/components/device_tracker.ubus.html
|
||||
"""
|
||||
import logging
|
||||
import json
|
||||
from datetime import timedelta
|
||||
import re
|
||||
import threading
|
||||
import requests
|
||||
|
||||
from homeassistant.const import CONF_HOST, CONF_USERNAME, CONF_PASSWORD
|
||||
from homeassistant.helpers import validate_config
|
||||
from homeassistant.util import Throttle
|
||||
from homeassistant.components.device_tracker import DOMAIN
|
||||
|
||||
# Return cached results if last scan was less then this time ago
|
||||
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_scanner(hass, config):
|
||||
""" Validates config and returns a Luci scanner. """
|
||||
if not validate_config(config,
|
||||
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
|
||||
_LOGGER):
|
||||
return None
|
||||
|
||||
scanner = UbusDeviceScanner(config[DOMAIN])
|
||||
|
||||
return scanner if scanner.success_init else None
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
class UbusDeviceScanner(object):
|
||||
"""
|
||||
This class queries a wireless router running OpenWrt firmware
|
||||
for connected devices. Adapted from Tomato scanner.
|
||||
|
||||
Configure your routers' ubus ACL based on following instructions:
|
||||
|
||||
http://wiki.openwrt.org/doc/techref/ubus
|
||||
|
||||
Read only access will be fine.
|
||||
|
||||
To use this class you have to install rpcd-mod-file package in your OpenWrt router:
|
||||
|
||||
opkg install rpcd-mod-file
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
host = config[CONF_HOST]
|
||||
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
|
||||
|
||||
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
|
||||
self.lock = threading.Lock()
|
||||
self.last_results = {}
|
||||
self.url = 'http://{}/ubus'.format(host)
|
||||
|
||||
self.session_id= _get_session_id(self.url, username, password)
|
||||
self.hostapd = []
|
||||
self.leasefile = None
|
||||
self.mac2name = None
|
||||
self.success_init = self.session_id is not None
|
||||
|
||||
def scan_devices(self):
|
||||
"""
|
||||
Scans for new devices and return a list containing found device ids.
|
||||
"""
|
||||
|
||||
self._update_info()
|
||||
|
||||
return self.last_results
|
||||
|
||||
def get_device_name(self, device):
|
||||
""" Returns the name of the given device or None if we don't know. """
|
||||
|
||||
with self.lock:
|
||||
if self.leasefile is None:
|
||||
result = _req_json_rpc(self.url, self.session_id, 'call', 'uci', 'get', config="dhcp", type="dnsmasq")
|
||||
if result:
|
||||
self.leasefile=next (iter (result["values"].values()))["leasefile"]
|
||||
else:
|
||||
return
|
||||
|
||||
if self.mac2name is None:
|
||||
result = _req_json_rpc(self.url, self.session_id, 'call', 'file', 'read', path=self.leasefile)
|
||||
|
||||
if result:
|
||||
self.mac2name = dict()
|
||||
for line in result["data"].splitlines():
|
||||
[time, mac, ip, name, lid] = line.split(" ")
|
||||
self.mac2name[mac.upper()] = name
|
||||
else:
|
||||
# Error, handled in the _req_json_rpc
|
||||
return
|
||||
|
||||
return self.mac2name.get(device.upper(), None)
|
||||
|
||||
@Throttle(MIN_TIME_BETWEEN_SCANS)
|
||||
def _update_info(self):
|
||||
"""
|
||||
Ensures the information from the Luci router is up to date.
|
||||
Returns boolean if scanning successful.
|
||||
"""
|
||||
if not self.success_init:
|
||||
return False
|
||||
|
||||
with self.lock:
|
||||
_LOGGER.info("Checking ARP")
|
||||
|
||||
if not self.hostapd:
|
||||
hostapd = _req_json_rpc(self.url, self.session_id, 'list', 'hostapd.*', '')
|
||||
for key in hostapd.keys():
|
||||
self.hostapd.append(key)
|
||||
|
||||
self.last_results = []
|
||||
results = 0
|
||||
for hostapd in self.hostapd:
|
||||
result = _req_json_rpc(self.url, self.session_id, 'call', hostapd, 'get_clients')
|
||||
|
||||
if result:
|
||||
results = results + 1
|
||||
for key in result["clients"].keys():
|
||||
self.last_results.append(key)
|
||||
|
||||
if results:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params):
|
||||
""" Perform one JSON RPC operation. """
|
||||
|
||||
data = json.dumps({ "jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": rpcmethod,
|
||||
"params": [ session_id,
|
||||
subsystem,
|
||||
method,
|
||||
params]
|
||||
})
|
||||
|
||||
try:
|
||||
res = requests.post(url, data=data, timeout=5)
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
return
|
||||
|
||||
if res.status_code == 200:
|
||||
response = res.json()
|
||||
|
||||
if (rpcmethod == "call"):
|
||||
return response["result"][1]
|
||||
else:
|
||||
return response["result"]
|
||||
|
||||
def _get_session_id(url, username, password):
|
||||
""" Get authentication token for the given host+username+password. """
|
||||
res = _req_json_rpc(url, "00000000000000000000000000000000", 'call', 'session', 'login', username=username, password=password)
|
||||
return res["ubus_rpc_session"]
|
||||
|
||||
|
||||
# root@dom:~# ubus call uci get '{ "config": "dhcp", "type": "dnsmasq" }'
|
Loading…
Reference in New Issue