Add emulated_roku component (#17596)
* Add emulated_roku component * Add emulated_roku config tests * Fix emulated_roku test dependencies * Remove emulated_roku yaml support, add tests * Add yaml support, simplify config flow * Improve emulated_roku code quality * Fix emulated_roku translation, improve code quality * Fix emulated_roku translation * Bump emulated_roku to 0.1.6 to fix SSDP discovery * Bump emulated roku to 0.1.7, refactor component start/stop methodspull/19956/head
parent
cee51ecb2b
commit
31d92683f7
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"name_exists": "Name already exists"
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"data": {
|
||||
"advertise_ip": "Advertise IP",
|
||||
"advertise_port": "Advertise port",
|
||||
"host_ip": "Host IP",
|
||||
"listen_port": "Listen port",
|
||||
"name": "Name",
|
||||
"upnp_bind_multicast": "Bind multicast (True/False)"
|
||||
},
|
||||
"title": "Define server configuration"
|
||||
}
|
||||
},
|
||||
"title": "EmulatedRoku"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
"""
|
||||
Support for Roku API emulation.
|
||||
|
||||
For more details about this component, please refer to the documentation at
|
||||
https://home-assistant.io/components/emulated_roku/
|
||||
"""
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries, util
|
||||
from homeassistant.const import CONF_NAME
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
|
||||
from .binding import EmulatedRoku
|
||||
from .config_flow import configured_servers
|
||||
from .const import (
|
||||
CONF_ADVERTISE_IP, CONF_ADVERTISE_PORT, CONF_HOST_IP, CONF_LISTEN_PORT,
|
||||
CONF_SERVERS, CONF_UPNP_BIND_MULTICAST, DOMAIN)
|
||||
|
||||
REQUIREMENTS = ['emulated_roku==0.1.7']
|
||||
|
||||
SERVER_CONFIG_SCHEMA = vol.Schema({
|
||||
vol.Required(CONF_NAME): cv.string,
|
||||
vol.Required(CONF_LISTEN_PORT): cv.port,
|
||||
vol.Optional(CONF_HOST_IP): cv.string,
|
||||
vol.Optional(CONF_ADVERTISE_IP): cv.string,
|
||||
vol.Optional(CONF_ADVERTISE_PORT): cv.port,
|
||||
vol.Optional(CONF_UPNP_BIND_MULTICAST): cv.boolean
|
||||
})
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema({
|
||||
DOMAIN: vol.Schema({
|
||||
vol.Required(CONF_SERVERS):
|
||||
vol.All(cv.ensure_list, [SERVER_CONFIG_SCHEMA]),
|
||||
}),
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
"""Set up the emulated roku component."""
|
||||
conf = config.get(DOMAIN)
|
||||
|
||||
if conf is None:
|
||||
return True
|
||||
|
||||
existing_servers = configured_servers(hass)
|
||||
|
||||
for entry in conf[CONF_SERVERS]:
|
||||
if entry[CONF_NAME] not in existing_servers:
|
||||
hass.async_create_task(hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={'source': config_entries.SOURCE_IMPORT},
|
||||
data=entry
|
||||
))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass, config_entry):
|
||||
"""Set up an emulated roku server from a config entry."""
|
||||
config = config_entry.data
|
||||
|
||||
if DOMAIN not in hass.data:
|
||||
hass.data[DOMAIN] = {}
|
||||
|
||||
name = config[CONF_NAME]
|
||||
listen_port = config[CONF_LISTEN_PORT]
|
||||
host_ip = config.get(CONF_HOST_IP) or util.get_local_ip()
|
||||
advertise_ip = config.get(CONF_ADVERTISE_IP)
|
||||
advertise_port = config.get(CONF_ADVERTISE_PORT)
|
||||
upnp_bind_multicast = config.get(CONF_UPNP_BIND_MULTICAST)
|
||||
|
||||
server = EmulatedRoku(hass, name, host_ip, listen_port,
|
||||
advertise_ip, advertise_port, upnp_bind_multicast)
|
||||
|
||||
hass.data[DOMAIN][name] = server
|
||||
|
||||
return await server.setup()
|
||||
|
||||
|
||||
async def async_unload_entry(hass, entry):
|
||||
"""Unload a config entry."""
|
||||
name = entry.data[CONF_NAME]
|
||||
server = hass.data[DOMAIN].pop(name)
|
||||
return await server.unload()
|
|
@ -0,0 +1,147 @@
|
|||
"""Bridge between emulated_roku and Home Assistant."""
|
||||
import logging
|
||||
|
||||
from homeassistant.const import (
|
||||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
|
||||
from homeassistant.core import CoreState, EventOrigin
|
||||
|
||||
LOGGER = logging.getLogger('homeassistant.components.emulated_roku')
|
||||
|
||||
EVENT_ROKU_COMMAND = 'roku_command'
|
||||
|
||||
ATTR_COMMAND_TYPE = 'type'
|
||||
ATTR_SOURCE_NAME = 'source_name'
|
||||
ATTR_KEY = 'key'
|
||||
ATTR_APP_ID = 'app_id'
|
||||
|
||||
ROKU_COMMAND_KEYDOWN = 'keydown'
|
||||
ROKU_COMMAND_KEYUP = 'keyup'
|
||||
ROKU_COMMAND_KEYPRESS = 'keypress'
|
||||
ROKU_COMMAND_LAUNCH = 'launch'
|
||||
|
||||
|
||||
class EmulatedRoku:
|
||||
"""Manages an emulated_roku server."""
|
||||
|
||||
def __init__(self, hass, name, host_ip, listen_port,
|
||||
advertise_ip, advertise_port, upnp_bind_multicast):
|
||||
"""Initialize the properties."""
|
||||
self.hass = hass
|
||||
|
||||
self.roku_usn = name
|
||||
self.host_ip = host_ip
|
||||
self.listen_port = listen_port
|
||||
|
||||
self.advertise_port = advertise_port
|
||||
self.advertise_ip = advertise_ip
|
||||
|
||||
self.bind_multicast = upnp_bind_multicast
|
||||
|
||||
self._api_server = None
|
||||
|
||||
self._unsub_start_listener = None
|
||||
self._unsub_stop_listener = None
|
||||
|
||||
async def setup(self):
|
||||
"""Start the emulated_roku server."""
|
||||
from emulated_roku import EmulatedRokuServer, \
|
||||
EmulatedRokuCommandHandler
|
||||
|
||||
class EventCommandHandler(EmulatedRokuCommandHandler):
|
||||
"""emulated_roku command handler to turn commands into events."""
|
||||
|
||||
def __init__(self, hass):
|
||||
self.hass = hass
|
||||
|
||||
def on_keydown(self, roku_usn, key):
|
||||
"""Handle keydown event."""
|
||||
self.hass.bus.async_fire(EVENT_ROKU_COMMAND, {
|
||||
ATTR_SOURCE_NAME: roku_usn,
|
||||
ATTR_COMMAND_TYPE: ROKU_COMMAND_KEYDOWN,
|
||||
ATTR_KEY: key
|
||||
}, EventOrigin.local)
|
||||
|
||||
def on_keyup(self, roku_usn, key):
|
||||
"""Handle keyup event."""
|
||||
self.hass.bus.async_fire(EVENT_ROKU_COMMAND, {
|
||||
ATTR_SOURCE_NAME: roku_usn,
|
||||
ATTR_COMMAND_TYPE: ROKU_COMMAND_KEYUP,
|
||||
ATTR_KEY: key
|
||||
}, EventOrigin.local)
|
||||
|
||||
def on_keypress(self, roku_usn, key):
|
||||
"""Handle keypress event."""
|
||||
self.hass.bus.async_fire(EVENT_ROKU_COMMAND, {
|
||||
ATTR_SOURCE_NAME: roku_usn,
|
||||
ATTR_COMMAND_TYPE: ROKU_COMMAND_KEYPRESS,
|
||||
ATTR_KEY: key
|
||||
}, EventOrigin.local)
|
||||
|
||||
def launch(self, roku_usn, app_id):
|
||||
"""Handle launch event."""
|
||||
self.hass.bus.async_fire(EVENT_ROKU_COMMAND, {
|
||||
ATTR_SOURCE_NAME: roku_usn,
|
||||
ATTR_COMMAND_TYPE: ROKU_COMMAND_LAUNCH,
|
||||
ATTR_APP_ID: app_id
|
||||
}, EventOrigin.local)
|
||||
|
||||
LOGGER.debug("Intializing emulated_roku %s on %s:%s",
|
||||
self.roku_usn, self.host_ip, self.listen_port)
|
||||
|
||||
handler = EventCommandHandler(self.hass)
|
||||
|
||||
self._api_server = EmulatedRokuServer(
|
||||
self.hass.loop, handler,
|
||||
self.roku_usn, self.host_ip, self.listen_port,
|
||||
advertise_ip=self.advertise_ip,
|
||||
advertise_port=self.advertise_port,
|
||||
bind_multicast=self.bind_multicast
|
||||
)
|
||||
|
||||
async def emulated_roku_stop(event):
|
||||
"""Wrap the call to emulated_roku.close."""
|
||||
LOGGER.debug("Stopping emulated_roku %s", self.roku_usn)
|
||||
self._unsub_stop_listener = None
|
||||
await self._api_server.close()
|
||||
|
||||
async def emulated_roku_start(event):
|
||||
"""Wrap the call to emulated_roku.start."""
|
||||
try:
|
||||
LOGGER.debug("Starting emulated_roku %s", self.roku_usn)
|
||||
self._unsub_start_listener = None
|
||||
await self._api_server.start()
|
||||
except OSError:
|
||||
LOGGER.exception("Failed to start Emulated Roku %s on %s:%s",
|
||||
self.roku_usn, self.host_ip, self.listen_port)
|
||||
# clean up inconsistent state on errors
|
||||
await emulated_roku_stop(None)
|
||||
else:
|
||||
self._unsub_stop_listener = self.hass.bus.async_listen_once(
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
emulated_roku_stop)
|
||||
|
||||
# start immediately if already running
|
||||
if self.hass.state == CoreState.running:
|
||||
await emulated_roku_start(None)
|
||||
else:
|
||||
self._unsub_start_listener = self.hass.bus.async_listen_once(
|
||||
EVENT_HOMEASSISTANT_START,
|
||||
emulated_roku_start)
|
||||
|
||||
return True
|
||||
|
||||
async def unload(self):
|
||||
"""Unload the emulated_roku server."""
|
||||
LOGGER.debug("Unloading emulated_roku %s", self.roku_usn)
|
||||
|
||||
if self._unsub_start_listener:
|
||||
self._unsub_start_listener()
|
||||
self._unsub_start_listener = None
|
||||
|
||||
if self._unsub_stop_listener:
|
||||
self._unsub_stop_listener()
|
||||
self._unsub_stop_listener = None
|
||||
|
||||
await self._api_server.close()
|
||||
|
||||
return True
|
|
@ -0,0 +1,63 @@
|
|||
"""Config flow to configure emulated_roku component."""
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.const import CONF_NAME
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .const import CONF_LISTEN_PORT, DEFAULT_NAME, DEFAULT_PORT, DOMAIN
|
||||
|
||||
|
||||
@callback
|
||||
def configured_servers(hass):
|
||||
"""Return a set of the configured servers."""
|
||||
return set(entry.data[CONF_NAME] for entry
|
||||
in hass.config_entries.async_entries(DOMAIN))
|
||||
|
||||
|
||||
@config_entries.HANDLERS.register(DOMAIN)
|
||||
class EmulatedRokuFlowHandler(config_entries.ConfigFlow):
|
||||
"""Handle an emulated_roku config flow."""
|
||||
|
||||
VERSION = 1
|
||||
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
"""Handle a flow initialized by the user."""
|
||||
errors = {}
|
||||
|
||||
if user_input is not None:
|
||||
name = user_input[CONF_NAME]
|
||||
|
||||
if name in configured_servers(self.hass):
|
||||
return self.async_abort(reason='name_exists')
|
||||
|
||||
return self.async_create_entry(
|
||||
title=name,
|
||||
data=user_input
|
||||
)
|
||||
|
||||
servers_num = len(configured_servers(self.hass))
|
||||
|
||||
if servers_num:
|
||||
default_name = "{} {}".format(DEFAULT_NAME, servers_num + 1)
|
||||
default_port = DEFAULT_PORT + servers_num
|
||||
else:
|
||||
default_name = DEFAULT_NAME
|
||||
default_port = DEFAULT_PORT
|
||||
|
||||
return self.async_show_form(
|
||||
step_id='user',
|
||||
data_schema=vol.Schema({
|
||||
vol.Required(CONF_NAME,
|
||||
default=default_name): str,
|
||||
vol.Required(CONF_LISTEN_PORT,
|
||||
default=default_port): vol.Coerce(int)
|
||||
}),
|
||||
errors=errors
|
||||
)
|
||||
|
||||
async def async_step_import(self, import_config):
|
||||
"""Handle a flow import."""
|
||||
return await self.async_step_user(import_config)
|
|
@ -0,0 +1,13 @@
|
|||
"""Constants for the emulated_roku component."""
|
||||
|
||||
DOMAIN = 'emulated_roku'
|
||||
|
||||
CONF_SERVERS = 'servers'
|
||||
CONF_LISTEN_PORT = 'listen_port'
|
||||
CONF_HOST_IP = 'host_ip'
|
||||
CONF_ADVERTISE_IP = 'advertise_ip'
|
||||
CONF_ADVERTISE_PORT = 'advertise_port'
|
||||
CONF_UPNP_BIND_MULTICAST = 'upnp_bind_multicast'
|
||||
|
||||
DEFAULT_NAME = "Home Assistant"
|
||||
DEFAULT_PORT = 8060
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"name_exists": "Name already exists"
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"data": {
|
||||
"advertise_ip": "Advertise IP",
|
||||
"advertise_port": "Advertise port",
|
||||
"host_ip": "Host IP",
|
||||
"listen_port": "Listen port",
|
||||
"name": "Name",
|
||||
"upnp_bind_multicast": "Bind multicast (True/False)"
|
||||
},
|
||||
"title": "Define server configuration"
|
||||
}
|
||||
},
|
||||
"title": "EmulatedRoku"
|
||||
}
|
||||
}
|
|
@ -140,6 +140,7 @@ FLOWS = [
|
|||
'deconz',
|
||||
'dialogflow',
|
||||
'esphome',
|
||||
'emulated_roku',
|
||||
'geofency',
|
||||
'hangouts',
|
||||
'homematicip_cloud',
|
||||
|
|
|
@ -351,6 +351,9 @@ eliqonline==1.2.2
|
|||
# homeassistant.components.elkm1
|
||||
elkm1-lib==0.7.13
|
||||
|
||||
# homeassistant.components.emulated_roku
|
||||
emulated_roku==0.1.7
|
||||
|
||||
# homeassistant.components.enocean
|
||||
enocean==0.40
|
||||
|
||||
|
|
|
@ -61,6 +61,9 @@ defusedxml==0.5.0
|
|||
# homeassistant.components.sensor.dsmr
|
||||
dsmr_parser==0.12
|
||||
|
||||
# homeassistant.components.emulated_roku
|
||||
emulated_roku==0.1.7
|
||||
|
||||
# homeassistant.components.sensor.entur_public_transport
|
||||
enturclient==0.1.3
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ TEST_REQUIREMENTS = (
|
|||
'coinmarketcap',
|
||||
'defusedxml',
|
||||
'dsmr_parser',
|
||||
'emulated_roku',
|
||||
'enturclient',
|
||||
'ephem',
|
||||
'evohomeclient',
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
"""Tests for emulated_roku."""
|
|
@ -0,0 +1,68 @@
|
|||
"""Tests for emulated_roku library bindings."""
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from homeassistant.components.emulated_roku.binding import EmulatedRoku, \
|
||||
EVENT_ROKU_COMMAND, \
|
||||
ATTR_SOURCE_NAME, ATTR_COMMAND_TYPE, ATTR_KEY, ATTR_APP_ID, \
|
||||
ROKU_COMMAND_KEYPRESS, ROKU_COMMAND_KEYDOWN, \
|
||||
ROKU_COMMAND_KEYUP, ROKU_COMMAND_LAUNCH
|
||||
|
||||
from tests.common import mock_coro_func
|
||||
|
||||
|
||||
async def test_events_fired_properly(hass):
|
||||
"""Test that events are fired correctly."""
|
||||
binding = EmulatedRoku(hass, 'Test Emulated Roku',
|
||||
'1.2.3.4', 8060,
|
||||
None, None, None)
|
||||
|
||||
events = []
|
||||
roku_event_handler = None
|
||||
|
||||
def instantiate(loop, handler,
|
||||
roku_usn, host_ip, listen_port,
|
||||
advertise_ip=None, advertise_port=None,
|
||||
bind_multicast=None):
|
||||
nonlocal roku_event_handler
|
||||
roku_event_handler = handler
|
||||
|
||||
return Mock(start=mock_coro_func(), close=mock_coro_func())
|
||||
|
||||
def listener(event):
|
||||
events.append(event)
|
||||
|
||||
with patch('emulated_roku.EmulatedRokuServer', instantiate):
|
||||
hass.bus.async_listen(EVENT_ROKU_COMMAND, listener)
|
||||
|
||||
assert await binding.setup() is True
|
||||
|
||||
assert roku_event_handler is not None
|
||||
|
||||
roku_event_handler.on_keydown('Test Emulated Roku', 'A')
|
||||
roku_event_handler.on_keyup('Test Emulated Roku', 'A')
|
||||
roku_event_handler.on_keypress('Test Emulated Roku', 'C')
|
||||
roku_event_handler.launch('Test Emulated Roku', '1')
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(events) == 4
|
||||
|
||||
assert events[0].event_type == EVENT_ROKU_COMMAND
|
||||
assert events[0].data[ATTR_COMMAND_TYPE] == ROKU_COMMAND_KEYDOWN
|
||||
assert events[0].data[ATTR_SOURCE_NAME] == 'Test Emulated Roku'
|
||||
assert events[0].data[ATTR_KEY] == 'A'
|
||||
|
||||
assert events[1].event_type == EVENT_ROKU_COMMAND
|
||||
assert events[1].data[ATTR_COMMAND_TYPE] == ROKU_COMMAND_KEYUP
|
||||
assert events[1].data[ATTR_SOURCE_NAME] == 'Test Emulated Roku'
|
||||
assert events[1].data[ATTR_KEY] == 'A'
|
||||
|
||||
assert events[2].event_type == EVENT_ROKU_COMMAND
|
||||
assert events[2].data[ATTR_COMMAND_TYPE] == ROKU_COMMAND_KEYPRESS
|
||||
assert events[2].data[ATTR_SOURCE_NAME] == 'Test Emulated Roku'
|
||||
assert events[2].data[ATTR_KEY] == 'C'
|
||||
|
||||
assert events[3].event_type == EVENT_ROKU_COMMAND
|
||||
assert events[3].data[ATTR_COMMAND_TYPE] == ROKU_COMMAND_LAUNCH
|
||||
assert events[3].data[ATTR_SOURCE_NAME] == 'Test Emulated Roku'
|
||||
assert events[3].data[ATTR_APP_ID] == '1'
|
|
@ -0,0 +1,36 @@
|
|||
"""Tests for emulated_roku config flow."""
|
||||
from homeassistant.components.emulated_roku import config_flow
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_flow_works(hass):
|
||||
"""Test that config flow works."""
|
||||
flow = config_flow.EmulatedRokuFlowHandler()
|
||||
flow.hass = hass
|
||||
result = await flow.async_step_user(user_input={
|
||||
'name': 'Emulated Roku Test',
|
||||
'listen_port': 8060
|
||||
})
|
||||
|
||||
assert result['type'] == 'create_entry'
|
||||
assert result['title'] == 'Emulated Roku Test'
|
||||
assert result['data'] == {
|
||||
'name': 'Emulated Roku Test',
|
||||
'listen_port': 8060
|
||||
}
|
||||
|
||||
|
||||
async def test_flow_already_registered_entry(hass):
|
||||
"""Test that config flow doesn't allow existing names."""
|
||||
MockConfigEntry(domain='emulated_roku', data={
|
||||
'name': 'Emulated Roku Test',
|
||||
'listen_port': 8062
|
||||
}).add_to_hass(hass)
|
||||
flow = config_flow.EmulatedRokuFlowHandler()
|
||||
flow.hass = hass
|
||||
|
||||
result = await flow.async_step_user(user_input={
|
||||
'name': 'Emulated Roku Test',
|
||||
'listen_port': 8062
|
||||
})
|
||||
assert result['type'] == 'abort'
|
|
@ -0,0 +1,91 @@
|
|||
"""Test emulated_roku component setup process."""
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.components import emulated_roku
|
||||
|
||||
from tests.common import mock_coro_func
|
||||
|
||||
|
||||
async def test_config_required_fields(hass):
|
||||
"""Test that configuration is successful with required fields."""
|
||||
with patch.object(emulated_roku, 'configured_servers', return_value=[]), \
|
||||
patch('emulated_roku.EmulatedRokuServer',
|
||||
return_value=Mock(start=mock_coro_func(),
|
||||
close=mock_coro_func())):
|
||||
assert await async_setup_component(hass, emulated_roku.DOMAIN, {
|
||||
emulated_roku.DOMAIN: {
|
||||
emulated_roku.CONF_SERVERS: [{
|
||||
emulated_roku.CONF_NAME: 'Emulated Roku Test',
|
||||
emulated_roku.CONF_LISTEN_PORT: 8060
|
||||
}]
|
||||
}
|
||||
}) is True
|
||||
|
||||
|
||||
async def test_config_already_registered_not_configured(hass):
|
||||
"""Test that an already registered name causes the entry to be ignored."""
|
||||
with patch('emulated_roku.EmulatedRokuServer',
|
||||
return_value=Mock(start=mock_coro_func(),
|
||||
close=mock_coro_func())) as instantiate, \
|
||||
patch.object(emulated_roku, 'configured_servers',
|
||||
return_value=['Emulated Roku Test']):
|
||||
assert await async_setup_component(hass, emulated_roku.DOMAIN, {
|
||||
emulated_roku.DOMAIN: {
|
||||
emulated_roku.CONF_SERVERS: [{
|
||||
emulated_roku.CONF_NAME: 'Emulated Roku Test',
|
||||
emulated_roku.CONF_LISTEN_PORT: 8060
|
||||
}]
|
||||
}
|
||||
}) is True
|
||||
|
||||
assert len(instantiate.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_setup_entry_successful(hass):
|
||||
"""Test setup entry is successful."""
|
||||
entry = Mock()
|
||||
entry.data = {
|
||||
emulated_roku.CONF_NAME: 'Emulated Roku Test',
|
||||
emulated_roku.CONF_LISTEN_PORT: 8060,
|
||||
emulated_roku.CONF_HOST_IP: '1.2.3.5',
|
||||
emulated_roku.CONF_ADVERTISE_IP: '1.2.3.4',
|
||||
emulated_roku.CONF_ADVERTISE_PORT: 8071,
|
||||
emulated_roku.CONF_UPNP_BIND_MULTICAST: False
|
||||
}
|
||||
|
||||
with patch('emulated_roku.EmulatedRokuServer',
|
||||
return_value=Mock(start=mock_coro_func(),
|
||||
close=mock_coro_func())) as instantiate:
|
||||
assert await emulated_roku.async_setup_entry(hass, entry) is True
|
||||
|
||||
assert len(instantiate.mock_calls) == 1
|
||||
assert hass.data[emulated_roku.DOMAIN]
|
||||
|
||||
roku_instance = hass.data[emulated_roku.DOMAIN]['Emulated Roku Test']
|
||||
|
||||
assert roku_instance.roku_usn == 'Emulated Roku Test'
|
||||
assert roku_instance.host_ip == '1.2.3.5'
|
||||
assert roku_instance.listen_port == 8060
|
||||
assert roku_instance.advertise_ip == '1.2.3.4'
|
||||
assert roku_instance.advertise_port == 8071
|
||||
assert roku_instance.bind_multicast is False
|
||||
|
||||
|
||||
async def test_unload_entry(hass):
|
||||
"""Test being able to unload an entry."""
|
||||
entry = Mock()
|
||||
entry.data = {'name': 'Emulated Roku Test', 'listen_port': 8060}
|
||||
|
||||
with patch('emulated_roku.EmulatedRokuServer',
|
||||
return_value=Mock(start=mock_coro_func(),
|
||||
close=mock_coro_func())):
|
||||
assert await emulated_roku.async_setup_entry(hass, entry) is True
|
||||
|
||||
assert emulated_roku.DOMAIN in hass.data
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert await emulated_roku.async_unload_entry(hass, entry)
|
||||
|
||||
assert len(hass.data[emulated_roku.DOMAIN]) == 0
|
Loading…
Reference in New Issue