Update cloud auth (#9357)
* Update cloud logic * Lint * Update test requirements * Address commments, fix tests * Add credentialspull/8913/head^2
parent
90f9a6bc0a
commit
c9fc3fae6e
|
@ -4,10 +4,11 @@ import logging
|
|||
|
||||
import voluptuous as vol
|
||||
|
||||
from . import http_api, cloud_api
|
||||
from . import http_api, auth_api
|
||||
from .const import DOMAIN
|
||||
|
||||
|
||||
REQUIREMENTS = ['warrant==0.2.0']
|
||||
DEPENDENCIES = ['http']
|
||||
CONF_MODE = 'mode'
|
||||
MODE_DEV = 'development'
|
||||
|
@ -40,10 +41,7 @@ def async_setup(hass, config):
|
|||
'mode': mode
|
||||
}
|
||||
|
||||
cloud = yield from cloud_api.async_load_auth(hass)
|
||||
|
||||
if cloud is not None:
|
||||
data['cloud'] = cloud
|
||||
data['auth'] = yield from hass.async_add_job(auth_api.load_auth, hass)
|
||||
|
||||
yield from http_api.async_setup(hass)
|
||||
return True
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
"""Package to offer tools to authenticate with the cloud."""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from .const import AUTH_FILE, SERVERS
|
||||
from .util import get_mode
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CloudError(Exception):
|
||||
"""Base class for cloud related errors."""
|
||||
|
||||
|
||||
class Unauthenticated(CloudError):
|
||||
"""Raised when authentication failed."""
|
||||
|
||||
|
||||
class UserNotFound(CloudError):
|
||||
"""Raised when a user is not found."""
|
||||
|
||||
|
||||
class UserNotConfirmed(CloudError):
|
||||
"""Raised when a user has not confirmed email yet."""
|
||||
|
||||
|
||||
class ExpiredCode(CloudError):
|
||||
"""Raised when an expired code is encoutered."""
|
||||
|
||||
|
||||
class InvalidCode(CloudError):
|
||||
"""Raised when an invalid code is submitted."""
|
||||
|
||||
|
||||
class PasswordChangeRequired(CloudError):
|
||||
"""Raised when a password change is required."""
|
||||
|
||||
def __init__(self, message='Password change required.'):
|
||||
"""Initialize a password change required error."""
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class UnknownError(CloudError):
|
||||
"""Raised when an unknown error occurrs."""
|
||||
|
||||
|
||||
AWS_EXCEPTIONS = {
|
||||
'UserNotFoundException': UserNotFound,
|
||||
'NotAuthorizedException': Unauthenticated,
|
||||
'ExpiredCodeException': ExpiredCode,
|
||||
'UserNotConfirmedException': UserNotConfirmed,
|
||||
'PasswordResetRequiredException': PasswordChangeRequired,
|
||||
'CodeMismatchException': InvalidCode,
|
||||
}
|
||||
|
||||
|
||||
def _map_aws_exception(err):
|
||||
"""Map AWS exception to our exceptions."""
|
||||
ex = AWS_EXCEPTIONS.get(err.response['Error']['Code'], UnknownError)
|
||||
return ex(err.response['Error']['Message'])
|
||||
|
||||
|
||||
def load_auth(hass):
|
||||
"""Load authentication from disk and verify it."""
|
||||
info = _read_info(hass)
|
||||
|
||||
if info is None:
|
||||
return Auth(hass)
|
||||
|
||||
auth = Auth(hass, _cognito(
|
||||
hass,
|
||||
id_token=info['id_token'],
|
||||
access_token=info['access_token'],
|
||||
refresh_token=info['refresh_token'],
|
||||
))
|
||||
|
||||
if auth.validate_auth():
|
||||
return auth
|
||||
|
||||
return Auth(hass)
|
||||
|
||||
|
||||
def register(hass, email, password):
|
||||
"""Register a new account."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
cognito = _cognito(hass, username=email)
|
||||
try:
|
||||
cognito.register(email, password)
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
|
||||
def confirm_register(hass, confirmation_code, email):
|
||||
"""Confirm confirmation code after registration."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
cognito = _cognito(hass, username=email)
|
||||
try:
|
||||
cognito.confirm_sign_up(confirmation_code, email)
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
|
||||
def forgot_password(hass, email):
|
||||
"""Initiate forgotten password flow."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
cognito = _cognito(hass, username=email)
|
||||
try:
|
||||
cognito.initiate_forgot_password()
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
|
||||
def confirm_forgot_password(hass, confirmation_code, email, new_password):
|
||||
"""Confirm forgotten password code and change password."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
cognito = _cognito(hass, username=email)
|
||||
try:
|
||||
cognito.confirm_forgot_password(confirmation_code, new_password)
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
|
||||
class Auth(object):
|
||||
"""Class that holds Cloud authentication."""
|
||||
|
||||
def __init__(self, hass, cognito=None):
|
||||
"""Initialize Hass cloud info object."""
|
||||
self.hass = hass
|
||||
self.cognito = cognito
|
||||
self.account = None
|
||||
|
||||
@property
|
||||
def is_logged_in(self):
|
||||
"""Return if user is logged in."""
|
||||
return self.account is not None
|
||||
|
||||
def validate_auth(self):
|
||||
"""Validate that the contained auth is valid."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
try:
|
||||
self._refresh_account_info()
|
||||
except ClientError as err:
|
||||
if err.response['Error']['Code'] != 'NotAuthorizedException':
|
||||
_LOGGER.error('Unexpected error verifying auth: %s', err)
|
||||
return False
|
||||
|
||||
try:
|
||||
self.renew_access_token()
|
||||
self._refresh_account_info()
|
||||
except ClientError:
|
||||
_LOGGER.error('Unable to refresh auth token: %s', err)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def login(self, username, password):
|
||||
"""Login using a username and password."""
|
||||
from botocore.exceptions import ClientError
|
||||
from warrant.exceptions import ForceChangePasswordException
|
||||
|
||||
cognito = _cognito(self.hass, username=username)
|
||||
|
||||
try:
|
||||
cognito.authenticate(password=password)
|
||||
self.cognito = cognito
|
||||
self._refresh_account_info()
|
||||
_write_info(self.hass, self)
|
||||
|
||||
except ForceChangePasswordException as err:
|
||||
raise PasswordChangeRequired
|
||||
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
def _refresh_account_info(self):
|
||||
"""Refresh the account info.
|
||||
|
||||
Raises boto3 exceptions.
|
||||
"""
|
||||
self.account = self.cognito.get_user()
|
||||
|
||||
def renew_access_token(self):
|
||||
"""Refresh token."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
try:
|
||||
self.cognito.renew_access_token()
|
||||
_write_info(self.hass, self)
|
||||
return True
|
||||
except ClientError as err:
|
||||
_LOGGER.error('Error refreshing token: %s', err)
|
||||
return False
|
||||
|
||||
def logout(self):
|
||||
"""Invalidate token."""
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
try:
|
||||
self.cognito.logout()
|
||||
self.account = None
|
||||
_write_info(self.hass, self)
|
||||
except ClientError as err:
|
||||
raise _map_aws_exception(err)
|
||||
|
||||
|
||||
def _read_info(hass):
|
||||
"""Read auth file."""
|
||||
path = hass.config.path(AUTH_FILE)
|
||||
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
with open(path) as file:
|
||||
return json.load(file).get(get_mode(hass))
|
||||
|
||||
|
||||
def _write_info(hass, auth):
|
||||
"""Write auth info for specified mode.
|
||||
|
||||
Pass in None for data to remove authentication for that mode.
|
||||
"""
|
||||
path = hass.config.path(AUTH_FILE)
|
||||
mode = get_mode(hass)
|
||||
|
||||
if os.path.isfile(path):
|
||||
with open(path) as file:
|
||||
content = json.load(file)
|
||||
else:
|
||||
content = {}
|
||||
|
||||
if auth.is_logged_in:
|
||||
content[mode] = {
|
||||
'id_token': auth.cognito.id_token,
|
||||
'access_token': auth.cognito.access_token,
|
||||
'refresh_token': auth.cognito.refresh_token,
|
||||
}
|
||||
else:
|
||||
content.pop(mode, None)
|
||||
|
||||
with open(path, 'wt') as file:
|
||||
file.write(json.dumps(content, indent=4, sort_keys=True))
|
||||
|
||||
|
||||
def _cognito(hass, **kwargs):
|
||||
"""Get the client credentials."""
|
||||
from warrant import Cognito
|
||||
|
||||
mode = get_mode(hass)
|
||||
|
||||
info = SERVERS.get(mode)
|
||||
|
||||
if info is None:
|
||||
raise ValueError('Mode {} is not supported.'.format(mode))
|
||||
|
||||
cognito = Cognito(
|
||||
user_pool_id=info['identity_pool_id'],
|
||||
client_id=info['client_id'],
|
||||
user_pool_region=info['region'],
|
||||
access_key=info['access_key_id'],
|
||||
secret_key=info['secret_access_key'],
|
||||
**kwargs
|
||||
)
|
||||
|
||||
return cognito
|
|
@ -1,297 +0,0 @@
|
|||
"""Package to offer tools to communicate with the cloud."""
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import aiohttp
|
||||
import async_timeout
|
||||
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.util.dt import utcnow
|
||||
|
||||
from .const import AUTH_FILE, REQUEST_TIMEOUT, SERVERS
|
||||
from .util import get_mode
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
URL_CREATE_TOKEN = 'o/token/'
|
||||
URL_REVOKE_TOKEN = 'o/revoke_token/'
|
||||
URL_ACCOUNT = 'account.json'
|
||||
|
||||
|
||||
class CloudError(Exception):
|
||||
"""Base class for cloud related errors."""
|
||||
|
||||
def __init__(self, reason=None, status=None):
|
||||
"""Initialize a cloud error."""
|
||||
super().__init__(reason)
|
||||
self.status = status
|
||||
|
||||
|
||||
class Unauthenticated(CloudError):
|
||||
"""Raised when authentication failed."""
|
||||
|
||||
|
||||
class UnknownError(CloudError):
|
||||
"""Raised when an unknown error occurred."""
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_load_auth(hass):
|
||||
"""Load authentication from disk and verify it."""
|
||||
auth = yield from hass.async_add_job(_read_auth, hass)
|
||||
|
||||
if not auth:
|
||||
return None
|
||||
|
||||
cloud = Cloud(hass, auth)
|
||||
|
||||
try:
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
auth_check = yield from cloud.async_refresh_account_info()
|
||||
|
||||
if not auth_check:
|
||||
_LOGGER.error('Unable to validate credentials.')
|
||||
return None
|
||||
|
||||
return cloud
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
_LOGGER.error('Unable to reach server to validate credentials.')
|
||||
return None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_login(hass, username, password, scope=None):
|
||||
"""Get a token using a username and password.
|
||||
|
||||
Returns a coroutine.
|
||||
"""
|
||||
data = {
|
||||
'grant_type': 'password',
|
||||
'username': username,
|
||||
'password': password
|
||||
}
|
||||
if scope is not None:
|
||||
data['scope'] = scope
|
||||
|
||||
auth = yield from _async_get_token(hass, data)
|
||||
|
||||
yield from hass.async_add_job(_write_auth, hass, auth)
|
||||
|
||||
return Cloud(hass, auth)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def _async_get_token(hass, data):
|
||||
"""Get a new token and return it as a dictionary.
|
||||
|
||||
Raises exceptions when errors occur:
|
||||
- Unauthenticated
|
||||
- UnknownError
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
auth = aiohttp.BasicAuth(*_client_credentials(hass))
|
||||
|
||||
try:
|
||||
req = yield from session.post(
|
||||
_url(hass, URL_CREATE_TOKEN),
|
||||
data=data,
|
||||
auth=auth
|
||||
)
|
||||
|
||||
if req.status == 401:
|
||||
_LOGGER.error('Cloud login failed: %d', req.status)
|
||||
raise Unauthenticated(status=req.status)
|
||||
elif req.status != 200:
|
||||
_LOGGER.error('Cloud login failed: %d', req.status)
|
||||
raise UnknownError(status=req.status)
|
||||
|
||||
response = yield from req.json()
|
||||
response['expires_at'] = \
|
||||
(utcnow() + timedelta(seconds=response['expires_in'])).isoformat()
|
||||
|
||||
return response
|
||||
|
||||
except aiohttp.ClientError:
|
||||
raise UnknownError()
|
||||
|
||||
|
||||
class Cloud:
|
||||
"""Store Hass Cloud info."""
|
||||
|
||||
def __init__(self, hass, auth):
|
||||
"""Initialize Hass cloud info object."""
|
||||
self.hass = hass
|
||||
self.auth = auth
|
||||
self.account = None
|
||||
|
||||
@property
|
||||
def access_token(self):
|
||||
"""Return access token."""
|
||||
return self.auth['access_token']
|
||||
|
||||
@property
|
||||
def refresh_token(self):
|
||||
"""Get refresh token."""
|
||||
return self.auth['refresh_token']
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_refresh_account_info(self):
|
||||
"""Refresh the account info."""
|
||||
req = yield from self.async_request('get', URL_ACCOUNT)
|
||||
|
||||
if req.status != 200:
|
||||
return False
|
||||
|
||||
self.account = yield from req.json()
|
||||
return True
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_refresh_access_token(self):
|
||||
"""Get a token using a refresh token."""
|
||||
try:
|
||||
self.auth = yield from _async_get_token(self.hass, {
|
||||
'grant_type': 'refresh_token',
|
||||
'refresh_token': self.refresh_token,
|
||||
})
|
||||
|
||||
yield from self.hass.async_add_job(
|
||||
_write_auth, self.hass, self.auth)
|
||||
|
||||
return True
|
||||
except CloudError:
|
||||
return False
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_revoke_access_token(self):
|
||||
"""Revoke active access token."""
|
||||
session = async_get_clientsession(self.hass)
|
||||
client_id, client_secret = _client_credentials(self.hass)
|
||||
data = {
|
||||
'token': self.access_token,
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret
|
||||
}
|
||||
try:
|
||||
req = yield from session.post(
|
||||
_url(self.hass, URL_REVOKE_TOKEN),
|
||||
data=data,
|
||||
)
|
||||
|
||||
if req.status != 200:
|
||||
_LOGGER.error('Cloud logout failed: %d', req.status)
|
||||
raise UnknownError(status=req.status)
|
||||
|
||||
self.auth = None
|
||||
yield from self.hass.async_add_job(
|
||||
_write_auth, self.hass, None)
|
||||
|
||||
except aiohttp.ClientError:
|
||||
raise UnknownError()
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_request(self, method, path, **kwargs):
|
||||
"""Make a request to Home Assistant cloud.
|
||||
|
||||
Will refresh the token if necessary.
|
||||
"""
|
||||
session = async_get_clientsession(self.hass)
|
||||
url = _url(self.hass, path)
|
||||
|
||||
if 'headers' not in kwargs:
|
||||
kwargs['headers'] = {}
|
||||
|
||||
kwargs['headers']['authorization'] = \
|
||||
'Bearer {}'.format(self.access_token)
|
||||
|
||||
request = yield from session.request(method, url, **kwargs)
|
||||
|
||||
if request.status != 403:
|
||||
return request
|
||||
|
||||
# Maybe token expired. Try refreshing it.
|
||||
reauth = yield from self.async_refresh_access_token()
|
||||
|
||||
if not reauth:
|
||||
return request
|
||||
|
||||
# Release old connection back to the pool.
|
||||
yield from request.release()
|
||||
|
||||
kwargs['headers']['authorization'] = \
|
||||
'Bearer {}'.format(self.access_token)
|
||||
|
||||
# If we are not already fetching the account info,
|
||||
# refresh the account info.
|
||||
|
||||
if path != URL_ACCOUNT:
|
||||
yield from self.async_refresh_account_info()
|
||||
|
||||
request = yield from session.request(method, url, **kwargs)
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def _read_auth(hass):
|
||||
"""Read auth file."""
|
||||
path = hass.config.path(AUTH_FILE)
|
||||
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
with open(path) as file:
|
||||
return json.load(file).get(get_mode(hass))
|
||||
|
||||
|
||||
def _write_auth(hass, data):
|
||||
"""Write auth info for specified mode.
|
||||
|
||||
Pass in None for data to remove authentication for that mode.
|
||||
"""
|
||||
path = hass.config.path(AUTH_FILE)
|
||||
mode = get_mode(hass)
|
||||
|
||||
if os.path.isfile(path):
|
||||
with open(path) as file:
|
||||
content = json.load(file)
|
||||
else:
|
||||
content = {}
|
||||
|
||||
if data is None:
|
||||
content.pop(mode, None)
|
||||
else:
|
||||
content[mode] = data
|
||||
|
||||
with open(path, 'wt') as file:
|
||||
file.write(json.dumps(content, indent=4, sort_keys=True))
|
||||
|
||||
|
||||
def _client_credentials(hass):
|
||||
"""Get the client credentials.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
mode = get_mode(hass)
|
||||
|
||||
if mode not in SERVERS:
|
||||
raise ValueError('Mode {} is not supported.'.format(mode))
|
||||
|
||||
return SERVERS[mode]['client_id'], SERVERS[mode]['client_secret']
|
||||
|
||||
|
||||
def _url(hass, path):
|
||||
"""Generate a url for the cloud.
|
||||
|
||||
Async friendly.
|
||||
"""
|
||||
mode = get_mode(hass)
|
||||
|
||||
if mode not in SERVERS:
|
||||
raise ValueError('Mode {} is not supported.'.format(mode))
|
||||
|
||||
return urljoin(SERVERS[mode]['host'], path)
|
|
@ -5,10 +5,10 @@ AUTH_FILE = '.cloud'
|
|||
|
||||
SERVERS = {
|
||||
'development': {
|
||||
'host': 'http://localhost:8000',
|
||||
'client_id': 'HBhQxeV8H4aFBcs7jrZUeeDud0FjGEJJSZ9G6gNu',
|
||||
'client_secret': ('V1qw2NhB32cSAlP7DOezjgWNgn7ZKgq0jvVZoYSI0KCmg9rg7q4'
|
||||
'BSzoebnQnX6tuHCJiZjm2479mZmmtf2LOUdnSqOqkSpjc3js7Wu'
|
||||
'VBJrRyfgTVd43kbrEQtuOiaUpK')
|
||||
'client_id': '3k755iqfcgv8t12o4pl662mnos',
|
||||
'identity_pool_id': 'us-west-2_vDOfweDJo',
|
||||
'region': 'us-west-2',
|
||||
'access_key_id': 'AKIAJGRK7MILPRJTT2ZQ',
|
||||
'secret_access_key': 'lscdYBApxrLWL0HKuVqVXWv3ou8ZVXgG7rZBu/Sz'
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
"""The HTTP api to control the cloud integration."""
|
||||
import asyncio
|
||||
from functools import wraps
|
||||
import logging
|
||||
|
||||
import voluptuous as vol
|
||||
import async_timeout
|
||||
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.components.http import (
|
||||
HomeAssistantView, RequestDataValidator)
|
||||
|
||||
from . import cloud_api
|
||||
from .const import DOMAIN, REQUEST_TIMEOUT
|
||||
from . import auth_api
|
||||
from .const import REQUEST_TIMEOUT
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -19,6 +21,42 @@ def async_setup(hass):
|
|||
hass.http.register_view(CloudLoginView)
|
||||
hass.http.register_view(CloudLogoutView)
|
||||
hass.http.register_view(CloudAccountView)
|
||||
hass.http.register_view(CloudRegisterView)
|
||||
hass.http.register_view(CloudConfirmRegisterView)
|
||||
hass.http.register_view(CloudForgotPasswordView)
|
||||
hass.http.register_view(CloudConfirmForgotPasswordView)
|
||||
|
||||
|
||||
_CLOUD_ERRORS = {
|
||||
auth_api.UserNotFound: (400, "User does not exist."),
|
||||
auth_api.UserNotConfirmed: (400, 'Email not confirmed.'),
|
||||
auth_api.Unauthenticated: (401, 'Authentication failed.'),
|
||||
auth_api.PasswordChangeRequired: (400, 'Password change required.'),
|
||||
auth_api.ExpiredCode: (400, 'Confirmation code has expired.'),
|
||||
auth_api.InvalidCode: (400, 'Invalid confirmation code.'),
|
||||
asyncio.TimeoutError: (502, 'Unable to reach the Home Assistant cloud.')
|
||||
}
|
||||
|
||||
|
||||
def _handle_cloud_errors(handler):
|
||||
"""Helper method to handle auth errors."""
|
||||
@asyncio.coroutine
|
||||
@wraps(handler)
|
||||
def error_handler(view, request, *args, **kwargs):
|
||||
"""Handle exceptions that raise from the wrapped request handler."""
|
||||
try:
|
||||
result = yield from handler(view, request, *args, **kwargs)
|
||||
return result
|
||||
|
||||
except (auth_api.CloudError, asyncio.TimeoutError) as err:
|
||||
err_info = _CLOUD_ERRORS.get(err.__class__)
|
||||
if err_info is None:
|
||||
err_info = (502, 'Unexpected error: {}'.format(err))
|
||||
status, msg = err_info
|
||||
return view.json_message(msg, status_code=status,
|
||||
message_code=err.__class__.__name__)
|
||||
|
||||
return error_handler
|
||||
|
||||
|
||||
class CloudLoginView(HomeAssistantView):
|
||||
|
@ -26,52 +64,23 @@ class CloudLoginView(HomeAssistantView):
|
|||
|
||||
url = '/api/cloud/login'
|
||||
name = 'api:cloud:login'
|
||||
schema = vol.Schema({
|
||||
vol.Required('username'): str,
|
||||
vol.Required('password'): str,
|
||||
})
|
||||
|
||||
@asyncio.coroutine
|
||||
def post(self, request):
|
||||
"""Validate config and return results."""
|
||||
try:
|
||||
data = yield from request.json()
|
||||
except ValueError:
|
||||
_LOGGER.error('Login with invalid JSON')
|
||||
return self.json_message('Invalid JSON.', 400)
|
||||
|
||||
try:
|
||||
self.schema(data)
|
||||
except vol.Invalid as err:
|
||||
_LOGGER.error('Login with invalid formatted data')
|
||||
return self.json_message(
|
||||
'Message format incorrect: {}'.format(err), 400)
|
||||
|
||||
@_handle_cloud_errors
|
||||
@RequestDataValidator(vol.Schema({
|
||||
vol.Required('email'): str,
|
||||
vol.Required('password'): str,
|
||||
}))
|
||||
def post(self, request, data):
|
||||
"""Handle login request."""
|
||||
hass = request.app['hass']
|
||||
phase = 1
|
||||
try:
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
cloud = yield from cloud_api.async_login(
|
||||
hass, data['username'], data['password'])
|
||||
auth = hass.data['cloud']['auth']
|
||||
|
||||
phase += 1
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(auth.login, data['email'],
|
||||
data['password'])
|
||||
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from cloud.async_refresh_account_info()
|
||||
|
||||
except cloud_api.Unauthenticated:
|
||||
return self.json_message(
|
||||
'Authentication failed (phase {}).'.format(phase), 401)
|
||||
except cloud_api.UnknownError:
|
||||
return self.json_message(
|
||||
'Unknown error occurred (phase {}).'.format(phase), 500)
|
||||
except asyncio.TimeoutError:
|
||||
return self.json_message(
|
||||
'Unable to reach Home Assistant cloud '
|
||||
'(phase {}).'.format(phase), 502)
|
||||
|
||||
hass.data[DOMAIN]['cloud'] = cloud
|
||||
return self.json(cloud.account)
|
||||
return self.json(_auth_data(auth))
|
||||
|
||||
|
||||
class CloudLogoutView(HomeAssistantView):
|
||||
|
@ -81,39 +90,133 @@ class CloudLogoutView(HomeAssistantView):
|
|||
name = 'api:cloud:logout'
|
||||
|
||||
@asyncio.coroutine
|
||||
@_handle_cloud_errors
|
||||
def post(self, request):
|
||||
"""Validate config and return results."""
|
||||
"""Handle logout request."""
|
||||
hass = request.app['hass']
|
||||
try:
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from \
|
||||
hass.data[DOMAIN]['cloud'].async_revoke_access_token()
|
||||
auth = hass.data['cloud']['auth']
|
||||
|
||||
hass.data[DOMAIN].pop('cloud')
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(auth.logout)
|
||||
|
||||
return self.json({
|
||||
'result': 'ok',
|
||||
})
|
||||
except asyncio.TimeoutError:
|
||||
return self.json_message("Could not reach the server.", 502)
|
||||
except cloud_api.UnknownError as err:
|
||||
return self.json_message(
|
||||
"Error communicating with the server ({}).".format(err.status),
|
||||
502)
|
||||
return self.json_message('ok')
|
||||
|
||||
|
||||
class CloudAccountView(HomeAssistantView):
|
||||
"""Log out of the Home Assistant cloud."""
|
||||
"""View to retrieve account info."""
|
||||
|
||||
url = '/api/cloud/account'
|
||||
name = 'api:cloud:account'
|
||||
|
||||
@asyncio.coroutine
|
||||
def get(self, request):
|
||||
"""Validate config and return results."""
|
||||
"""Get account info."""
|
||||
hass = request.app['hass']
|
||||
auth = hass.data['cloud']['auth']
|
||||
|
||||
if 'cloud' not in hass.data[DOMAIN]:
|
||||
if not auth.is_logged_in:
|
||||
return self.json_message('Not logged in', 400)
|
||||
|
||||
return self.json(hass.data[DOMAIN]['cloud'].account)
|
||||
return self.json(_auth_data(auth))
|
||||
|
||||
|
||||
class CloudRegisterView(HomeAssistantView):
|
||||
"""Register on the Home Assistant cloud."""
|
||||
|
||||
url = '/api/cloud/register'
|
||||
name = 'api:cloud:register'
|
||||
|
||||
@asyncio.coroutine
|
||||
@_handle_cloud_errors
|
||||
@RequestDataValidator(vol.Schema({
|
||||
vol.Required('email'): str,
|
||||
vol.Required('password'): vol.All(str, vol.Length(min=6)),
|
||||
}))
|
||||
def post(self, request, data):
|
||||
"""Handle registration request."""
|
||||
hass = request.app['hass']
|
||||
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(
|
||||
auth_api.register, hass, data['email'], data['password'])
|
||||
|
||||
return self.json_message('ok')
|
||||
|
||||
|
||||
class CloudConfirmRegisterView(HomeAssistantView):
|
||||
"""Confirm registration on the Home Assistant cloud."""
|
||||
|
||||
url = '/api/cloud/confirm_register'
|
||||
name = 'api:cloud:confirm_register'
|
||||
|
||||
@asyncio.coroutine
|
||||
@_handle_cloud_errors
|
||||
@RequestDataValidator(vol.Schema({
|
||||
vol.Required('confirmation_code'): str,
|
||||
vol.Required('email'): str,
|
||||
}))
|
||||
def post(self, request, data):
|
||||
"""Handle registration confirmation request."""
|
||||
hass = request.app['hass']
|
||||
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(
|
||||
auth_api.confirm_register, hass, data['confirmation_code'],
|
||||
data['email'])
|
||||
|
||||
return self.json_message('ok')
|
||||
|
||||
|
||||
class CloudForgotPasswordView(HomeAssistantView):
|
||||
"""View to start Forgot Password flow.."""
|
||||
|
||||
url = '/api/cloud/forgot_password'
|
||||
name = 'api:cloud:forgot_password'
|
||||
|
||||
@asyncio.coroutine
|
||||
@_handle_cloud_errors
|
||||
@RequestDataValidator(vol.Schema({
|
||||
vol.Required('email'): str,
|
||||
}))
|
||||
def post(self, request, data):
|
||||
"""Handle forgot password request."""
|
||||
hass = request.app['hass']
|
||||
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(
|
||||
auth_api.forgot_password, hass, data['email'])
|
||||
|
||||
return self.json_message('ok')
|
||||
|
||||
|
||||
class CloudConfirmForgotPasswordView(HomeAssistantView):
|
||||
"""View to finish Forgot Password flow.."""
|
||||
|
||||
url = '/api/cloud/confirm_forgot_password'
|
||||
name = 'api:cloud:confirm_forgot_password'
|
||||
|
||||
@asyncio.coroutine
|
||||
@_handle_cloud_errors
|
||||
@RequestDataValidator(vol.Schema({
|
||||
vol.Required('confirmation_code'): str,
|
||||
vol.Required('email'): str,
|
||||
vol.Required('new_password'): vol.All(str, vol.Length(min=6))
|
||||
}))
|
||||
def post(self, request, data):
|
||||
"""Handle forgot password confirm request."""
|
||||
hass = request.app['hass']
|
||||
|
||||
with async_timeout.timeout(REQUEST_TIMEOUT, loop=hass.loop):
|
||||
yield from hass.async_add_job(
|
||||
auth_api.confirm_forgot_password, hass,
|
||||
data['confirmation_code'], data['email'],
|
||||
data['new_password'])
|
||||
|
||||
return self.json_message('ok')
|
||||
|
||||
|
||||
def _auth_data(auth):
|
||||
"""Generate the auth data JSON response."""
|
||||
return {
|
||||
'email': auth.account.email
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ https://home-assistant.io/components/http/
|
|||
"""
|
||||
import asyncio
|
||||
import json
|
||||
from functools import wraps
|
||||
import logging
|
||||
import ssl
|
||||
from ipaddress import ip_network
|
||||
|
@ -364,9 +365,12 @@ class HomeAssistantView(object):
|
|||
return web.Response(
|
||||
body=msg, content_type=CONTENT_TYPE_JSON, status=status_code)
|
||||
|
||||
def json_message(self, error, status_code=200):
|
||||
def json_message(self, message, status_code=200, message_code=None):
|
||||
"""Return a JSON message response."""
|
||||
return self.json({'message': error}, status_code)
|
||||
data = {'message': message}
|
||||
if message_code is not None:
|
||||
data['code'] = message_code
|
||||
return self.json(data, status_code)
|
||||
|
||||
@asyncio.coroutine
|
||||
# pylint: disable=no-self-use
|
||||
|
@ -443,3 +447,41 @@ def request_handler_factory(view, handler):
|
|||
return web.Response(body=result, status=status_code)
|
||||
|
||||
return handle
|
||||
|
||||
|
||||
class RequestDataValidator:
|
||||
"""Decorator that will validate the incoming data.
|
||||
|
||||
Takes in a voluptuous schema and adds 'post_data' as
|
||||
keyword argument to the function call.
|
||||
|
||||
Will return a 400 if no JSON provided or doesn't match schema.
|
||||
"""
|
||||
|
||||
def __init__(self, schema):
|
||||
"""Initialize the decorator."""
|
||||
self._schema = schema
|
||||
|
||||
def __call__(self, method):
|
||||
"""Decorate a function."""
|
||||
@asyncio.coroutine
|
||||
@wraps(method)
|
||||
def wrapper(view, request, *args, **kwargs):
|
||||
"""Wrap a request handler with data validation."""
|
||||
try:
|
||||
data = yield from request.json()
|
||||
except ValueError:
|
||||
_LOGGER.error('Invalid JSON received.')
|
||||
return view.json_message('Invalid JSON.', 400)
|
||||
|
||||
try:
|
||||
kwargs['data'] = self._schema(data)
|
||||
except vol.Invalid as err:
|
||||
_LOGGER.error('Data does not match schema: %s', err)
|
||||
return view.json_message(
|
||||
'Message format incorrect: {}'.format(err), 400)
|
||||
|
||||
result = yield from method(view, request, *args, **kwargs)
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
|
|
|
@ -999,6 +999,9 @@ wakeonlan==0.2.2
|
|||
# homeassistant.components.sensor.waqi
|
||||
waqiasync==1.0.0
|
||||
|
||||
# homeassistant.components.cloud
|
||||
warrant==0.2.0
|
||||
|
||||
# homeassistant.components.media_player.gpmdp
|
||||
websocket-client==0.37.0
|
||||
|
||||
|
|
|
@ -141,5 +141,8 @@ statsd==3.2.1
|
|||
# homeassistant.components.camera.uvc
|
||||
uvcclient==0.10.0
|
||||
|
||||
# homeassistant.components.cloud
|
||||
warrant==0.2.0
|
||||
|
||||
# homeassistant.components.sensor.yahoo_finance
|
||||
yahoo-finance==1.4.0
|
||||
|
|
|
@ -33,44 +33,45 @@ COMMENT_REQUIREMENTS = (
|
|||
)
|
||||
|
||||
TEST_REQUIREMENTS = (
|
||||
'pydispatch',
|
||||
'influxdb',
|
||||
'nx584',
|
||||
'uvcclient',
|
||||
'somecomfort',
|
||||
'aioautomatic',
|
||||
'SoCo',
|
||||
'libsoundtouch',
|
||||
'libpurecoollink',
|
||||
'rxv',
|
||||
'apns2',
|
||||
'sqlalchemy',
|
||||
'forecastio',
|
||||
'aiohttp_cors',
|
||||
'pilight',
|
||||
'apns2',
|
||||
'dsmr_parser',
|
||||
'ephem',
|
||||
'evohomeclient',
|
||||
'forecastio',
|
||||
'fuzzywuzzy',
|
||||
'gTTS-token',
|
||||
'ha-ffmpeg',
|
||||
'hbmqtt',
|
||||
'holidays',
|
||||
'influxdb',
|
||||
'libpurecoollink',
|
||||
'libsoundtouch',
|
||||
'mficlient',
|
||||
'nx584',
|
||||
'paho',
|
||||
'pexpect',
|
||||
'pilight',
|
||||
'pmsensor',
|
||||
'prometheus_client',
|
||||
'pydispatch',
|
||||
'PyJWT',
|
||||
'pylitejet',
|
||||
'pyunifi',
|
||||
'pywebpush',
|
||||
'restrictedpython',
|
||||
'rflink',
|
||||
'ring_doorbell',
|
||||
'rxv',
|
||||
'sleepyq',
|
||||
'SoCo',
|
||||
'somecomfort',
|
||||
'sqlalchemy',
|
||||
'statsd',
|
||||
'pylitejet',
|
||||
'holidays',
|
||||
'evohomeclient',
|
||||
'pexpect',
|
||||
'hbmqtt',
|
||||
'paho',
|
||||
'dsmr_parser',
|
||||
'mficlient',
|
||||
'pmsensor',
|
||||
'uvcclient',
|
||||
'warrant',
|
||||
'yahoo-finance',
|
||||
'ha-ffmpeg',
|
||||
'gTTS-token',
|
||||
'pywebpush',
|
||||
'PyJWT',
|
||||
'restrictedpython',
|
||||
'pyunifi',
|
||||
'prometheus_client',
|
||||
'ephem'
|
||||
)
|
||||
|
||||
IGNORE_PACKAGES = (
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
"""Tests for the tools to communicate with the cloud."""
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.cloud import DOMAIN, auth_api
|
||||
|
||||
|
||||
MOCK_AUTH = {
|
||||
"id_token": "fake_id_token",
|
||||
"access_token": "fake_access_token",
|
||||
"refresh_token": "fake_refresh_token",
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cloud_hass(hass):
|
||||
"""Fixture to return a hass instance with cloud mode set."""
|
||||
hass.data[DOMAIN] = {'mode': 'development'}
|
||||
return hass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write():
|
||||
"""Mock reading authentication."""
|
||||
with patch.object(auth_api, '_write_info') as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_read():
|
||||
"""Mock writing authentication."""
|
||||
with patch.object(auth_api, '_read_info') as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_cognito():
|
||||
"""Mock warrant."""
|
||||
with patch('homeassistant.components.cloud.auth_api._cognito') as mock_cog:
|
||||
yield mock_cog()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_auth():
|
||||
"""Mock warrant."""
|
||||
with patch('homeassistant.components.cloud.auth_api.Auth') as mock_auth:
|
||||
yield mock_auth()
|
||||
|
||||
|
||||
def aws_error(code, message='Unknown', operation_name='fake_operation_name'):
|
||||
"""Generate AWS error response."""
|
||||
response = {
|
||||
'Error': {
|
||||
'Code': code,
|
||||
'Message': message
|
||||
}
|
||||
}
|
||||
return ClientError(response, operation_name)
|
||||
|
||||
|
||||
def test_load_auth_with_no_stored_auth(cloud_hass, mock_read):
|
||||
"""Test loading authentication with no stored auth."""
|
||||
mock_read.return_value = None
|
||||
auth = auth_api.load_auth(cloud_hass)
|
||||
assert auth.cognito is None
|
||||
|
||||
|
||||
def test_load_auth_with_invalid_auth(cloud_hass, mock_read, mock_cognito):
|
||||
"""Test calling load_auth when auth is no longer valid."""
|
||||
mock_cognito.get_user.side_effect = aws_error('SomeError')
|
||||
auth = auth_api.load_auth(cloud_hass)
|
||||
|
||||
assert auth.cognito is None
|
||||
|
||||
|
||||
def test_load_auth_with_valid_auth(cloud_hass, mock_read, mock_cognito):
|
||||
"""Test calling load_auth when valid auth."""
|
||||
auth = auth_api.load_auth(cloud_hass)
|
||||
|
||||
assert auth.cognito is not None
|
||||
|
||||
|
||||
def test_auth_properties():
|
||||
"""Test Auth class properties."""
|
||||
auth = auth_api.Auth(None, None)
|
||||
assert not auth.is_logged_in
|
||||
auth.account = {}
|
||||
assert auth.is_logged_in
|
||||
|
||||
|
||||
def test_auth_validate_auth_verification_fails(mock_cognito):
|
||||
"""Test validate authentication with verify request failing."""
|
||||
mock_cognito.get_user.side_effect = aws_error('UserNotFoundException')
|
||||
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
assert auth.validate_auth() is False
|
||||
|
||||
|
||||
def test_auth_validate_auth_token_refresh_needed_fails(mock_cognito):
|
||||
"""Test validate authentication with refresh needed which gets 401."""
|
||||
mock_cognito.get_user.side_effect = aws_error('NotAuthorizedException')
|
||||
mock_cognito.renew_access_token.side_effect = \
|
||||
aws_error('NotAuthorizedException')
|
||||
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
assert auth.validate_auth() is False
|
||||
|
||||
|
||||
def test_auth_validate_auth_token_refresh_needed_succeeds(mock_write,
|
||||
mock_cognito):
|
||||
"""Test validate authentication with refresh."""
|
||||
mock_cognito.get_user.side_effect = [
|
||||
aws_error('NotAuthorizedException'),
|
||||
MagicMock(email='hello@home-assistant.io')
|
||||
]
|
||||
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
assert auth.validate_auth() is True
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
|
||||
|
||||
def test_auth_login_invalid_auth(mock_cognito, mock_write):
|
||||
"""Test trying to login with invalid credentials."""
|
||||
mock_cognito.authenticate.side_effect = aws_error('NotAuthorizedException')
|
||||
auth = auth_api.Auth(None, None)
|
||||
with pytest.raises(auth_api.Unauthenticated):
|
||||
auth.login('user', 'pass')
|
||||
|
||||
assert not auth.is_logged_in
|
||||
assert len(mock_cognito.get_user.mock_calls) == 0
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
def test_auth_login_user_not_found(mock_cognito, mock_write):
|
||||
"""Test trying to login with invalid credentials."""
|
||||
mock_cognito.authenticate.side_effect = aws_error('UserNotFoundException')
|
||||
auth = auth_api.Auth(None, None)
|
||||
with pytest.raises(auth_api.UserNotFound):
|
||||
auth.login('user', 'pass')
|
||||
|
||||
assert not auth.is_logged_in
|
||||
assert len(mock_cognito.get_user.mock_calls) == 0
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
def test_auth_login_user_not_confirmed(mock_cognito, mock_write):
|
||||
"""Test trying to login without confirming account."""
|
||||
mock_cognito.authenticate.side_effect = \
|
||||
aws_error('UserNotConfirmedException')
|
||||
auth = auth_api.Auth(None, None)
|
||||
with pytest.raises(auth_api.UserNotConfirmed):
|
||||
auth.login('user', 'pass')
|
||||
|
||||
assert not auth.is_logged_in
|
||||
assert len(mock_cognito.get_user.mock_calls) == 0
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
def test_auth_login(cloud_hass, mock_cognito, mock_write):
|
||||
"""Test trying to login without confirming account."""
|
||||
mock_cognito.get_user.return_value = \
|
||||
MagicMock(email='hello@home-assistant.io')
|
||||
auth = auth_api.Auth(cloud_hass, None)
|
||||
auth.login('user', 'pass')
|
||||
assert auth.is_logged_in
|
||||
assert len(mock_cognito.authenticate.mock_calls) == 1
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
result_hass, result_auth = mock_write.mock_calls[0][1]
|
||||
assert result_hass is cloud_hass
|
||||
assert result_auth is auth
|
||||
|
||||
|
||||
def test_auth_renew_access_token(mock_write, mock_cognito):
|
||||
"""Test renewing an access token."""
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
assert auth.renew_access_token()
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
|
||||
|
||||
def test_auth_renew_access_token_fails(mock_write, mock_cognito):
|
||||
"""Test failing to renew an access token."""
|
||||
mock_cognito.renew_access_token.side_effect = aws_error('SomeError')
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
assert not auth.renew_access_token()
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
def test_auth_logout(mock_write, mock_cognito):
|
||||
"""Test renewing an access token."""
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
auth.account = MagicMock()
|
||||
auth.logout()
|
||||
assert auth.account is None
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
|
||||
|
||||
def test_auth_logout_fails(mock_write, mock_cognito):
|
||||
"""Test error while logging out."""
|
||||
mock_cognito.logout.side_effect = aws_error('SomeError')
|
||||
auth = auth_api.Auth(None, mock_cognito)
|
||||
auth.account = MagicMock()
|
||||
with pytest.raises(auth_api.CloudError):
|
||||
auth.logout()
|
||||
assert auth.account is not None
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
def test_register(mock_cognito):
|
||||
"""Test registering an account."""
|
||||
auth_api.register(None, 'email@home-assistant.io', 'password')
|
||||
assert len(mock_cognito.register.mock_calls) == 1
|
||||
result_email, result_password = mock_cognito.register.mock_calls[0][1]
|
||||
assert result_email == 'email@home-assistant.io'
|
||||
assert result_password == 'password'
|
||||
|
||||
|
||||
def test_register_fails(mock_cognito):
|
||||
"""Test registering an account."""
|
||||
mock_cognito.register.side_effect = aws_error('SomeError')
|
||||
with pytest.raises(auth_api.CloudError):
|
||||
auth_api.register(None, 'email@home-assistant.io', 'password')
|
||||
|
||||
|
||||
def test_confirm_register(mock_cognito):
|
||||
"""Test confirming a registration of an account."""
|
||||
auth_api.confirm_register(None, '123456', 'email@home-assistant.io')
|
||||
assert len(mock_cognito.confirm_sign_up.mock_calls) == 1
|
||||
result_code, result_email = mock_cognito.confirm_sign_up.mock_calls[0][1]
|
||||
assert result_email == 'email@home-assistant.io'
|
||||
assert result_code == '123456'
|
||||
|
||||
|
||||
def test_confirm_register_fails(mock_cognito):
|
||||
"""Test an error during confirmation of an account."""
|
||||
mock_cognito.confirm_sign_up.side_effect = aws_error('SomeError')
|
||||
with pytest.raises(auth_api.CloudError):
|
||||
auth_api.confirm_register(None, '123456', 'email@home-assistant.io')
|
||||
|
||||
|
||||
def test_forgot_password(mock_cognito):
|
||||
"""Test starting forgot password flow."""
|
||||
auth_api.forgot_password(None, 'email@home-assistant.io')
|
||||
assert len(mock_cognito.initiate_forgot_password.mock_calls) == 1
|
||||
|
||||
|
||||
def test_forgot_password_fails(mock_cognito):
|
||||
"""Test failure when starting forgot password flow."""
|
||||
mock_cognito.initiate_forgot_password.side_effect = aws_error('SomeError')
|
||||
with pytest.raises(auth_api.CloudError):
|
||||
auth_api.forgot_password(None, 'email@home-assistant.io')
|
||||
|
||||
|
||||
def test_confirm_forgot_password(mock_cognito):
|
||||
"""Test confirming forgot password."""
|
||||
auth_api.confirm_forgot_password(
|
||||
None, '123456', 'email@home-assistant.io', 'new password')
|
||||
assert len(mock_cognito.confirm_forgot_password.mock_calls) == 1
|
||||
result_code, result_password = \
|
||||
mock_cognito.confirm_forgot_password.mock_calls[0][1]
|
||||
assert result_code == '123456'
|
||||
assert result_password == 'new password'
|
||||
|
||||
|
||||
def test_confirm_forgot_password_fails(mock_cognito):
|
||||
"""Test failure when confirming forgot password."""
|
||||
mock_cognito.confirm_forgot_password.side_effect = aws_error('SomeError')
|
||||
with pytest.raises(auth_api.CloudError):
|
||||
auth_api.confirm_forgot_password(
|
||||
None, '123456', 'email@home-assistant.io', 'new password')
|
|
@ -1,352 +0,0 @@
|
|||
"""Tests for the tools to communicate with the cloud."""
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from unittest.mock import patch
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.cloud import DOMAIN, cloud_api, const
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from tests.common import mock_coro
|
||||
|
||||
|
||||
MOCK_AUTH = {
|
||||
"access_token": "jvCHxpTu2nfORLBRgQY78bIAoK4RPa",
|
||||
"expires_at": "2017-08-29T05:33:28.266048+00:00",
|
||||
"expires_in": 86400,
|
||||
"refresh_token": "C4wR1mgb03cs69EeiFgGOBC8mMQC5Q",
|
||||
"scope": "",
|
||||
"token_type": "Bearer"
|
||||
}
|
||||
|
||||
|
||||
def url(path):
|
||||
"""Create a url."""
|
||||
return urljoin(const.SERVERS['development']['host'], path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cloud_hass(hass):
|
||||
"""Fixture to return a hass instance with cloud mode set."""
|
||||
hass.data[DOMAIN] = {'mode': 'development'}
|
||||
return hass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write():
|
||||
"""Mock reading authentication."""
|
||||
with patch.object(cloud_api, '_write_auth') as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_read():
|
||||
"""Mock writing authentication."""
|
||||
with patch.object(cloud_api, '_read_auth') as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_async_login_invalid_auth(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test trying to login with invalid credentials."""
|
||||
aioclient_mock.post(url('o/token/'), status=401)
|
||||
with pytest.raises(cloud_api.Unauthenticated):
|
||||
yield from cloud_api.async_login(cloud_hass, 'user', 'pass')
|
||||
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_async_login_cloud_error(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test exception in cloud while logging in."""
|
||||
aioclient_mock.post(url('o/token/'), status=500)
|
||||
with pytest.raises(cloud_api.UnknownError):
|
||||
yield from cloud_api.async_login(cloud_hass, 'user', 'pass')
|
||||
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_async_login_client_error(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test client error while logging in."""
|
||||
aioclient_mock.post(url('o/token/'), exc=aiohttp.ClientError)
|
||||
with pytest.raises(cloud_api.UnknownError):
|
||||
yield from cloud_api.async_login(cloud_hass, 'user', 'pass')
|
||||
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_async_login(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test logging in."""
|
||||
aioclient_mock.post(url('o/token/'), json={
|
||||
'expires_in': 10
|
||||
})
|
||||
now = dt_util.utcnow()
|
||||
with patch('homeassistant.components.cloud.cloud_api.utcnow',
|
||||
return_value=now):
|
||||
yield from cloud_api.async_login(cloud_hass, 'user', 'pass')
|
||||
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
result_hass, result_data = mock_write.mock_calls[0][1]
|
||||
assert result_hass is cloud_hass
|
||||
assert result_data == {
|
||||
'expires_in': 10,
|
||||
'expires_at': (now + timedelta(seconds=10)).isoformat()
|
||||
}
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_with_no_stored_auth(cloud_hass, mock_read):
|
||||
"""Test loading authentication with no stored auth."""
|
||||
mock_read.return_value = None
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_timeout_during_verification(cloud_hass, mock_read):
|
||||
"""Test loading authentication with timeout during verification."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
|
||||
with patch.object(cloud_api.Cloud, 'async_refresh_account_info',
|
||||
side_effect=asyncio.TimeoutError):
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_verification_failed_500(cloud_hass, mock_read,
|
||||
aioclient_mock):
|
||||
"""Test loading authentication with verify request getting 500."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), status=500)
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_token_refresh_needed_401(cloud_hass, mock_read,
|
||||
aioclient_mock):
|
||||
"""Test loading authentication with refresh needed which gets 401."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), status=403)
|
||||
aioclient_mock.post(url('o/token/'), status=401)
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_token_refresh_needed_500(cloud_hass, mock_read,
|
||||
aioclient_mock):
|
||||
"""Test loading authentication with refresh needed which gets 500."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), status=403)
|
||||
aioclient_mock.post(url('o/token/'), status=500)
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_token_refresh_needed_timeout(cloud_hass, mock_read,
|
||||
aioclient_mock):
|
||||
"""Test loading authentication with refresh timing out."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), status=403)
|
||||
aioclient_mock.post(url('o/token/'), exc=asyncio.TimeoutError)
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_token_refresh_needed_succeeds(cloud_hass, mock_read,
|
||||
aioclient_mock):
|
||||
"""Test loading authentication with refresh timing out."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), status=403)
|
||||
|
||||
with patch.object(cloud_api.Cloud, 'async_refresh_access_token',
|
||||
return_value=mock_coro(True)) as mock_refresh:
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is None
|
||||
assert len(mock_refresh.mock_calls) == 1
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_load_auth_token(cloud_hass, mock_read, aioclient_mock):
|
||||
"""Test loading authentication with refresh timing out."""
|
||||
mock_read.return_value = MOCK_AUTH
|
||||
aioclient_mock.get(url('account.json'), json={
|
||||
'first_name': 'Paulus',
|
||||
'last_name': 'Schoutsen'
|
||||
})
|
||||
|
||||
result = yield from cloud_api.async_load_auth(cloud_hass)
|
||||
|
||||
assert result is not None
|
||||
assert result.account == {
|
||||
'first_name': 'Paulus',
|
||||
'last_name': 'Schoutsen'
|
||||
}
|
||||
assert result.auth == MOCK_AUTH
|
||||
|
||||
|
||||
def test_cloud_properties():
|
||||
"""Test Cloud class properties."""
|
||||
cloud = cloud_api.Cloud(None, MOCK_AUTH)
|
||||
assert cloud.access_token == MOCK_AUTH['access_token']
|
||||
assert cloud.refresh_token == MOCK_AUTH['refresh_token']
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_refresh_account_info(cloud_hass, aioclient_mock):
|
||||
"""Test refreshing account info."""
|
||||
aioclient_mock.get(url('account.json'), json={
|
||||
'first_name': 'Paulus',
|
||||
'last_name': 'Schoutsen'
|
||||
})
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
assert cloud.account is None
|
||||
result = yield from cloud.async_refresh_account_info()
|
||||
assert result
|
||||
assert cloud.account == {
|
||||
'first_name': 'Paulus',
|
||||
'last_name': 'Schoutsen'
|
||||
}
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_refresh_account_info_500(cloud_hass, aioclient_mock):
|
||||
"""Test refreshing account info and getting 500."""
|
||||
aioclient_mock.get(url('account.json'), status=500)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
assert cloud.account is None
|
||||
result = yield from cloud.async_refresh_account_info()
|
||||
assert not result
|
||||
assert cloud.account is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_refresh_token(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test refreshing access token."""
|
||||
aioclient_mock.post(url('o/token/'), json={
|
||||
'access_token': 'refreshed',
|
||||
'expires_in': 10
|
||||
})
|
||||
now = dt_util.utcnow()
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
with patch('homeassistant.components.cloud.cloud_api.utcnow',
|
||||
return_value=now):
|
||||
result = yield from cloud.async_refresh_access_token()
|
||||
assert result
|
||||
assert cloud.auth == {
|
||||
'access_token': 'refreshed',
|
||||
'expires_in': 10,
|
||||
'expires_at': (now + timedelta(seconds=10)).isoformat()
|
||||
}
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
write_hass, write_data = mock_write.mock_calls[0][1]
|
||||
assert write_hass is cloud_hass
|
||||
assert write_data == cloud.auth
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_refresh_token_unknown_error(cloud_hass, aioclient_mock,
|
||||
mock_write):
|
||||
"""Test refreshing access token."""
|
||||
aioclient_mock.post(url('o/token/'), status=500)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
result = yield from cloud.async_refresh_access_token()
|
||||
assert not result
|
||||
assert cloud.auth == MOCK_AUTH
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_revoke_token(cloud_hass, aioclient_mock, mock_write):
|
||||
"""Test revoking access token."""
|
||||
aioclient_mock.post(url('o/revoke_token/'))
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
yield from cloud.async_revoke_access_token()
|
||||
assert cloud.auth is None
|
||||
assert len(mock_write.mock_calls) == 1
|
||||
write_hass, write_data = mock_write.mock_calls[0][1]
|
||||
assert write_hass is cloud_hass
|
||||
assert write_data is None
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_revoke_token_invalid_client_creds(cloud_hass, aioclient_mock,
|
||||
mock_write):
|
||||
"""Test revoking access token with invalid client credentials."""
|
||||
aioclient_mock.post(url('o/revoke_token/'), status=401)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
with pytest.raises(cloud_api.UnknownError):
|
||||
yield from cloud.async_revoke_access_token()
|
||||
assert cloud.auth is not None
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_revoke_token_request_error(cloud_hass, aioclient_mock,
|
||||
mock_write):
|
||||
"""Test revoking access token with invalid client credentials."""
|
||||
aioclient_mock.post(url('o/revoke_token/'), exc=aiohttp.ClientError)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
with pytest.raises(cloud_api.UnknownError):
|
||||
yield from cloud.async_revoke_access_token()
|
||||
assert cloud.auth is not None
|
||||
assert len(mock_write.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_request(cloud_hass, aioclient_mock):
|
||||
"""Test making request to the cloud."""
|
||||
aioclient_mock.post(url('some_endpoint'), json={'hello': 'world'})
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
request = yield from cloud.async_request('post', 'some_endpoint')
|
||||
assert request.status == 200
|
||||
data = yield from request.json()
|
||||
assert data == {'hello': 'world'}
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_request_requiring_refresh_fail(cloud_hass, aioclient_mock):
|
||||
"""Test making request to the cloud."""
|
||||
aioclient_mock.post(url('some_endpoint'), status=403)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
with patch.object(cloud_api.Cloud, 'async_refresh_access_token',
|
||||
return_value=mock_coro(False)) as mock_refresh:
|
||||
request = yield from cloud.async_request('post', 'some_endpoint')
|
||||
assert request.status == 403
|
||||
assert len(mock_refresh.mock_calls) == 1
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_cloud_request_requiring_refresh_success(cloud_hass, aioclient_mock):
|
||||
"""Test making request to the cloud."""
|
||||
aioclient_mock.post(url('some_endpoint'), status=403)
|
||||
cloud = cloud_api.Cloud(cloud_hass, MOCK_AUTH)
|
||||
with patch.object(cloud_api.Cloud, 'async_refresh_access_token',
|
||||
return_value=mock_coro(True)) as mock_refresh, \
|
||||
patch.object(cloud_api.Cloud, 'async_refresh_account_info',
|
||||
return_value=mock_coro()) as mock_account_info:
|
||||
request = yield from cloud.async_request('post', 'some_endpoint')
|
||||
assert request.status == 403
|
||||
assert len(mock_refresh.mock_calls) == 1
|
||||
assert len(mock_account_info.mock_calls) == 1
|
|
@ -5,9 +5,7 @@ from unittest.mock import patch, MagicMock
|
|||
import pytest
|
||||
|
||||
from homeassistant.bootstrap import async_setup_component
|
||||
from homeassistant.components.cloud import DOMAIN, cloud_api
|
||||
|
||||
from tests.common import mock_coro
|
||||
from homeassistant.components.cloud import DOMAIN, auth_api
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -21,6 +19,20 @@ def cloud_client(hass, test_client):
|
|||
return hass.loop.run_until_complete(test_client(hass.http.app))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_auth(cloud_client, hass):
|
||||
"""Fixture to mock authentication."""
|
||||
auth = hass.data[DOMAIN]['auth'] = MagicMock()
|
||||
return auth
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_cognito():
|
||||
"""Mock warrant."""
|
||||
with patch('homeassistant.components.cloud.auth_api._cognito') as mock_cog:
|
||||
yield mock_cog()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_account_view_no_account(cloud_client):
|
||||
"""Test fetching account if no account available."""
|
||||
|
@ -29,129 +41,300 @@ def test_account_view_no_account(cloud_client):
|
|||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_account_view(hass, cloud_client):
|
||||
def test_account_view(mock_auth, cloud_client):
|
||||
"""Test fetching account if no account available."""
|
||||
cloud = MagicMock(account={'test': 'account'})
|
||||
hass.data[DOMAIN]['cloud'] = cloud
|
||||
mock_auth.account = MagicMock(email='hello@home-assistant.io')
|
||||
req = yield from cloud_client.get('/api/cloud/account')
|
||||
assert req.status == 200
|
||||
result = yield from req.json()
|
||||
assert result == {'test': 'account'}
|
||||
assert result == {'email': 'hello@home-assistant.io'}
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view(hass, cloud_client):
|
||||
def test_login_view(mock_auth, cloud_client):
|
||||
"""Test logging in."""
|
||||
cloud = MagicMock(account={'test': 'account'})
|
||||
cloud.async_refresh_account_info.return_value = mock_coro(None)
|
||||
|
||||
with patch.object(cloud_api, 'async_login',
|
||||
MagicMock(return_value=mock_coro(cloud))):
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'username': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
mock_auth.account = MagicMock(email='hello@home-assistant.io')
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'email': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
|
||||
assert req.status == 200
|
||||
|
||||
result = yield from req.json()
|
||||
assert result == {'test': 'account'}
|
||||
assert hass.data[DOMAIN]['cloud'] is cloud
|
||||
assert result == {'email': 'hello@home-assistant.io'}
|
||||
assert len(mock_auth.login.mock_calls) == 1
|
||||
result_user, result_pass = mock_auth.login.mock_calls[0][1]
|
||||
assert result_user == 'my_username'
|
||||
assert result_pass == 'my_password'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view_invalid_json(hass, cloud_client):
|
||||
def test_login_view_invalid_json(mock_auth, cloud_client):
|
||||
"""Try logging in with invalid JSON."""
|
||||
req = yield from cloud_client.post('/api/cloud/login', data='Not JSON')
|
||||
assert req.status == 400
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
assert len(mock_auth.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view_invalid_schema(hass, cloud_client):
|
||||
def test_login_view_invalid_schema(mock_auth, cloud_client):
|
||||
"""Try logging in with invalid schema."""
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'invalid': 'schema'
|
||||
})
|
||||
assert req.status == 400
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
assert len(mock_auth.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view_request_timeout(hass, cloud_client):
|
||||
def test_login_view_request_timeout(mock_auth, cloud_client):
|
||||
"""Test request timeout while trying to log in."""
|
||||
with patch.object(cloud_api, 'async_login',
|
||||
MagicMock(side_effect=asyncio.TimeoutError)):
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'username': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
mock_auth.login.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'email': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
|
||||
assert req.status == 502
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view_invalid_credentials(hass, cloud_client):
|
||||
def test_login_view_invalid_credentials(mock_auth, cloud_client):
|
||||
"""Test logging in with invalid credentials."""
|
||||
with patch.object(cloud_api, 'async_login',
|
||||
MagicMock(side_effect=cloud_api.Unauthenticated)):
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'username': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
mock_auth.login.side_effect = auth_api.Unauthenticated
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'email': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
|
||||
assert req.status == 401
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_login_view_unknown_error(hass, cloud_client):
|
||||
def test_login_view_unknown_error(mock_auth, cloud_client):
|
||||
"""Test unknown error while logging in."""
|
||||
with patch.object(cloud_api, 'async_login',
|
||||
MagicMock(side_effect=cloud_api.UnknownError)):
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'username': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
mock_auth.login.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post('/api/cloud/login', json={
|
||||
'email': 'my_username',
|
||||
'password': 'my_password'
|
||||
})
|
||||
|
||||
assert req.status == 500
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_logout_view(hass, cloud_client):
|
||||
def test_logout_view(mock_auth, cloud_client):
|
||||
"""Test logging out."""
|
||||
cloud = MagicMock()
|
||||
cloud.async_revoke_access_token.return_value = mock_coro(None)
|
||||
hass.data[DOMAIN]['cloud'] = cloud
|
||||
|
||||
req = yield from cloud_client.post('/api/cloud/logout')
|
||||
assert req.status == 200
|
||||
data = yield from req.json()
|
||||
assert data == {'result': 'ok'}
|
||||
assert 'cloud' not in hass.data[DOMAIN]
|
||||
assert data == {'message': 'ok'}
|
||||
assert len(mock_auth.logout.mock_calls) == 1
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_logout_view_request_timeout(hass, cloud_client):
|
||||
def test_logout_view_request_timeout(mock_auth, cloud_client):
|
||||
"""Test timeout while logging out."""
|
||||
cloud = MagicMock()
|
||||
cloud.async_revoke_access_token.side_effect = asyncio.TimeoutError
|
||||
hass.data[DOMAIN]['cloud'] = cloud
|
||||
|
||||
mock_auth.logout.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post('/api/cloud/logout')
|
||||
assert req.status == 502
|
||||
assert 'cloud' in hass.data[DOMAIN]
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_logout_view_unknown_error(hass, cloud_client):
|
||||
def test_logout_view_unknown_error(mock_auth, cloud_client):
|
||||
"""Test unknown error while loggin out."""
|
||||
cloud = MagicMock()
|
||||
cloud.async_revoke_access_token.side_effect = cloud_api.UnknownError
|
||||
hass.data[DOMAIN]['cloud'] = cloud
|
||||
|
||||
mock_auth.logout.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post('/api/cloud/logout')
|
||||
assert req.status == 502
|
||||
assert 'cloud' in hass.data[DOMAIN]
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_register_view(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'password': 'falcon42'
|
||||
})
|
||||
assert req.status == 200
|
||||
assert len(mock_cognito.register.mock_calls) == 1
|
||||
result_email, result_pass = mock_cognito.register.mock_calls[0][1]
|
||||
assert result_email == 'hello@bla.com'
|
||||
assert result_pass == 'falcon42'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_register_view_bad_data(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'not_password': 'falcon'
|
||||
})
|
||||
assert req.status == 400
|
||||
assert len(mock_cognito.logout.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_register_view_request_timeout(mock_cognito, cloud_client):
|
||||
"""Test timeout while logging out."""
|
||||
mock_cognito.register.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post('/api/cloud/register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'password': 'falcon42'
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_register_view_unknown_error(mock_cognito, cloud_client):
|
||||
"""Test unknown error while loggin out."""
|
||||
mock_cognito.register.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post('/api/cloud/register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'password': 'falcon42'
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_register_view(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/confirm_register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456'
|
||||
})
|
||||
assert req.status == 200
|
||||
assert len(mock_cognito.confirm_sign_up.mock_calls) == 1
|
||||
result_code, result_email = mock_cognito.confirm_sign_up.mock_calls[0][1]
|
||||
assert result_email == 'hello@bla.com'
|
||||
assert result_code == '123456'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_register_view_bad_data(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/confirm_register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'not_confirmation_code': '123456'
|
||||
})
|
||||
assert req.status == 400
|
||||
assert len(mock_cognito.confirm_sign_up.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_register_view_request_timeout(mock_cognito, cloud_client):
|
||||
"""Test timeout while logging out."""
|
||||
mock_cognito.confirm_sign_up.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post('/api/cloud/confirm_register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456'
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_register_view_unknown_error(mock_cognito, cloud_client):
|
||||
"""Test unknown error while loggin out."""
|
||||
mock_cognito.confirm_sign_up.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post('/api/cloud/confirm_register', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456'
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_forgot_password_view(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
})
|
||||
assert req.status == 200
|
||||
assert len(mock_cognito.initiate_forgot_password.mock_calls) == 1
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_forgot_password_view_bad_data(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post('/api/cloud/forgot_password', json={
|
||||
'not_email': 'hello@bla.com',
|
||||
})
|
||||
assert req.status == 400
|
||||
assert len(mock_cognito.initiate_forgot_password.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_forgot_password_view_request_timeout(mock_cognito, cloud_client):
|
||||
"""Test timeout while logging out."""
|
||||
mock_cognito.initiate_forgot_password.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post('/api/cloud/forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_forgot_password_view_unknown_error(mock_cognito, cloud_client):
|
||||
"""Test unknown error while loggin out."""
|
||||
mock_cognito.initiate_forgot_password.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post('/api/cloud/forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_forgot_password_view(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post(
|
||||
'/api/cloud/confirm_forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456',
|
||||
'new_password': 'hello2',
|
||||
})
|
||||
assert req.status == 200
|
||||
assert len(mock_cognito.confirm_forgot_password.mock_calls) == 1
|
||||
result_code, result_new_password = \
|
||||
mock_cognito.confirm_forgot_password.mock_calls[0][1]
|
||||
assert result_code == '123456'
|
||||
assert result_new_password == 'hello2'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_forgot_password_view_bad_data(mock_cognito, cloud_client):
|
||||
"""Test logging out."""
|
||||
req = yield from cloud_client.post(
|
||||
'/api/cloud/confirm_forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
'not_confirmation_code': '123456',
|
||||
'new_password': 'hello2',
|
||||
})
|
||||
assert req.status == 400
|
||||
assert len(mock_cognito.confirm_forgot_password.mock_calls) == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_forgot_password_view_request_timeout(mock_cognito,
|
||||
cloud_client):
|
||||
"""Test timeout while logging out."""
|
||||
mock_cognito.confirm_forgot_password.side_effect = asyncio.TimeoutError
|
||||
req = yield from cloud_client.post(
|
||||
'/api/cloud/confirm_forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456',
|
||||
'new_password': 'hello2',
|
||||
})
|
||||
assert req.status == 502
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_confirm_forgot_password_view_unknown_error(mock_cognito,
|
||||
cloud_client):
|
||||
"""Test unknown error while loggin out."""
|
||||
mock_cognito.confirm_forgot_password.side_effect = auth_api.UnknownError
|
||||
req = yield from cloud_client.post(
|
||||
'/api/cloud/confirm_forgot_password', json={
|
||||
'email': 'hello@bla.com',
|
||||
'confirmation_code': '123456',
|
||||
'new_password': 'hello2',
|
||||
})
|
||||
assert req.status == 502
|
||||
|
|
Loading…
Reference in New Issue