Clean up and test media player

pull/1098/head
Paulus Schoutsen 2016-02-02 00:31:36 -08:00
parent 26efaa91a3
commit e7e540d4bb
11 changed files with 168 additions and 181 deletions

View File

@ -32,7 +32,6 @@ DISCOVERY_PLATFORMS = {
discovery.SERVICE_PLEX: 'plex',
}
SERVICE_YOUTUBE_VIDEO = 'play_youtube_video'
SERVICE_PLAY_MEDIA = 'play_media'
ATTR_MEDIA_VOLUME_LEVEL = 'volume_level'
@ -68,14 +67,12 @@ SUPPORT_VOLUME_SET = 4
SUPPORT_VOLUME_MUTE = 8
SUPPORT_PREVIOUS_TRACK = 16
SUPPORT_NEXT_TRACK = 32
SUPPORT_YOUTUBE = 64
SUPPORT_TURN_ON = 128
SUPPORT_TURN_OFF = 256
SUPPORT_PLAY_MEDIA = 512
SUPPORT_VOLUME_STEP = 1024
YOUTUBE_COVER_URL_FORMAT = 'https://img.youtube.com/vi/{}/1.jpg'
SERVICE_TO_METHOD = {
SERVICE_TURN_ON: 'turn_on',
SERVICE_TURN_OFF: 'turn_off',
@ -200,6 +197,13 @@ def media_previous_track(hass, entity_id=None):
hass.services.call(DOMAIN, SERVICE_MEDIA_PREVIOUS_TRACK, data)
def media_seek(hass, position, entity_id=None):
""" Send the media player the command to seek in current playing media. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data[ATTR_MEDIA_SEEK_POSITION] = position
hass.services.call(DOMAIN, SERVICE_MEDIA_SEEK, data)
def play_media(hass, media_type, media_id, entity_id=None):
""" Send the media player the command for playing media. """
data = {"media_type": media_type, "media_id": media_id}
@ -283,7 +287,7 @@ def setup(hass, config):
position = service.data[ATTR_MEDIA_SEEK_POSITION]
for player in target_players:
player.seek(position)
player.media_seek(position)
if player.should_poll:
player.update_ha_state(True)
@ -291,20 +295,6 @@ def setup(hass, config):
hass.services.register(DOMAIN, SERVICE_MEDIA_SEEK, media_seek_service,
descriptions.get(SERVICE_MEDIA_SEEK))
def play_youtube_video_service(service, media_id=None):
""" Plays specified media_id on the media player. """
if media_id is None:
service.data.get('video')
if media_id is None:
return
for player in component.extract_from_service(service):
player.play_youtube(media_id)
if player.should_poll:
player.update_ha_state(True)
def play_media_service(service):
""" Plays specified media_id on the media player. """
media_type = service.data.get('media_type')
@ -322,20 +312,6 @@ def setup(hass, config):
if player.should_poll:
player.update_ha_state(True)
hass.services.register(
DOMAIN, "start_fireplace",
lambda service: play_youtube_video_service(service, "eyU3bRy2x44"),
descriptions.get('start_fireplace'))
hass.services.register(
DOMAIN, "start_epic_sax",
lambda service: play_youtube_video_service(service, "kxopViU98Xo"),
descriptions.get('start_epic_sax'))
hass.services.register(
DOMAIN, SERVICE_YOUTUBE_VIDEO, play_youtube_video_service,
descriptions.get(SERVICE_YOUTUBE_VIDEO))
hass.services.register(
DOMAIN, SERVICE_PLAY_MEDIA, play_media_service,
descriptions.get(SERVICE_PLAY_MEDIA))
@ -490,10 +466,6 @@ class MediaPlayerDevice(Entity):
""" Send seek command. """
raise NotImplementedError()
def play_youtube(self, media_id):
""" Plays a YouTube media. """
raise NotImplementedError()
def play_media(self, media_type, media_id):
""" Plays a piece of media. """
raise NotImplementedError()
@ -529,11 +501,6 @@ class MediaPlayerDevice(Entity):
""" Boolean if next track command supported. """
return bool(self.supported_media_commands & SUPPORT_NEXT_TRACK)
@property
def support_youtube(self):
""" Boolean if YouTube is supported. """
return bool(self.supported_media_commands & SUPPORT_YOUTUBE)
@property
def support_play_media(self):
""" Boolean if play media command supported. """

View File

@ -16,7 +16,7 @@ from homeassistant.const import (
from homeassistant.components.media_player import (
MediaPlayerDevice,
SUPPORT_PAUSE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE,
SUPPORT_TURN_ON, SUPPORT_TURN_OFF, SUPPORT_YOUTUBE, SUPPORT_PLAY_MEDIA,
SUPPORT_TURN_ON, SUPPORT_TURN_OFF, SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK, SUPPORT_NEXT_TRACK,
MEDIA_TYPE_MUSIC, MEDIA_TYPE_TVSHOW, MEDIA_TYPE_VIDEO)
@ -25,7 +25,7 @@ CONF_IGNORE_CEC = 'ignore_cec'
CAST_SPLASH = 'https://home-assistant.io/images/cast/splash.png'
SUPPORT_CAST = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PREVIOUS_TRACK | \
SUPPORT_NEXT_TRACK | SUPPORT_YOUTUBE | SUPPORT_PLAY_MEDIA
SUPPORT_NEXT_TRACK | SUPPORT_PLAY_MEDIA
KNOWN_HOSTS = []
DEFAULT_PORT = 8009
@ -79,10 +79,7 @@ class CastDevice(MediaPlayerDevice):
def __init__(self, host, port):
import pychromecast
import pychromecast.controllers.youtube as youtube
self.cast = pychromecast.Chromecast(host, port)
self.youtube = youtube.YouTubeController()
self.cast.register_handler(self.youtube)
self.cast.socket_client.receiver_controller.register_status_listener(
self)
@ -266,10 +263,6 @@ class CastDevice(MediaPlayerDevice):
""" Plays media from a URL """
self.cast.media_controller.play_media(media_id, media_type)
def play_youtube(self, media_id):
""" Plays a YouTube media. """
self.youtube.play_video(media_id)
# implementation of chromecast status_listener methods
def new_cast_status(self, status):

View File

@ -7,11 +7,11 @@ from homeassistant.const import (
STATE_PLAYING, STATE_PAUSED, STATE_OFF)
from homeassistant.components.media_player import (
MediaPlayerDevice, YOUTUBE_COVER_URL_FORMAT,
MediaPlayerDevice,
MEDIA_TYPE_VIDEO, MEDIA_TYPE_MUSIC, MEDIA_TYPE_TVSHOW,
SUPPORT_PAUSE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE, SUPPORT_YOUTUBE,
SUPPORT_PAUSE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE,
SUPPORT_TURN_ON, SUPPORT_TURN_OFF, SUPPORT_PREVIOUS_TRACK,
SUPPORT_NEXT_TRACK)
SUPPORT_NEXT_TRACK, SUPPORT_PLAY_MEDIA)
# pylint: disable=unused-argument
@ -26,9 +26,11 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
])
YOUTUBE_COVER_URL_FORMAT = 'https://img.youtube.com/vi/{}/1.jpg'
YOUTUBE_PLAYER_SUPPORT = \
SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_YOUTUBE | SUPPORT_TURN_ON | SUPPORT_TURN_OFF
SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PLAY_MEDIA
MUSIC_PLAYER_SUPPORT = \
SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
@ -150,10 +152,9 @@ class DemoYoutubePlayer(AbstractDemoPlayer):
""" Flags of media commands that are supported. """
return YOUTUBE_PLAYER_SUPPORT
def play_youtube(self, media_id):
""" Plays a YouTube media. """
def play_media(self, media_type, media_id):
""" Plays a piece of media. """
self.youtube_id = media_id
self._media_title = 'some YouTube video'
self.update_ha_state()
@ -234,7 +235,7 @@ class DemoMusicPlayer(AbstractDemoPlayer):
""" Flags of media commands that are supported. """
support = MUSIC_PLAYER_SUPPORT
if self._cur_track > 1:
if self._cur_track > 0:
support |= SUPPORT_PREVIOUS_TRACK
if self._cur_track < len(self.tracks)-1:

View File

@ -24,7 +24,6 @@ SUPPORT_DENON = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_TURN_ON | SUPPORT_TURN_OFF
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the Denon platform. """
if not config.get(CONF_HOST):
@ -48,7 +47,7 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class DenonDevice(MediaPlayerDevice):
""" Represents a Denon device. """
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-public-methods, abstract-method
def __init__(self, name, host):
self._name = name
@ -145,10 +144,6 @@ class DenonDevice(MediaPlayerDevice):
""" mute (true) or unmute (false) media player. """
self.telnet_command("MU" + ("ON" if mute else "OFF"))
def media_play_pause(self):
""" media_play_pause media player. """
raise NotImplementedError()
def media_play(self):
""" media_play media player. """
self.telnet_command("NS9A")
@ -164,9 +159,6 @@ class DenonDevice(MediaPlayerDevice):
def media_previous_track(self):
self.telnet_command("NS9E")
def media_seek(self, position):
raise NotImplementedError()
def turn_on(self):
""" turn the media player on. """
self.telnet_command("PWON")

View File

@ -105,6 +105,8 @@ class FireTV(object):
class FireTVDevice(MediaPlayerDevice):
""" Represents an Amazon Fire TV device on the network. """
# pylint: disable=abstract-method
def __init__(self, host, device, name):
self._firetv = FireTV(host, device)
self._name = name
@ -176,15 +178,3 @@ class FireTVDevice(MediaPlayerDevice):
def media_next_track(self):
""" Send next track command (results in fast-forward). """
self._firetv.action('media_next')
def media_seek(self, position):
raise NotImplementedError()
def mute_volume(self, mute):
raise NotImplementedError()
def play_youtube(self, media_id):
raise NotImplementedError()
def set_volume_level(self, volume):
raise NotImplementedError()

View File

@ -22,7 +22,6 @@ SUPPORT_KODI = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | SUPPORT_SEEK
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the kodi platform. """
@ -47,7 +46,7 @@ def _get_image_url(kodi_url):
class KodiDevice(MediaPlayerDevice):
""" Represents a XBMC/Kodi device. """
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-public-methods, abstract-method
def __init__(self, name, url, auth=None):
import jsonrpc_requests
@ -263,11 +262,3 @@ class KodiDevice(MediaPlayerDevice):
self._server.Player.Seek(players[0]['playerid'], time)
self.update_ha_state()
def turn_on(self):
""" turn the media player on. """
raise NotImplementedError()
def play_youtube(self, media_id):
""" Plays a YouTube media. """
raise NotImplementedError()

View File

@ -59,7 +59,7 @@ def config_from_file(filename, config=None):
return {}
# pylint: disable=abstract-method, unused-argument
# pylint: disable=abstract-method
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
""" Sets up the plex platform. """

View File

@ -27,7 +27,6 @@ SUPPORT_SQUEEZEBOX = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | \
SUPPORT_SEEK | SUPPORT_TURN_ON | SUPPORT_TURN_OFF
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the squeezebox platform. """
if not config.get(CONF_HOST):
@ -138,7 +137,7 @@ class LogitechMediaServer(object):
class SqueezeBoxDevice(MediaPlayerDevice):
""" Represents a SqueezeBox device. """
# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments, abstract-method
def __init__(self, lms, player_id):
super(SqueezeBoxDevice, self).__init__()
self._lms = lms
@ -292,7 +291,3 @@ class SqueezeBoxDevice(MediaPlayerDevice):
""" turn the media player on. """
self._lms.query(self._id, 'power', '1')
self.update_ha_state()
def play_youtube(self, media_id):
""" Plays a YouTube media. """
raise NotImplementedError()

View File

@ -27,7 +27,7 @@ from homeassistant.components.media_player import (
MediaPlayerDevice, DOMAIN,
SUPPORT_VOLUME_STEP, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE,
SUPPORT_TURN_ON, SUPPORT_TURN_OFF,
SERVICE_PLAY_MEDIA, SERVICE_YOUTUBE_VIDEO,
SERVICE_PLAY_MEDIA,
ATTR_SUPPORTED_MEDIA_COMMANDS, ATTR_MEDIA_VOLUME_MUTED,
ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE, ATTR_MEDIA_DURATION,
ATTR_MEDIA_TITLE, ATTR_MEDIA_ARTIST, ATTR_MEDIA_ALBUM_NAME,
@ -397,11 +397,6 @@ class UniversalMediaPlayer(MediaPlayerDevice):
data = {ATTR_MEDIA_SEEK_POSITION: position}
self._call_service(SERVICE_MEDIA_SEEK, data)
def play_youtube(self, media_id):
""" Plays a YouTube media. """
data = {'media_id': media_id}
self._call_service(SERVICE_YOUTUBE_VIDEO, data)
def play_media(self, media_type, media_id):
""" Plays a piece of media. """
data = {'media_type': media_type, 'media_id': media_id}

View File

@ -0,0 +1,141 @@
"""
tests.component.media_player.test_demo
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests demo media_player component.
"""
import unittest
from unittest.mock import patch
from pprint import pprint
import homeassistant.core as ha
from homeassistant.const import (
STATE_OFF, STATE_ON, STATE_UNKNOWN, STATE_PLAYING, STATE_PAUSED)
import homeassistant.components.media_player as mp
entity_id = 'media_player.walkman'
class TestDemoMediaPlayer(unittest.TestCase):
""" Test the media_player module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_volume_services(self):
assert mp.setup(self.hass, {'media_player': {'platform': 'demo'}})
state = self.hass.states.get(entity_id)
assert 1.0 == state.attributes.get('volume_level')
mp.set_volume_level(self.hass, 0.5, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.5 == state.attributes.get('volume_level')
mp.volume_down(self.hass, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.4 == state.attributes.get('volume_level')
mp.volume_up(self.hass, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.5 == state.attributes.get('volume_level')
assert False is state.attributes.get('is_volume_muted')
mp.mute_volume(self.hass, True, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert True is state.attributes.get('is_volume_muted')
def test_turning_off_and_on(self):
assert mp.setup(self.hass, {'media_player': {'platform': 'demo'}})
assert self.hass.states.is_state(entity_id, 'playing')
mp.turn_off(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'off')
assert not mp.is_on(self.hass, entity_id)
mp.turn_on(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'playing')
mp.toggle(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'off')
assert not mp.is_on(self.hass, entity_id)
def test_playing_pausing(self):
assert mp.setup(self.hass, {'media_player': {'platform': 'demo'}})
assert self.hass.states.is_state(entity_id, 'playing')
mp.media_pause(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'paused')
mp.media_play_pause(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'playing')
mp.media_play_pause(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'paused')
mp.media_play(self.hass, entity_id)
self.hass.pool.block_till_done()
assert self.hass.states.is_state(entity_id, 'playing')
def test_prev_next_track(self):
assert mp.setup(self.hass, {'media_player': {'platform': 'demo'}})
state = self.hass.states.get(entity_id)
assert 1 == state.attributes.get('media_track')
assert 0 == (mp.SUPPORT_PREVIOUS_TRACK &
state.attributes.get('supported_media_commands'))
mp.media_next_track(self.hass, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 2 == state.attributes.get('media_track')
assert 0 < (mp.SUPPORT_PREVIOUS_TRACK &
state.attributes.get('supported_media_commands'))
mp.media_next_track(self.hass, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 3 == state.attributes.get('media_track')
assert 0 < (mp.SUPPORT_PREVIOUS_TRACK &
state.attributes.get('supported_media_commands'))
mp.media_previous_track(self.hass, entity_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(entity_id)
assert 2 == state.attributes.get('media_track')
assert 0 < (mp.SUPPORT_PREVIOUS_TRACK &
state.attributes.get('supported_media_commands'))
@patch('homeassistant.components.media_player.demo.DemoYoutubePlayer.media_seek')
def test_play_media(self, mock_seek):
assert mp.setup(self.hass, {'media_player': {'platform': 'demo'}})
ent_id = 'media_player.living_room'
state = self.hass.states.get(ent_id)
assert 0 < (mp.SUPPORT_PLAY_MEDIA &
state.attributes.get('supported_media_commands'))
assert state.attributes.get('media_content_id') is not None
mp.play_media(self.hass, 'youtube', 'some_id', ent_id)
self.hass.pool.block_till_done()
state = self.hass.states.get(ent_id)
assert 0 < (mp.SUPPORT_PLAY_MEDIA &
state.attributes.get('supported_media_commands'))
assert 'some_id' == state.attributes.get('media_content_id')
assert not mock_seek.called
mp.media_seek(self.hass, 100, ent_id)
self.hass.pool.block_till_done()
assert mock_seek.called

View File

@ -1,78 +0,0 @@
"""
tests.test_component_media_player
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests media_player component.
"""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import homeassistant.core as ha
from homeassistant.const import (
STATE_OFF,
SERVICE_TURN_ON, SERVICE_TURN_OFF, SERVICE_VOLUME_UP, SERVICE_VOLUME_DOWN,
SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PLAY, SERVICE_MEDIA_PAUSE,
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREVIOUS_TRACK, SERVICE_TOGGLE,
ATTR_ENTITY_ID)
import homeassistant.components.media_player as media_player
from tests.common import mock_service
class TestMediaPlayer(unittest.TestCase):
""" Test the media_player module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
self.test_entity = media_player.ENTITY_ID_FORMAT.format('living_room')
self.hass.states.set(self.test_entity, STATE_OFF)
self.test_entity2 = media_player.ENTITY_ID_FORMAT.format('bedroom')
self.hass.states.set(self.test_entity2, "YouTube")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_is_on(self):
""" Test is_on method. """
self.assertFalse(media_player.is_on(self.hass, self.test_entity))
self.assertTrue(media_player.is_on(self.hass, self.test_entity2))
def test_services(self):
"""
Test if the call service methods convert to correct service calls.
"""
services = {
SERVICE_TURN_ON: media_player.turn_on,
SERVICE_TURN_OFF: media_player.turn_off,
SERVICE_TOGGLE: media_player.toggle,
SERVICE_VOLUME_UP: media_player.volume_up,
SERVICE_VOLUME_DOWN: media_player.volume_down,
SERVICE_MEDIA_PLAY_PAUSE: media_player.media_play_pause,
SERVICE_MEDIA_PLAY: media_player.media_play,
SERVICE_MEDIA_PAUSE: media_player.media_pause,
SERVICE_MEDIA_NEXT_TRACK: media_player.media_next_track,
SERVICE_MEDIA_PREVIOUS_TRACK: media_player.media_previous_track
}
for service_name, service_method in services.items():
calls = mock_service(self.hass, media_player.DOMAIN, service_name)
service_method(self.hass)
self.hass.pool.block_till_done()
self.assertEqual(1, len(calls))
call = calls[-1]
self.assertEqual(media_player.DOMAIN, call.domain)
self.assertEqual(service_name, call.service)
service_method(self.hass, self.test_entity)
self.hass.pool.block_till_done()
self.assertEqual(2, len(calls))
call = calls[-1]
self.assertEqual(media_player.DOMAIN, call.domain)
self.assertEqual(service_name, call.service)
self.assertEqual(self.test_entity,
call.data.get(ATTR_ENTITY_ID))