135 lines
4.1 KiB
Python
135 lines
4.1 KiB
Python
"""Support for scanning a network with nmap."""
|
|
from collections import namedtuple
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
from getmac import get_mac_address
|
|
from nmap import PortScanner, PortScannerError
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.device_tracker import (
|
|
DOMAIN,
|
|
PLATFORM_SCHEMA,
|
|
DeviceScanner,
|
|
)
|
|
from homeassistant.const import CONF_HOSTS
|
|
import homeassistant.helpers.config_validation as cv
|
|
import homeassistant.util.dt as dt_util
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
CONF_EXCLUDE = "exclude"
|
|
# Interval in minutes to exclude devices from a scan while they are home
|
|
CONF_HOME_INTERVAL = "home_interval"
|
|
CONF_OPTIONS = "scan_options"
|
|
DEFAULT_OPTIONS = "-F --host-timeout 5s"
|
|
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Required(CONF_HOSTS): cv.ensure_list,
|
|
vol.Required(CONF_HOME_INTERVAL, default=0): cv.positive_int,
|
|
vol.Optional(CONF_EXCLUDE, default=[]): vol.All(cv.ensure_list, [cv.string]),
|
|
vol.Optional(CONF_OPTIONS, default=DEFAULT_OPTIONS): cv.string,
|
|
}
|
|
)
|
|
|
|
|
|
def get_scanner(hass, config):
|
|
"""Validate the configuration and return a Nmap scanner."""
|
|
return NmapDeviceScanner(config[DOMAIN])
|
|
|
|
|
|
Device = namedtuple("Device", ["mac", "name", "ip", "last_update"])
|
|
|
|
|
|
class NmapDeviceScanner(DeviceScanner):
|
|
"""This class scans for devices using nmap."""
|
|
|
|
exclude = []
|
|
|
|
def __init__(self, config):
|
|
"""Initialize the scanner."""
|
|
self.last_results = []
|
|
|
|
self.hosts = config[CONF_HOSTS]
|
|
self.exclude = config[CONF_EXCLUDE]
|
|
minutes = config[CONF_HOME_INTERVAL]
|
|
self._options = config[CONF_OPTIONS]
|
|
self.home_interval = timedelta(minutes=minutes)
|
|
|
|
_LOGGER.debug("Scanner initialized")
|
|
|
|
def scan_devices(self):
|
|
"""Scan for new devices and return a list with found device IDs."""
|
|
self._update_info()
|
|
|
|
_LOGGER.debug("Nmap last results %s", self.last_results)
|
|
|
|
return [device.mac for device in self.last_results]
|
|
|
|
def get_device_name(self, device):
|
|
"""Return the name of the given device or None if we don't know."""
|
|
filter_named = [
|
|
result.name for result in self.last_results if result.mac == device
|
|
]
|
|
|
|
if filter_named:
|
|
return filter_named[0]
|
|
return None
|
|
|
|
def get_extra_attributes(self, device):
|
|
"""Return the IP of the given device."""
|
|
filter_ip = next(
|
|
(result.ip for result in self.last_results if result.mac == device), None
|
|
)
|
|
return {"ip": filter_ip}
|
|
|
|
def _update_info(self):
|
|
"""Scan the network for devices.
|
|
|
|
Returns boolean if scanning successful.
|
|
"""
|
|
_LOGGER.debug("Scanning...")
|
|
|
|
scanner = PortScanner()
|
|
|
|
options = self._options
|
|
|
|
if self.home_interval:
|
|
boundary = dt_util.now() - self.home_interval
|
|
last_results = [
|
|
device for device in self.last_results if device.last_update > boundary
|
|
]
|
|
if last_results:
|
|
exclude_hosts = self.exclude + [device.ip for device in last_results]
|
|
else:
|
|
exclude_hosts = self.exclude
|
|
else:
|
|
last_results = []
|
|
exclude_hosts = self.exclude
|
|
if exclude_hosts:
|
|
options += f" --exclude {','.join(exclude_hosts)}"
|
|
|
|
try:
|
|
result = scanner.scan(hosts=" ".join(self.hosts), arguments=options)
|
|
except PortScannerError:
|
|
return False
|
|
|
|
now = dt_util.now()
|
|
for ipv4, info in result["scan"].items():
|
|
if info["status"]["state"] != "up":
|
|
continue
|
|
name = info["hostnames"][0]["name"] if info["hostnames"] else ipv4
|
|
# Mac address only returned if nmap ran as root
|
|
mac = info["addresses"].get("mac") or get_mac_address(ip=ipv4)
|
|
if mac is None:
|
|
_LOGGER.info("No MAC address found for %s", ipv4)
|
|
continue
|
|
last_results.append(Device(mac.upper(), name, ipv4, now))
|
|
|
|
self.last_results = last_results
|
|
|
|
_LOGGER.debug("nmap scan successful")
|
|
return True
|