core/homeassistant/components/fortios/device_tracker.py

125 lines
3.9 KiB
Python

"""Support to use FortiOS device like FortiGate as device tracker.
This component is part of the device_tracker platform.
"""
from __future__ import annotations
import logging
from typing import Any
from awesomeversion import AwesomeVersion
from fortiosapi import FortiOSAPI
import voluptuous as vol
from homeassistant.components.device_tracker import (
DOMAIN,
PLATFORM_SCHEMA as BASE_PLATFORM_SCHEMA,
DeviceScanner,
)
from homeassistant.const import CONF_HOST, CONF_TOKEN, CONF_VERIFY_SSL
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
DEFAULT_VERIFY_SSL = False
PLATFORM_SCHEMA = BASE_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_TOKEN): cv.string,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
}
)
def get_scanner(hass: HomeAssistant, config: ConfigType) -> FortiOSDeviceScanner | None:
"""Validate the configuration and return a FortiOSDeviceScanner."""
host = config[DOMAIN][CONF_HOST]
verify_ssl = config[DOMAIN][CONF_VERIFY_SSL]
token = config[DOMAIN][CONF_TOKEN]
fgt = FortiOSAPI()
try:
fgt.tokenlogin(host, token, verify_ssl, None, 12, "root")
except ConnectionError as ex:
_LOGGER.error("ConnectionError to FortiOS API: %s", ex)
return None
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error("Failed to login to FortiOS API: %s", ex)
return None
status_json = fgt.monitor("system/status", "")
current_version = AwesomeVersion(status_json["version"])
minimum_version = AwesomeVersion("6.4.3")
if current_version < minimum_version:
_LOGGER.error(
"Unsupported FortiOS version: %s. Version %s and newer are supported",
current_version,
minimum_version,
)
return None
return FortiOSDeviceScanner(fgt)
class FortiOSDeviceScanner(DeviceScanner):
"""Class which queries a FortiOS unit for connected devices."""
def __init__(self, fgt) -> None:
"""Initialize the scanner."""
self._clients: list[str] = []
self._clients_json: dict[str, Any] = {}
self._fgt = fgt
def update(self):
"""Update clients from the device."""
clients_json = self._fgt.monitor(
"user/device/query",
"",
parameters={"filter": "format=master_mac|hostname|is_online"},
)
self._clients_json = clients_json
self._clients = []
if clients_json:
try:
for client in clients_json["results"]:
if (
"is_online" in client
and "master_mac" in client
and client["is_online"]
):
self._clients.append(client["master_mac"].upper())
except KeyError as kex:
_LOGGER.error("Key not found in clients: %s", kex)
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self.update()
return self._clients
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
_LOGGER.debug("Getting name of device %s", device)
device = device.lower()
if (data := self._clients_json) == 0:
_LOGGER.error("No json results to get device names")
return None
for client in data["results"]:
if "master_mac" in client and client["master_mac"] == device:
if "hostname" in client:
name = client["hostname"]
else:
name = client["master_mac"].replace(":", "_")
return name
return None