From b9e4855e05d32e1cd53e94c83ccbd794fb33e66f Mon Sep 17 00:00:00 2001 From: G Johansson Date: Tue, 3 Dec 2024 21:18:54 +0100 Subject: [PATCH] Refactor roomba to set vacuums in vacuum file (#132102) --- homeassistant/components/roomba/braava.py | 128 ------- homeassistant/components/roomba/entity.py | 187 +---------- homeassistant/components/roomba/roomba.py | 89 ----- homeassistant/components/roomba/vacuum.py | 388 +++++++++++++++++++++- 4 files changed, 387 insertions(+), 405 deletions(-) delete mode 100644 homeassistant/components/roomba/braava.py delete mode 100644 homeassistant/components/roomba/roomba.py diff --git a/homeassistant/components/roomba/braava.py b/homeassistant/components/roomba/braava.py deleted file mode 100644 index 8744561b2c5..00000000000 --- a/homeassistant/components/roomba/braava.py +++ /dev/null @@ -1,128 +0,0 @@ -"""Class for Braava devices.""" - -import logging - -from homeassistant.components.vacuum import VacuumEntityFeature - -from .entity import SUPPORT_IROBOT, IRobotVacuum - -_LOGGER = logging.getLogger(__name__) - -ATTR_DETECTED_PAD = "detected_pad" -ATTR_LID_CLOSED = "lid_closed" -ATTR_TANK_PRESENT = "tank_present" -ATTR_TANK_LEVEL = "tank_level" -ATTR_PAD_WETNESS = "spray_amount" - -OVERLAP_STANDARD = 67 -OVERLAP_DEEP = 85 -OVERLAP_EXTENDED = 25 -MOP_STANDARD = "Standard" -MOP_DEEP = "Deep" -MOP_EXTENDED = "Extended" -BRAAVA_MOP_BEHAVIORS = [MOP_STANDARD, MOP_DEEP, MOP_EXTENDED] -BRAAVA_SPRAY_AMOUNT = [1, 2, 3] - -# Braava Jets can set mopping behavior through fanspeed -SUPPORT_BRAAVA = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED - - -class BraavaJet(IRobotVacuum): # pylint: disable=hass-enforce-class-module - """Braava Jet.""" - - _attr_supported_features = SUPPORT_BRAAVA - - def __init__(self, roomba, blid): - """Initialize the Roomba handler.""" - super().__init__(roomba, blid) - - # Initialize fan speed list - self._attr_fan_speed_list = [ - f"{behavior}-{spray}" - for behavior in BRAAVA_MOP_BEHAVIORS - for spray in BRAAVA_SPRAY_AMOUNT - ] - - @property - def fan_speed(self): - """Return the fan speed of the vacuum cleaner.""" - # Mopping behavior and spray amount as fan speed - rank_overlap = self.vacuum_state.get("rankOverlap", {}) - behavior = None - if rank_overlap == OVERLAP_STANDARD: - behavior = MOP_STANDARD - elif rank_overlap == OVERLAP_DEEP: - behavior = MOP_DEEP - elif rank_overlap == OVERLAP_EXTENDED: - behavior = MOP_EXTENDED - pad_wetness = self.vacuum_state.get("padWetness", {}) - # "disposable" and "reusable" values are always the same - pad_wetness_value = pad_wetness.get("disposable") - return f"{behavior}-{pad_wetness_value}" - - async def async_set_fan_speed(self, fan_speed, **kwargs): - """Set fan speed.""" - try: - split = fan_speed.split("-", 1) - behavior = split[0] - spray = int(split[1]) - if behavior.capitalize() in BRAAVA_MOP_BEHAVIORS: - behavior = behavior.capitalize() - except IndexError: - _LOGGER.error( - "Fan speed error: expected {behavior}-{spray_amount}, got '%s'", - fan_speed, - ) - return - except ValueError: - _LOGGER.error("Spray amount error: expected integer, got '%s'", split[1]) - return - if behavior not in BRAAVA_MOP_BEHAVIORS: - _LOGGER.error( - "Mop behavior error: expected one of %s, got '%s'", - str(BRAAVA_MOP_BEHAVIORS), - behavior, - ) - return - if spray not in BRAAVA_SPRAY_AMOUNT: - _LOGGER.error( - "Spray amount error: expected one of %s, got '%d'", - str(BRAAVA_SPRAY_AMOUNT), - spray, - ) - return - - overlap = 0 - if behavior == MOP_STANDARD: - overlap = OVERLAP_STANDARD - elif behavior == MOP_DEEP: - overlap = OVERLAP_DEEP - else: - overlap = OVERLAP_EXTENDED - await self.hass.async_add_executor_job( - self.vacuum.set_preference, "rankOverlap", overlap - ) - await self.hass.async_add_executor_job( - self.vacuum.set_preference, - "padWetness", - {"disposable": spray, "reusable": spray}, - ) - - @property - def extra_state_attributes(self): - """Return the state attributes of the device.""" - state_attrs = super().extra_state_attributes - - # Get Braava state - state = self.vacuum_state - detected_pad = state.get("detectedPad") - mop_ready = state.get("mopReady", {}) - lid_closed = mop_ready.get("lidClosed") - tank_present = mop_ready.get("tankPresent") - tank_level = state.get("tankLvl") - state_attrs[ATTR_DETECTED_PAD] = detected_pad - state_attrs[ATTR_LID_CLOSED] = lid_closed - state_attrs[ATTR_TANK_PRESENT] = tank_present - state_attrs[ATTR_TANK_LEVEL] = tank_level - - return state_attrs diff --git a/homeassistant/components/roomba/entity.py b/homeassistant/components/roomba/entity.py index 10c3d36de12..d55a260e53a 100644 --- a/homeassistant/components/roomba/entity.py +++ b/homeassistant/components/roomba/entity.py @@ -2,62 +2,15 @@ from __future__ import annotations -import asyncio -import logging - -from homeassistant.components.vacuum import ( - ATTR_STATUS, - STATE_CLEANING, - STATE_DOCKED, - STATE_ERROR, - STATE_RETURNING, - StateVacuumEntity, - VacuumEntityFeature, -) -from homeassistant.const import ATTR_CONNECTIONS, STATE_IDLE, STATE_PAUSED +from homeassistant.const import ATTR_CONNECTIONS import homeassistant.helpers.device_registry as dr from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.entity import Entity import homeassistant.util.dt as dt_util -from homeassistant.util.unit_system import METRIC_SYSTEM from . import roomba_reported_state from .const import DOMAIN -_LOGGER = logging.getLogger(__name__) - -ATTR_CLEANING_TIME = "cleaning_time" -ATTR_CLEANED_AREA = "cleaned_area" -ATTR_ERROR = "error" -ATTR_ERROR_CODE = "error_code" -ATTR_POSITION = "position" -ATTR_SOFTWARE_VERSION = "software_version" - -# Commonly supported features -SUPPORT_IROBOT = ( - VacuumEntityFeature.BATTERY - | VacuumEntityFeature.PAUSE - | VacuumEntityFeature.RETURN_HOME - | VacuumEntityFeature.SEND_COMMAND - | VacuumEntityFeature.START - | VacuumEntityFeature.STATE - | VacuumEntityFeature.STOP - | VacuumEntityFeature.LOCATE -) - -STATE_MAP = { - "": STATE_IDLE, - "charge": STATE_DOCKED, - "evac": STATE_RETURNING, # Emptying at cleanbase - "hmMidMsn": STATE_CLEANING, # Recharging at the middle of a cycle - "hmPostMsn": STATE_RETURNING, # Cycle finished - "hmUsrDock": STATE_RETURNING, - "pause": STATE_PAUSED, - "run": STATE_CLEANING, - "stop": STATE_IDLE, - "stuck": STATE_ERROR, -} - class IRobotEntity(Entity): """Base class for iRobot Entities.""" @@ -65,7 +18,7 @@ class IRobotEntity(Entity): _attr_should_poll = False _attr_has_entity_name = True - def __init__(self, roomba, blid): + def __init__(self, roomba, blid) -> None: """Initialize the iRobot handler.""" self.vacuum = roomba self._blid = blid @@ -127,20 +80,6 @@ class IRobotEntity(Entity): return None return dt_util.utc_from_timestamp(ts) - @property - def _robot_state(self): - """Return the state of the vacuum cleaner.""" - clean_mission_status = self.vacuum_state.get("cleanMissionStatus", {}) - cycle = clean_mission_status.get("cycle") - phase = clean_mission_status.get("phase") - try: - state = STATE_MAP[phase] - except KeyError: - return STATE_ERROR - if cycle != "none" and state in (STATE_IDLE, STATE_DOCKED): - state = STATE_PAUSED - return state - async def async_added_to_hass(self): """Register callback function.""" self.vacuum.register_on_message_callback(self.on_message) @@ -154,125 +93,3 @@ class IRobotEntity(Entity): state = json_data.get("state", {}).get("reported", {}) if self.new_state_filter(state): self.schedule_update_ha_state() - - -class IRobotVacuum(IRobotEntity, StateVacuumEntity): # pylint: disable=hass-enforce-class-module - """Base class for iRobot robots.""" - - _attr_name = None - _attr_supported_features = SUPPORT_IROBOT - _attr_available = True # Always available, otherwise setup will fail - - def __init__(self, roomba, blid): - """Initialize the iRobot handler.""" - super().__init__(roomba, blid) - self._cap_position = self.vacuum_state.get("cap", {}).get("pose") == 1 - - @property - def state(self): - """Return the state of the vacuum cleaner.""" - return self._robot_state - - @property - def extra_state_attributes(self): - """Return the state attributes of the device.""" - state = self.vacuum_state - - # Roomba software version - software_version = state.get("softwareVer") - - # Set properties that are to appear in the GUI - state_attrs = {ATTR_SOFTWARE_VERSION: software_version} - - # Set legacy status to avoid break changes - state_attrs[ATTR_STATUS] = self.vacuum.current_state - - # Only add cleaning time and cleaned area attrs when the vacuum is - # currently on - if self.state == STATE_CLEANING: - # Get clean mission status - ( - state_attrs[ATTR_CLEANING_TIME], - state_attrs[ATTR_CLEANED_AREA], - ) = self.get_cleaning_status(state) - - # Error - if self.vacuum.error_code != 0: - state_attrs[ATTR_ERROR] = self.vacuum.error_message - state_attrs[ATTR_ERROR_CODE] = self.vacuum.error_code - - # Not all Roombas expose position data - # https://github.com/koalazak/dorita980/issues/48 - if self._cap_position: - pos_state = state.get("pose", {}) - position = None - pos_x = pos_state.get("point", {}).get("x") - pos_y = pos_state.get("point", {}).get("y") - theta = pos_state.get("theta") - if all(item is not None for item in (pos_x, pos_y, theta)): - position = f"({pos_x}, {pos_y}, {theta})" - state_attrs[ATTR_POSITION] = position - - return state_attrs - - def get_cleaning_status(self, state) -> tuple[int, int]: - """Return the cleaning time and cleaned area from the device.""" - if not (mission_state := state.get("cleanMissionStatus")): - return (0, 0) - - if cleaning_time := mission_state.get("mssnM", 0): - pass - elif start_time := mission_state.get("mssnStrtTm"): - now = dt_util.as_timestamp(dt_util.utcnow()) - if now > start_time: - cleaning_time = (now - start_time) // 60 - - if cleaned_area := mission_state.get("sqft", 0): # Imperial - # Convert to m2 if the unit_system is set to metric - if self.hass.config.units is METRIC_SYSTEM: - cleaned_area = round(cleaned_area * 0.0929) - - return (cleaning_time, cleaned_area) - - def on_message(self, json_data): - """Update state on message change.""" - state = json_data.get("state", {}).get("reported", {}) - if self.new_state_filter(state): - _LOGGER.debug("Got new state from the vacuum: %s", json_data) - self.schedule_update_ha_state() - - async def async_start(self): - """Start or resume the cleaning task.""" - if self.state == STATE_PAUSED: - await self.hass.async_add_executor_job(self.vacuum.send_command, "resume") - else: - await self.hass.async_add_executor_job(self.vacuum.send_command, "start") - - async def async_stop(self, **kwargs): - """Stop the vacuum cleaner.""" - await self.hass.async_add_executor_job(self.vacuum.send_command, "stop") - - async def async_pause(self): - """Pause the cleaning cycle.""" - await self.hass.async_add_executor_job(self.vacuum.send_command, "pause") - - async def async_return_to_base(self, **kwargs): - """Set the vacuum cleaner to return to the dock.""" - if self.state == STATE_CLEANING: - await self.async_pause() - for _ in range(10): - if self.state == STATE_PAUSED: - break - await asyncio.sleep(1) - await self.hass.async_add_executor_job(self.vacuum.send_command, "dock") - - async def async_locate(self, **kwargs): - """Located vacuum.""" - await self.hass.async_add_executor_job(self.vacuum.send_command, "find") - - async def async_send_command(self, command, params=None, **kwargs): - """Send raw command.""" - _LOGGER.debug("async_send_command %s (%s), %s", command, params, kwargs) - await self.hass.async_add_executor_job( - self.vacuum.send_command, command, params - ) diff --git a/homeassistant/components/roomba/roomba.py b/homeassistant/components/roomba/roomba.py deleted file mode 100644 index 917fbb2bfff..00000000000 --- a/homeassistant/components/roomba/roomba.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Class for Roomba devices.""" - -import logging - -from homeassistant.components.vacuum import VacuumEntityFeature - -from .entity import SUPPORT_IROBOT, IRobotVacuum - -_LOGGER = logging.getLogger(__name__) - -ATTR_BIN_FULL = "bin_full" -ATTR_BIN_PRESENT = "bin_present" - -FAN_SPEED_AUTOMATIC = "Automatic" -FAN_SPEED_ECO = "Eco" -FAN_SPEED_PERFORMANCE = "Performance" -FAN_SPEEDS = [FAN_SPEED_AUTOMATIC, FAN_SPEED_ECO, FAN_SPEED_PERFORMANCE] - -# Only Roombas with CarpetBost can set their fanspeed -SUPPORT_ROOMBA_CARPET_BOOST = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED - - -class RoombaVacuum(IRobotVacuum): # pylint: disable=hass-enforce-class-module - """Basic Roomba robot (without carpet boost).""" - - @property - def extra_state_attributes(self): - """Return the state attributes of the device.""" - state_attrs = super().extra_state_attributes - - # Get bin state - bin_raw_state = self.vacuum_state.get("bin", {}) - bin_state = {} - if bin_raw_state.get("present") is not None: - bin_state[ATTR_BIN_PRESENT] = bin_raw_state.get("present") - if bin_raw_state.get("full") is not None: - bin_state[ATTR_BIN_FULL] = bin_raw_state.get("full") - state_attrs.update(bin_state) - - return state_attrs - - -class RoombaVacuumCarpetBoost(RoombaVacuum): # pylint: disable=hass-enforce-class-module - """Roomba robot with carpet boost.""" - - _attr_fan_speed_list = FAN_SPEEDS - _attr_supported_features = SUPPORT_ROOMBA_CARPET_BOOST - - @property - def fan_speed(self): - """Return the fan speed of the vacuum cleaner.""" - fan_speed = None - carpet_boost = self.vacuum_state.get("carpetBoost") - high_perf = self.vacuum_state.get("vacHigh") - if carpet_boost is not None and high_perf is not None: - if carpet_boost: - fan_speed = FAN_SPEED_AUTOMATIC - elif high_perf: - fan_speed = FAN_SPEED_PERFORMANCE - else: # carpet_boost and high_perf are False - fan_speed = FAN_SPEED_ECO - return fan_speed - - async def async_set_fan_speed(self, fan_speed, **kwargs): - """Set fan speed.""" - if fan_speed.capitalize() in FAN_SPEEDS: - fan_speed = fan_speed.capitalize() - _LOGGER.debug("Set fan speed to: %s", fan_speed) - high_perf = None - carpet_boost = None - if fan_speed == FAN_SPEED_AUTOMATIC: - high_perf = False - carpet_boost = True - elif fan_speed == FAN_SPEED_ECO: - high_perf = False - carpet_boost = False - elif fan_speed == FAN_SPEED_PERFORMANCE: - high_perf = True - carpet_boost = False - else: - _LOGGER.error("No such fan speed available: %s", fan_speed) - return - # The set_preference method does only accept string values - await self.hass.async_add_executor_job( - self.vacuum.set_preference, "carpetBoost", str(carpet_boost) - ) - await self.hass.async_add_executor_job( - self.vacuum.set_preference, "vacHigh", str(high_perf) - ) diff --git a/homeassistant/components/roomba/vacuum.py b/homeassistant/components/roomba/vacuum.py index a45b8eea632..9024e54087d 100644 --- a/homeassistant/components/roomba/vacuum.py +++ b/homeassistant/components/roomba/vacuum.py @@ -2,16 +2,92 @@ from __future__ import annotations +import asyncio +import logging +from typing import Any + +from homeassistant.components.vacuum import ( + ATTR_STATUS, + STATE_CLEANING, + STATE_DOCKED, + STATE_ERROR, + STATE_RETURNING, + StateVacuumEntity, + VacuumEntityFeature, +) from homeassistant.config_entries import ConfigEntry +from homeassistant.const import STATE_IDLE, STATE_PAUSED from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.util import dt as dt_util +from homeassistant.util.unit_system import METRIC_SYSTEM from . import roomba_reported_state -from .braava import BraavaJet from .const import DOMAIN -from .entity import IRobotVacuum +from .entity import IRobotEntity from .models import RoombaData -from .roomba import RoombaVacuum, RoombaVacuumCarpetBoost + +SUPPORT_IROBOT = ( + VacuumEntityFeature.BATTERY + | VacuumEntityFeature.PAUSE + | VacuumEntityFeature.RETURN_HOME + | VacuumEntityFeature.SEND_COMMAND + | VacuumEntityFeature.START + | VacuumEntityFeature.STATE + | VacuumEntityFeature.STOP + | VacuumEntityFeature.LOCATE +) + +STATE_MAP = { + "": STATE_IDLE, + "charge": STATE_DOCKED, + "evac": STATE_RETURNING, # Emptying at cleanbase + "hmMidMsn": STATE_CLEANING, # Recharging at the middle of a cycle + "hmPostMsn": STATE_RETURNING, # Cycle finished + "hmUsrDock": STATE_RETURNING, + "pause": STATE_PAUSED, + "run": STATE_CLEANING, + "stop": STATE_IDLE, + "stuck": STATE_ERROR, +} + +_LOGGER = logging.getLogger(__name__) +ATTR_SOFTWARE_VERSION = "software_version" +ATTR_CLEANING_TIME = "cleaning_time" +ATTR_CLEANED_AREA = "cleaned_area" +ATTR_ERROR = "error" +ATTR_ERROR_CODE = "error_code" +ATTR_POSITION = "position" +ATTR_SOFTWARE_VERSION = "software_version" + +ATTR_BIN_FULL = "bin_full" +ATTR_BIN_PRESENT = "bin_present" + +FAN_SPEED_AUTOMATIC = "Automatic" +FAN_SPEED_ECO = "Eco" +FAN_SPEED_PERFORMANCE = "Performance" +FAN_SPEEDS = [FAN_SPEED_AUTOMATIC, FAN_SPEED_ECO, FAN_SPEED_PERFORMANCE] + +# Only Roombas with CarpetBost can set their fanspeed +SUPPORT_ROOMBA_CARPET_BOOST = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED + +ATTR_DETECTED_PAD = "detected_pad" +ATTR_LID_CLOSED = "lid_closed" +ATTR_TANK_PRESENT = "tank_present" +ATTR_TANK_LEVEL = "tank_level" +ATTR_PAD_WETNESS = "spray_amount" + +OVERLAP_STANDARD = 67 +OVERLAP_DEEP = 85 +OVERLAP_EXTENDED = 25 +MOP_STANDARD = "Standard" +MOP_DEEP = "Deep" +MOP_EXTENDED = "Extended" +BRAAVA_MOP_BEHAVIORS = [MOP_STANDARD, MOP_DEEP, MOP_EXTENDED] +BRAAVA_SPRAY_AMOUNT = [1, 2, 3] + +# Braava Jets can set mopping behavior through fanspeed +SUPPORT_BRAAVA = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED async def async_setup_entry( @@ -39,3 +115,309 @@ async def async_setup_entry( roomba_vac = constructor(roomba, blid) async_add_entities([roomba_vac]) + + +class IRobotVacuum(IRobotEntity, StateVacuumEntity): + """Base class for iRobot robots.""" + + _attr_name = None + _attr_supported_features = SUPPORT_IROBOT + _attr_available = True # Always available, otherwise setup will fail + + def __init__(self, roomba, blid) -> None: + """Initialize the iRobot handler.""" + super().__init__(roomba, blid) + self._cap_position = self.vacuum_state.get("cap", {}).get("pose") == 1 + + @property + def _robot_state(self): + """Return the state of the vacuum cleaner.""" + clean_mission_status = self.vacuum_state.get("cleanMissionStatus", {}) + cycle = clean_mission_status.get("cycle") + phase = clean_mission_status.get("phase") + try: + state = STATE_MAP[phase] + except KeyError: + return STATE_ERROR + if cycle != "none" and state in (STATE_IDLE, STATE_DOCKED): + state = STATE_PAUSED + return state + + @property + def state(self) -> str: + """Return the state of the vacuum cleaner.""" + return self._robot_state + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return the state attributes of the device.""" + state = self.vacuum_state + + # Roomba software version + software_version = state.get("softwareVer") + + # Set properties that are to appear in the GUI + state_attrs = {ATTR_SOFTWARE_VERSION: software_version} + + # Set legacy status to avoid break changes + state_attrs[ATTR_STATUS] = self.vacuum.current_state + + # Only add cleaning time and cleaned area attrs when the vacuum is + # currently on + if self.state == STATE_CLEANING: + # Get clean mission status + ( + state_attrs[ATTR_CLEANING_TIME], + state_attrs[ATTR_CLEANED_AREA], + ) = self.get_cleaning_status(state) + + # Error + if self.vacuum.error_code != 0: + state_attrs[ATTR_ERROR] = self.vacuum.error_message + state_attrs[ATTR_ERROR_CODE] = self.vacuum.error_code + + # Not all Roombas expose position data + # https://github.com/koalazak/dorita980/issues/48 + if self._cap_position: + pos_state = state.get("pose", {}) + position = None + pos_x = pos_state.get("point", {}).get("x") + pos_y = pos_state.get("point", {}).get("y") + theta = pos_state.get("theta") + if all(item is not None for item in (pos_x, pos_y, theta)): + position = f"({pos_x}, {pos_y}, {theta})" + state_attrs[ATTR_POSITION] = position + + return state_attrs + + def get_cleaning_status(self, state) -> tuple[int, int]: + """Return the cleaning time and cleaned area from the device.""" + if not (mission_state := state.get("cleanMissionStatus")): + return (0, 0) + + if cleaning_time := mission_state.get("mssnM", 0): + pass + elif start_time := mission_state.get("mssnStrtTm"): + now = dt_util.as_timestamp(dt_util.utcnow()) + if now > start_time: + cleaning_time = (now - start_time) // 60 + + if cleaned_area := mission_state.get("sqft", 0): # Imperial + # Convert to m2 if the unit_system is set to metric + if self.hass.config.units is METRIC_SYSTEM: + cleaned_area = round(cleaned_area * 0.0929) + + return (cleaning_time, cleaned_area) + + def on_message(self, json_data): + """Update state on message change.""" + state = json_data.get("state", {}).get("reported", {}) + if self.new_state_filter(state): + _LOGGER.debug("Got new state from the vacuum: %s", json_data) + self.schedule_update_ha_state() + + async def async_start(self) -> None: + """Start or resume the cleaning task.""" + if self.state == STATE_PAUSED: + await self.hass.async_add_executor_job(self.vacuum.send_command, "resume") + else: + await self.hass.async_add_executor_job(self.vacuum.send_command, "start") + + async def async_stop(self, **kwargs): + """Stop the vacuum cleaner.""" + await self.hass.async_add_executor_job(self.vacuum.send_command, "stop") + + async def async_pause(self) -> None: + """Pause the cleaning cycle.""" + await self.hass.async_add_executor_job(self.vacuum.send_command, "pause") + + async def async_return_to_base(self, **kwargs): + """Set the vacuum cleaner to return to the dock.""" + if self.state == STATE_CLEANING: + await self.async_pause() + for _ in range(10): + if self.state == STATE_PAUSED: + break + await asyncio.sleep(1) + await self.hass.async_add_executor_job(self.vacuum.send_command, "dock") + + async def async_locate(self, **kwargs): + """Located vacuum.""" + await self.hass.async_add_executor_job(self.vacuum.send_command, "find") + + async def async_send_command(self, command, params=None, **kwargs): + """Send raw command.""" + _LOGGER.debug("async_send_command %s (%s), %s", command, params, kwargs) + await self.hass.async_add_executor_job( + self.vacuum.send_command, command, params + ) + + +class RoombaVacuum(IRobotVacuum): + """Basic Roomba robot (without carpet boost).""" + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return the state attributes of the device.""" + state_attrs = super().extra_state_attributes + + # Get bin state + bin_raw_state = self.vacuum_state.get("bin", {}) + bin_state = {} + if bin_raw_state.get("present") is not None: + bin_state[ATTR_BIN_PRESENT] = bin_raw_state.get("present") + if bin_raw_state.get("full") is not None: + bin_state[ATTR_BIN_FULL] = bin_raw_state.get("full") + state_attrs.update(bin_state) + + return state_attrs + + +class RoombaVacuumCarpetBoost(RoombaVacuum): + """Roomba robot with carpet boost.""" + + _attr_fan_speed_list = FAN_SPEEDS + _attr_supported_features = SUPPORT_ROOMBA_CARPET_BOOST + + @property + def fan_speed(self): + """Return the fan speed of the vacuum cleaner.""" + fan_speed = None + carpet_boost = self.vacuum_state.get("carpetBoost") + high_perf = self.vacuum_state.get("vacHigh") + if carpet_boost is not None and high_perf is not None: + if carpet_boost: + fan_speed = FAN_SPEED_AUTOMATIC + elif high_perf: + fan_speed = FAN_SPEED_PERFORMANCE + else: # carpet_boost and high_perf are False + fan_speed = FAN_SPEED_ECO + return fan_speed + + async def async_set_fan_speed(self, fan_speed, **kwargs): + """Set fan speed.""" + if fan_speed.capitalize() in FAN_SPEEDS: + fan_speed = fan_speed.capitalize() + _LOGGER.debug("Set fan speed to: %s", fan_speed) + high_perf = None + carpet_boost = None + if fan_speed == FAN_SPEED_AUTOMATIC: + high_perf = False + carpet_boost = True + elif fan_speed == FAN_SPEED_ECO: + high_perf = False + carpet_boost = False + elif fan_speed == FAN_SPEED_PERFORMANCE: + high_perf = True + carpet_boost = False + else: + _LOGGER.error("No such fan speed available: %s", fan_speed) + return + # The set_preference method does only accept string values + await self.hass.async_add_executor_job( + self.vacuum.set_preference, "carpetBoost", str(carpet_boost) + ) + await self.hass.async_add_executor_job( + self.vacuum.set_preference, "vacHigh", str(high_perf) + ) + + +class BraavaJet(IRobotVacuum): + """Braava Jet.""" + + _attr_supported_features = SUPPORT_BRAAVA + + def __init__(self, roomba, blid) -> None: + """Initialize the Roomba handler.""" + super().__init__(roomba, blid) + + # Initialize fan speed list + self._attr_fan_speed_list = [ + f"{behavior}-{spray}" + for behavior in BRAAVA_MOP_BEHAVIORS + for spray in BRAAVA_SPRAY_AMOUNT + ] + + @property + def fan_speed(self): + """Return the fan speed of the vacuum cleaner.""" + # Mopping behavior and spray amount as fan speed + rank_overlap = self.vacuum_state.get("rankOverlap", {}) + behavior = None + if rank_overlap == OVERLAP_STANDARD: + behavior = MOP_STANDARD + elif rank_overlap == OVERLAP_DEEP: + behavior = MOP_DEEP + elif rank_overlap == OVERLAP_EXTENDED: + behavior = MOP_EXTENDED + pad_wetness = self.vacuum_state.get("padWetness", {}) + # "disposable" and "reusable" values are always the same + pad_wetness_value = pad_wetness.get("disposable") + return f"{behavior}-{pad_wetness_value}" + + async def async_set_fan_speed(self, fan_speed, **kwargs): + """Set fan speed.""" + try: + split = fan_speed.split("-", 1) + behavior = split[0] + spray = int(split[1]) + if behavior.capitalize() in BRAAVA_MOP_BEHAVIORS: + behavior = behavior.capitalize() + except IndexError: + _LOGGER.error( + "Fan speed error: expected {behavior}-{spray_amount}, got '%s'", + fan_speed, + ) + return + except ValueError: + _LOGGER.error("Spray amount error: expected integer, got '%s'", split[1]) + return + if behavior not in BRAAVA_MOP_BEHAVIORS: + _LOGGER.error( + "Mop behavior error: expected one of %s, got '%s'", + str(BRAAVA_MOP_BEHAVIORS), + behavior, + ) + return + if spray not in BRAAVA_SPRAY_AMOUNT: + _LOGGER.error( + "Spray amount error: expected one of %s, got '%d'", + str(BRAAVA_SPRAY_AMOUNT), + spray, + ) + return + + overlap = 0 + if behavior == MOP_STANDARD: + overlap = OVERLAP_STANDARD + elif behavior == MOP_DEEP: + overlap = OVERLAP_DEEP + else: + overlap = OVERLAP_EXTENDED + await self.hass.async_add_executor_job( + self.vacuum.set_preference, "rankOverlap", overlap + ) + await self.hass.async_add_executor_job( + self.vacuum.set_preference, + "padWetness", + {"disposable": spray, "reusable": spray}, + ) + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return the state attributes of the device.""" + state_attrs = super().extra_state_attributes + + # Get Braava state + state = self.vacuum_state + detected_pad = state.get("detectedPad") + mop_ready = state.get("mopReady", {}) + lid_closed = mop_ready.get("lidClosed") + tank_present = mop_ready.get("tankPresent") + tank_level = state.get("tankLvl") + state_attrs[ATTR_DETECTED_PAD] = detected_pad + state_attrs[ATTR_LID_CLOSED] = lid_closed + state_attrs[ATTR_TANK_PRESENT] = tank_present + state_attrs[ATTR_TANK_LEVEL] = tank_level + + return state_attrs