From f2176e54ba3a1f31abdb15600d6f19db2e3e9e09 Mon Sep 17 00:00:00 2001 From: Kyle Hendricks Date: Tue, 3 May 2016 01:09:27 -0400 Subject: [PATCH] Add Pioneer AVR media_player support (#1968) --- .coveragerc | 1 + .../components/media_player/pioneer.py | 194 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 homeassistant/components/media_player/pioneer.py diff --git a/.coveragerc b/.coveragerc index babb263d730..173d6ab4865 100644 --- a/.coveragerc +++ b/.coveragerc @@ -110,6 +110,7 @@ omit = homeassistant/components/media_player/mpd.py homeassistant/components/media_player/onkyo.py homeassistant/components/media_player/panasonic_viera.py + homeassistant/components/media_player/pioneer.py homeassistant/components/media_player/plex.py homeassistant/components/media_player/samsungtv.py homeassistant/components/media_player/snapcast.py diff --git a/homeassistant/components/media_player/pioneer.py b/homeassistant/components/media_player/pioneer.py new file mode 100644 index 00000000000..9e987711b76 --- /dev/null +++ b/homeassistant/components/media_player/pioneer.py @@ -0,0 +1,194 @@ +""" +Support for Pioneer Network Receivers. + +For more details about this platform, please refer to the documentation at +https://home-assistant.io/components/media_player.pioneer/ +""" +import logging +import telnetlib + +from homeassistant.components.media_player import ( + DOMAIN, SUPPORT_PAUSE, SUPPORT_SELECT_SOURCE, + SUPPORT_TURN_OFF, SUPPORT_TURN_ON, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, + MediaPlayerDevice) +from homeassistant.const import ( + CONF_HOST, STATE_OFF, STATE_ON, STATE_UNKNOWN, + CONF_NAME) + +_LOGGER = logging.getLogger(__name__) + +SUPPORT_PIONEER = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \ + SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_SELECT_SOURCE + +MAX_VOLUME = 185 +MAX_SOURCE_NUMBERS = 60 + + +def setup_platform(hass, config, add_devices, discovery_info=None): + """Setup the Pioneer platform.""" + if not config.get(CONF_HOST): + _LOGGER.error( + "Missing required configuration items in %s: %s", + DOMAIN, + CONF_HOST) + return False + + pioneer = PioneerDevice( + config.get(CONF_NAME, "Pioneer AVR"), + config.get(CONF_HOST) + ) + if pioneer.update(): + add_devices([pioneer]) + return True + else: + return False + + +class PioneerDevice(MediaPlayerDevice): + """Representation of a Pioneer device.""" + + # pylint: disable=too-many-public-methods, abstract-method + # pylint: disable=too-many-instance-attributes + def __init__(self, name, host): + """Initialize the Pioneer device.""" + self._name = name + self._host = host + self._pwstate = "PWR1" + self._volume = 0 + self._muted = False + self._selected_source = '' + self._source_name_to_number = {} + self._source_number_to_name = {} + + @classmethod + def telnet_request(cls, telnet, command, expected_prefix): + """Execute `command` and return the response.""" + telnet.write(command.encode("ASCII") + b"\r") + + # The receiver will randomly send state change updates, make sure + # we get the response we are looking for + for _ in range(3): + result = telnet.read_until(b"\r\n", timeout=0.2).decode("ASCII") \ + .strip() + if result.startswith(expected_prefix): + return result + + return None + + def telnet_command(self, command): + """Establish a telnet connection and sends `command`.""" + telnet = telnetlib.Telnet(self._host) + telnet.write(command.encode("ASCII") + b"\r") + telnet.read_very_eager() # skip response + telnet.close() + + def update(self): + """Get the latest details from the device.""" + try: + telnet = telnetlib.Telnet(self._host) + except ConnectionRefusedError: + return False + + self._pwstate = self.telnet_request(telnet, "?P", "PWR") + + volume_str = self.telnet_request(telnet, "?V", "VOL") + self._volume = int(volume_str[3:]) / MAX_VOLUME if volume_str else None + + muted_value = self.telnet_request(telnet, "?M", "MUT") + self._muted = (muted_value == "MUT0") if muted_value else None + + # Build the source name dictionaries if necessary + if not self._source_name_to_number: + for i in range(MAX_SOURCE_NUMBERS): + result = self.telnet_request(telnet, + "?RGB" + str(i).zfill(2), + "RGB") + + if not result: + continue + + source_name = result[6:] + source_number = str(i).zfill(2) + + self._source_name_to_number[source_name] = source_number + self._source_number_to_name[source_number] = source_name + + source_number = self.telnet_request(telnet, "?F", "FN") + + if source_number: + self._selected_source = self._source_number_to_name \ + .get(source_number[2:]) + else: + self._selected_source = None + + telnet.close() + return True + + @property + def name(self): + """Return the name of the device.""" + return self._name + + @property + def state(self): + """Return the state of the device.""" + if self._pwstate == "PWR1": + return STATE_OFF + if self._pwstate == "PWR0": + return STATE_ON + + return STATE_UNKNOWN + + @property + def volume_level(self): + """Volume level of the media player (0..1).""" + return self._volume + + @property + def is_volume_muted(self): + """Boolean if volume is currently muted.""" + return self._muted + + @property + def supported_media_commands(self): + """Flag of media commands that are supported.""" + return SUPPORT_PIONEER + + @property + def source(self): + """Return the current input source.""" + return self._selected_source + + @property + def source_list(self): + """List of available input sources.""" + return list(self._source_name_to_number.keys()) + + def turn_off(self): + """Turn off media player.""" + self.telnet_command("PF") + + def volume_up(self): + """Volume up media player.""" + self.telnet_command("VU") + + def volume_down(self): + """Volume down media player.""" + self.telnet_command("VD") + + def set_volume_level(self, volume): + """Set volume level, range 0..1.""" + # 60dB max + self.telnet_command(str(round(volume * MAX_VOLUME)).zfill(3) + "VL") + + def mute_volume(self, mute): + """Mute (true) or unmute (false) media player.""" + self.telnet_command("MO" if mute else "MF") + + def turn_on(self): + """Turn the media player on.""" + self.telnet_command("PO") + + def select_source(self, source): + """Select input source.""" + self.telnet_command(self._source_name_to_number.get(source) + "FN")