core/homeassistant/components/mikrotik/__init__.py

202 lines
6.3 KiB
Python
Raw Normal View History

"""The mikrotik component."""
import logging
import ssl
import voluptuous as vol
import librouteros
from librouteros.login import login_plain, login_token
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
CONF_PORT,
CONF_SSL,
CONF_METHOD,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
from .const import (
NAME,
DOMAIN,
HOSTS,
MTK_LOGIN_PLAIN,
MTK_LOGIN_TOKEN,
DEFAULT_ENCODING,
IDENTITY,
CONF_TRACK_DEVICES,
CONF_ENCODING,
CONF_ARP_PING,
CONF_LOGIN_METHOD,
MIKROTIK_SERVICES,
)
_LOGGER = logging.getLogger(__name__)
MTK_DEFAULT_API_PORT = "8728"
MTK_DEFAULT_API_SSL_PORT = "8729"
MIKROTIK_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_METHOD): cv.string,
vol.Optional(CONF_LOGIN_METHOD): vol.Any(MTK_LOGIN_PLAIN, MTK_LOGIN_TOKEN),
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): cv.string,
vol.Optional(CONF_TRACK_DEVICES, default=True): cv.boolean,
vol.Optional(CONF_ARP_PING, default=False): cv.boolean,
}
)
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.All(cv.ensure_list, [MIKROTIK_SCHEMA])}, extra=vol.ALLOW_EXTRA
)
def setup(hass, config):
"""Set up the Mikrotik component."""
hass.data[DOMAIN] = {HOSTS: {}}
for device in config[DOMAIN]:
host = device[CONF_HOST]
use_ssl = device.get(CONF_SSL)
user = device.get(CONF_USERNAME)
password = device.get(CONF_PASSWORD, "")
login = device.get(CONF_LOGIN_METHOD)
encoding = device.get(CONF_ENCODING)
track_devices = device.get(CONF_TRACK_DEVICES)
if CONF_PORT in device:
port = device.get(CONF_PORT)
else:
if use_ssl:
port = MTK_DEFAULT_API_SSL_PORT
else:
port = MTK_DEFAULT_API_PORT
if login == MTK_LOGIN_PLAIN:
login_method = (login_plain,)
elif login == MTK_LOGIN_TOKEN:
login_method = (login_token,)
else:
login_method = (login_plain, login_token)
try:
api = MikrotikClient(
host, use_ssl, port, user, password, login_method, encoding
)
api.connect_to_device()
hass.data[DOMAIN][HOSTS][host] = {"config": device, "api": api}
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Mikrotik %s error %s", host, api_error)
continue
if track_devices:
hass.data[DOMAIN][HOSTS][host][DEVICE_TRACKER] = True
load_platform(hass, DEVICE_TRACKER, DOMAIN, None, config)
if not hass.data[DOMAIN][HOSTS]:
return False
return True
class MikrotikClient:
"""Handle all communication with the Mikrotik API."""
def __init__(self, host, use_ssl, port, user, password, login_method, encoding):
"""Initialize the Mikrotik Client."""
self._host = host
self._use_ssl = use_ssl
self._port = port
self._user = user
self._password = password
self._login_method = login_method
self._encoding = encoding
self._ssl_wrapper = None
self.hostname = None
self._client = None
self._connected = False
def connect_to_device(self):
"""Connect to Mikrotik device."""
self._connected = False
_LOGGER.debug("[%s] Connecting to Mikrotik device", self._host)
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
if self._use_ssl:
if self._ssl_wrapper is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = self._ssl_wrapper
try:
self._client = librouteros.connect(
self._host, self._user, self._password, **kwargs
)
self._connected = True
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionError,
) as api_error:
_LOGGER.error("Mikrotik %s: %s", self._host, api_error)
self._client = None
return False
self.hostname = self.get_hostname()
_LOGGER.info("Mikrotik Connected to %s (%s)", self.hostname, self._host)
return self._connected
def get_hostname(self):
"""Return device host name."""
data = self.command(MIKROTIK_SERVICES[IDENTITY])
return data[0][NAME] if data else None
def connected(self):
"""Return connected boolean."""
return self._connected
def command(self, cmd, params=None):
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._client:
if not self.connect_to_device():
return None
try:
if params:
response = self._client(cmd=cmd, **params)
else:
response = self._client(cmd=cmd)
except (librouteros.exceptions.ConnectionError,) as api_error:
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
self.connect_to_device()
return None
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
) as api_error:
_LOGGER.error(
"Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s",
self._host,
cmd,
api_error,
)
return None
return response if response else None