185 lines
6.0 KiB
Python
185 lines
6.0 KiB
Python
"""Device tracker helpers."""
|
|
import asyncio
|
|
from typing import Dict, Any, Callable, Optional
|
|
from types import ModuleType
|
|
|
|
import attr
|
|
|
|
from homeassistant.core import callback
|
|
from homeassistant.setup import async_prepare_setup_platform
|
|
from homeassistant.helpers import config_per_platform
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
|
|
from homeassistant.helpers.event import async_track_time_interval
|
|
from homeassistant.util import dt as dt_util
|
|
from homeassistant.const import (
|
|
ATTR_LATITUDE,
|
|
ATTR_LONGITUDE,
|
|
)
|
|
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
PLATFORM_TYPE_LEGACY,
|
|
CONF_SCAN_INTERVAL,
|
|
SCAN_INTERVAL,
|
|
SOURCE_TYPE_ROUTER,
|
|
LOGGER,
|
|
)
|
|
|
|
|
|
@attr.s
|
|
class DeviceTrackerPlatform:
|
|
"""Class to hold platform information."""
|
|
|
|
LEGACY_SETUP = (
|
|
'async_get_scanner',
|
|
'get_scanner',
|
|
'async_setup_scanner',
|
|
'setup_scanner',
|
|
)
|
|
|
|
name = attr.ib(type=str)
|
|
platform = attr.ib(type=ModuleType)
|
|
config = attr.ib(type=Dict)
|
|
|
|
@property
|
|
def type(self):
|
|
"""Return platform type."""
|
|
for methods, platform_type in (
|
|
(self.LEGACY_SETUP, PLATFORM_TYPE_LEGACY),
|
|
):
|
|
for meth in methods:
|
|
if hasattr(self.platform, meth):
|
|
return platform_type
|
|
|
|
return None
|
|
|
|
async def async_setup_legacy(self, hass, tracker, discovery_info=None):
|
|
"""Set up a legacy platform."""
|
|
LOGGER.info("Setting up %s.%s", DOMAIN, self.type)
|
|
try:
|
|
scanner = None
|
|
setup = None
|
|
if hasattr(self.platform, 'async_get_scanner'):
|
|
scanner = await self.platform.async_get_scanner(
|
|
hass, {DOMAIN: self.config})
|
|
elif hasattr(self.platform, 'get_scanner'):
|
|
scanner = await hass.async_add_job(
|
|
self.platform.get_scanner, hass, {DOMAIN: self.config})
|
|
elif hasattr(self.platform, 'async_setup_scanner'):
|
|
setup = await self.platform.async_setup_scanner(
|
|
hass, self.config, tracker.async_see, discovery_info)
|
|
elif hasattr(self.platform, 'setup_scanner'):
|
|
setup = await hass.async_add_job(
|
|
self.platform.setup_scanner, hass, self.config,
|
|
tracker.see, discovery_info)
|
|
else:
|
|
raise HomeAssistantError(
|
|
"Invalid legacy device_tracker platform.")
|
|
|
|
if scanner:
|
|
async_setup_scanner_platform(
|
|
hass, self.config, scanner, tracker.async_see, self.type)
|
|
return
|
|
|
|
if not setup:
|
|
LOGGER.error("Error setting up platform %s", self.type)
|
|
return
|
|
|
|
except Exception: # pylint: disable=broad-except
|
|
LOGGER.exception("Error setting up platform %s", self.type)
|
|
|
|
|
|
async def async_extract_config(hass, config):
|
|
"""Extract device tracker config and split between legacy and modern."""
|
|
legacy = []
|
|
|
|
for platform in await asyncio.gather(*[
|
|
async_create_platform_type(hass, config, p_type, p_config)
|
|
for p_type, p_config in config_per_platform(config, DOMAIN)
|
|
]):
|
|
if platform is None:
|
|
continue
|
|
|
|
if platform.type == PLATFORM_TYPE_LEGACY:
|
|
legacy.append(platform)
|
|
else:
|
|
raise ValueError("Unable to determine type for {}: {}".format(
|
|
platform.name, platform.type))
|
|
|
|
return legacy
|
|
|
|
|
|
async def async_create_platform_type(hass, config, p_type, p_config) \
|
|
-> Optional[DeviceTrackerPlatform]:
|
|
"""Determine type of platform."""
|
|
platform = await async_prepare_setup_platform(
|
|
hass, config, DOMAIN, p_type)
|
|
|
|
if platform is None:
|
|
return None
|
|
|
|
return DeviceTrackerPlatform(p_type, platform, p_config)
|
|
|
|
|
|
@callback
|
|
def async_setup_scanner_platform(hass: HomeAssistantType, config: ConfigType,
|
|
scanner: Any, async_see_device: Callable,
|
|
platform: str):
|
|
"""Set up the connect scanner-based platform to device tracker.
|
|
|
|
This method must be run in the event loop.
|
|
"""
|
|
interval = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
|
|
update_lock = asyncio.Lock()
|
|
scanner.hass = hass
|
|
|
|
# Initial scan of each mac we also tell about host name for config
|
|
seen = set() # type: Any
|
|
|
|
async def async_device_tracker_scan(now: dt_util.dt.datetime):
|
|
"""Handle interval matches."""
|
|
if update_lock.locked():
|
|
LOGGER.warning(
|
|
"Updating device list from %s took longer than the scheduled "
|
|
"scan interval %s", platform, interval)
|
|
return
|
|
|
|
async with update_lock:
|
|
found_devices = await scanner.async_scan_devices()
|
|
|
|
for mac in found_devices:
|
|
if mac in seen:
|
|
host_name = None
|
|
else:
|
|
host_name = await scanner.async_get_device_name(mac)
|
|
seen.add(mac)
|
|
|
|
try:
|
|
extra_attributes = \
|
|
await scanner.async_get_extra_attributes(mac)
|
|
except NotImplementedError:
|
|
extra_attributes = dict()
|
|
|
|
kwargs = {
|
|
'mac': mac,
|
|
'host_name': host_name,
|
|
'source_type': SOURCE_TYPE_ROUTER,
|
|
'attributes': {
|
|
'scanner': scanner.__class__.__name__,
|
|
**extra_attributes
|
|
}
|
|
}
|
|
|
|
zone_home = hass.states.get(hass.components.zone.ENTITY_ID_HOME)
|
|
if zone_home:
|
|
kwargs['gps'] = [zone_home.attributes[ATTR_LATITUDE],
|
|
zone_home.attributes[ATTR_LONGITUDE]]
|
|
kwargs['gps_accuracy'] = 0
|
|
|
|
hass.async_create_task(async_see_device(**kwargs))
|
|
|
|
async_track_time_interval(hass, async_device_tracker_scan, interval)
|
|
hass.async_create_task(async_device_tracker_scan(None))
|