core/homeassistant/components/hdmi_cec.py

123 lines
3.4 KiB
Python

"""
CEC component.
Requires libcec + Python bindings.
"""
import logging
import voluptuous as vol
from homeassistant.const import EVENT_HOMEASSISTANT_START
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
_CEC = None
DOMAIN = 'hdmi_cec'
SERVICE_SELECT_DEVICE = 'select_device'
SERVICE_POWER_ON = 'power_on'
SERVICE_STANDBY = 'standby'
CONF_DEVICES = 'devices'
ATTR_DEVICE = 'device'
MAX_DEPTH = 4
# pylint: disable=unnecessary-lambda
DEVICE_SCHEMA = vol.Schema({
vol.All(cv.positive_int): vol.Any(lambda devices: DEVICE_SCHEMA(devices),
cv.string)
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_DEVICES): DEVICE_SCHEMA
})
}, extra=vol.ALLOW_EXTRA)
def parse_mapping(mapping, parents=None):
"""Parse configuration device mapping."""
if parents is None:
parents = []
for addr, val in mapping.items():
cur = parents + [str(addr)]
if isinstance(val, dict):
yield from parse_mapping(val, cur)
elif isinstance(val, str):
yield (val, cur)
def pad_physical_address(addr):
"""Right-pad a physical address."""
return addr + ['0'] * (MAX_DEPTH - len(addr))
def setup(hass, config):
"""Setup CEC capability."""
global _CEC
# cec is only available if libcec is properly installed
# and the Python bindings are accessible.
try:
import cec
except ImportError:
_LOGGER.error("libcec must be installed")
return False
# Parse configuration into a dict of device name
# to physical address represented as a list of
# four elements.
flat = {}
for pair in parse_mapping(config[DOMAIN].get(CONF_DEVICES, {})):
flat[pair[0]] = pad_physical_address(pair[1])
# Configure libcec.
cfg = cec.libcec_configuration()
cfg.strDeviceName = 'HASS'
cfg.bActivateSource = 0
cfg.bMonitorOnly = 1
cfg.clientVersion = cec.LIBCEC_VERSION_CURRENT
# Set up CEC adapter.
_CEC = cec.ICECAdapter.Create(cfg)
def _power_on(call):
"""Power on all devices."""
_CEC.PowerOnDevices()
def _standby(call):
"""Standby all devices."""
_CEC.StandbyDevices()
def _select_device(call):
"""Select the active device."""
path = flat.get(call.data[ATTR_DEVICE])
if not path:
_LOGGER.error("Device not found: %s", call.data[ATTR_DEVICE])
cmds = []
for i in range(1, MAX_DEPTH - 1):
addr = pad_physical_address(path[:i])
cmds.append('1f:82:{}{}:{}{}'.format(*addr))
cmds.append('1f:86:{}{}:{}{}'.format(*addr))
for cmd in cmds:
_CEC.Transmit(_CEC.CommandFromString(cmd))
_LOGGER.info("Selected %s", call.data[ATTR_DEVICE])
def _start_cec(event):
"""Open CEC adapter."""
adapters = _CEC.DetectAdapters()
if len(adapters) == 0:
_LOGGER.error("No CEC adapter found")
return
if _CEC.Open(adapters[0].strComName):
hass.services.register(DOMAIN, SERVICE_POWER_ON, _power_on)
hass.services.register(DOMAIN, SERVICE_STANDBY, _standby)
hass.services.register(DOMAIN, SERVICE_SELECT_DEVICE,
_select_device)
else:
_LOGGER.error("Failed to open adapter")
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_cec)
return True