Incorporate various improvements for the ws66i integration (#71717)

* Improve readability and remove unused code

* Remove ws66i custom services. Scenes can be used instead.

* Unmute WS66i Zone when volume changes

* Raise CannotConnect instead of ConnectionError in validation method

* Move _verify_connection() method to module level
pull/70973/head
Shawn Saenger 2022-05-29 10:33:33 -06:00 committed by GitHub
parent 5031c3c8b4
commit 1d57626ff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 251 additions and 409 deletions

View File

@ -94,8 +94,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
zones=zones,
)
@callback
def shutdown(event):
"""Close the WS66i connection to the amplifier and save snapshots."""
"""Close the WS66i connection to the amplifier."""
ws66i.close()
entry.async_on_unload(entry.add_update_listener(_update_listener))
@ -119,6 +120,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry):
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)

View File

@ -1,5 +1,6 @@
"""Config flow for WS66i 6-Zone Amplifier integration."""
import logging
from typing import Any
from pyws66i import WS66i, get_ws66i
import voluptuous as vol
@ -50,22 +51,34 @@ def _sources_from_config(data):
}
async def validate_input(hass: core.HomeAssistant, input_data):
"""Validate the user input allows us to connect.
def _verify_connection(ws66i: WS66i) -> bool:
"""Verify a connection can be made to the WS66i."""
try:
ws66i.open()
except ConnectionError as err:
raise CannotConnect from err
# Connection successful. Verify correct port was opened
# Test on FIRST_ZONE because this zone will always be valid
ret_val = ws66i.zone_status(FIRST_ZONE)
ws66i.close()
return bool(ret_val)
async def validate_input(
hass: core.HomeAssistant, input_data: dict[str, Any]
) -> dict[str, Any]:
"""Validate the user input.
Data has the keys from DATA_SCHEMA with values provided by the user.
"""
ws66i: WS66i = get_ws66i(input_data[CONF_IP_ADDRESS])
await hass.async_add_executor_job(ws66i.open)
# No exception. run a simple test to make sure we opened correct port
# Test on FIRST_ZONE because this zone will always be valid
ret_val = await hass.async_add_executor_job(ws66i.zone_status, FIRST_ZONE)
if ret_val is None:
ws66i.close()
raise ConnectionError("Not a valid WS66i connection")
# Validation done. No issues. Close the connection
ws66i.close()
is_valid: bool = await hass.async_add_executor_job(_verify_connection, ws66i)
if not is_valid:
raise CannotConnect("Not a valid WS66i connection")
# Return info that you want to store in the config entry.
return {CONF_IP_ADDRESS: input_data[CONF_IP_ADDRESS]}
@ -82,17 +95,18 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
# Data is valid. Add default values for options flow.
except CannotConnect:
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
# Data is valid. Create a config entry.
return self.async_create_entry(
title="WS66i Amp",
data=info,
options={CONF_SOURCES: INIT_OPTIONS_DEFAULT},
)
except ConnectionError:
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors

View File

@ -1,4 +1,5 @@
"""Constants for the Soundavo WS66i 6-Zone Amplifier Media Player component."""
from datetime import timedelta
DOMAIN = "ws66i"
@ -20,5 +21,6 @@ INIT_OPTIONS_DEFAULT = {
"6": "Source 6",
}
SERVICE_SNAPSHOT = "snapshot"
SERVICE_RESTORE = "restore"
POLL_INTERVAL = timedelta(seconds=30)
MAX_VOL = 38

View File

@ -1,7 +1,6 @@
"""Coordinator for WS66i."""
from __future__ import annotations
from datetime import timedelta
import logging
from pyws66i import WS66i, ZoneStatus
@ -9,12 +8,12 @@ from pyws66i import WS66i, ZoneStatus
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import POLL_INTERVAL
_LOGGER = logging.getLogger(__name__)
POLL_INTERVAL = timedelta(seconds=30)
class Ws66iDataUpdateCoordinator(DataUpdateCoordinator):
class Ws66iDataUpdateCoordinator(DataUpdateCoordinator[list[ZoneStatus]]):
"""DataUpdateCoordinator to gather data for WS66i Zones."""
def __init__(
@ -43,11 +42,9 @@ class Ws66iDataUpdateCoordinator(DataUpdateCoordinator):
data.append(data_zone)
# HA will call my entity's _handle_coordinator_update()
return data
async def _async_update_data(self) -> list[ZoneStatus]:
"""Fetch data for each of the zones."""
# HA will call my entity's _handle_coordinator_update()
# The data I pass back here can be accessed through coordinator.data.
# The data that is returned here can be accessed through coordinator.data.
return await self.hass.async_add_executor_job(self._update_all_zones)

View File

@ -1,6 +1,4 @@
"""Support for interfacing with WS66i 6 zone home audio controller."""
from copy import deepcopy
from pyws66i import WS66i, ZoneStatus
from homeassistant.components.media_player import (
@ -10,22 +8,16 @@ from homeassistant.components.media_player import (
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import (
AddEntitiesCallback,
async_get_current_platform,
)
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, SERVICE_RESTORE, SERVICE_SNAPSHOT
from .const import DOMAIN, MAX_VOL
from .coordinator import Ws66iDataUpdateCoordinator
from .models import Ws66iData
PARALLEL_UPDATES = 1
MAX_VOL = 38
async def async_setup_entry(
hass: HomeAssistant,
@ -48,23 +40,8 @@ async def async_setup_entry(
for idx, zone_id in enumerate(ws66i_data.zones)
)
# Set up services
platform = async_get_current_platform()
platform.async_register_entity_service(
SERVICE_SNAPSHOT,
{},
"snapshot",
)
platform.async_register_entity_service(
SERVICE_RESTORE,
{},
"async_restore",
)
class Ws66iZone(CoordinatorEntity, MediaPlayerEntity):
class Ws66iZone(CoordinatorEntity[Ws66iDataUpdateCoordinator], MediaPlayerEntity):
"""Representation of a WS66i amplifier zone."""
def __init__(
@ -82,8 +59,6 @@ class Ws66iZone(CoordinatorEntity, MediaPlayerEntity):
self._ws66i_data: Ws66iData = ws66i_data
self._zone_id: int = zone_id
self._zone_id_idx: int = data_idx
self._coordinator = coordinator
self._snapshot: ZoneStatus = None
self._status: ZoneStatus = coordinator.data[data_idx]
self._attr_source_list = ws66i_data.sources.name_list
self._attr_unique_id = f"{entry_id}_{self._zone_id}"
@ -131,20 +106,6 @@ class Ws66iZone(CoordinatorEntity, MediaPlayerEntity):
self._set_attrs_from_status()
self.async_write_ha_state()
@callback
def snapshot(self):
"""Save zone's current state."""
self._snapshot = deepcopy(self._status)
async def async_restore(self):
"""Restore saved state."""
if not self._snapshot:
raise HomeAssistantError("There is no snapshot to restore")
await self.hass.async_add_executor_job(self._ws66i.restore_zone, self._snapshot)
self._status = self._snapshot
self._async_update_attrs_write_ha_state()
async def async_select_source(self, source):
"""Set input source."""
idx = self._ws66i_data.sources.name_id[source]
@ -180,24 +141,30 @@ class Ws66iZone(CoordinatorEntity, MediaPlayerEntity):
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, int(volume * MAX_VOL)
)
self._status.volume = int(volume * MAX_VOL)
await self.hass.async_add_executor_job(self._set_volume, int(volume * MAX_VOL))
self._async_update_attrs_write_ha_state()
async def async_volume_up(self):
"""Volume up the media player."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, min(self._status.volume + 1, MAX_VOL)
self._set_volume, min(self._status.volume + 1, MAX_VOL)
)
self._status.volume = min(self._status.volume + 1, MAX_VOL)
self._async_update_attrs_write_ha_state()
async def async_volume_down(self):
"""Volume down media player."""
await self.hass.async_add_executor_job(
self._ws66i.set_volume, self._zone_id, max(self._status.volume - 1, 0)
self._set_volume, max(self._status.volume - 1, 0)
)
self._status.volume = max(self._status.volume - 1, 0)
self._async_update_attrs_write_ha_state()
def _set_volume(self, volume: int) -> None:
"""Set the volume of the media player."""
# Can't set a new volume level when this zone is muted.
# Follow behavior of keypads, where zone is unmuted when volume changes.
if self._status.mute:
self._ws66i.set_mute(self._zone_id, False)
self._status.mute = False
self._ws66i.set_volume(self._zone_id, volume)
self._status.volume = volume

View File

@ -7,8 +7,6 @@ from pyws66i import WS66i
from .coordinator import Ws66iDataUpdateCoordinator
# A dataclass is basically a struct in C/C++
@dataclass
class SourceRep:

View File

@ -1,15 +0,0 @@
snapshot:
name: Snapshot
description: Take a snapshot of the media player zone.
target:
entity:
integration: ws66i
domain: media_player
restore:
name: Restore
description: Restore a snapshot of the media player zone.
target:
entity:
integration: ws66i
domain: media_player

View File

@ -11,9 +11,6 @@
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"options": {

View File

@ -1,8 +1,5 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
},
"error": {
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"

View File

@ -1,7 +1,7 @@
"""Test the WS66i 6-Zone Amplifier config flow."""
from unittest.mock import patch
from homeassistant import config_entries, data_entry_flow, setup
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.ws66i.const import (
CONF_SOURCE_1,
CONF_SOURCE_2,
@ -15,15 +15,15 @@ from homeassistant.components.ws66i.const import (
)
from homeassistant.const import CONF_IP_ADDRESS
from .test_media_player import AttrDict
from tests.common import MockConfigEntry
from tests.components.ws66i.test_media_player import AttrDict
CONFIG = {CONF_IP_ADDRESS: "1.1.1.1"}
async def test_form(hass):
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)

View File

@ -0,0 +1,80 @@
"""Test the WS66i 6-Zone Amplifier init file."""
from unittest.mock import patch
from homeassistant.components.ws66i.const import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from .test_media_player import (
MOCK_CONFIG,
MOCK_DEFAULT_OPTIONS,
MOCK_OPTIONS,
MockWs66i,
)
from tests.common import MockConfigEntry
ZONE_1_ID = "media_player.zone_11"
async def test_cannot_connect(hass):
"""Test connection error."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(fail_open=True),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.SETUP_RETRY
assert hass.states.get(ZONE_1_ID) is None
async def test_cannot_connect_2(hass):
"""Test connection error pt 2."""
# Another way to test same case as test_cannot_connect
ws66i = MockWs66i()
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_DEFAULT_OPTIONS
)
config_entry.add_to_hass(hass)
with patch.object(MockWs66i, "open", side_effect=ConnectionError):
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: ws66i,
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.SETUP_RETRY
assert hass.states.get(ZONE_1_ID) is None
async def test_unload_config_entry(hass):
"""Test unloading config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN][config_entry.entry_id]
with patch.object(MockWs66i, "close") as method_call:
await config_entry.async_unload(hass)
await hass.async_block_till_done()
assert method_call.called
assert not hass.data[DOMAIN]

View File

@ -2,28 +2,22 @@
from collections import defaultdict
from unittest.mock import patch
import pytest
from homeassistant.components.media_player import MediaPlayerEntityFeature
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_INPUT_SOURCE_LIST,
ATTR_MEDIA_VOLUME_LEVEL,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_SELECT_SOURCE,
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.components.ws66i.const import (
CONF_SOURCES,
DOMAIN,
INIT_OPTIONS_DEFAULT,
SERVICE_RESTORE,
SERVICE_SNAPSHOT,
MAX_VOL,
POLL_INTERVAL,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import (
CONF_IP_ADDRESS,
SERVICE_TURN_OFF,
@ -35,10 +29,10 @@ from homeassistant.const import (
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from homeassistant.util.dt import utcnow
from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, async_fire_time_changed
MOCK_SOURCE_DIC = {
"1": "one",
@ -125,47 +119,52 @@ class MockWs66i:
async def test_setup_success(hass):
"""Test connection success."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(),
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get(ZONE_1_ID) is not None
assert config_entry.state is ConfigEntryState.LOADED
assert hass.states.get(ZONE_1_ID) is not None
async def _setup_ws66i(hass, ws66i) -> MockConfigEntry:
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_DEFAULT_OPTIONS
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: ws66i,
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_DEFAULT_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
return config_entry
async def _setup_ws66i_with_options(hass, ws66i) -> MockConfigEntry:
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: ws66i,
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
return config_entry
async def _call_media_player_service(hass, name, data):
@ -174,172 +173,10 @@ async def _call_media_player_service(hass, name, data):
)
async def _call_ws66i_service(hass, name, data):
await hass.services.async_call(DOMAIN, name, service_data=data, blocking=True)
async def test_cannot_connect(hass):
"""Test connection error."""
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(fail_open=True),
):
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get(ZONE_1_ID) is None
async def test_cannot_connect_2(hass):
"""Test connection error pt 2."""
# Another way to test same case as test_cannot_connect
ws66i = MockWs66i()
with patch.object(MockWs66i, "open", side_effect=ConnectionError):
await _setup_ws66i(hass, ws66i)
assert hass.states.get(ZONE_1_ID) is None
async def test_service_calls_with_entity_id(hass):
"""Test snapshot save/restore service calls."""
_ = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": ZONE_1_ID})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
await hass.async_block_till_done()
# Restoring other media player to its previous state
# The zone should not be restored
with pytest.raises(HomeAssistantError):
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_2_ID})
await hass.async_block_till_done()
# Checking that values were not (!) restored
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
async def test_service_calls_with_all_entities(hass):
"""Test snapshot save/restore service calls with entity id all."""
_ = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
# await coordinator.async_refresh()
# await hass.async_block_till_done()
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": "all"})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
async def test_service_calls_without_relevant_entities(hass):
"""Test snapshot save/restore service calls with bad entity id."""
config_entry = await _setup_ws66i_with_options(hass, MockWs66i())
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
await hass.async_block_till_done()
# Saving existing values
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": "all"})
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "three"}
)
await coordinator.async_refresh()
await hass.async_block_till_done()
# Restoring media player to its previous state
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": "light.demo"})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 1.0
assert state.attributes[ATTR_INPUT_SOURCE] == "three"
async def test_restore_without_snapshot(hass):
"""Test restore when snapshot wasn't called."""
await _setup_ws66i(hass, MockWs66i())
with patch.object(MockWs66i, "restore_zone") as method_call:
with pytest.raises(HomeAssistantError):
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
assert not method_call.called
async def test_update(hass):
"""Test updating values from ws66i."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
_ = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
@ -350,13 +187,10 @@ async def test_update(hass):
)
ws66i.set_source(11, 3)
ws66i.set_volume(11, 38)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
ws66i.set_volume(11, MAX_VOL)
with patch.object(MockWs66i, "open") as method_call:
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
assert not method_call.called
@ -371,7 +205,7 @@ async def test_update(hass):
async def test_failed_update(hass):
"""Test updating failure from ws66i."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
_ = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
@ -382,26 +216,25 @@ async def test_failed_update(hass):
)
ws66i.set_source(11, 3)
ws66i.set_volume(11, 38)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
ws66i.set_volume(11, MAX_VOL)
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
# Failed update, close called
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
assert hass.states.is_state(ZONE_1_ID, STATE_UNAVAILABLE)
# A connection re-attempt fails
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
# A connection re-attempt succeeds
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
# confirm entity is back on
@ -418,12 +251,12 @@ async def test_supported_features(hass):
state = hass.states.get(ZONE_1_ID)
assert (
SUPPORT_VOLUME_MUTE
| SUPPORT_VOLUME_SET
| SUPPORT_VOLUME_STEP
| SUPPORT_TURN_ON
| SUPPORT_TURN_OFF
| SUPPORT_SELECT_SOURCE
MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.SELECT_SOURCE
== state.attributes["supported_features"]
)
@ -462,15 +295,13 @@ async def test_select_source(hass):
async def test_source_select(hass):
"""Test behavior when device has unknown source."""
"""Test source selection simulated from keypad."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
_ = await _setup_ws66i_with_options(hass, ws66i)
ws66i.set_source(11, 5)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
@ -512,10 +343,7 @@ async def test_mute_volume(hass):
async def test_volume_up_down(hass):
"""Test increasing volume by one."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i(hass, ws66i)
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
_ = await _setup_ws66i(hass, ws66i)
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
@ -525,34 +353,89 @@ async def test_volume_up_down(hass):
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
# should not go below zero
assert ws66i.zones[11].volume == 0
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
assert ws66i.zones[11].volume == 1
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
assert ws66i.zones[11].volume == 38
assert ws66i.zones[11].volume == MAX_VOL
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
await coordinator.async_refresh()
async_fire_time_changed(hass, utcnow() + POLL_INTERVAL)
await hass.async_block_till_done()
# should not go above 38
assert ws66i.zones[11].volume == 38
# should not go above 38 (MAX_VOL)
assert ws66i.zones[11].volume == MAX_VOL
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
assert ws66i.zones[11].volume == 37
assert ws66i.zones[11].volume == MAX_VOL - 1
async def test_volume_while_mute(hass):
"""Test increasing volume by one."""
ws66i = MockWs66i()
_ = await _setup_ws66i(hass, ws66i)
# Set vol to a known value
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
assert ws66i.zones[11].volume == 0
# Set mute to a known value, False
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": False}
)
assert not ws66i.zones[11].mute
# Mute the zone
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": True}
)
assert ws66i.zones[11].mute
# Increase volume. Mute state should go back to unmutted
await _call_media_player_service(hass, SERVICE_VOLUME_UP, {"entity_id": ZONE_1_ID})
assert ws66i.zones[11].volume == 1
assert not ws66i.zones[11].mute
# Mute the zone again
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": True}
)
assert ws66i.zones[11].mute
# Decrease volume. Mute state should go back to unmutted
await _call_media_player_service(
hass, SERVICE_VOLUME_DOWN, {"entity_id": ZONE_1_ID}
)
assert ws66i.zones[11].volume == 0
assert not ws66i.zones[11].mute
# Mute the zone again
await _call_media_player_service(
hass, SERVICE_VOLUME_MUTE, {"entity_id": ZONE_1_ID, "is_volume_muted": True}
)
assert ws66i.zones[11].mute
# Set to max volume. Mute state should go back to unmutted
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
assert ws66i.zones[11].volume == MAX_VOL
assert not ws66i.zones[11].mute
async def test_first_run_with_available_zones(hass):
@ -611,82 +494,3 @@ async def test_register_entities_in_1_amp_only(hass):
entry = registry.async_get(ZONE_7_ID)
assert entry is None
async def test_unload_config_entry(hass):
"""Test unloading config entry."""
with patch(
"homeassistant.components.ws66i.get_ws66i",
new=lambda *a: MockWs66i(),
):
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_CONFIG, options=MOCK_OPTIONS
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN][config_entry.entry_id]
with patch.object(MockWs66i, "close") as method_call:
await config_entry.async_unload(hass)
await hass.async_block_till_done()
assert method_call.called
assert not hass.data[DOMAIN]
async def test_restore_snapshot_on_reconnect(hass):
"""Test restoring a saved snapshot when reconnecting to amp."""
ws66i = MockWs66i()
config_entry = await _setup_ws66i_with_options(hass, ws66i)
# Changing media player to new state
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 0.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "one"}
)
# Save a snapshot
await _call_ws66i_service(hass, SERVICE_SNAPSHOT, {"entity_id": ZONE_1_ID})
ws66i_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = ws66i_data.coordinator
# Failed update,
with patch.object(MockWs66i, "zone_status", return_value=None):
await coordinator.async_refresh()
await hass.async_block_till_done()
assert hass.states.is_state(ZONE_1_ID, STATE_UNAVAILABLE)
# A connection re-attempt succeeds
await coordinator.async_refresh()
await hass.async_block_till_done()
# confirm entity is back on
state = hass.states.get(ZONE_1_ID)
assert hass.states.is_state(ZONE_1_ID, STATE_ON)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"
# Change states
await _call_media_player_service(
hass, SERVICE_VOLUME_SET, {"entity_id": ZONE_1_ID, "volume_level": 1.0}
)
await _call_media_player_service(
hass, SERVICE_SELECT_SOURCE, {"entity_id": ZONE_1_ID, "source": "six"}
)
# Now confirm that the snapshot before the disconnect works
await _call_ws66i_service(hass, SERVICE_RESTORE, {"entity_id": ZONE_1_ID})
await hass.async_block_till_done()
state = hass.states.get(ZONE_1_ID)
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.0
assert state.attributes[ATTR_INPUT_SOURCE] == "one"