Add usercode support to totalconnect (#39199)
* Add test for invalid usercode * Add usercodes to totalconnect. * Update existing tests for usercodes * Fix tests * Add test for invalid usercode * Add usercodes to totalconnect. * Update existing tests for usercodes * Fix tests * Remove YAML support * Fix conflict * Bump to total_connect_client 0.56 * Change Exception to HomeAssistantError * Fix config_flow.py * Simplify async_setup since no yaml * Remove import from config flow and tests * Add reauth and test for it. Various other fixes. * Fix pylint in __init__ * Show config yaml as deprecated * separate config_flow and init tests * Assert ENTRY_STATE_SETUP_ERROR in init test * Add test for reauth flow * Fix reauth and tests * Fix strings * Restore username and usercode with new passord * Correct the integration name * Update tests/components/totalconnect/test_config_flow.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * Update tests/components/totalconnect/test_init.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * Update .coveragerc * Add test for invalid auth during reauth * Bump total-connect-client to 0.57 * Fix .coveragerc * More tests for usercodes * Fix usercode test * Reload config entry on reauth Co-authored-by: Martin Hjelmare <marhje52@gmail.com>pull/46902/head
parent
d61d39de08
commit
e5aef45bd7
|
@ -978,7 +978,10 @@ omit =
|
|||
homeassistant/components/toon/sensor.py
|
||||
homeassistant/components/toon/switch.py
|
||||
homeassistant/components/torque/sensor.py
|
||||
homeassistant/components/totalconnect/*
|
||||
homeassistant/components/totalconnect/__init__.py
|
||||
homeassistant/components/totalconnect/alarm_control_panel.py
|
||||
homeassistant/components/totalconnect/binary_sensor.py
|
||||
homeassistant/components/totalconnect/const.py
|
||||
homeassistant/components/touchline/climate.py
|
||||
homeassistant/components/tplink/common.py
|
||||
homeassistant/components/tplink/switch.py
|
||||
|
|
|
@ -5,60 +5,79 @@ import logging
|
|||
from total_connect_client import TotalConnectClient
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
||||
from homeassistant.core import HomeAssistant
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import CONF_USERCODES, DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORMS = ["alarm_control_panel", "binary_sensor"]
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
{
|
||||
DOMAIN: vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_USERNAME): cv.string,
|
||||
vol.Required(CONF_PASSWORD): cv.string,
|
||||
}
|
||||
)
|
||||
},
|
||||
vol.All(
|
||||
cv.deprecated(DOMAIN),
|
||||
{
|
||||
DOMAIN: vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_USERNAME): cv.string,
|
||||
vol.Required(CONF_PASSWORD): cv.string,
|
||||
}
|
||||
)
|
||||
},
|
||||
),
|
||||
extra=vol.ALLOW_EXTRA,
|
||||
)
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: dict):
|
||||
"""Set up by configuration file."""
|
||||
if DOMAIN not in config:
|
||||
return True
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data=config[DOMAIN],
|
||||
)
|
||||
)
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
|
||||
"""Set up upon config entry in user interface."""
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
|
||||
conf = entry.data
|
||||
username = conf[CONF_USERNAME]
|
||||
password = conf[CONF_PASSWORD]
|
||||
|
||||
if CONF_USERCODES not in conf:
|
||||
_LOGGER.warning("No usercodes in TotalConnect configuration")
|
||||
# should only happen for those who used UI before we added usercodes
|
||||
await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={
|
||||
"source": SOURCE_REAUTH,
|
||||
},
|
||||
data=conf,
|
||||
)
|
||||
return False
|
||||
|
||||
temp_codes = conf[CONF_USERCODES]
|
||||
usercodes = {}
|
||||
for code in temp_codes:
|
||||
usercodes[int(code)] = temp_codes[code]
|
||||
|
||||
client = await hass.async_add_executor_job(
|
||||
TotalConnectClient.TotalConnectClient, username, password
|
||||
TotalConnectClient.TotalConnectClient, username, password, usercodes
|
||||
)
|
||||
|
||||
if not client.is_valid_credentials():
|
||||
_LOGGER.error("TotalConnect authentication failed")
|
||||
await hass.async_create_task(
|
||||
hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={
|
||||
"source": SOURCE_REAUTH,
|
||||
},
|
||||
data=conf,
|
||||
)
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
hass.data[DOMAIN][entry.entry_id] = client
|
||||
|
|
|
@ -5,7 +5,11 @@ import voluptuous as vol
|
|||
from homeassistant import config_entries
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
||||
|
||||
from .const import DOMAIN # pylint: disable=unused-import
|
||||
from .const import CONF_USERCODES, DOMAIN # pylint: disable=unused-import
|
||||
|
||||
CONF_LOCATION = "location"
|
||||
|
||||
PASSWORD_DATA_SCHEMA = vol.Schema({vol.Required(CONF_PASSWORD): str})
|
||||
|
||||
|
||||
class TotalConnectConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
|
@ -13,6 +17,13 @@ class TotalConnectConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the config flow."""
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.usercodes = {}
|
||||
self.client = None
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
"""Handle a flow initiated by the user."""
|
||||
errors = {}
|
||||
|
@ -25,14 +36,16 @@ class TotalConnectConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
await self.async_set_unique_id(username)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
valid = await self.is_valid(username, password)
|
||||
client = await self.hass.async_add_executor_job(
|
||||
TotalConnectClient.TotalConnectClient, username, password, None
|
||||
)
|
||||
|
||||
if valid:
|
||||
# authentication success / valid
|
||||
return self.async_create_entry(
|
||||
title="Total Connect",
|
||||
data={CONF_USERNAME: username, CONF_PASSWORD: password},
|
||||
)
|
||||
if client.is_valid_credentials():
|
||||
# username/password valid so show user locations
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.client = client
|
||||
return await self.async_step_locations()
|
||||
# authentication failed / invalid
|
||||
errors["base"] = "invalid_auth"
|
||||
|
||||
|
@ -44,13 +57,101 @@ class TotalConnectConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
step_id="user", data_schema=data_schema, errors=errors
|
||||
)
|
||||
|
||||
async def async_step_import(self, user_input):
|
||||
"""Import a config entry."""
|
||||
return await self.async_step_user(user_input)
|
||||
async def async_step_locations(self, user_entry=None):
|
||||
"""Handle the user locations and associated usercodes."""
|
||||
errors = {}
|
||||
if user_entry is not None:
|
||||
for location_id in self.usercodes:
|
||||
if self.usercodes[location_id] is None:
|
||||
valid = await self.hass.async_add_executor_job(
|
||||
self.client.locations[location_id].set_usercode,
|
||||
user_entry[CONF_LOCATION],
|
||||
)
|
||||
if valid:
|
||||
self.usercodes[location_id] = user_entry[CONF_LOCATION]
|
||||
else:
|
||||
errors[CONF_LOCATION] = "usercode"
|
||||
break
|
||||
|
||||
async def is_valid(self, username="", password=""):
|
||||
"""Return true if the given username and password are valid."""
|
||||
client = await self.hass.async_add_executor_job(
|
||||
TotalConnectClient.TotalConnectClient, username, password
|
||||
complete = True
|
||||
for location_id in self.usercodes:
|
||||
if self.usercodes[location_id] is None:
|
||||
complete = False
|
||||
|
||||
if not errors and complete:
|
||||
return self.async_create_entry(
|
||||
title="Total Connect",
|
||||
data={
|
||||
CONF_USERNAME: self.username,
|
||||
CONF_PASSWORD: self.password,
|
||||
CONF_USERCODES: self.usercodes,
|
||||
},
|
||||
)
|
||||
else:
|
||||
for location_id in self.client.locations:
|
||||
self.usercodes[location_id] = None
|
||||
|
||||
# show the next location that needs a usercode
|
||||
location_codes = {}
|
||||
for location_id in self.usercodes:
|
||||
if self.usercodes[location_id] is None:
|
||||
location_codes[
|
||||
vol.Required(
|
||||
CONF_LOCATION,
|
||||
default=location_id,
|
||||
)
|
||||
] = str
|
||||
break
|
||||
|
||||
data_schema = vol.Schema(location_codes)
|
||||
return self.async_show_form(
|
||||
step_id="locations",
|
||||
data_schema=data_schema,
|
||||
errors=errors,
|
||||
description_placeholders={"base": "description"},
|
||||
)
|
||||
return client.is_valid_credentials()
|
||||
|
||||
async def async_step_reauth(self, config):
|
||||
"""Perform reauth upon an authentication error or no usercode."""
|
||||
self.username = config[CONF_USERNAME]
|
||||
self.usercodes = config[CONF_USERCODES]
|
||||
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(self, user_input=None):
|
||||
"""Dialog that informs the user that reauth is required."""
|
||||
errors = {}
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="reauth_confirm",
|
||||
data_schema=PASSWORD_DATA_SCHEMA,
|
||||
)
|
||||
|
||||
client = await self.hass.async_add_executor_job(
|
||||
TotalConnectClient.TotalConnectClient,
|
||||
self.username,
|
||||
user_input[CONF_PASSWORD],
|
||||
self.usercodes,
|
||||
)
|
||||
|
||||
if not client.is_valid_credentials():
|
||||
errors["base"] = "invalid_auth"
|
||||
return self.async_show_form(
|
||||
step_id="reauth_confirm",
|
||||
errors=errors,
|
||||
data_schema=PASSWORD_DATA_SCHEMA,
|
||||
)
|
||||
|
||||
existing_entry = await self.async_set_unique_id(self.username)
|
||||
new_entry = {
|
||||
CONF_USERNAME: self.username,
|
||||
CONF_PASSWORD: user_input[CONF_PASSWORD],
|
||||
CONF_USERCODES: self.usercodes,
|
||||
}
|
||||
self.hass.config_entries.async_update_entry(existing_entry, data=new_entry)
|
||||
|
||||
self.hass.async_create_task(
|
||||
self.hass.config_entries.async_reload(existing_entry.entry_id)
|
||||
)
|
||||
|
||||
return self.async_abort(reason="reauth_successful")
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
"""TotalConnect constants."""
|
||||
|
||||
DOMAIN = "totalconnect"
|
||||
CONF_USERCODES = "usercodes"
|
||||
CONF_LOCATION = "location"
|
||||
|
||||
# Most TotalConnect alarms will work passing '-1' as usercode
|
||||
DEFAULT_USERCODE = "-1"
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
{
|
||||
"domain": "totalconnect",
|
||||
"name": "Honeywell Total Connect Alarm",
|
||||
"name": "Total Connect",
|
||||
"documentation": "https://www.home-assistant.io/integrations/totalconnect",
|
||||
"requirements": ["total_connect_client==0.55"],
|
||||
"requirements": ["total_connect_client==0.57"],
|
||||
"dependencies": [],
|
||||
"codeowners": ["@austinmroczek"],
|
||||
"config_flow": true
|
||||
}
|
||||
|
|
|
@ -7,13 +7,26 @@
|
|||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"password": "[%key:common::config_flow::data::password%]"
|
||||
}
|
||||
},
|
||||
"locations": {
|
||||
"title": "Location Usercodes",
|
||||
"description": "Enter the usercode for this user at this location",
|
||||
"data": {
|
||||
"location": "Location"
|
||||
}
|
||||
},
|
||||
"reauth_confirm": {
|
||||
"title": "[%key:common::config_flow::title::reauth%]",
|
||||
"description": "Total Connect needs to re-authenticate your account"
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]"
|
||||
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
|
||||
"usercode": "Usercode not valid for this user at this location"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
|
||||
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,25 @@
|
|||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"already_configured": "Account is already configured"
|
||||
"already_configured": "Account is already configured",
|
||||
"reauth_successful": "Re-authentication was successful"
|
||||
},
|
||||
"error": {
|
||||
"invalid_auth": "Invalid authentication"
|
||||
"invalid_auth": "Invalid authentication",
|
||||
"usercode": "Usercode not valid for this user at this location"
|
||||
},
|
||||
"step": {
|
||||
"locations": {
|
||||
"data": {
|
||||
"location": "Location"
|
||||
},
|
||||
"description": "Enter the usercode for this user at this location",
|
||||
"title": "Location Usercodes"
|
||||
},
|
||||
"reauth_confirm": {
|
||||
"description": "Total Connect needs to re-authenticate your account",
|
||||
"title": "Reauthenticate Integration"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
"password": "Password",
|
||||
|
|
|
@ -2221,7 +2221,7 @@ todoist-python==8.0.0
|
|||
toonapi==0.2.0
|
||||
|
||||
# homeassistant.components.totalconnect
|
||||
total_connect_client==0.55
|
||||
total_connect_client==0.57
|
||||
|
||||
# homeassistant.components.tplink_lte
|
||||
tp-connected==0.0.4
|
||||
|
|
|
@ -1133,7 +1133,7 @@ teslajsonpy==0.11.5
|
|||
toonapi==0.2.0
|
||||
|
||||
# homeassistant.components.totalconnect
|
||||
total_connect_client==0.55
|
||||
total_connect_client==0.57
|
||||
|
||||
# homeassistant.components.transmission
|
||||
transmissionrpc==0.11
|
||||
|
|
|
@ -3,7 +3,7 @@ from unittest.mock import patch
|
|||
|
||||
from total_connect_client import TotalConnectClient
|
||||
|
||||
from homeassistant.components.totalconnect import DOMAIN
|
||||
from homeassistant.components.totalconnect.const import CONF_USERCODES, DOMAIN
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
@ -29,13 +29,19 @@ USER = {
|
|||
}
|
||||
|
||||
RESPONSE_AUTHENTICATE = {
|
||||
"ResultCode": 0,
|
||||
"ResultCode": TotalConnectClient.TotalConnectClient.SUCCESS,
|
||||
"SessionID": 1,
|
||||
"Locations": LOCATIONS,
|
||||
"ModuleFlags": MODULE_FLAGS,
|
||||
"UserInfo": USER,
|
||||
}
|
||||
|
||||
RESPONSE_AUTHENTICATE_FAILED = {
|
||||
"ResultCode": TotalConnectClient.TotalConnectClient.BAD_USER_OR_PASSWORD,
|
||||
"ResultData": "test bad authentication",
|
||||
}
|
||||
|
||||
|
||||
PARTITION_DISARMED = {
|
||||
"PartitionID": "1",
|
||||
"ArmingState": TotalConnectClient.TotalConnectLocation.DISARMED,
|
||||
|
@ -101,6 +107,32 @@ RESPONSE_DISARM_FAILURE = {
|
|||
"ResultCode": TotalConnectClient.TotalConnectClient.COMMAND_FAILED,
|
||||
"ResultData": "Command Failed",
|
||||
}
|
||||
RESPONSE_USER_CODE_INVALID = {
|
||||
"ResultCode": TotalConnectClient.TotalConnectClient.USER_CODE_INVALID,
|
||||
"ResultData": "testing user code invalid",
|
||||
}
|
||||
RESPONSE_SUCCESS = {"ResultCode": TotalConnectClient.TotalConnectClient.SUCCESS}
|
||||
|
||||
USERNAME = "username@me.com"
|
||||
PASSWORD = "password"
|
||||
USERCODES = {123456: "7890"}
|
||||
CONFIG_DATA = {
|
||||
CONF_USERNAME: USERNAME,
|
||||
CONF_PASSWORD: PASSWORD,
|
||||
CONF_USERCODES: USERCODES,
|
||||
}
|
||||
CONFIG_DATA_NO_USERCODES = {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
|
||||
|
||||
|
||||
USERNAME = "username@me.com"
|
||||
PASSWORD = "password"
|
||||
USERCODES = {123456: "7890"}
|
||||
CONFIG_DATA = {
|
||||
CONF_USERNAME: USERNAME,
|
||||
CONF_PASSWORD: PASSWORD,
|
||||
CONF_USERCODES: USERCODES,
|
||||
}
|
||||
CONFIG_DATA_NO_USERCODES = {CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
|
||||
|
||||
|
||||
async def setup_platform(hass, platform):
|
||||
|
@ -108,7 +140,7 @@ async def setup_platform(hass, platform):
|
|||
# first set up a config entry and add it to hass
|
||||
mock_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_USERNAME: "user@email.com", CONF_PASSWORD: "password"},
|
||||
data=CONFIG_DATA,
|
||||
)
|
||||
mock_entry.add_to_hass(hass)
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ from homeassistant.const import (
|
|||
STATE_ALARM_ARMED_HOME,
|
||||
STATE_ALARM_DISARMED,
|
||||
)
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from .common import (
|
||||
RESPONSE_ARM_FAILURE,
|
||||
|
@ -23,6 +24,7 @@ from .common import (
|
|||
RESPONSE_DISARM_FAILURE,
|
||||
RESPONSE_DISARM_SUCCESS,
|
||||
RESPONSE_DISARMED,
|
||||
RESPONSE_USER_CODE_INVALID,
|
||||
setup_platform,
|
||||
)
|
||||
|
||||
|
@ -72,12 +74,31 @@ async def test_arm_home_failure(hass):
|
|||
await setup_platform(hass, ALARM_DOMAIN)
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
with pytest.raises(HomeAssistantError) as err:
|
||||
await hass.services.async_call(
|
||||
ALARM_DOMAIN, SERVICE_ALARM_ARM_HOME, DATA, blocking=True
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert f"{e.value}" == "TotalConnect failed to arm home test."
|
||||
assert f"{err.value}" == "TotalConnect failed to arm home test."
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
|
||||
async def test_arm_home_invalid_usercode(hass):
|
||||
"""Test arm home method with invalid usercode."""
|
||||
responses = [RESPONSE_DISARMED, RESPONSE_USER_CODE_INVALID, RESPONSE_DISARMED]
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.TotalConnectClient.TotalConnectClient.request",
|
||||
side_effect=responses,
|
||||
):
|
||||
await setup_platform(hass, ALARM_DOMAIN)
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
with pytest.raises(HomeAssistantError) as err:
|
||||
await hass.services.async_call(
|
||||
ALARM_DOMAIN, SERVICE_ALARM_ARM_HOME, DATA, blocking=True
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert f"{err.value}" == "TotalConnect failed to arm home test."
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
|
||||
|
@ -108,12 +129,12 @@ async def test_arm_away_failure(hass):
|
|||
await setup_platform(hass, ALARM_DOMAIN)
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
with pytest.raises(HomeAssistantError) as err:
|
||||
await hass.services.async_call(
|
||||
ALARM_DOMAIN, SERVICE_ALARM_ARM_AWAY, DATA, blocking=True
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert f"{e.value}" == "TotalConnect failed to arm away test."
|
||||
assert f"{err.value}" == "TotalConnect failed to arm away test."
|
||||
assert STATE_ALARM_DISARMED == hass.states.get(ENTITY_ID).state
|
||||
|
||||
|
||||
|
@ -144,10 +165,29 @@ async def test_disarm_failure(hass):
|
|||
await setup_platform(hass, ALARM_DOMAIN)
|
||||
assert STATE_ALARM_ARMED_AWAY == hass.states.get(ENTITY_ID).state
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
with pytest.raises(HomeAssistantError) as err:
|
||||
await hass.services.async_call(
|
||||
ALARM_DOMAIN, SERVICE_ALARM_DISARM, DATA, blocking=True
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert f"{e.value}" == "TotalConnect failed to disarm test."
|
||||
assert f"{err.value}" == "TotalConnect failed to disarm test."
|
||||
assert STATE_ALARM_ARMED_AWAY == hass.states.get(ENTITY_ID).state
|
||||
|
||||
|
||||
async def test_disarm_invalid_usercode(hass):
|
||||
"""Test disarm method failure."""
|
||||
responses = [RESPONSE_ARMED_AWAY, RESPONSE_USER_CODE_INVALID, RESPONSE_ARMED_AWAY]
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.TotalConnectClient.TotalConnectClient.request",
|
||||
side_effect=responses,
|
||||
):
|
||||
await setup_platform(hass, ALARM_DOMAIN)
|
||||
assert STATE_ALARM_ARMED_AWAY == hass.states.get(ENTITY_ID).state
|
||||
|
||||
with pytest.raises(HomeAssistantError) as err:
|
||||
await hass.services.async_call(
|
||||
ALARM_DOMAIN, SERVICE_ALARM_DISARM, DATA, blocking=True
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert f"{err.value}" == "TotalConnect failed to disarm test."
|
||||
assert STATE_ALARM_ARMED_AWAY == hass.states.get(ENTITY_ID).state
|
||||
|
|
|
@ -1,78 +1,97 @@
|
|||
"""Tests for the iCloud config flow."""
|
||||
"""Tests for the TotalConnect config flow."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant import data_entry_flow
|
||||
from homeassistant.components.totalconnect.const import DOMAIN
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
||||
from homeassistant.components.totalconnect.const import CONF_LOCATION, DOMAIN
|
||||
from homeassistant.config_entries import SOURCE_USER
|
||||
from homeassistant.const import CONF_PASSWORD
|
||||
|
||||
from .common import (
|
||||
CONFIG_DATA,
|
||||
CONFIG_DATA_NO_USERCODES,
|
||||
RESPONSE_AUTHENTICATE,
|
||||
RESPONSE_DISARMED,
|
||||
RESPONSE_SUCCESS,
|
||||
RESPONSE_USER_CODE_INVALID,
|
||||
USERNAME,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
USERNAME = "username@me.com"
|
||||
PASSWORD = "password"
|
||||
|
||||
|
||||
async def test_user(hass):
|
||||
"""Test user config."""
|
||||
# no data provided so show the form
|
||||
"""Test user step."""
|
||||
# user starts with no data entered, so show the user form
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data=None,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
# now data is provided, so check if login is correct and create the entry
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.config_flow.TotalConnectClient.TotalConnectClient"
|
||||
) as client_mock:
|
||||
client_mock.return_value.is_valid_credentials.return_value = True
|
||||
|
||||
async def test_user_show_locations(hass):
|
||||
"""Test user locations form."""
|
||||
# user/pass provided, so check if valid then ask for usercodes on locations form
|
||||
responses = [
|
||||
RESPONSE_AUTHENTICATE,
|
||||
RESPONSE_DISARMED,
|
||||
RESPONSE_USER_CODE_INVALID,
|
||||
RESPONSE_SUCCESS,
|
||||
]
|
||||
|
||||
with patch("zeep.Client", autospec=True), patch(
|
||||
"homeassistant.components.totalconnect.TotalConnectClient.TotalConnectClient.request",
|
||||
side_effect=responses,
|
||||
) as mock_request, patch(
|
||||
"homeassistant.components.totalconnect.TotalConnectClient.TotalConnectClient.get_zone_details",
|
||||
return_value=True,
|
||||
), patch(
|
||||
"homeassistant.components.totalconnect.async_setup_entry", return_value=True
|
||||
):
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
data=CONFIG_DATA_NO_USERCODES,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
# first it should show the locations form
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "locations"
|
||||
# client should have sent two requests, authenticate and get status
|
||||
assert mock_request.call_count == 2
|
||||
|
||||
|
||||
async def test_import(hass):
|
||||
"""Test import step with good username and password."""
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.config_flow.TotalConnectClient.TotalConnectClient"
|
||||
) as client_mock:
|
||||
client_mock.return_value.is_valid_credentials.return_value = True
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
# user enters an invalid usercode
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
user_input={CONF_LOCATION: "bad"},
|
||||
)
|
||||
assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result2["step_id"] == "locations"
|
||||
# client should have sent 3rd request to validate usercode
|
||||
assert mock_request.call_count == 3
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
# user enters a valid usercode
|
||||
result3 = await hass.config_entries.flow.async_configure(
|
||||
result2["flow_id"],
|
||||
user_input={CONF_LOCATION: "7890"},
|
||||
)
|
||||
assert result3["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
# client should have sent another request to validate usercode
|
||||
assert mock_request.call_count == 4
|
||||
|
||||
|
||||
async def test_abort_if_already_setup(hass):
|
||||
"""Test abort if the account is already setup."""
|
||||
MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
data=CONFIG_DATA,
|
||||
unique_id=USERNAME,
|
||||
).add_to_hass(hass)
|
||||
|
||||
# Should fail, same USERNAME (import)
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.config_flow.TotalConnectClient.TotalConnectClient"
|
||||
) as client_mock:
|
||||
client_mock.return_value.is_valid_credentials.return_value = True
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
# Should fail, same USERNAME (flow)
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.config_flow.TotalConnectClient.TotalConnectClient"
|
||||
|
@ -81,7 +100,7 @@ async def test_abort_if_already_setup(hass):
|
|||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
data=CONFIG_DATA,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||
|
@ -97,8 +116,51 @@ async def test_login_failed(hass):
|
|||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
|
||||
data=CONFIG_DATA,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result["errors"] == {"base": "invalid_auth"}
|
||||
|
||||
|
||||
async def test_reauth(hass):
|
||||
"""Test reauth."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=CONFIG_DATA,
|
||||
unique_id=USERNAME,
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": "reauth"}, data=entry.data
|
||||
)
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.config_flow.TotalConnectClient.TotalConnectClient"
|
||||
) as client_mock:
|
||||
# first test with an invalid password
|
||||
client_mock.return_value.is_valid_credentials.return_value = False
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={CONF_PASSWORD: "password"}
|
||||
)
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
assert result["errors"] == {"base": "invalid_auth"}
|
||||
|
||||
# now test with the password valid
|
||||
client_mock.return_value.is_valid_credentials.return_value = True
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={CONF_PASSWORD: "password"}
|
||||
)
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "reauth_successful"
|
||||
|
||||
assert len(hass.config_entries.async_entries()) == 1
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
"""Tests for the TotalConnect init process."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components.totalconnect.const import DOMAIN
|
||||
from homeassistant.config_entries import ENTRY_STATE_SETUP_ERROR
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from .common import CONFIG_DATA
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_reauth_started(hass):
|
||||
"""Test that reauth is started when we have login errors."""
|
||||
mock_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=CONFIG_DATA,
|
||||
)
|
||||
mock_entry.add_to_hass(hass)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.totalconnect.TotalConnectClient.TotalConnectClient",
|
||||
autospec=True,
|
||||
) as mock_client:
|
||||
mock_client.return_value.is_valid_credentials.return_value = False
|
||||
assert await async_setup_component(hass, DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_entry.state == ENTRY_STATE_SETUP_ERROR
|
Loading…
Reference in New Issue