By default to use access_token if hass.auth.active (#15212)
* Force to use access_token if hass.auth.active * Not allow Basic auth with api_password if hass.auth.active * Block websocket api_password auth when hass.auth.active * Add legacy_api_password auth provider * lint * lintpull/15330/head
parent
279fd39677
commit
63b28aa39d
|
@ -279,6 +279,18 @@ class AuthManager:
|
|||
"""Return if any auth providers are registered."""
|
||||
return bool(self._providers)
|
||||
|
||||
@property
|
||||
def support_legacy(self):
|
||||
"""
|
||||
Return if legacy_api_password auth providers are registered.
|
||||
|
||||
Should be removed when we removed legacy_api_password auth providers.
|
||||
"""
|
||||
for provider_type, _ in self._providers:
|
||||
if provider_type == 'legacy_api_password':
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def async_auth_providers(self):
|
||||
"""Return a list of available auth providers."""
|
||||
|
@ -565,7 +577,7 @@ class AuthStore:
|
|||
client_id=rt_dict['client_id'],
|
||||
created_at=dt_util.parse_datetime(rt_dict['created_at']),
|
||||
access_token_expiration=timedelta(
|
||||
rt_dict['access_token_expiration']),
|
||||
seconds=rt_dict['access_token_expiration']),
|
||||
token=rt_dict['token'],
|
||||
)
|
||||
refresh_tokens[token.id] = token
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
"""
|
||||
Support Legacy API password auth provider.
|
||||
|
||||
It will be removed when auth system production ready
|
||||
"""
|
||||
from collections import OrderedDict
|
||||
import hmac
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant import auth, data_entry_flow
|
||||
from homeassistant.core import callback
|
||||
|
||||
USER_SCHEMA = vol.Schema({
|
||||
vol.Required('username'): str,
|
||||
})
|
||||
|
||||
|
||||
CONFIG_SCHEMA = auth.AUTH_PROVIDER_SCHEMA.extend({
|
||||
}, extra=vol.PREVENT_EXTRA)
|
||||
|
||||
LEGACY_USER = 'homeassistant'
|
||||
|
||||
|
||||
class InvalidAuthError(HomeAssistantError):
|
||||
"""Raised when submitting invalid authentication."""
|
||||
|
||||
|
||||
@auth.AUTH_PROVIDERS.register('legacy_api_password')
|
||||
class LegacyApiPasswordAuthProvider(auth.AuthProvider):
|
||||
"""Example auth provider based on hardcoded usernames and passwords."""
|
||||
|
||||
DEFAULT_TITLE = 'Legacy API Password'
|
||||
|
||||
async def async_credential_flow(self):
|
||||
"""Return a flow to login."""
|
||||
return LoginFlow(self)
|
||||
|
||||
@callback
|
||||
def async_validate_login(self, password):
|
||||
"""Helper to validate a username and password."""
|
||||
if not hasattr(self.hass, 'http'):
|
||||
raise ValueError('http component is not loaded')
|
||||
|
||||
if self.hass.http.api_password is None:
|
||||
raise ValueError('http component is not configured using'
|
||||
' api_password')
|
||||
|
||||
if not hmac.compare_digest(self.hass.http.api_password.encode('utf-8'),
|
||||
password.encode('utf-8')):
|
||||
raise InvalidAuthError
|
||||
|
||||
async def async_get_or_create_credentials(self, flow_result):
|
||||
"""Return LEGACY_USER always."""
|
||||
for credential in await self.async_credentials():
|
||||
if credential.data['username'] == LEGACY_USER:
|
||||
return credential
|
||||
|
||||
return self.async_create_credentials({
|
||||
'username': LEGACY_USER
|
||||
})
|
||||
|
||||
async def async_user_meta_for_credentials(self, credentials):
|
||||
"""
|
||||
Set name as LEGACY_USER always.
|
||||
|
||||
Will be used to populate info when creating a new user.
|
||||
"""
|
||||
return {'name': LEGACY_USER}
|
||||
|
||||
|
||||
class LoginFlow(data_entry_flow.FlowHandler):
|
||||
"""Handler for the login flow."""
|
||||
|
||||
def __init__(self, auth_provider):
|
||||
"""Initialize the login flow."""
|
||||
self._auth_provider = auth_provider
|
||||
|
||||
async def async_step_init(self, user_input=None):
|
||||
"""Handle the step of the form."""
|
||||
errors = {}
|
||||
|
||||
if user_input is not None:
|
||||
try:
|
||||
self._auth_provider.async_validate_login(
|
||||
user_input['password'])
|
||||
except InvalidAuthError:
|
||||
errors['base'] = 'invalid_auth'
|
||||
|
||||
if not errors:
|
||||
return self.async_create_entry(
|
||||
title=self._auth_provider.name,
|
||||
data={}
|
||||
)
|
||||
|
||||
schema = OrderedDict()
|
||||
schema['password'] = str
|
||||
|
||||
return self.async_show_form(
|
||||
step_id='init',
|
||||
data_schema=vol.Schema(schema),
|
||||
errors=errors,
|
||||
)
|
|
@ -184,7 +184,22 @@ class HomeAssistantHTTP(object):
|
|||
if is_ban_enabled:
|
||||
setup_bans(hass, app, login_threshold)
|
||||
|
||||
setup_auth(app, trusted_networks, api_password)
|
||||
if hass.auth.active:
|
||||
if hass.auth.support_legacy:
|
||||
_LOGGER.warning("Experimental auth api enabled and "
|
||||
"legacy_api_password support enabled. Please "
|
||||
"use access_token instead api_password, "
|
||||
"although you can still use legacy "
|
||||
"api_password")
|
||||
else:
|
||||
_LOGGER.warning("Experimental auth api enabled. Please use "
|
||||
"access_token instead api_password.")
|
||||
elif api_password is None:
|
||||
_LOGGER.warning("You have been advised to set http.api_password.")
|
||||
|
||||
setup_auth(app, trusted_networks, hass.auth.active,
|
||||
support_legacy=hass.auth.support_legacy,
|
||||
api_password=api_password)
|
||||
|
||||
if cors_origins:
|
||||
setup_cors(app, cors_origins)
|
||||
|
|
|
@ -17,37 +17,44 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
|
||||
@callback
|
||||
def setup_auth(app, trusted_networks, api_password):
|
||||
def setup_auth(app, trusted_networks, use_auth,
|
||||
support_legacy=False, api_password=None):
|
||||
"""Create auth middleware for the app."""
|
||||
@middleware
|
||||
async def auth_middleware(request, handler):
|
||||
"""Authenticate as middleware."""
|
||||
# If no password set, just always set authenticated=True
|
||||
if api_password is None:
|
||||
request[KEY_AUTHENTICATED] = True
|
||||
return await handler(request)
|
||||
|
||||
# Check authentication
|
||||
authenticated = False
|
||||
|
||||
if (HTTP_HEADER_HA_AUTH in request.headers and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))):
|
||||
if use_auth and (HTTP_HEADER_HA_AUTH in request.headers or
|
||||
DATA_API_PASSWORD in request.query):
|
||||
_LOGGER.warning('Please use access_token instead api_password.')
|
||||
|
||||
legacy_auth = (not use_auth or support_legacy) and api_password
|
||||
if (hdrs.AUTHORIZATION in request.headers and
|
||||
await async_validate_auth_header(
|
||||
request, api_password if legacy_auth else None)):
|
||||
# it included both use_auth and api_password Basic auth
|
||||
authenticated = True
|
||||
|
||||
elif (legacy_auth and HTTP_HEADER_HA_AUTH in request.headers and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))):
|
||||
# A valid auth header has been set
|
||||
authenticated = True
|
||||
|
||||
elif (DATA_API_PASSWORD in request.query and
|
||||
elif (legacy_auth and DATA_API_PASSWORD in request.query and
|
||||
hmac.compare_digest(
|
||||
api_password.encode('utf-8'),
|
||||
request.query[DATA_API_PASSWORD].encode('utf-8'))):
|
||||
authenticated = True
|
||||
|
||||
elif (hdrs.AUTHORIZATION in request.headers and
|
||||
await async_validate_auth_header(api_password, request)):
|
||||
elif _is_trusted_ip(request, trusted_networks):
|
||||
authenticated = True
|
||||
|
||||
elif _is_trusted_ip(request, trusted_networks):
|
||||
elif not use_auth and api_password is None:
|
||||
# If neither password nor auth_providers set,
|
||||
# just always set authenticated=True
|
||||
authenticated = True
|
||||
|
||||
request[KEY_AUTHENTICATED] = authenticated
|
||||
|
@ -76,8 +83,12 @@ def validate_password(request, api_password):
|
|||
request.app['hass'].http.api_password.encode('utf-8'))
|
||||
|
||||
|
||||
async def async_validate_auth_header(api_password, request):
|
||||
"""Test an authorization header if valid password."""
|
||||
async def async_validate_auth_header(request, api_password=None):
|
||||
"""
|
||||
Test authorization header against access token.
|
||||
|
||||
Basic auth_type is legacy code, should be removed with api_password.
|
||||
"""
|
||||
if hdrs.AUTHORIZATION not in request.headers:
|
||||
return False
|
||||
|
||||
|
@ -88,7 +99,16 @@ async def async_validate_auth_header(api_password, request):
|
|||
# If no space in authorization header
|
||||
return False
|
||||
|
||||
if auth_type == 'Basic':
|
||||
if auth_type == 'Bearer':
|
||||
hass = request.app['hass']
|
||||
access_token = hass.auth.async_get_access_token(auth_val)
|
||||
if access_token is None:
|
||||
return False
|
||||
|
||||
request['hass_user'] = access_token.refresh_token.user
|
||||
return True
|
||||
|
||||
elif auth_type == 'Basic' and api_password is not None:
|
||||
decoded = base64.b64decode(auth_val).decode('utf-8')
|
||||
try:
|
||||
username, password = decoded.split(':', 1)
|
||||
|
@ -102,13 +122,5 @@ async def async_validate_auth_header(api_password, request):
|
|||
return hmac.compare_digest(api_password.encode('utf-8'),
|
||||
password.encode('utf-8'))
|
||||
|
||||
if auth_type != 'Bearer':
|
||||
else:
|
||||
return False
|
||||
|
||||
hass = request.app['hass']
|
||||
access_token = hass.auth.async_get_access_token(auth_val)
|
||||
if access_token is None:
|
||||
return False
|
||||
|
||||
request['hass_user'] = access_token.refresh_token.user
|
||||
return True
|
||||
|
|
|
@ -315,26 +315,32 @@ class ActiveConnection:
|
|||
authenticated = True
|
||||
|
||||
else:
|
||||
self.debug("Request auth")
|
||||
await self.wsock.send_json(auth_required_message())
|
||||
msg = await wsock.receive_json()
|
||||
msg = AUTH_MESSAGE_SCHEMA(msg)
|
||||
|
||||
if 'api_password' in msg:
|
||||
authenticated = validate_password(
|
||||
request, msg['api_password'])
|
||||
|
||||
elif 'access_token' in msg:
|
||||
if self.hass.auth.active and 'access_token' in msg:
|
||||
self.debug("Received access_token")
|
||||
token = self.hass.auth.async_get_access_token(
|
||||
msg['access_token'])
|
||||
authenticated = token is not None
|
||||
|
||||
elif ((not self.hass.auth.active or
|
||||
self.hass.auth.support_legacy) and
|
||||
'api_password' in msg):
|
||||
self.debug("Received api_password")
|
||||
authenticated = validate_password(
|
||||
request, msg['api_password'])
|
||||
|
||||
if not authenticated:
|
||||
self.debug("Invalid password")
|
||||
self.debug("Authorization failed")
|
||||
await self.wsock.send_json(
|
||||
auth_invalid_message('Invalid password'))
|
||||
auth_invalid_message('Invalid access token or password'))
|
||||
await process_wrong_login(request)
|
||||
return wsock
|
||||
|
||||
self.debug("Auth OK")
|
||||
await self.wsock.send_json(auth_ok_message())
|
||||
|
||||
# ---------- AUTH PHASE OVER ----------
|
||||
|
@ -392,7 +398,7 @@ class ActiveConnection:
|
|||
if wsock.closed:
|
||||
self.debug("Connection closed by client")
|
||||
else:
|
||||
_LOGGER.exception("Unexpected TypeError: %s", msg)
|
||||
_LOGGER.exception("Unexpected TypeError: %s", err)
|
||||
|
||||
except ValueError as err:
|
||||
msg = "Received invalid JSON"
|
||||
|
@ -403,7 +409,7 @@ class ActiveConnection:
|
|||
self._writer_task.cancel()
|
||||
|
||||
except CANCELLATION_ERRORS:
|
||||
self.debug("Connection cancelled by server")
|
||||
self.debug("Connection cancelled")
|
||||
|
||||
except asyncio.QueueFull:
|
||||
self.log_error("Client exceeded max pending messages [1]:",
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
"""Tests for the legacy_api_password auth provider."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant import auth
|
||||
from homeassistant.auth_providers import legacy_api_password
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(hass):
|
||||
"""Mock store."""
|
||||
return auth.AuthStore(hass)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def provider(hass, store):
|
||||
"""Mock provider."""
|
||||
return legacy_api_password.LegacyApiPasswordAuthProvider(hass, store, {
|
||||
'type': 'legacy_api_password',
|
||||
})
|
||||
|
||||
|
||||
async def test_create_new_credential(provider):
|
||||
"""Test that we create a new credential."""
|
||||
credentials = await provider.async_get_or_create_credentials({})
|
||||
assert credentials.data["username"] is legacy_api_password.LEGACY_USER
|
||||
assert credentials.is_new is True
|
||||
|
||||
|
||||
async def test_only_one_credentials(store, provider):
|
||||
"""Call create twice will return same credential."""
|
||||
credentials = await provider.async_get_or_create_credentials({})
|
||||
await store.async_get_or_create_user(credentials, provider)
|
||||
credentials2 = await provider.async_get_or_create_credentials({})
|
||||
assert credentials2.data["username"] is legacy_api_password.LEGACY_USER
|
||||
assert credentials2.id is credentials.id
|
||||
assert credentials2.is_new is False
|
||||
|
||||
|
||||
async def test_verify_not_load(hass, provider):
|
||||
"""Test we raise if http module not load."""
|
||||
with pytest.raises(ValueError):
|
||||
provider.async_validate_login('test-password')
|
||||
hass.http = Mock(api_password=None)
|
||||
with pytest.raises(ValueError):
|
||||
provider.async_validate_login('test-password')
|
||||
hass.http = Mock(api_password='test-password')
|
||||
provider.async_validate_login('test-password')
|
||||
|
||||
|
||||
async def test_verify_login(hass, provider):
|
||||
"""Test we raise if http module not load."""
|
||||
hass.http = Mock(api_password='test-password')
|
||||
provider.async_validate_login('test-password')
|
||||
hass.http = Mock(api_password='test-password')
|
||||
with pytest.raises(legacy_api_password.InvalidAuthError):
|
||||
provider.async_validate_login('invalid-password')
|
||||
|
||||
|
||||
async def test_utf_8_username_password(provider):
|
||||
"""Test that we create a new credential."""
|
||||
credentials = await provider.async_get_or_create_credentials({
|
||||
'username': '🎉',
|
||||
'password': '😎',
|
||||
})
|
||||
assert credentials.is_new is True
|
|
@ -1,20 +1,23 @@
|
|||
"""The tests for the Home Assistant HTTP component."""
|
||||
# pylint: disable=protected-access
|
||||
from ipaddress import ip_network
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import patch, Mock
|
||||
|
||||
import pytest
|
||||
from aiohttp import BasicAuth, web
|
||||
from aiohttp.web_exceptions import HTTPUnauthorized
|
||||
import pytest
|
||||
|
||||
from homeassistant.auth import AccessToken, RefreshToken
|
||||
from homeassistant.components.http.auth import setup_auth
|
||||
from homeassistant.components.http.const import KEY_AUTHENTICATED
|
||||
from homeassistant.components.http.real_ip import setup_real_ip
|
||||
from homeassistant.const import HTTP_HEADER_HA_AUTH
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.components.http.auth import setup_auth
|
||||
from homeassistant.components.http.real_ip import setup_real_ip
|
||||
from homeassistant.components.http.const import KEY_AUTHENTICATED
|
||||
|
||||
from . import mock_real_ip
|
||||
|
||||
|
||||
ACCESS_TOKEN = 'tk.1234'
|
||||
|
||||
API_PASSWORD = 'test1234'
|
||||
|
||||
# Don't add 127.0.0.1/::1 as trusted, as it may interfere with other test cases
|
||||
|
@ -36,15 +39,37 @@ async def mock_handler(request):
|
|||
return web.Response(status=200)
|
||||
|
||||
|
||||
def mock_async_get_access_token(token):
|
||||
"""Return if token is valid."""
|
||||
if token == ACCESS_TOKEN:
|
||||
return Mock(spec=AccessToken,
|
||||
token=ACCESS_TOKEN,
|
||||
refresh_token=Mock(spec=RefreshToken))
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
"""Fixture to setup a web.Application."""
|
||||
app = web.Application()
|
||||
mock_auth = Mock(async_get_access_token=mock_async_get_access_token)
|
||||
app['hass'] = Mock(auth=mock_auth)
|
||||
app.router.add_get('/', mock_handler)
|
||||
setup_real_ip(app, False, [])
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app2():
|
||||
"""Fixture to setup a web.Application without real_ip middleware."""
|
||||
app = web.Application()
|
||||
mock_auth = Mock(async_get_access_token=mock_async_get_access_token)
|
||||
app['hass'] = Mock(auth=mock_auth)
|
||||
app.router.add_get('/', mock_handler)
|
||||
return app
|
||||
|
||||
|
||||
async def test_auth_middleware_loaded_by_default(hass):
|
||||
"""Test accessing to server from banned IP when feature is off."""
|
||||
with patch('homeassistant.components.http.setup_auth') as mock_setup:
|
||||
|
@ -57,7 +82,7 @@ async def test_auth_middleware_loaded_by_default(hass):
|
|||
|
||||
async def test_access_without_password(app, aiohttp_client):
|
||||
"""Test access without password."""
|
||||
setup_auth(app, [], None)
|
||||
setup_auth(app, [], False, api_password=None)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
resp = await client.get('/')
|
||||
|
@ -65,8 +90,8 @@ async def test_access_without_password(app, aiohttp_client):
|
|||
|
||||
|
||||
async def test_access_with_password_in_header(app, aiohttp_client):
|
||||
"""Test access with password in URL."""
|
||||
setup_auth(app, [], API_PASSWORD)
|
||||
"""Test access with password in header."""
|
||||
setup_auth(app, [], False, api_password=API_PASSWORD)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get(
|
||||
|
@ -79,8 +104,8 @@ async def test_access_with_password_in_header(app, aiohttp_client):
|
|||
|
||||
|
||||
async def test_access_with_password_in_query(app, aiohttp_client):
|
||||
"""Test access without password."""
|
||||
setup_auth(app, [], API_PASSWORD)
|
||||
"""Test access with password in URL."""
|
||||
setup_auth(app, [], False, api_password=API_PASSWORD)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
resp = await client.get('/', params={
|
||||
|
@ -99,7 +124,7 @@ async def test_access_with_password_in_query(app, aiohttp_client):
|
|||
|
||||
async def test_basic_auth_works(app, aiohttp_client):
|
||||
"""Test access with basic authentication."""
|
||||
setup_auth(app, [], API_PASSWORD)
|
||||
setup_auth(app, [], False, api_password=API_PASSWORD)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get(
|
||||
|
@ -125,15 +150,12 @@ async def test_basic_auth_works(app, aiohttp_client):
|
|||
assert req.status == 401
|
||||
|
||||
|
||||
async def test_access_with_trusted_ip(aiohttp_client):
|
||||
async def test_access_with_trusted_ip(app2, aiohttp_client):
|
||||
"""Test access with an untrusted ip address."""
|
||||
app = web.Application()
|
||||
app.router.add_get('/', mock_handler)
|
||||
setup_auth(app2, TRUSTED_NETWORKS, False, api_password='some-pass')
|
||||
|
||||
setup_auth(app, TRUSTED_NETWORKS, 'some-pass')
|
||||
|
||||
set_mock_ip = mock_real_ip(app)
|
||||
client = await aiohttp_client(app)
|
||||
set_mock_ip = mock_real_ip(app2)
|
||||
client = await aiohttp_client(app2)
|
||||
|
||||
for remote_addr in UNTRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
|
@ -146,3 +168,94 @@ async def test_access_with_trusted_ip(aiohttp_client):
|
|||
resp = await client.get('/')
|
||||
assert resp.status == 200, \
|
||||
"{} should be trusted".format(remote_addr)
|
||||
|
||||
|
||||
async def test_auth_active_access_with_access_token_in_header(
|
||||
app, aiohttp_client):
|
||||
"""Test access with access token in header."""
|
||||
setup_auth(app, [], True, api_password=None)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'Authorization': 'Bearer {}'.format(ACCESS_TOKEN)})
|
||||
assert req.status == 200
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'AUTHORIZATION': 'Bearer {}'.format(ACCESS_TOKEN)})
|
||||
assert req.status == 200
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'authorization': 'Bearer {}'.format(ACCESS_TOKEN)})
|
||||
assert req.status == 200
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'Authorization': ACCESS_TOKEN})
|
||||
assert req.status == 401
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'Authorization': 'BEARER {}'.format(ACCESS_TOKEN)})
|
||||
assert req.status == 401
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={'Authorization': 'Bearer wrong-pass'})
|
||||
assert req.status == 401
|
||||
|
||||
|
||||
async def test_auth_active_access_with_trusted_ip(app2, aiohttp_client):
|
||||
"""Test access with an untrusted ip address."""
|
||||
setup_auth(app2, TRUSTED_NETWORKS, True, api_password=None)
|
||||
|
||||
set_mock_ip = mock_real_ip(app2)
|
||||
client = await aiohttp_client(app2)
|
||||
|
||||
for remote_addr in UNTRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get('/')
|
||||
assert resp.status == 401, \
|
||||
"{} shouldn't be trusted".format(remote_addr)
|
||||
|
||||
for remote_addr in TRUSTED_ADDRESSES:
|
||||
set_mock_ip(remote_addr)
|
||||
resp = await client.get('/')
|
||||
assert resp.status == 200, \
|
||||
"{} should be trusted".format(remote_addr)
|
||||
|
||||
|
||||
async def test_auth_active_blocked_api_password_access(app, aiohttp_client):
|
||||
"""Test access using api_password should be blocked when auth.active."""
|
||||
setup_auth(app, [], True, api_password=API_PASSWORD)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={HTTP_HEADER_HA_AUTH: API_PASSWORD})
|
||||
assert req.status == 401
|
||||
|
||||
resp = await client.get('/', params={
|
||||
'api_password': API_PASSWORD
|
||||
})
|
||||
assert resp.status == 401
|
||||
|
||||
req = await client.get(
|
||||
'/',
|
||||
auth=BasicAuth('homeassistant', API_PASSWORD))
|
||||
assert req.status == 401
|
||||
|
||||
|
||||
async def test_auth_legacy_support_api_password_access(app, aiohttp_client):
|
||||
"""Test access using api_password if auth.support_legacy."""
|
||||
setup_auth(app, [], True, support_legacy=True, api_password=API_PASSWORD)
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
req = await client.get(
|
||||
'/', headers={HTTP_HEADER_HA_AUTH: API_PASSWORD})
|
||||
assert req.status == 200
|
||||
|
||||
resp = await client.get('/', params={
|
||||
'api_password': API_PASSWORD
|
||||
})
|
||||
assert resp.status == 200
|
||||
|
||||
req = await client.get(
|
||||
'/',
|
||||
auth=BasicAuth('homeassistant', API_PASSWORD))
|
||||
assert req.status == 200
|
||||
|
|
|
@ -77,7 +77,7 @@ def test_auth_via_msg_incorrect_pass(no_auth_websocket_client):
|
|||
|
||||
assert mock_process_wrong_login.called
|
||||
assert msg['type'] == wapi.TYPE_AUTH_INVALID
|
||||
assert msg['message'] == 'Invalid password'
|
||||
assert msg['message'] == 'Invalid access token or password'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
|
@ -316,47 +316,103 @@ def test_unknown_command(websocket_client):
|
|||
assert msg['error']['code'] == wapi.ERR_UNKNOWN_COMMAND
|
||||
|
||||
|
||||
async def test_auth_with_token(hass, aiohttp_client, hass_access_token):
|
||||
async def test_auth_active_with_token(hass, aiohttp_client, hass_access_token):
|
||||
"""Test authenticating with a token."""
|
||||
assert await async_setup_component(hass, 'websocket_api', {
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
|
||||
client = await aiohttp_client(hass.http.app)
|
||||
|
||||
async with client.ws_connect(wapi.URL) as ws:
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
with patch('homeassistant.auth.AuthManager.active') as auth_active:
|
||||
auth_active.return_value = True
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'access_token': hass_access_token.token
|
||||
})
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'access_token': hass_access_token.token
|
||||
})
|
||||
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_OK
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_OK
|
||||
|
||||
|
||||
async def test_auth_active_with_password_not_allow(hass, aiohttp_client):
|
||||
"""Test authenticating with a token."""
|
||||
assert await async_setup_component(hass, 'websocket_api', {
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
|
||||
client = await aiohttp_client(hass.http.app)
|
||||
|
||||
async with client.ws_connect(wapi.URL) as ws:
|
||||
with patch('homeassistant.auth.AuthManager.active',
|
||||
return_value=True):
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'api_password': API_PASSWORD
|
||||
})
|
||||
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|
||||
|
||||
|
||||
async def test_auth_legacy_support_with_password(hass, aiohttp_client):
|
||||
"""Test authenticating with a token."""
|
||||
assert await async_setup_component(hass, 'websocket_api', {
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
|
||||
client = await aiohttp_client(hass.http.app)
|
||||
|
||||
async with client.ws_connect(wapi.URL) as ws:
|
||||
with patch('homeassistant.auth.AuthManager.active',
|
||||
return_value=True),\
|
||||
patch('homeassistant.auth.AuthManager.support_legacy',
|
||||
return_value=True):
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'api_password': API_PASSWORD
|
||||
})
|
||||
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_OK
|
||||
|
||||
|
||||
async def test_auth_with_invalid_token(hass, aiohttp_client):
|
||||
"""Test authenticating with a token."""
|
||||
assert await async_setup_component(hass, 'websocket_api', {
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
'http': {
|
||||
'api_password': API_PASSWORD
|
||||
}
|
||||
})
|
||||
|
||||
client = await aiohttp_client(hass.http.app)
|
||||
|
||||
async with client.ws_connect(wapi.URL) as ws:
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
with patch('homeassistant.auth.AuthManager.active') as auth_active:
|
||||
auth_active.return_value = True
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||||
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'access_token': 'incorrect'
|
||||
})
|
||||
await ws.send_json({
|
||||
'type': wapi.TYPE_AUTH,
|
||||
'access_token': 'incorrect'
|
||||
})
|
||||
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|
||||
auth_msg = await ws.receive_json()
|
||||
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|
||||
|
|
Loading…
Reference in New Issue