"""Tests for the PS4 media player platform.""" from asynctest import MagicMock, patch from pyps4_2ndscreen.credential import get_ddp_message from homeassistant.components import ps4 from homeassistant.components.media_player.const import ( ATTR_INPUT_SOURCE, ATTR_INPUT_SOURCE_LIST, ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE, ATTR_MEDIA_TITLE, MEDIA_TYPE_GAME, ) from homeassistant.components.ps4.const import ( ATTR_MEDIA_IMAGE_URL, CONFIG_ENTRY_VERSION as VERSION, DEFAULT_REGION, DOMAIN, GAMES_FILE, PS4_DATA, ) from homeassistant.const import ( ATTR_COMMAND, ATTR_ENTITY_ID, ATTR_LOCKED, CONF_HOST, CONF_NAME, CONF_REGION, CONF_TOKEN, STATE_IDLE, STATE_PLAYING, STATE_STANDBY, STATE_UNKNOWN, ) from homeassistant.setup import async_setup_component from tests.common import MockConfigEntry, mock_device_registry, mock_registry MOCK_CREDS = "123412341234abcd12341234abcd12341234abcd12341234abcd12341234abcd" MOCK_NAME = "ha_ps4_name" MOCK_REGION = DEFAULT_REGION MOCK_GAMES_FILE = GAMES_FILE MOCK_HOST = "192.168.0.2" MOCK_HOST_NAME = "Fake PS4" MOCK_HOST_ID = "A0000A0AA000" MOCK_HOST_VERSION = "09879011" MOCK_HOST_TYPE = "PS4" MOCK_STATUS_REST = "Server Standby" MOCK_STATUS_ON = "Ok" MOCK_STANDBY_CODE = 620 MOCK_ON_CODE = 200 MOCK_TCP_PORT = 997 MOCK_DDP_PORT = 987 MOCK_DDP_VERSION = "00020020" MOCK_RANDOM_PORT = "1234" MOCK_TITLE_ID = "CUSA00000" MOCK_TITLE_NAME = "Random Game" MOCK_TITLE_TYPE = MEDIA_TYPE_GAME MOCK_TITLE_ART_URL = "https://somecoverurl" MOCK_GAMES_DATA = { ATTR_LOCKED: False, ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_GAME, ATTR_MEDIA_IMAGE_URL: MOCK_TITLE_ART_URL, ATTR_MEDIA_TITLE: MOCK_TITLE_NAME, } MOCK_GAMES_DATA_LOCKED = { ATTR_LOCKED: True, ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_GAME, ATTR_MEDIA_IMAGE_URL: MOCK_TITLE_ART_URL, ATTR_MEDIA_TITLE: MOCK_TITLE_NAME, } MOCK_STATUS_PLAYING = { "host-type": MOCK_HOST_TYPE, "host-ip": MOCK_HOST, "host-request-port": MOCK_TCP_PORT, "host-id": MOCK_HOST_ID, "host-name": MOCK_HOST_NAME, "running-app-titleid": MOCK_TITLE_ID, "running-app-name": MOCK_TITLE_NAME, "status": MOCK_STATUS_ON, "status_code": MOCK_ON_CODE, "device-discovery-protocol-version": MOCK_DDP_VERSION, "system-version": MOCK_HOST_VERSION, } MOCK_STATUS_IDLE = { "host-type": MOCK_HOST_TYPE, "host-ip": MOCK_HOST, "host-request-port": MOCK_TCP_PORT, "host-id": MOCK_HOST_ID, "host-name": MOCK_HOST_NAME, "status": MOCK_STATUS_ON, "status_code": MOCK_ON_CODE, "device-discovery-protocol-version": MOCK_DDP_VERSION, "system-version": MOCK_HOST_VERSION, } MOCK_STATUS_STANDBY = { "host-type": MOCK_HOST_TYPE, "host-ip": MOCK_HOST, "host-request-port": MOCK_TCP_PORT, "host-id": MOCK_HOST_ID, "host-name": MOCK_HOST_NAME, "status": MOCK_STATUS_REST, "status_code": MOCK_STANDBY_CODE, "device-discovery-protocol-version": MOCK_DDP_VERSION, "system-version": MOCK_HOST_VERSION, } MOCK_DEVICE = {CONF_HOST: MOCK_HOST, CONF_NAME: MOCK_NAME, CONF_REGION: MOCK_REGION} MOCK_ENTRY_ID = "SomeID" MOCK_DEVICE_MODEL = "PlayStation 4" MOCK_DATA = {CONF_TOKEN: MOCK_CREDS, "devices": [MOCK_DEVICE]} MOCK_CONFIG = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA, entry_id=MOCK_ENTRY_ID) MOCK_LOAD = "homeassistant.components.ps4.media_player.load_games" async def setup_mock_component(hass, entry=None): """Set up Mock Media Player.""" if entry is None: mock_entry = MockConfigEntry( domain=ps4.DOMAIN, data=MOCK_DATA, version=VERSION, entry_id=MOCK_ENTRY_ID ) else: mock_entry = entry mock_entry.add_to_hass(hass) await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) await hass.async_block_till_done() mock_entities = hass.states.async_entity_ids() mock_entity_id = mock_entities[0] return mock_entity_id async def mock_ddp_response(hass, mock_status_data): """Mock raw UDP response from device.""" mock_protocol = hass.data[PS4_DATA].protocol mock_code = mock_status_data.get("status_code") mock_status = mock_status_data.get("status") mock_status_header = f"{mock_code} {mock_status}" mock_response = get_ddp_message(mock_status_header, mock_status_data).encode() mock_protocol.datagram_received(mock_response, (MOCK_HOST, MOCK_RANDOM_PORT)) await hass.async_block_till_done() async def test_media_player_is_setup_correctly_with_entry(hass): """Test entity is setup correctly with entry correctly.""" mock_entity_id = await setup_mock_component(hass) mock_state = hass.states.get(mock_entity_id).state # Assert status updated callback is added to protocol. assert len(hass.data[PS4_DATA].protocol.callbacks) == 1 # Test that entity is added to hass. assert hass.data[PS4_DATA].protocol is not None assert mock_entity_id == f"media_player.{MOCK_NAME}" assert mock_state == STATE_UNKNOWN async def test_state_standby_is_set(hass): """Test that state is set to standby.""" mock_entity_id = await setup_mock_component(hass) await mock_ddp_response(hass, MOCK_STATUS_STANDBY) assert hass.states.get(mock_entity_id).state == STATE_STANDBY async def test_state_playing_is_set(hass): """Test that state is set to playing.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.async_get_ps_store_data", ) with patch(mock_func, return_value=None): await mock_ddp_response(hass, MOCK_STATUS_PLAYING) assert hass.states.get(mock_entity_id).state == STATE_PLAYING async def test_state_idle_is_set(hass): """Test that state is set to idle.""" mock_entity_id = await setup_mock_component(hass) await mock_ddp_response(hass, MOCK_STATUS_IDLE) assert hass.states.get(mock_entity_id).state == STATE_IDLE async def test_state_none_is_set(hass): """Test that state is set to None.""" mock_entity_id = await setup_mock_component(hass) assert hass.states.get(mock_entity_id).state == STATE_UNKNOWN async def test_media_attributes_are_fetched(hass): """Test that media attributes are fetched.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.async_get_ps_store_data", ) # Mock result from fetching data. mock_result = MagicMock() mock_result.name = MOCK_TITLE_NAME mock_result.cover_art = MOCK_TITLE_ART_URL mock_result.game_type = "game" with patch(mock_func, return_value=mock_result) as mock_fetch: await mock_ddp_response(hass, MOCK_STATUS_PLAYING) mock_state = hass.states.get(mock_entity_id) mock_attrs = dict(mock_state.attributes) assert len(mock_fetch.mock_calls) == 1 assert mock_state.state == STATE_PLAYING assert len(mock_attrs.get(ATTR_INPUT_SOURCE_LIST)) == 1 assert mock_attrs.get(ATTR_INPUT_SOURCE_LIST)[0] == MOCK_TITLE_NAME assert mock_attrs.get(ATTR_MEDIA_CONTENT_ID) == MOCK_TITLE_ID assert mock_attrs.get(ATTR_MEDIA_TITLE) == MOCK_TITLE_NAME assert mock_attrs.get(ATTR_MEDIA_CONTENT_TYPE) == MOCK_TITLE_TYPE async def test_media_attributes_are_loaded(hass, patch_load_json): """Test that media attributes are loaded.""" mock_entity_id = await setup_mock_component(hass) patch_load_json.return_value = {MOCK_TITLE_ID: MOCK_GAMES_DATA_LOCKED} with patch( "homeassistant.components.ps4.media_player." "pyps4.Ps4Async.async_get_ps_store_data", return_value=None, ) as mock_fetch: await mock_ddp_response(hass, MOCK_STATUS_PLAYING) mock_state = hass.states.get(mock_entity_id) mock_attrs = dict(mock_state.attributes) # Ensure that data is not fetched. assert not mock_fetch.mock_calls assert mock_state.state == STATE_PLAYING assert len(mock_attrs.get(ATTR_INPUT_SOURCE_LIST)) == 1 assert mock_attrs.get(ATTR_INPUT_SOURCE_LIST)[0] == MOCK_TITLE_NAME assert mock_attrs.get(ATTR_MEDIA_CONTENT_ID) == MOCK_TITLE_ID assert mock_attrs.get(ATTR_MEDIA_TITLE) == MOCK_TITLE_NAME assert mock_attrs.get(ATTR_MEDIA_CONTENT_TYPE) == MOCK_TITLE_TYPE async def test_device_info_is_set_from_status_correctly(hass): """Test that device info is set correctly from status update.""" mock_d_registry = mock_device_registry(hass) with patch("pyps4_2ndscreen.ps4.get_status", return_value=MOCK_STATUS_STANDBY): mock_entity_id = await setup_mock_component(hass) await hass.async_block_till_done() # Reformat mock status-sw_version for assertion. mock_version = MOCK_STATUS_STANDBY["system-version"] mock_version = mock_version[1:4] mock_version = "{}.{}".format(mock_version[0], mock_version[1:]) mock_state = hass.states.get(mock_entity_id).state mock_d_entries = mock_d_registry.devices mock_entry = mock_d_registry.async_get_device( identifiers={(DOMAIN, MOCK_HOST_ID)}, connections={()} ) assert mock_state == STATE_STANDBY assert len(mock_d_entries) == 1 assert mock_entry.name == MOCK_HOST_NAME assert mock_entry.model == MOCK_DEVICE_MODEL assert mock_entry.sw_version == mock_version assert mock_entry.identifiers == {(DOMAIN, MOCK_HOST_ID)} async def test_device_info_is_assummed(hass): """Test that device info is assumed if device is unavailable.""" # Create a device registry entry with device info. mock_d_registry = mock_device_registry(hass) mock_d_registry.async_get_or_create( config_entry_id=MOCK_ENTRY_ID, name=MOCK_HOST_NAME, model=MOCK_DEVICE_MODEL, identifiers={(DOMAIN, MOCK_HOST_ID)}, sw_version=MOCK_HOST_VERSION, ) mock_d_entries = mock_d_registry.devices assert len(mock_d_entries) == 1 # Create a entity_registry entry which is using identifiers from device. mock_unique_id = ps4.format_unique_id(MOCK_CREDS, MOCK_HOST_ID) mock_e_registry = mock_registry(hass) mock_e_registry.async_get_or_create( "media_player", DOMAIN, mock_unique_id, config_entry=MOCK_CONFIG ) mock_entity_id = mock_e_registry.async_get_entity_id( "media_player", DOMAIN, mock_unique_id ) mock_entity_id = await setup_mock_component(hass) mock_state = hass.states.get(mock_entity_id).state # Ensure that state is not set. assert mock_state == STATE_UNKNOWN # Ensure that entity_id is the same as the existing. mock_entities = hass.states.async_entity_ids() assert len(mock_entities) == 1 assert mock_entities[0] == mock_entity_id async def test_device_info_assummed_works(hass): """Reverse test that device info assumption works.""" mock_d_registry = mock_device_registry(hass) mock_entity_id = await setup_mock_component(hass) mock_state = hass.states.get(mock_entity_id).state mock_d_entries = mock_d_registry.devices # Ensure that state is not set. assert mock_state == STATE_UNKNOWN # With no state/status and no existing entries, registry should be empty. assert not mock_d_entries async def test_turn_on(hass): """Test that turn on service calls function.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.wakeup" ) with patch(mock_func) as mock_call: await hass.services.async_call( "media_player", "turn_on", {ATTR_ENTITY_ID: mock_entity_id} ) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 async def test_turn_off(hass): """Test that turn off service calls function.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.standby" ) with patch(mock_func) as mock_call: await hass.services.async_call( "media_player", "turn_off", {ATTR_ENTITY_ID: mock_entity_id} ) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 async def test_media_pause(hass): """Test that media pause service calls function.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.remote_control" ) with patch(mock_func) as mock_call: await hass.services.async_call( "media_player", "media_pause", {ATTR_ENTITY_ID: mock_entity_id} ) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 async def test_media_stop(hass): """Test that media stop service calls function.""" mock_entity_id = await setup_mock_component(hass) mock_func = "{}{}".format( "homeassistant.components.ps4.media_player.", "pyps4.Ps4Async.remote_control" ) with patch(mock_func) as mock_call: await hass.services.async_call( "media_player", "media_stop", {ATTR_ENTITY_ID: mock_entity_id} ) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 async def test_select_source(hass, patch_load_json): """Test that select source service calls function with title.""" patch_load_json.return_value = {MOCK_TITLE_ID: MOCK_GAMES_DATA} with patch("pyps4_2ndscreen.ps4.get_status", return_value=MOCK_STATUS_IDLE): mock_entity_id = await setup_mock_component(hass) with patch("pyps4_2ndscreen.ps4.Ps4Async.start_title") as mock_call, patch( "homeassistant.components.ps4.media_player.PS4Device.async_update" ): # Test with title name. await hass.services.async_call( "media_player", "select_source", {ATTR_ENTITY_ID: mock_entity_id, ATTR_INPUT_SOURCE: MOCK_TITLE_NAME}, blocking=True, ) assert len(mock_call.mock_calls) == 1 async def test_select_source_caps(hass, patch_load_json): """Test that select source service calls function with upper case title.""" patch_load_json.return_value = {MOCK_TITLE_ID: MOCK_GAMES_DATA} with patch("pyps4_2ndscreen.ps4.get_status", return_value=MOCK_STATUS_IDLE): mock_entity_id = await setup_mock_component(hass) with patch("pyps4_2ndscreen.ps4.Ps4Async.start_title") as mock_call, patch( "homeassistant.components.ps4.media_player.PS4Device.async_update" ): # Test with title name in caps. await hass.services.async_call( "media_player", "select_source", { ATTR_ENTITY_ID: mock_entity_id, ATTR_INPUT_SOURCE: MOCK_TITLE_NAME.upper(), }, blocking=True, ) assert len(mock_call.mock_calls) == 1 async def test_select_source_id(hass, patch_load_json): """Test that select source service calls function with Title ID.""" patch_load_json.return_value = {MOCK_TITLE_ID: MOCK_GAMES_DATA} with patch("pyps4_2ndscreen.ps4.get_status", return_value=MOCK_STATUS_IDLE): mock_entity_id = await setup_mock_component(hass) with patch("pyps4_2ndscreen.ps4.Ps4Async.start_title") as mock_call, patch( "homeassistant.components.ps4.media_player.PS4Device.async_update" ): # Test with title ID. await hass.services.async_call( "media_player", "select_source", {ATTR_ENTITY_ID: mock_entity_id, ATTR_INPUT_SOURCE: MOCK_TITLE_ID}, blocking=True, ) assert len(mock_call.mock_calls) == 1 async def test_ps4_send_command(hass): """Test that ps4 send command service calls function.""" mock_entity_id = await setup_mock_component(hass) with patch("pyps4_2ndscreen.ps4.Ps4Async.remote_control") as mock_call: await hass.services.async_call( DOMAIN, "send_command", {ATTR_ENTITY_ID: mock_entity_id, ATTR_COMMAND: "ps"}, blocking=True, ) assert len(mock_call.mock_calls) == 1 async def test_entry_is_unloaded(hass): """Test that entry is unloaded.""" mock_entry = MockConfigEntry( domain=ps4.DOMAIN, data=MOCK_DATA, version=VERSION, entry_id=MOCK_ENTRY_ID ) mock_entity_id = await setup_mock_component(hass, mock_entry) mock_unload = await ps4.async_unload_entry(hass, mock_entry) assert mock_unload is True assert not hass.data[PS4_DATA].devices # Test that callback listener for entity is removed from protocol. assert not hass.data[PS4_DATA].protocol.callbacks assert hass.states.get(mock_entity_id) is None