core/homeassistant/components/icloud/account.py

493 lines
16 KiB
Python
Raw Normal View History

"""iCloud account."""
from datetime import timedelta
import logging
import operator
from typing import Dict, Optional
from pyicloud import PyiCloudService
from pyicloud.exceptions import (
PyiCloudFailedLoginException,
PyiCloudNoDevicesException,
PyiCloudServiceNotActivatedException,
)
from pyicloud.services.findmyiphone import AppleDevice
from homeassistant.components.zone import async_active_zone
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry
from homeassistant.const import ATTR_ATTRIBUTION, CONF_USERNAME
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import track_point_in_utc_time
from homeassistant.helpers.storage import Store
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import slugify
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.util.dt import utcnow
from homeassistant.util.location import distance
from .const import (
DEVICE_BATTERY_LEVEL,
DEVICE_BATTERY_STATUS,
DEVICE_CLASS,
DEVICE_DISPLAY_NAME,
DEVICE_ID,
DEVICE_LOCATION,
DEVICE_LOCATION_HORIZONTAL_ACCURACY,
DEVICE_LOCATION_LATITUDE,
DEVICE_LOCATION_LONGITUDE,
DEVICE_LOST_MODE_CAPABLE,
DEVICE_LOW_POWER_MODE,
DEVICE_NAME,
DEVICE_PERSON_ID,
DEVICE_RAW_DEVICE_MODEL,
DEVICE_STATUS,
DEVICE_STATUS_CODES,
DEVICE_STATUS_SET,
DOMAIN,
)
ATTRIBUTION = "Data provided by Apple iCloud"
# entity attributes
ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval"
ATTR_BATTERY = "battery"
ATTR_BATTERY_STATUS = "battery_status"
ATTR_DEVICE_NAME = "device_name"
ATTR_DEVICE_STATUS = "device_status"
ATTR_LOW_POWER_MODE = "low_power_mode"
ATTR_OWNER_NAME = "owner_fullname"
# services
SERVICE_ICLOUD_PLAY_SOUND = "play_sound"
SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message"
SERVICE_ICLOUD_LOST_DEVICE = "lost_device"
SERVICE_ICLOUD_UPDATE = "update"
ATTR_ACCOUNT = "account"
ATTR_LOST_DEVICE_MESSAGE = "message"
ATTR_LOST_DEVICE_NUMBER = "number"
ATTR_LOST_DEVICE_SOUND = "sound"
_LOGGER = logging.getLogger(__name__)
class IcloudAccount:
"""Representation of an iCloud account."""
def __init__(
self,
hass: HomeAssistantType,
username: str,
password: str,
icloud_dir: Store,
with_family: bool,
max_interval: int,
gps_accuracy_threshold: int,
config_entry: ConfigEntry,
):
"""Initialize an iCloud account."""
self.hass = hass
self._username = username
self._password = password
self._with_family = with_family
self._fetch_interval = max_interval
self._max_interval = max_interval
self._gps_accuracy_threshold = gps_accuracy_threshold
self._icloud_dir = icloud_dir
self.api: Optional[PyiCloudService] = None
self._owner_fullname = None
self._family_members_fullname = {}
self._devices = {}
self._retried_fetch = False
self._config_entry = config_entry
self.listeners = []
def setup(self) -> None:
"""Set up an iCloud account."""
try:
self.api = PyiCloudService(
self._username,
self._password,
self._icloud_dir.path,
with_family=self._with_family,
)
except PyiCloudFailedLoginException:
self.api = None
# Login failed which means credentials need to be updated.
_LOGGER.error(
(
"Your password for '%s' is no longer working. Go to the "
"Integrations menu and click on Configure on the discovered Apple "
"iCloud card to login again."
),
self._config_entry.data[CONF_USERNAME],
)
self.hass.add_job(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_REAUTH},
data={
**self._config_entry.data,
"unique_id": self._config_entry.unique_id,
},
)
)
return
try:
api_devices = self.api.devices
# Gets device owners infos
user_info = api_devices.response["userInfo"]
except (
PyiCloudServiceNotActivatedException,
PyiCloudNoDevicesException,
) as err:
_LOGGER.error("No iCloud device found")
raise ConfigEntryNotReady from err
self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}"
self._family_members_fullname = {}
if user_info.get("membersInfo") is not None:
for prs_id, member in user_info["membersInfo"].items():
self._family_members_fullname[
prs_id
] = f"{member['firstName']} {member['lastName']}"
self._devices = {}
self.update_devices()
def update_devices(self) -> None:
"""Update iCloud devices."""
if self.api is None:
return
api_devices = {}
try:
api_devices = self.api.devices
except Exception as err: # pylint: disable=broad-except
_LOGGER.error("Unknown iCloud error: %s", err)
self._fetch_interval = 2
dispatcher_send(self.hass, self.signal_device_update)
track_point_in_utc_time(
self.hass,
self.keep_alive,
utcnow() + timedelta(minutes=self._fetch_interval),
)
return
# Gets devices infos
new_device = False
for device in api_devices:
status = device.status(DEVICE_STATUS_SET)
device_id = status[DEVICE_ID]
device_name = status[DEVICE_NAME]
if (
status[DEVICE_BATTERY_STATUS] == "Unknown"
or status.get(DEVICE_BATTERY_LEVEL) is None
):
continue
if self._devices.get(device_id) is not None:
# Seen device -> updating
_LOGGER.debug("Updating iCloud device: %s", device_name)
self._devices[device_id].update(status)
else:
# New device, should be unique
_LOGGER.debug(
"Adding iCloud device: %s [model: %s]",
device_name,
status[DEVICE_RAW_DEVICE_MODEL],
)
self._devices[device_id] = IcloudDevice(self, device, status)
self._devices[device_id].update(status)
new_device = True
if (
DEVICE_STATUS_CODES.get(list(api_devices)[0][DEVICE_STATUS]) == "pending"
and not self._retried_fetch
):
_LOGGER.debug("Pending devices, trying again in 15s")
self._fetch_interval = 0.25
self._retried_fetch = True
else:
self._fetch_interval = self._determine_interval()
self._retried_fetch = False
dispatcher_send(self.hass, self.signal_device_update)
if new_device:
dispatcher_send(self.hass, self.signal_device_new)
track_point_in_utc_time(
self.hass,
self.keep_alive,
utcnow() + timedelta(minutes=self._fetch_interval),
)
def _determine_interval(self) -> int:
"""Calculate new interval between two API fetch (in minutes)."""
intervals = {"default": self._max_interval}
for device in self._devices.values():
# Max interval if no location
if device.location is None:
continue
current_zone = run_callback_threadsafe(
self.hass.loop,
async_active_zone,
self.hass,
device.location[DEVICE_LOCATION_LATITUDE],
device.location[DEVICE_LOCATION_LONGITUDE],
device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY],
).result()
# Max interval if in zone
if current_zone is not None:
continue
zones = (
self.hass.states.get(entity_id)
for entity_id in sorted(self.hass.states.entity_ids("zone"))
)
distances = []
for zone_state in zones:
zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE]
zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE]
zone_distance = distance(
device.location[DEVICE_LOCATION_LATITUDE],
device.location[DEVICE_LOCATION_LONGITUDE],
zone_state_lat,
zone_state_long,
)
distances.append(round(zone_distance / 1000, 1))
# Max interval if no zone
if not distances:
continue
mindistance = min(distances)
# Calculate out how long it would take for the device to drive
# to the nearest zone at 120 km/h:
interval = round(mindistance / 2, 0)
# Never poll more than once per minute
interval = max(interval, 1)
if interval > 180:
# Three hour drive?
# This is far enough that they might be flying
interval = self._max_interval
if (
device.battery_level is not None
and device.battery_level <= 33
and mindistance > 3
):
# Low battery - let's check half as often
interval = interval * 2
intervals[device.name] = interval
return max(
int(min(intervals.items(), key=operator.itemgetter(1))[1]),
self._max_interval,
)
def keep_alive(self, now=None) -> None:
"""Keep the API alive."""
if self.api is None:
self.setup()
if self.api is None:
return
self.api.authenticate()
self.update_devices()
def get_devices_with_name(self, name: str) -> [any]:
"""Get devices by name."""
result = []
name_slug = slugify(name.replace(" ", "", 99))
for device in self.devices.values():
if slugify(device.name.replace(" ", "", 99)) == name_slug:
result.append(device)
if not result:
raise Exception(f"No device with name {name}")
return result
@property
def username(self) -> str:
"""Return the account username."""
return self._username
@property
def owner_fullname(self) -> str:
"""Return the account owner fullname."""
return self._owner_fullname
@property
def family_members_fullname(self) -> Dict[str, str]:
"""Return the account family members fullname."""
return self._family_members_fullname
@property
def fetch_interval(self) -> int:
"""Return the account fetch interval."""
return self._fetch_interval
@property
def devices(self) -> Dict[str, any]:
"""Return the account devices."""
return self._devices
@property
def signal_device_new(self) -> str:
"""Event specific per Freebox entry to signal new device."""
return f"{DOMAIN}-{self._username}-device-new"
@property
def signal_device_update(self) -> str:
"""Event specific per Freebox entry to signal updates in devices."""
return f"{DOMAIN}-{self._username}-device-update"
class IcloudDevice:
"""Representation of a iCloud device."""
def __init__(self, account: IcloudAccount, device: AppleDevice, status):
"""Initialize the iCloud device."""
self._account = account
self._device = device
self._status = status
self._name = self._status[DEVICE_NAME]
self._device_id = self._status[DEVICE_ID]
self._device_class = self._status[DEVICE_CLASS]
self._device_model = self._status[DEVICE_DISPLAY_NAME]
if self._status[DEVICE_PERSON_ID]:
owner_fullname = account.family_members_fullname[
self._status[DEVICE_PERSON_ID]
]
else:
owner_fullname = account.owner_fullname
self._battery_level = None
self._battery_status = None
self._location = None
self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION,
ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval,
ATTR_DEVICE_NAME: self._device_model,
ATTR_DEVICE_STATUS: None,
ATTR_OWNER_NAME: owner_fullname,
}
def update(self, status) -> None:
"""Update the iCloud device."""
self._status = status
self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval
device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error")
self._attrs[ATTR_DEVICE_STATUS] = device_status
self._battery_status = self._status[DEVICE_BATTERY_STATUS]
self._attrs[ATTR_BATTERY_STATUS] = self._battery_status
device_battery_level = self._status.get(DEVICE_BATTERY_LEVEL, 0)
if self._battery_status != "Unknown" and device_battery_level is not None:
self._battery_level = int(device_battery_level * 100)
self._attrs[ATTR_BATTERY] = self._battery_level
self._attrs[ATTR_LOW_POWER_MODE] = self._status[DEVICE_LOW_POWER_MODE]
if (
self._status[DEVICE_LOCATION]
and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE]
):
location = self._status[DEVICE_LOCATION]
if self._location is None:
dispatcher_send(self._account.hass, self._account.signal_device_new)
self._location = location
def play_sound(self) -> None:
"""Play sound on the device."""
if self._account.api is None:
return
self._account.api.authenticate()
_LOGGER.debug("Playing sound for %s", self.name)
self.device.play_sound()
def display_message(self, message: str, sound: bool = False) -> None:
"""Display a message on the device."""
if self._account.api is None:
return
self._account.api.authenticate()
_LOGGER.debug("Displaying message for %s", self.name)
self.device.display_message("Subject not working", message, sound)
def lost_device(self, number: str, message: str) -> None:
"""Make the device in lost state."""
if self._account.api is None:
return
self._account.api.authenticate()
if self._status[DEVICE_LOST_MODE_CAPABLE]:
_LOGGER.debug("Make device lost for %s", self.name)
self.device.lost_device(number, message, None)
else:
_LOGGER.error("Cannot make device lost for %s", self.name)
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._device_id
@property
def name(self) -> str:
"""Return the Apple device name."""
return self._name
@property
def device(self) -> AppleDevice:
"""Return the Apple device."""
return self._device
@property
def device_class(self) -> str:
"""Return the Apple device class."""
return self._device_class
@property
def device_model(self) -> str:
"""Return the Apple device model."""
return self._device_model
@property
def battery_level(self) -> int:
"""Return the Apple device battery level."""
return self._battery_level
@property
def battery_status(self) -> str:
"""Return the Apple device battery status."""
return self._battery_status
@property
def location(self) -> Dict[str, any]:
"""Return the Apple device location."""
return self._location
@property
def state_attributes(self) -> Dict[str, any]:
"""Return the attributes."""
return self._attrs