core/homeassistant/components/nmap_tracker/device_tracker.py

135 lines
4.1 KiB
Python
Raw Normal View History

"""Support for scanning a network with nmap."""
2016-02-19 05:27:50 +00:00
from collections import namedtuple
from datetime import timedelta
import logging
2014-12-15 05:29:36 +00:00
from getmac import get_mac_address
from nmap import PortScanner, PortScannerError
import voluptuous as vol
from homeassistant.components.device_tracker import (
2019-07-31 19:25:30 +00:00
DOMAIN,
PLATFORM_SCHEMA,
DeviceScanner,
)
2014-12-15 05:29:36 +00:00
from homeassistant.const import CONF_HOSTS
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
2014-12-15 05:29:36 +00:00
_LOGGER = logging.getLogger(__name__)
2019-07-31 19:25:30 +00:00
CONF_EXCLUDE = "exclude"
2016-07-31 20:47:46 +00:00
# Interval in minutes to exclude devices from a scan while they are home
2019-07-31 19:25:30 +00:00
CONF_HOME_INTERVAL = "home_interval"
CONF_OPTIONS = "scan_options"
DEFAULT_OPTIONS = "-F --host-timeout 5s"
2015-08-09 04:22:34 +00:00
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOSTS): cv.ensure_list,
vol.Required(CONF_HOME_INTERVAL, default=0): cv.positive_int,
vol.Optional(CONF_EXCLUDE, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_OPTIONS, default=DEFAULT_OPTIONS): cv.string,
}
)
2014-12-15 05:29:36 +00:00
def get_scanner(hass, config):
2016-03-07 20:18:53 +00:00
"""Validate the configuration and return a Nmap scanner."""
return NmapDeviceScanner(config[DOMAIN])
2014-12-15 05:29:36 +00:00
2016-11-19 05:47:59 +00:00
2019-07-31 19:25:30 +00:00
Device = namedtuple("Device", ["mac", "name", "ip", "last_update"])
2014-12-15 05:29:36 +00:00
class NmapDeviceScanner(DeviceScanner):
2016-03-07 17:12:06 +00:00
"""This class scans for devices using nmap."""
2016-03-07 20:18:53 +00:00
exclude = []
2014-12-15 05:29:36 +00:00
def __init__(self, config):
2016-03-07 20:18:53 +00:00
"""Initialize the scanner."""
2014-12-15 05:29:36 +00:00
self.last_results = []
self.hosts = config[CONF_HOSTS]
self.exclude = config[CONF_EXCLUDE]
minutes = config[CONF_HOME_INTERVAL]
self._options = config[CONF_OPTIONS]
self.home_interval = timedelta(minutes=minutes)
2014-12-15 05:29:36 +00:00
_LOGGER.debug("Scanner initialized")
2014-12-15 05:29:36 +00:00
def scan_devices(self):
2016-03-07 20:18:53 +00:00
"""Scan for new devices and return a list with found device IDs."""
2014-12-15 05:29:36 +00:00
self._update_info()
_LOGGER.debug("Nmap last results %s", self.last_results)
2014-12-15 05:29:36 +00:00
return [device.mac for device in self.last_results]
def get_device_name(self, device):
2016-03-07 20:18:53 +00:00
"""Return the name of the given device or None if we don't know."""
2019-07-31 19:25:30 +00:00
filter_named = [
result.name for result in self.last_results if result.mac == device
]
2014-12-15 05:29:36 +00:00
if filter_named:
return filter_named[0]
return None
2014-12-15 05:29:36 +00:00
def get_extra_attributes(self, device):
2018-04-18 10:43:55 +00:00
"""Return the IP of the given device."""
2019-07-31 19:25:30 +00:00
filter_ip = next(
(result.ip for result in self.last_results if result.mac == device), None
)
return {"ip": filter_ip}
2014-12-15 05:29:36 +00:00
def _update_info(self):
2016-03-07 20:18:53 +00:00
"""Scan the network for devices.
Returns boolean if scanning successful.
"""
_LOGGER.debug("Scanning...")
scanner = PortScanner()
options = self._options
2015-10-09 04:45:51 +00:00
if self.home_interval:
2015-10-09 04:45:51 +00:00
boundary = dt_util.now() - self.home_interval
2019-07-31 19:25:30 +00:00
last_results = [
device for device in self.last_results if device.last_update > boundary
]
2015-10-09 04:45:51 +00:00
if last_results:
2019-07-31 19:25:30 +00:00
exclude_hosts = self.exclude + [device.ip for device in last_results]
else:
exclude_hosts = self.exclude
2015-10-09 04:45:51 +00:00
else:
last_results = []
exclude_hosts = self.exclude
if exclude_hosts:
2019-07-31 19:25:30 +00:00
options += " --exclude {}".format(",".join(exclude_hosts))
try:
2019-07-31 19:25:30 +00:00
result = scanner.scan(hosts=" ".join(self.hosts), arguments=options)
except PortScannerError:
return False
now = dt_util.now()
2019-07-31 19:25:30 +00:00
for ipv4, info in result["scan"].items():
if info["status"]["state"] != "up":
continue
2019-07-31 19:25:30 +00:00
name = info["hostnames"][0]["name"] if info["hostnames"] else ipv4
# Mac address only returned if nmap ran as root
2019-07-31 19:25:30 +00:00
mac = info["addresses"].get("mac") or get_mac_address(ip=ipv4)
if mac is None:
2019-07-31 19:25:30 +00:00
_LOGGER.info("No MAC address found for %s", ipv4)
continue
2015-10-09 04:45:51 +00:00
last_results.append(Device(mac.upper(), name, ipv4, now))
self.last_results = last_results
_LOGGER.debug("nmap scan successful")
return True