core/homeassistant/components/unifi_direct/device_tracker.py

130 lines
4.0 KiB
Python

"""Support for Unifi AP direct access."""
import logging
import json
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME,
CONF_PORT)
_LOGGER = logging.getLogger(__name__)
DEFAULT_SSH_PORT = 22
UNIFI_COMMAND = 'mca-dump | tr -d "\n"'
UNIFI_SSID_TABLE = "vap_table"
UNIFI_CLIENT_TABLE = "sta_table"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_SSH_PORT): cv.port
})
def get_scanner(hass, config):
"""Validate the configuration and return a Unifi direct scanner."""
scanner = UnifiDeviceScanner(config[DOMAIN])
if not scanner.connected:
return False
return scanner
class UnifiDeviceScanner(DeviceScanner):
"""This class queries Unifi wireless access point."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.port = config[CONF_PORT]
self.ssh = None
self.connected = False
self.last_results = {}
self._connect()
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
result = _response_to_json(self._get_update())
if result:
self.last_results = result
return self.last_results.keys()
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
hostname = next((
value.get('hostname') for key, value in self.last_results.items()
if key.upper() == device.upper()), None)
if hostname is not None:
hostname = str(hostname)
return hostname
def _connect(self):
"""Connect to the Unifi AP SSH server."""
from pexpect import pxssh, exceptions
self.ssh = pxssh.pxssh()
try:
self.ssh.login(self.host, self.username,
password=self.password, port=self.port)
self.connected = True
except exceptions.EOF:
_LOGGER.error("Connection refused. SSH enabled?")
self._disconnect()
def _disconnect(self):
"""Disconnect the current SSH connection."""
try:
self.ssh.logout()
except Exception: # pylint: disable=broad-except
pass
finally:
self.ssh = None
self.connected = False
def _get_update(self):
from pexpect import pxssh, exceptions
try:
if not self.connected:
self._connect()
# If we still aren't connected at this point
# don't try to send anything to the AP.
if not self.connected:
return None
self.ssh.sendline(UNIFI_COMMAND)
self.ssh.prompt()
return self.ssh.before
except pxssh.ExceptionPxssh as err:
_LOGGER.error("Unexpected SSH error: %s", str(err))
self._disconnect()
return None
except (AssertionError, exceptions.EOF) as err:
_LOGGER.error("Connection to AP unavailable: %s", str(err))
self._disconnect()
return None
def _response_to_json(response):
try:
json_response = json.loads(str(response)[31:-1].replace("\\", ""))
_LOGGER.debug(str(json_response))
ssid_table = json_response.get(UNIFI_SSID_TABLE)
active_clients = {}
for ssid in ssid_table:
client_table = ssid.get(UNIFI_CLIENT_TABLE)
for client in client_table:
active_clients[client.get("mac")] = client
return active_clients
except (ValueError, TypeError):
_LOGGER.error("Failed to decode response from AP.")
return {}