Revert nmap_tracker to 2021.6 version (#52573)

* Revert nmap_tracker to 2021.6 version

- Its unlikely we will be able to solve #52565 before release

* hassfest
pull/52627/head
J. Nick Koston 2021-07-06 11:28:23 -05:00 committed by Franck Nijhof
parent 746a52bb27
commit 40d9541d9b
No known key found for this signature in database
GPG Key ID: D62583BA8AB11CA3
11 changed files with 106 additions and 1129 deletions

View File

@ -691,8 +691,7 @@ omit =
homeassistant/components/niko_home_control/light.py
homeassistant/components/nilu/air_quality.py
homeassistant/components/nissan_leaf/*
homeassistant/components/nmap_tracker/__init__.py
homeassistant/components/nmap_tracker/device_tracker.py
homeassistant/components/nmap_tracker/*
homeassistant/components/nmbs/sensor.py
homeassistant/components/notion/__init__.py
homeassistant/components/notion/binary_sensor.py

View File

@ -332,7 +332,6 @@ homeassistant/components/nextcloud/* @meichthys
homeassistant/components/nightscout/* @marciogranzotto
homeassistant/components/nilu/* @hfurubotten
homeassistant/components/nissan_leaf/* @filcole
homeassistant/components/nmap_tracker/* @bdraco
homeassistant/components/nmbs/* @thibmaek
homeassistant/components/no_ip/* @fabaff
homeassistant/components/noaa_tides/* @jdelaney72

View File

@ -1,395 +1 @@
"""The Nmap Tracker integration."""
from __future__ import annotations
import asyncio
import contextlib
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import aiohttp
from getmac import get_mac_address
from mac_vendor_lookup import AsyncMacLookup
from nmap import PortScanner, PortScannerError
from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import CoreState, HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
import homeassistant.util.dt as dt_util
from .const import (
CONF_HOME_INTERVAL,
CONF_OPTIONS,
DEFAULT_TRACK_NEW_DEVICES,
DOMAIN,
NMAP_TRACKED_DEVICES,
PLATFORMS,
TRACKER_SCAN_INTERVAL,
)
# Some version of nmap will fail with 'Assertion failed: htn.toclock_running == true (Target.cc: stopTimeOutClock: 503)\n'
NMAP_TRANSIENT_FAILURE = "Assertion failed: htn.toclock_running == true"
MAX_SCAN_ATTEMPTS = 16
OFFLINE_SCANS_TO_MARK_UNAVAILABLE = 3
def short_hostname(hostname):
"""Return the first part of the hostname."""
if hostname is None:
return None
return hostname.split(".")[0]
def human_readable_name(hostname, vendor, mac_address):
"""Generate a human readable name."""
if hostname:
return short_hostname(hostname)
if vendor:
return f"{vendor} {mac_address[-8:]}"
return f"Nmap Tracker {mac_address}"
@dataclass
class NmapDevice:
"""Class for keeping track of an nmap tracked device."""
mac_address: str
hostname: str
name: str
ipv4: str
manufacturer: str
reason: str
last_update: datetime.datetime
offline_scans: int
class NmapTrackedDevices:
"""Storage class for all nmap trackers."""
def __init__(self) -> None:
"""Initialize the data."""
self.tracked: dict = {}
self.ipv4_last_mac: dict = {}
self.config_entry_owner: dict = {}
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Nmap Tracker from a config entry."""
domain_data = hass.data.setdefault(DOMAIN, {})
devices = domain_data.setdefault(NMAP_TRACKED_DEVICES, NmapTrackedDevices())
scanner = domain_data[entry.entry_id] = NmapDeviceScanner(hass, entry, devices)
await scanner.async_setup()
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry):
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
_async_untrack_devices(hass, entry)
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
@callback
def _async_untrack_devices(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Remove tracking for devices owned by this config entry."""
devices = hass.data[DOMAIN][NMAP_TRACKED_DEVICES]
remove_mac_addresses = [
mac_address
for mac_address, entry_id in devices.config_entry_owner.items()
if entry_id == entry.entry_id
]
for mac_address in remove_mac_addresses:
if device := devices.tracked.pop(mac_address, None):
devices.ipv4_last_mac.pop(device.ipv4, None)
del devices.config_entry_owner[mac_address]
def signal_device_update(mac_address) -> str:
"""Signal specific per nmap tracker entry to signal updates in device."""
return f"{DOMAIN}-device-update-{mac_address}"
class NmapDeviceScanner:
"""This class scans for devices using nmap."""
def __init__(self, hass, entry, devices):
"""Initialize the scanner."""
self.devices = devices
self.home_interval = None
self._hass = hass
self._entry = entry
self._scan_lock = None
self._stopping = False
self._scanner = None
self._entry_id = entry.entry_id
self._hosts = None
self._options = None
self._exclude = None
self._scan_interval = None
self._track_new_devices = None
self._known_mac_addresses = {}
self._finished_first_scan = False
self._last_results = []
self._mac_vendor_lookup = None
async def async_setup(self):
"""Set up the tracker."""
config = self._entry.options
self._track_new_devices = config.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW_DEVICES)
self._scan_interval = timedelta(
seconds=config.get(CONF_SCAN_INTERVAL, TRACKER_SCAN_INTERVAL)
)
hosts_list = cv.ensure_list_csv(config[CONF_HOSTS])
self._hosts = [host for host in hosts_list if host != ""]
excludes_list = cv.ensure_list_csv(config[CONF_EXCLUDE])
self._exclude = [exclude for exclude in excludes_list if exclude != ""]
self._options = config[CONF_OPTIONS]
self.home_interval = timedelta(
minutes=cv.positive_int(config[CONF_HOME_INTERVAL])
)
self._scan_lock = asyncio.Lock()
if self._hass.state == CoreState.running:
await self._async_start_scanner()
return
self._entry.async_on_unload(
self._hass.bus.async_listen(
EVENT_HOMEASSISTANT_STARTED, self._async_start_scanner
)
)
registry = er.async_get(self._hass)
self._known_mac_addresses = {
entry.unique_id: entry.original_name
for entry in registry.entities.values()
if entry.config_entry_id == self._entry_id
}
@property
def signal_device_new(self) -> str:
"""Signal specific per nmap tracker entry to signal new device."""
return f"{DOMAIN}-device-new-{self._entry_id}"
@property
def signal_device_missing(self) -> str:
"""Signal specific per nmap tracker entry to signal a missing device."""
return f"{DOMAIN}-device-missing-{self._entry_id}"
@callback
def _async_get_vendor(self, mac_address):
"""Lookup the vendor."""
oui = self._mac_vendor_lookup.sanitise(mac_address)[:6]
return self._mac_vendor_lookup.prefixes.get(oui)
@callback
def _async_stop(self):
"""Stop the scanner."""
self._stopping = True
async def _async_start_scanner(self, *_):
"""Start the scanner."""
self._entry.async_on_unload(self._async_stop)
self._entry.async_on_unload(
async_track_time_interval(
self._hass,
self._async_scan_devices,
self._scan_interval,
)
)
self._mac_vendor_lookup = AsyncMacLookup()
with contextlib.suppress((asyncio.TimeoutError, aiohttp.ClientError)):
# We don't care of this fails since its only
# improves the data when we don't have it from nmap
await self._mac_vendor_lookup.load_vendors()
self._hass.async_create_task(self._async_scan_devices())
def _build_options(self):
"""Build the command line and strip out last results that do not need to be updated."""
options = self._options
if self.home_interval:
boundary = dt_util.now() - self.home_interval
last_results = [
device for device in self._last_results if device.last_update > boundary
]
if last_results:
exclude_hosts = self._exclude + [device.ipv4 for device in last_results]
else:
exclude_hosts = self._exclude
else:
last_results = []
exclude_hosts = self._exclude
if exclude_hosts:
options += f" --exclude {','.join(exclude_hosts)}"
# Report reason
if "--reason" not in options:
options += " --reason"
# Report down hosts
if "-v" not in options:
options += " -v"
self._last_results = last_results
return options
async def _async_scan_devices(self, *_):
"""Scan devices and dispatch."""
if self._scan_lock.locked():
_LOGGER.debug(
"Nmap scanning is taking longer than the scheduled interval: %s",
TRACKER_SCAN_INTERVAL,
)
return
async with self._scan_lock:
try:
await self._async_run_nmap_scan()
except PortScannerError as ex:
_LOGGER.error("Nmap scanning failed: %s", ex)
if not self._finished_first_scan:
self._finished_first_scan = True
await self._async_mark_missing_devices_as_not_home()
async def _async_mark_missing_devices_as_not_home(self):
# After all config entries have finished their first
# scan we mark devices that were not found as not_home
# from unavailable
now = dt_util.now()
for mac_address, original_name in self._known_mac_addresses.items():
if mac_address in self.devices.tracked:
continue
self.devices.config_entry_owner[mac_address] = self._entry_id
self.devices.tracked[mac_address] = NmapDevice(
mac_address,
None,
original_name,
None,
self._async_get_vendor(mac_address),
"Device not found in initial scan",
now,
1,
)
async_dispatcher_send(self._hass, self.signal_device_missing, mac_address)
def _run_nmap_scan(self):
"""Run nmap and return the result."""
options = self._build_options()
if not self._scanner:
self._scanner = PortScanner()
_LOGGER.debug("Scanning %s with args: %s", self._hosts, options)
for attempt in range(MAX_SCAN_ATTEMPTS):
try:
result = self._scanner.scan(
hosts=" ".join(self._hosts),
arguments=options,
timeout=TRACKER_SCAN_INTERVAL * 10,
)
break
except PortScannerError as ex:
if attempt < (MAX_SCAN_ATTEMPTS - 1) and NMAP_TRANSIENT_FAILURE in str(
ex
):
_LOGGER.debug("Nmap saw transient error %s", NMAP_TRANSIENT_FAILURE)
continue
raise
_LOGGER.debug(
"Finished scanning %s with args: %s",
self._hosts,
options,
)
return result
@callback
def _async_increment_device_offline(self, ipv4, reason):
"""Mark an IP offline."""
if not (formatted_mac := self.devices.ipv4_last_mac.get(ipv4)):
return
if not (device := self.devices.tracked.get(formatted_mac)):
# Device was unloaded
return
device.offline_scans += 1
if device.offline_scans < OFFLINE_SCANS_TO_MARK_UNAVAILABLE:
return
device.reason = reason
async_dispatcher_send(self._hass, signal_device_update(formatted_mac), False)
del self.devices.ipv4_last_mac[ipv4]
async def _async_run_nmap_scan(self):
"""Scan the network for devices and dispatch events."""
result = await self._hass.async_add_executor_job(self._run_nmap_scan)
if self._stopping:
return
devices = self.devices
entry_id = self._entry_id
now = dt_util.now()
for ipv4, info in result["scan"].items():
status = info["status"]
reason = status["reason"]
if status["state"] != "up":
self._async_increment_device_offline(ipv4, reason)
continue
# Mac address only returned if nmap ran as root
mac = info["addresses"].get("mac") or get_mac_address(ip=ipv4)
if mac is None:
self._async_increment_device_offline(ipv4, "No MAC address found")
_LOGGER.info("No MAC address found for %s", ipv4)
continue
formatted_mac = format_mac(mac)
new = formatted_mac not in devices.tracked
if (
new
and not self._track_new_devices
and formatted_mac not in devices.tracked
and formatted_mac not in self._known_mac_addresses
):
continue
if (
devices.config_entry_owner.setdefault(formatted_mac, entry_id)
!= entry_id
):
continue
hostname = info["hostnames"][0]["name"] if info["hostnames"] else ipv4
vendor = info.get("vendor", {}).get(mac) or self._async_get_vendor(mac)
name = human_readable_name(hostname, vendor, mac)
device = NmapDevice(
formatted_mac, hostname, name, ipv4, vendor, reason, now, 0
)
devices.tracked[formatted_mac] = device
devices.ipv4_last_mac[ipv4] = formatted_mac
self._last_results.append(device)
if new:
async_dispatcher_send(self._hass, self.signal_device_new, formatted_mac)
else:
async_dispatcher_send(
self._hass, signal_device_update(formatted_mac), True
)
"""The nmap_tracker component."""

View File

@ -1,223 +0,0 @@
"""Config flow for Nmap Tracker integration."""
from __future__ import annotations
from ipaddress import ip_address, ip_network, summarize_address_range
from typing import Any
import ifaddr
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW,
)
from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
import homeassistant.helpers.config_validation as cv
from homeassistant.util import get_local_ip
from .const import (
CONF_HOME_INTERVAL,
CONF_OPTIONS,
DEFAULT_OPTIONS,
DEFAULT_TRACK_NEW_DEVICES,
DOMAIN,
TRACKER_SCAN_INTERVAL,
)
DEFAULT_NETWORK_PREFIX = 24
def get_network():
"""Search adapters for the network."""
adapters = ifaddr.get_adapters()
local_ip = get_local_ip()
network_prefix = (
get_ip_prefix_from_adapters(local_ip, adapters) or DEFAULT_NETWORK_PREFIX
)
return str(ip_network(f"{local_ip}/{network_prefix}", False))
def get_ip_prefix_from_adapters(local_ip, adapters):
"""Find the network prefix for an adapter."""
for adapter in adapters:
for ip_cfg in adapter.ips:
if local_ip == ip_cfg.ip:
return ip_cfg.network_prefix
def _normalize_ips_and_network(hosts_str):
"""Check if a list of hosts are all ips or ip networks."""
normalized_hosts = []
hosts = [host for host in cv.ensure_list_csv(hosts_str) if host != ""]
for host in sorted(hosts):
try:
start, end = host.split("-", 1)
if "." not in end:
ip_1, ip_2, ip_3, _ = start.split(".", 3)
end = ".".join([ip_1, ip_2, ip_3, end])
summarize_address_range(ip_address(start), ip_address(end))
except ValueError:
pass
else:
normalized_hosts.append(host)
continue
try:
ip_addr = ip_address(host)
except ValueError:
pass
else:
normalized_hosts.append(str(ip_addr))
continue
try:
network = ip_network(host)
except ValueError:
return None
else:
normalized_hosts.append(str(network))
return normalized_hosts
def normalize_input(user_input):
"""Validate hosts and exclude are valid."""
errors = {}
normalized_hosts = _normalize_ips_and_network(user_input[CONF_HOSTS])
if not normalized_hosts:
errors[CONF_HOSTS] = "invalid_hosts"
else:
user_input[CONF_HOSTS] = ",".join(normalized_hosts)
normalized_exclude = _normalize_ips_and_network(user_input[CONF_EXCLUDE])
if normalized_exclude is None:
errors[CONF_EXCLUDE] = "invalid_hosts"
else:
user_input[CONF_EXCLUDE] = ",".join(normalized_exclude)
return errors
async def _async_build_schema_with_user_input(hass, user_input, include_options):
hosts = user_input.get(CONF_HOSTS, await hass.async_add_executor_job(get_network))
exclude = user_input.get(
CONF_EXCLUDE, await hass.async_add_executor_job(get_local_ip)
)
schema = {
vol.Required(CONF_HOSTS, default=hosts): str,
vol.Required(
CONF_HOME_INTERVAL, default=user_input.get(CONF_HOME_INTERVAL, 0)
): int,
vol.Optional(CONF_EXCLUDE, default=exclude): str,
vol.Optional(
CONF_OPTIONS, default=user_input.get(CONF_OPTIONS, DEFAULT_OPTIONS)
): str,
}
if include_options:
schema.update(
{
vol.Optional(
CONF_TRACK_NEW,
default=user_input.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW_DEVICES),
): bool,
vol.Optional(
CONF_SCAN_INTERVAL,
default=user_input.get(CONF_SCAN_INTERVAL, TRACKER_SCAN_INTERVAL),
): vol.All(vol.Coerce(int), vol.Range(min=10, max=3600)),
}
)
return vol.Schema(schema)
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a option flow for homekit."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize options flow."""
self.options = dict(config_entry.options)
async def async_step_init(self, user_input=None):
"""Handle options flow."""
errors = {}
if user_input is not None:
errors = normalize_input(user_input)
self.options.update(user_input)
if not errors:
return self.async_create_entry(
title=f"Nmap Tracker {self.options[CONF_HOSTS]}", data=self.options
)
return self.async_show_form(
step_id="init",
data_schema=await _async_build_schema_with_user_input(
self.hass, self.options, True
),
errors=errors,
)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Nmap Tracker."""
VERSION = 1
def __init__(self):
"""Initialize config flow."""
self.options = {}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
errors = {}
if user_input is not None:
if not self._async_is_unique_host_list(user_input):
return self.async_abort(reason="already_configured")
errors = normalize_input(user_input)
self.options.update(user_input)
if not errors:
return self.async_create_entry(
title=f"Nmap Tracker {user_input[CONF_HOSTS]}",
data={},
options=user_input,
)
return self.async_show_form(
step_id="user",
data_schema=await _async_build_schema_with_user_input(
self.hass, self.options, False
),
errors=errors,
)
def _async_is_unique_host_list(self, user_input):
hosts = _normalize_ips_and_network(user_input[CONF_HOSTS])
for entry in self._async_current_entries():
if _normalize_ips_and_network(entry.options[CONF_HOSTS]) == hosts:
return False
return True
async def async_step_import(self, user_input=None):
"""Handle import from yaml."""
if not self._async_is_unique_host_list(user_input):
return self.async_abort(reason="already_configured")
normalize_input(user_input)
return self.async_create_entry(
title=f"Nmap Tracker {user_input[CONF_HOSTS]}", data={}, options=user_input
)
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return OptionsFlowHandler(config_entry)

View File

@ -1,40 +1,29 @@
"""Support for scanning a network with nmap."""
from collections import namedtuple
from datetime import timedelta
import logging
from typing import Callable
from getmac import get_mac_address
from nmap import PortScanner, PortScannerError
import voluptuous as vol
from homeassistant.components.device_tracker import (
DOMAIN as DEVICE_TRACKER_DOMAIN,
PLATFORM_SCHEMA,
SOURCE_TYPE_ROUTER,
)
from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import (
CONF_NEW_DEVICE_DEFAULTS,
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from . import NmapDeviceScanner, short_hostname, signal_device_update
from .const import (
CONF_HOME_INTERVAL,
CONF_OPTIONS,
DEFAULT_OPTIONS,
DEFAULT_TRACK_NEW_DEVICES,
DOMAIN,
TRACKER_SCAN_INTERVAL,
PLATFORM_SCHEMA,
DeviceScanner,
)
from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
# Interval in minutes to exclude devices from a scan while they are home
CONF_HOME_INTERVAL = "home_interval"
CONF_OPTIONS = "scan_options"
DEFAULT_OPTIONS = "-F --host-timeout 5s"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOSTS): cv.ensure_list,
@ -45,164 +34,100 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
async def async_get_scanner(hass, config):
def get_scanner(hass, config):
"""Validate the configuration and return a Nmap scanner."""
validated_config = config[DEVICE_TRACKER_DOMAIN]
return NmapDeviceScanner(config[DOMAIN])
if CONF_SCAN_INTERVAL in validated_config:
scan_interval = validated_config[CONF_SCAN_INTERVAL].total_seconds()
else:
scan_interval = TRACKER_SCAN_INTERVAL
import_config = {
CONF_HOSTS: ",".join(validated_config[CONF_HOSTS]),
CONF_HOME_INTERVAL: validated_config[CONF_HOME_INTERVAL],
CONF_EXCLUDE: ",".join(validated_config[CONF_EXCLUDE]),
CONF_OPTIONS: validated_config[CONF_OPTIONS],
CONF_SCAN_INTERVAL: scan_interval,
CONF_TRACK_NEW: validated_config.get(CONF_NEW_DEVICE_DEFAULTS, {}).get(
CONF_TRACK_NEW, DEFAULT_TRACK_NEW_DEVICES
),
}
Device = namedtuple("Device", ["mac", "name", "ip", "last_update"])
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=import_config,
class NmapDeviceScanner(DeviceScanner):
"""This class scans for devices using nmap."""
exclude = []
def __init__(self, config):
"""Initialize the scanner."""
self.last_results = []
self.hosts = config[CONF_HOSTS]
self.exclude = config[CONF_EXCLUDE]
minutes = config[CONF_HOME_INTERVAL]
self._options = config[CONF_OPTIONS]
self.home_interval = timedelta(minutes=minutes)
_LOGGER.debug("Scanner initialized")
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
_LOGGER.debug("Nmap last results %s", self.last_results)
return [device.mac for device in self.last_results]
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
filter_named = [
result.name for result in self.last_results if result.mac == device
]
if filter_named:
return filter_named[0]
return None
def get_extra_attributes(self, device):
"""Return the IP of the given device."""
filter_ip = next(
(result.ip for result in self.last_results if result.mac == device), None
)
)
return {"ip": filter_ip}
_LOGGER.warning(
"Your Nmap Tracker configuration has been imported into the UI, "
"please remove it from configuration.yaml. "
)
def _update_info(self):
"""Scan the network for devices.
Returns boolean if scanning successful.
"""
_LOGGER.debug("Scanning")
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: Callable
) -> None:
"""Set up device tracker for Nmap Tracker component."""
nmap_tracker = hass.data[DOMAIN][entry.entry_id]
scanner = PortScanner()
@callback
def device_new(mac_address):
"""Signal a new device."""
async_add_entities([NmapTrackerEntity(nmap_tracker, mac_address, True)])
options = self._options
@callback
def device_missing(mac_address):
"""Signal a missing device."""
async_add_entities([NmapTrackerEntity(nmap_tracker, mac_address, False)])
if self.home_interval:
boundary = dt_util.now() - self.home_interval
last_results = [
device for device in self.last_results if device.last_update > boundary
]
if last_results:
exclude_hosts = self.exclude + [device.ip for device in last_results]
else:
exclude_hosts = self.exclude
else:
last_results = []
exclude_hosts = self.exclude
if exclude_hosts:
options += f" --exclude {','.join(exclude_hosts)}"
entry.async_on_unload(
async_dispatcher_connect(hass, nmap_tracker.signal_device_new, device_new)
)
entry.async_on_unload(
async_dispatcher_connect(
hass, nmap_tracker.signal_device_missing, device_missing
)
)
try:
result = scanner.scan(hosts=" ".join(self.hosts), arguments=options)
except PortScannerError:
return False
now = dt_util.now()
for ipv4, info in result["scan"].items():
if info["status"]["state"] != "up":
continue
name = info["hostnames"][0]["name"] if info["hostnames"] else ipv4
# Mac address only returned if nmap ran as root
mac = info["addresses"].get("mac") or get_mac_address(ip=ipv4)
if mac is None:
_LOGGER.info("No MAC address found for %s", ipv4)
continue
last_results.append(Device(mac.upper(), name, ipv4, now))
class NmapTrackerEntity(ScannerEntity):
"""An Nmap Tracker entity."""
self.last_results = last_results
def __init__(
self, nmap_tracker: NmapDeviceScanner, mac_address: str, active: bool
) -> None:
"""Initialize an nmap tracker entity."""
self._mac_address = mac_address
self._nmap_tracker = nmap_tracker
self._tracked = self._nmap_tracker.devices.tracked
self._active = active
@property
def _device(self) -> bool:
"""Get latest device state."""
return self._tracked[self._mac_address]
@property
def is_connected(self) -> bool:
"""Return device status."""
return self._active
@property
def name(self) -> str:
"""Return device name."""
return self._device.name
@property
def unique_id(self) -> str:
"""Return device unique id."""
return self._mac_address
@property
def ip_address(self) -> str:
"""Return the primary ip address of the device."""
return self._device.ipv4
@property
def mac_address(self) -> str:
"""Return the mac address of the device."""
return self._mac_address
@property
def hostname(self) -> str:
"""Return hostname of the device."""
return short_hostname(self._device.hostname)
@property
def source_type(self) -> str:
"""Return tracker source type."""
return SOURCE_TYPE_ROUTER
@property
def device_info(self):
"""Return the device information."""
return {
"connections": {(CONNECTION_NETWORK_MAC, self._mac_address)},
"default_manufacturer": self._device.manufacturer,
"default_name": self.name,
}
@property
def should_poll(self) -> bool:
"""No polling needed."""
return False
@property
def icon(self):
"""Return device icon."""
return "mdi:lan-connect" if self._active else "mdi:lan-disconnect"
@callback
def async_process_update(self, online: bool) -> None:
"""Update device."""
self._active = online
@property
def extra_state_attributes(self):
"""Return the attributes."""
return {
"last_time_reachable": self._device.last_update.isoformat(
timespec="seconds"
),
"reason": self._device.reason,
}
@callback
def async_on_demand_update(self, online: bool):
"""Update state."""
self.async_process_update(online)
self.async_write_ha_state()
async def async_added_to_hass(self):
"""Register state update callback."""
self.async_on_remove(
async_dispatcher_connect(
self.hass,
signal_device_update(self._mac_address),
self.async_on_demand_update,
)
)
_LOGGER.debug("nmap scan successful")
return True

View File

@ -2,13 +2,7 @@
"domain": "nmap_tracker",
"name": "Nmap Tracker",
"documentation": "https://www.home-assistant.io/integrations/nmap_tracker",
"requirements": [
"netmap==0.7.0.2",
"getmac==0.8.2",
"ifaddr==0.1.7",
"mac-vendor-lookup==0.1.11"
],
"codeowners": ["@bdraco"],
"iot_class": "local_polling",
"config_flow": true
"requirements": ["python-nmap==0.6.1", "getmac==0.8.2"],
"codeowners": [],
"iot_class": "local_polling"
}

View File

@ -176,7 +176,6 @@ FLOWS = [
"netatmo",
"nexia",
"nightscout",
"nmap_tracker",
"notion",
"nuheat",
"nuki",

View File

@ -834,7 +834,6 @@ ibmiotf==0.3.4
icmplib==3.0
# homeassistant.components.network
# homeassistant.components.nmap_tracker
ifaddr==0.1.7
# homeassistant.components.iglo
@ -936,9 +935,6 @@ lw12==0.9.2
# homeassistant.components.lyft
lyft_rides==0.2
# homeassistant.components.nmap_tracker
mac-vendor-lookup==0.1.11
# homeassistant.components.magicseaweed
magicseaweed==1.0.3
@ -1017,9 +1013,6 @@ netdata==0.2.0
# homeassistant.components.discovery
netdisco==2.9.0
# homeassistant.components.nmap_tracker
netmap==0.7.0.2
# homeassistant.components.nam
nettigo-air-monitor==1.0.0
@ -1869,6 +1862,9 @@ python-mystrom==1.1.2
# homeassistant.components.nest
python-nest==4.1.0
# homeassistant.components.nmap_tracker
python-nmap==0.6.1
# homeassistant.components.ozw
python-openzwave-mqtt[mqtt-client]==1.4.0

View File

@ -481,7 +481,6 @@ iaqualink==0.3.90
icmplib==3.0
# homeassistant.components.network
# homeassistant.components.nmap_tracker
ifaddr==0.1.7
# homeassistant.components.influxdb
@ -523,9 +522,6 @@ logi_circle==0.2.2
# homeassistant.components.luftdaten
luftdaten==0.6.5
# homeassistant.components.nmap_tracker
mac-vendor-lookup==0.1.11
# homeassistant.components.maxcube
maxcube-api==0.4.3
@ -574,9 +570,6 @@ nessclient==0.9.15
# homeassistant.components.discovery
netdisco==2.9.0
# homeassistant.components.nmap_tracker
netmap==0.7.0.2
# homeassistant.components.nam
nettigo-air-monitor==1.0.0

View File

@ -1 +0,0 @@
"""Tests for the Nmap Tracker integration."""

View File

@ -1,310 +0,0 @@
"""Test the Nmap Tracker config flow."""
from unittest.mock import patch
import pytest
from homeassistant import config_entries, data_entry_flow, setup
from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW,
)
from homeassistant.components.nmap_tracker.const import (
CONF_HOME_INTERVAL,
CONF_OPTIONS,
DEFAULT_OPTIONS,
DOMAIN,
)
from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS
from homeassistant.core import CoreState, HomeAssistant
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
"hosts", ["1.1.1.1", "192.168.1.0/24", "192.168.1.0/24,192.168.2.0/24"]
)
async def test_form(hass: HomeAssistant, hosts: str) -> None:
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
schema_defaults = result["data_schema"]({})
assert CONF_TRACK_NEW not in schema_defaults
assert CONF_SCAN_INTERVAL not in schema_defaults
with patch(
"homeassistant.components.nmap_tracker.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOSTS: hosts,
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
},
)
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["title"] == f"Nmap Tracker {hosts}"
assert result2["data"] == {}
assert result2["options"] == {
CONF_HOSTS: hosts,
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_range(hass: HomeAssistant) -> None:
"""Test we get the form and can take an ip range."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
with patch(
"homeassistant.components.nmap_tracker.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOSTS: "192.168.0.5-12",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
},
)
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["title"] == "Nmap Tracker 192.168.0.5-12"
assert result2["data"] == {}
assert result2["options"] == {
CONF_HOSTS: "192.168.0.5-12",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_invalid_hosts(hass: HomeAssistant) -> None:
"""Test invalid hosts passed in."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOSTS: "not an ip block",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "",
},
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["errors"] == {CONF_HOSTS: "invalid_hosts"}
async def test_form_already_configured(hass: HomeAssistant) -> None:
"""Test duplicate host list."""
await setup.async_setup_component(hass, "persistent_notification", {})
config_entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
CONF_HOSTS: "192.168.0.0/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
},
)
config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOSTS: "192.168.0.0/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "",
},
)
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "already_configured"
async def test_form_invalid_excludes(hass: HomeAssistant) -> None:
"""Test invalid excludes passed in."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOSTS: "3.3.3.3",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "not an exclude",
},
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["errors"] == {CONF_EXCLUDE: "invalid_hosts"}
async def test_options_flow(hass: HomeAssistant) -> None:
"""Test we can edit options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
CONF_HOSTS: "192.168.1.0/24",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
},
)
config_entry.add_to_hass(hass)
hass.state = CoreState.stopped
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(config_entry.entry_id)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
assert result["data_schema"]({}) == {
CONF_EXCLUDE: "4.4.4.4",
CONF_HOME_INTERVAL: 3,
CONF_HOSTS: "192.168.1.0/24",
CONF_SCAN_INTERVAL: 120,
CONF_OPTIONS: "-F --host-timeout 5s",
CONF_TRACK_NEW: True,
}
with patch(
"homeassistant.components.nmap_tracker.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_HOSTS: "192.168.1.0/24, 192.168.2.0/24",
CONF_HOME_INTERVAL: 5,
CONF_OPTIONS: "-sn",
CONF_EXCLUDE: "4.4.4.4, 5.5.5.5",
CONF_SCAN_INTERVAL: 10,
CONF_TRACK_NEW: False,
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {
CONF_HOSTS: "192.168.1.0/24,192.168.2.0/24",
CONF_HOME_INTERVAL: 5,
CONF_OPTIONS: "-sn",
CONF_EXCLUDE: "4.4.4.4,5.5.5.5",
CONF_SCAN_INTERVAL: 10,
CONF_TRACK_NEW: False,
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_import(hass: HomeAssistant) -> None:
"""Test we can import from yaml."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.nmap_tracker.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_HOSTS: "1.2.3.4/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4, 6.4.3.2",
CONF_SCAN_INTERVAL: 2000,
CONF_TRACK_NEW: False,
},
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == "Nmap Tracker 1.2.3.4/20"
assert result["data"] == {}
assert result["options"] == {
CONF_HOSTS: "1.2.3.4/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4,6.4.3.2",
CONF_SCAN_INTERVAL: 2000,
CONF_TRACK_NEW: False,
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_import_aborts_if_matching(hass: HomeAssistant) -> None:
"""Test we can import from yaml."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
CONF_HOSTS: "192.168.0.0/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4",
},
)
config_entry.add_to_hass(hass)
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_HOSTS: "192.168.0.0/20",
CONF_HOME_INTERVAL: 3,
CONF_OPTIONS: DEFAULT_OPTIONS,
CONF_EXCLUDE: "4.4.4.4, 6.4.3.2",
},
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "already_configured"