"""iCloud account.""" from datetime import timedelta import logging import operator from typing import Dict from pyicloud import PyiCloudService from pyicloud.exceptions import ( PyiCloudFailedLoginException, PyiCloudNoDevicesException, PyiCloudServiceNotActivatedException, ) from pyicloud.services.findmyiphone import AppleDevice from homeassistant.components.zone import async_active_zone from homeassistant.const import ATTR_ATTRIBUTION from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers.dispatcher import dispatcher_send from homeassistant.helpers.event import track_point_in_utc_time from homeassistant.helpers.storage import Store from homeassistant.helpers.typing import HomeAssistantType from homeassistant.util import slugify from homeassistant.util.async_ import run_callback_threadsafe from homeassistant.util.dt import utcnow from homeassistant.util.location import distance from .const import ( DEVICE_BATTERY_LEVEL, DEVICE_BATTERY_STATUS, DEVICE_CLASS, DEVICE_DISPLAY_NAME, DEVICE_ID, DEVICE_LOCATION, DEVICE_LOCATION_HORIZONTAL_ACCURACY, DEVICE_LOCATION_LATITUDE, DEVICE_LOCATION_LONGITUDE, DEVICE_LOST_MODE_CAPABLE, DEVICE_LOW_POWER_MODE, DEVICE_NAME, DEVICE_PERSON_ID, DEVICE_RAW_DEVICE_MODEL, DEVICE_STATUS, DEVICE_STATUS_CODES, DEVICE_STATUS_SET, DOMAIN, ) ATTRIBUTION = "Data provided by Apple iCloud" # entity attributes ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval" ATTR_BATTERY = "battery" ATTR_BATTERY_STATUS = "battery_status" ATTR_DEVICE_NAME = "device_name" ATTR_DEVICE_STATUS = "device_status" ATTR_LOW_POWER_MODE = "low_power_mode" ATTR_OWNER_NAME = "owner_fullname" # services SERVICE_ICLOUD_PLAY_SOUND = "play_sound" SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message" SERVICE_ICLOUD_LOST_DEVICE = "lost_device" SERVICE_ICLOUD_UPDATE = "update" ATTR_ACCOUNT = "account" ATTR_LOST_DEVICE_MESSAGE = "message" ATTR_LOST_DEVICE_NUMBER = "number" ATTR_LOST_DEVICE_SOUND = "sound" _LOGGER = logging.getLogger(__name__) class IcloudAccount: """Representation of an iCloud account.""" def __init__( self, hass: HomeAssistantType, username: str, password: str, icloud_dir: Store, with_family: bool, max_interval: int, gps_accuracy_threshold: int, ): """Initialize an iCloud account.""" self.hass = hass self._username = username self._password = password self._with_family = with_family self._fetch_interval = max_interval self._max_interval = max_interval self._gps_accuracy_threshold = gps_accuracy_threshold self._icloud_dir = icloud_dir self.api: PyiCloudService = None self._owner_fullname = None self._family_members_fullname = {} self._devices = {} self._retried_fetch = False self.listeners = [] def setup(self) -> None: """Set up an iCloud account.""" try: self.api = PyiCloudService( self._username, self._password, self._icloud_dir.path, with_family=self._with_family, ) except PyiCloudFailedLoginException as error: self.api = None _LOGGER.error("Error logging into iCloud Service: %s", error) return try: api_devices = self.api.devices # Gets device owners infos user_info = api_devices.response["userInfo"] except (PyiCloudServiceNotActivatedException, PyiCloudNoDevicesException): _LOGGER.error("No iCloud device found") raise ConfigEntryNotReady self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}" self._family_members_fullname = {} if user_info.get("membersInfo") is not None: for prs_id, member in user_info["membersInfo"].items(): self._family_members_fullname[ prs_id ] = f"{member['firstName']} {member['lastName']}" self._devices = {} self.update_devices() def update_devices(self) -> None: """Update iCloud devices.""" if self.api is None: return api_devices = {} try: api_devices = self.api.devices except Exception as err: # pylint: disable=broad-except _LOGGER.error("Unknown iCloud error: %s", err) self._fetch_interval = 2 dispatcher_send(self.hass, self.signal_device_update) track_point_in_utc_time( self.hass, self.keep_alive, utcnow() + timedelta(minutes=self._fetch_interval), ) return # Gets devices infos new_device = False for device in api_devices: status = device.status(DEVICE_STATUS_SET) device_id = status[DEVICE_ID] device_name = status[DEVICE_NAME] if ( status[DEVICE_BATTERY_STATUS] == "Unknown" or status.get(DEVICE_BATTERY_LEVEL) is None ): continue if self._devices.get(device_id) is not None: # Seen device -> updating _LOGGER.debug("Updating iCloud device: %s", device_name) self._devices[device_id].update(status) else: # New device, should be unique _LOGGER.debug( "Adding iCloud device: %s [model: %s]", device_name, status[DEVICE_RAW_DEVICE_MODEL], ) self._devices[device_id] = IcloudDevice(self, device, status) self._devices[device_id].update(status) new_device = True if ( DEVICE_STATUS_CODES.get(list(api_devices)[0][DEVICE_STATUS]) == "pending" and not self._retried_fetch ): _LOGGER.debug("Pending devices, trying again in 15s") self._fetch_interval = 0.25 self._retried_fetch = True else: self._fetch_interval = self._determine_interval() self._retried_fetch = False dispatcher_send(self.hass, self.signal_device_update) if new_device: dispatcher_send(self.hass, self.signal_device_new) track_point_in_utc_time( self.hass, self.keep_alive, utcnow() + timedelta(minutes=self._fetch_interval), ) def _determine_interval(self) -> int: """Calculate new interval between two API fetch (in minutes).""" intervals = {"default": self._max_interval} for device in self._devices.values(): # Max interval if no location if device.location is None: continue current_zone = run_callback_threadsafe( self.hass.loop, async_active_zone, self.hass, device.location[DEVICE_LOCATION_LATITUDE], device.location[DEVICE_LOCATION_LONGITUDE], device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY], ).result() # Max interval if in zone if current_zone is not None: continue zones = ( self.hass.states.get(entity_id) for entity_id in sorted(self.hass.states.entity_ids("zone")) ) distances = [] for zone_state in zones: zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE] zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE] zone_distance = distance( device.location[DEVICE_LOCATION_LATITUDE], device.location[DEVICE_LOCATION_LONGITUDE], zone_state_lat, zone_state_long, ) distances.append(round(zone_distance / 1000, 1)) # Max interval if no zone if not distances: continue mindistance = min(distances) # Calculate out how long it would take for the device to drive # to the nearest zone at 120 km/h: interval = round(mindistance / 2, 0) # Never poll more than once per minute interval = max(interval, 1) if interval > 180: # Three hour drive? # This is far enough that they might be flying interval = self._max_interval if ( device.battery_level is not None and device.battery_level <= 33 and mindistance > 3 ): # Low battery - let's check half as often interval = interval * 2 intervals[device.name] = interval return max( int(min(intervals.items(), key=operator.itemgetter(1))[1]), self._max_interval, ) def keep_alive(self, now=None) -> None: """Keep the API alive.""" if self.api is None: self.setup() if self.api is None: return self.api.authenticate() self.update_devices() def get_devices_with_name(self, name: str) -> [any]: """Get devices by name.""" result = [] name_slug = slugify(name.replace(" ", "", 99)) for device in self.devices.values(): if slugify(device.name.replace(" ", "", 99)) == name_slug: result.append(device) if not result: raise Exception(f"No device with name {name}") return result @property def username(self) -> str: """Return the account username.""" return self._username @property def owner_fullname(self) -> str: """Return the account owner fullname.""" return self._owner_fullname @property def family_members_fullname(self) -> Dict[str, str]: """Return the account family members fullname.""" return self._family_members_fullname @property def fetch_interval(self) -> int: """Return the account fetch interval.""" return self._fetch_interval @property def devices(self) -> Dict[str, any]: """Return the account devices.""" return self._devices @property def signal_device_new(self) -> str: """Event specific per Freebox entry to signal new device.""" return f"{DOMAIN}-{self._username}-device-new" @property def signal_device_update(self) -> str: """Event specific per Freebox entry to signal updates in devices.""" return f"{DOMAIN}-{self._username}-device-update" class IcloudDevice: """Representation of a iCloud device.""" def __init__(self, account: IcloudAccount, device: AppleDevice, status): """Initialize the iCloud device.""" self._account = account self._device = device self._status = status self._name = self._status[DEVICE_NAME] self._device_id = self._status[DEVICE_ID] self._device_class = self._status[DEVICE_CLASS] self._device_model = self._status[DEVICE_DISPLAY_NAME] if self._status[DEVICE_PERSON_ID]: owner_fullname = account.family_members_fullname[ self._status[DEVICE_PERSON_ID] ] else: owner_fullname = account.owner_fullname self._battery_level = None self._battery_status = None self._location = None self._attrs = { ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval, ATTR_DEVICE_NAME: self._device_model, ATTR_DEVICE_STATUS: None, ATTR_OWNER_NAME: owner_fullname, } def update(self, status) -> None: """Update the iCloud device.""" self._status = status self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error") self._attrs[ATTR_DEVICE_STATUS] = device_status self._battery_status = self._status[DEVICE_BATTERY_STATUS] self._attrs[ATTR_BATTERY_STATUS] = self._battery_status device_battery_level = self._status.get(DEVICE_BATTERY_LEVEL, 0) if self._battery_status != "Unknown" and device_battery_level is not None: self._battery_level = int(device_battery_level * 100) self._attrs[ATTR_BATTERY] = self._battery_level self._attrs[ATTR_LOW_POWER_MODE] = self._status[DEVICE_LOW_POWER_MODE] if ( self._status[DEVICE_LOCATION] and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE] ): location = self._status[DEVICE_LOCATION] if self._location is None: dispatcher_send(self._account.hass, self._account.signal_device_new) self._location = location def play_sound(self) -> None: """Play sound on the device.""" if self._account.api is None: return self._account.api.authenticate() _LOGGER.debug("Playing sound for %s", self.name) self.device.play_sound() def display_message(self, message: str, sound: bool = False) -> None: """Display a message on the device.""" if self._account.api is None: return self._account.api.authenticate() _LOGGER.debug("Displaying message for %s", self.name) self.device.display_message("Subject not working", message, sound) def lost_device(self, number: str, message: str) -> None: """Make the device in lost state.""" if self._account.api is None: return self._account.api.authenticate() if self._status[DEVICE_LOST_MODE_CAPABLE]: _LOGGER.debug("Make device lost for %s", self.name) self.device.lost_device(number, message, None) else: _LOGGER.error("Cannot make device lost for %s", self.name) @property def unique_id(self) -> str: """Return a unique ID.""" return self._device_id @property def name(self) -> str: """Return the Apple device name.""" return self._name @property def device(self) -> AppleDevice: """Return the Apple device.""" return self._device @property def device_class(self) -> str: """Return the Apple device class.""" return self._device_class @property def device_model(self) -> str: """Return the Apple device model.""" return self._device_model @property def battery_level(self) -> int: """Return the Apple device battery level.""" return self._battery_level @property def battery_status(self) -> str: """Return the Apple device battery status.""" return self._battery_status @property def location(self) -> Dict[str, any]: """Return the Apple device location.""" return self._location @property def state_attributes(self) -> Dict[str, any]: """Return the attributes.""" return self._attrs