core/homeassistant/components/xiaomi_miio/vacuum.py

427 lines
14 KiB
Python

"""Support for the Xiaomi vacuum cleaner robot."""
from __future__ import annotations
from functools import partial
import logging
from miio import DeviceException
import voluptuous as vol
from homeassistant.components.vacuum import (
STATE_CLEANING,
STATE_DOCKED,
STATE_ERROR,
STATE_IDLE,
STATE_PAUSED,
STATE_RETURNING,
SUPPORT_BATTERY,
SUPPORT_CLEAN_SPOT,
SUPPORT_FAN_SPEED,
SUPPORT_LOCATE,
SUPPORT_PAUSE,
SUPPORT_RETURN_HOME,
SUPPORT_SEND_COMMAND,
SUPPORT_START,
SUPPORT_STATE,
SUPPORT_STOP,
StateVacuumEntity,
)
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv, entity_platform
from homeassistant.util.dt import as_utc
from . import VacuumCoordinatorData
from ...helpers.update_coordinator import DataUpdateCoordinator
from .const import (
CONF_DEVICE,
CONF_FLOW_TYPE,
DOMAIN,
KEY_COORDINATOR,
KEY_DEVICE,
SERVICE_CLEAN_SEGMENT,
SERVICE_CLEAN_ZONE,
SERVICE_GOTO,
SERVICE_MOVE_REMOTE_CONTROL,
SERVICE_MOVE_REMOTE_CONTROL_STEP,
SERVICE_START_REMOTE_CONTROL,
SERVICE_STOP_REMOTE_CONTROL,
)
from .device import XiaomiCoordinatedMiioEntity
_LOGGER = logging.getLogger(__name__)
ATTR_ERROR = "error"
ATTR_RC_DURATION = "duration"
ATTR_RC_ROTATION = "rotation"
ATTR_RC_VELOCITY = "velocity"
ATTR_STATUS = "status"
ATTR_ZONE_ARRAY = "zone"
ATTR_ZONE_REPEATER = "repeats"
ATTR_TIMERS = "timers"
SUPPORT_XIAOMI = (
SUPPORT_STATE
| SUPPORT_PAUSE
| SUPPORT_STOP
| SUPPORT_RETURN_HOME
| SUPPORT_FAN_SPEED
| SUPPORT_SEND_COMMAND
| SUPPORT_LOCATE
| SUPPORT_BATTERY
| SUPPORT_CLEAN_SPOT
| SUPPORT_START
)
STATE_CODE_TO_STATE = {
1: STATE_IDLE, # "Starting"
2: STATE_IDLE, # "Charger disconnected"
3: STATE_IDLE, # "Idle"
4: STATE_CLEANING, # "Remote control active"
5: STATE_CLEANING, # "Cleaning"
6: STATE_RETURNING, # "Returning home"
7: STATE_CLEANING, # "Manual mode"
8: STATE_DOCKED, # "Charging"
9: STATE_ERROR, # "Charging problem"
10: STATE_PAUSED, # "Paused"
11: STATE_CLEANING, # "Spot cleaning"
12: STATE_ERROR, # "Error"
13: STATE_IDLE, # "Shutting down"
14: STATE_DOCKED, # "Updating"
15: STATE_RETURNING, # "Docking"
16: STATE_CLEANING, # "Going to target"
17: STATE_CLEANING, # "Zoned cleaning"
18: STATE_CLEANING, # "Segment cleaning"
100: STATE_DOCKED, # "Charging complete"
101: STATE_ERROR, # "Device offline"
}
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Xiaomi vacuum cleaner robot from a config entry."""
entities = []
if config_entry.data[CONF_FLOW_TYPE] == CONF_DEVICE:
name = config_entry.title
unique_id = config_entry.unique_id
mirobo = MiroboVacuum(
name,
hass.data[DOMAIN][config_entry.entry_id][KEY_DEVICE],
config_entry,
unique_id,
hass.data[DOMAIN][config_entry.entry_id][KEY_COORDINATOR],
)
entities.append(mirobo)
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_START_REMOTE_CONTROL,
{},
MiroboVacuum.async_remote_control_start.__name__,
)
platform.async_register_entity_service(
SERVICE_STOP_REMOTE_CONTROL,
{},
MiroboVacuum.async_remote_control_stop.__name__,
)
platform.async_register_entity_service(
SERVICE_MOVE_REMOTE_CONTROL,
{
vol.Optional(ATTR_RC_VELOCITY): vol.All(
vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
),
vol.Optional(ATTR_RC_ROTATION): vol.All(
vol.Coerce(int), vol.Clamp(min=-179, max=179)
),
vol.Optional(ATTR_RC_DURATION): cv.positive_int,
},
MiroboVacuum.async_remote_control_move.__name__,
)
platform.async_register_entity_service(
SERVICE_MOVE_REMOTE_CONTROL_STEP,
{
vol.Optional(ATTR_RC_VELOCITY): vol.All(
vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
),
vol.Optional(ATTR_RC_ROTATION): vol.All(
vol.Coerce(int), vol.Clamp(min=-179, max=179)
),
vol.Optional(ATTR_RC_DURATION): cv.positive_int,
},
MiroboVacuum.async_remote_control_move_step.__name__,
)
platform.async_register_entity_service(
SERVICE_CLEAN_ZONE,
{
vol.Required(ATTR_ZONE_ARRAY): vol.All(
list,
[
vol.ExactSequence(
[
vol.Coerce(int),
vol.Coerce(int),
vol.Coerce(int),
vol.Coerce(int),
]
)
],
),
vol.Required(ATTR_ZONE_REPEATER): vol.All(
vol.Coerce(int), vol.Clamp(min=1, max=3)
),
},
MiroboVacuum.async_clean_zone.__name__,
)
platform.async_register_entity_service(
SERVICE_GOTO,
{
vol.Required("x_coord"): vol.Coerce(int),
vol.Required("y_coord"): vol.Coerce(int),
},
MiroboVacuum.async_goto.__name__,
)
platform.async_register_entity_service(
SERVICE_CLEAN_SEGMENT,
{vol.Required("segments"): vol.Any(vol.Coerce(int), [vol.Coerce(int)])},
MiroboVacuum.async_clean_segment.__name__,
)
async_add_entities(entities, update_before_add=True)
class MiroboVacuum(XiaomiCoordinatedMiioEntity, StateVacuumEntity):
"""Representation of a Xiaomi Vacuum cleaner robot."""
coordinator: DataUpdateCoordinator[VacuumCoordinatorData]
def __init__(
self, name, device, entry, unique_id, coordinator: DataUpdateCoordinator
):
"""Initialize the Xiaomi vacuum cleaner robot handler."""
super().__init__(name, device, entry, unique_id, coordinator)
self._state: str | None = None
async def async_added_to_hass(self) -> None:
"""Run when entity is about to be added to hass."""
await super().async_added_to_hass()
self._handle_coordinator_update()
@property
def state(self):
"""Return the status of the vacuum cleaner."""
# The vacuum reverts back to an idle state after erroring out.
# We want to keep returning an error until it has been cleared.
if self.coordinator.data.status.got_error:
return STATE_ERROR
return self._state
@property
def battery_level(self):
"""Return the battery level of the vacuum cleaner."""
return self.coordinator.data.status.battery
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
speed = self.coordinator.data.status.fanspeed
if speed in self.coordinator.data.fan_speeds_reverse:
return self.coordinator.data.fan_speeds_reverse[speed]
_LOGGER.debug("Unable to find reverse for %s", speed)
return speed
@property
def fan_speed_list(self):
"""Get the list of available fan speed steps of the vacuum cleaner."""
return (
list(self.coordinator.data.fan_speeds)
if self.coordinator.data.fan_speeds
else []
)
@property
def timers(self):
"""Get the list of added timers of the vacuum cleaner."""
return [
{
"enabled": timer.enabled,
"cron": timer.cron,
"next_schedule": as_utc(timer.next_schedule),
}
for timer in self.coordinator.data.timers
]
@property
def extra_state_attributes(self):
"""Return the specific state attributes of this vacuum cleaner."""
attrs = {}
attrs[ATTR_STATUS] = str(self.coordinator.data.status.state)
if self.coordinator.data.status.got_error:
attrs[ATTR_ERROR] = self.coordinator.data.status.error
if self.timers:
attrs[ATTR_TIMERS] = self.timers
return attrs
@property
def supported_features(self):
"""Flag vacuum cleaner robot features that are supported."""
return SUPPORT_XIAOMI
async def _try_command(self, mask_error, func, *args, **kwargs):
"""Call a vacuum command handling error messages."""
try:
await self.hass.async_add_executor_job(partial(func, *args, **kwargs))
await self.coordinator.async_refresh()
return True
except DeviceException as exc:
_LOGGER.error(mask_error, exc)
return False
async def async_start(self):
"""Start or resume the cleaning task."""
await self._try_command(
"Unable to start the vacuum: %s", self._device.resume_or_start
)
async def async_pause(self):
"""Pause the cleaning task."""
await self._try_command("Unable to set start/pause: %s", self._device.pause)
async def async_stop(self, **kwargs):
"""Stop the vacuum cleaner."""
await self._try_command("Unable to stop: %s", self._device.stop)
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
if fan_speed in self.coordinator.data.fan_speeds:
fan_speed = self.coordinator.data.fan_speeds[fan_speed]
else:
try:
fan_speed = int(fan_speed)
except ValueError as exc:
_LOGGER.error(
"Fan speed step not recognized (%s). Valid speeds are: %s",
exc,
self.fan_speed_list,
)
return
await self._try_command(
"Unable to set fan speed: %s", self._device.set_fan_speed, fan_speed
)
async def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
await self._try_command("Unable to return home: %s", self._device.home)
async def async_clean_spot(self, **kwargs):
"""Perform a spot clean-up."""
await self._try_command(
"Unable to start the vacuum for a spot clean-up: %s", self._device.spot
)
async def async_locate(self, **kwargs):
"""Locate the vacuum cleaner."""
await self._try_command("Unable to locate the botvac: %s", self._device.find)
async def async_send_command(self, command, params=None, **kwargs):
"""Send raw command."""
await self._try_command(
"Unable to send command to the vacuum: %s",
self._device.raw_command,
command,
params,
)
async def async_remote_control_start(self):
"""Start remote control mode."""
await self._try_command(
"Unable to start remote control the vacuum: %s", self._device.manual_start
)
async def async_remote_control_stop(self):
"""Stop remote control mode."""
await self._try_command(
"Unable to stop remote control the vacuum: %s", self._device.manual_stop
)
async def async_remote_control_move(
self, rotation: int = 0, velocity: float = 0.3, duration: int = 1500
):
"""Move vacuum with remote control mode."""
await self._try_command(
"Unable to move with remote control the vacuum: %s",
self._device.manual_control,
velocity=velocity,
rotation=rotation,
duration=duration,
)
async def async_remote_control_move_step(
self, rotation: int = 0, velocity: float = 0.2, duration: int = 1500
):
"""Move vacuum one step with remote control mode."""
await self._try_command(
"Unable to remote control the vacuum: %s",
self._device.manual_control_once,
velocity=velocity,
rotation=rotation,
duration=duration,
)
async def async_goto(self, x_coord: int, y_coord: int):
"""Goto the specified coordinates."""
await self._try_command(
"Unable to send the vacuum cleaner to the specified coordinates: %s",
self._device.goto,
x_coord=x_coord,
y_coord=y_coord,
)
async def async_clean_segment(self, segments):
"""Clean the specified segments(s)."""
if isinstance(segments, int):
segments = [segments]
await self._try_command(
"Unable to start cleaning of the specified segments: %s",
self._device.segment_clean,
segments=segments,
)
async def async_clean_zone(self, zone, repeats=1):
"""Clean selected area for the number of repeats indicated."""
for _zone in zone:
_zone.append(repeats)
_LOGGER.debug("Zone with repeats: %s", zone)
try:
await self.hass.async_add_executor_job(self._device.zoned_clean, zone)
await self.coordinator.async_refresh()
except (OSError, DeviceException) as exc:
_LOGGER.error("Unable to send zoned_clean command to the vacuum: %s", exc)
@callback
def _handle_coordinator_update(self) -> None:
state_code = int(self.coordinator.data.status.state_code)
if state_code not in STATE_CODE_TO_STATE:
_LOGGER.error(
"STATE not supported: %s, state_code: %s",
self.coordinator.data.status.state,
self.coordinator.data.status.state_code,
)
self._state = None
else:
self._state = STATE_CODE_TO_STATE[state_code]
super()._handle_coordinator_update()