187 lines
6.1 KiB
Python
187 lines
6.1 KiB
Python
|
"""Test auth of websocket API."""
|
||
|
from unittest.mock import patch
|
||
|
|
||
|
from homeassistant.components import websocket_api as wapi
|
||
|
from homeassistant.components.websocket_api import commands
|
||
|
from homeassistant.setup import async_setup_component
|
||
|
|
||
|
from tests.common import mock_coro
|
||
|
|
||
|
from . import API_PASSWORD
|
||
|
|
||
|
|
||
|
async def test_auth_via_msg(no_auth_websocket_client):
|
||
|
"""Test authenticating."""
|
||
|
await no_auth_websocket_client.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'api_password': API_PASSWORD
|
||
|
})
|
||
|
|
||
|
msg = await no_auth_websocket_client.receive_json()
|
||
|
|
||
|
assert msg['type'] == wapi.TYPE_AUTH_OK
|
||
|
|
||
|
|
||
|
async def test_auth_via_msg_incorrect_pass(no_auth_websocket_client):
|
||
|
"""Test authenticating."""
|
||
|
with patch('homeassistant.components.websocket_api.process_wrong_login',
|
||
|
return_value=mock_coro()) as mock_process_wrong_login:
|
||
|
await no_auth_websocket_client.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'api_password': API_PASSWORD + 'wrong'
|
||
|
})
|
||
|
|
||
|
msg = await no_auth_websocket_client.receive_json()
|
||
|
|
||
|
assert mock_process_wrong_login.called
|
||
|
assert msg['type'] == wapi.TYPE_AUTH_INVALID
|
||
|
assert msg['message'] == 'Invalid access token or password'
|
||
|
|
||
|
|
||
|
async def test_pre_auth_only_auth_allowed(no_auth_websocket_client):
|
||
|
"""Verify that before authentication, only auth messages are allowed."""
|
||
|
await no_auth_websocket_client.send_json({
|
||
|
'type': commands.TYPE_CALL_SERVICE,
|
||
|
'domain': 'domain_test',
|
||
|
'service': 'test_service',
|
||
|
'service_data': {
|
||
|
'hello': 'world'
|
||
|
}
|
||
|
})
|
||
|
|
||
|
msg = await no_auth_websocket_client.receive_json()
|
||
|
|
||
|
assert msg['type'] == wapi.TYPE_AUTH_INVALID
|
||
|
assert msg['message'].startswith('Message incorrectly formatted')
|
||
|
|
||
|
|
||
|
async def test_auth_active_with_token(hass, aiohttp_client, hass_access_token):
|
||
|
"""Test authenticating with a token."""
|
||
|
assert await async_setup_component(hass, 'websocket_api', {
|
||
|
'http': {
|
||
|
'api_password': API_PASSWORD
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client = await aiohttp_client(hass.http.app)
|
||
|
|
||
|
async with client.ws_connect(wapi.URL) as ws:
|
||
|
with patch('homeassistant.auth.AuthManager.active') as auth_active:
|
||
|
auth_active.return_value = True
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||
|
|
||
|
await ws.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'access_token': hass_access_token
|
||
|
})
|
||
|
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_OK
|
||
|
|
||
|
|
||
|
async def test_auth_active_user_inactive(hass, aiohttp_client,
|
||
|
hass_access_token):
|
||
|
"""Test authenticating with a token."""
|
||
|
refresh_token = await hass.auth.async_validate_access_token(
|
||
|
hass_access_token)
|
||
|
refresh_token.user.is_active = False
|
||
|
assert await async_setup_component(hass, 'websocket_api', {
|
||
|
'http': {
|
||
|
'api_password': API_PASSWORD
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client = await aiohttp_client(hass.http.app)
|
||
|
|
||
|
async with client.ws_connect(wapi.URL) as ws:
|
||
|
with patch('homeassistant.auth.AuthManager.active') as auth_active:
|
||
|
auth_active.return_value = True
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||
|
|
||
|
await ws.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'access_token': hass_access_token
|
||
|
})
|
||
|
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|
||
|
|
||
|
|
||
|
async def test_auth_active_with_password_not_allow(hass, aiohttp_client):
|
||
|
"""Test authenticating with a token."""
|
||
|
assert await async_setup_component(hass, 'websocket_api', {
|
||
|
'http': {
|
||
|
'api_password': API_PASSWORD
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client = await aiohttp_client(hass.http.app)
|
||
|
|
||
|
async with client.ws_connect(wapi.URL) as ws:
|
||
|
with patch('homeassistant.auth.AuthManager.active',
|
||
|
return_value=True):
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||
|
|
||
|
await ws.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'api_password': API_PASSWORD
|
||
|
})
|
||
|
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|
||
|
|
||
|
|
||
|
async def test_auth_legacy_support_with_password(hass, aiohttp_client):
|
||
|
"""Test authenticating with a token."""
|
||
|
assert await async_setup_component(hass, 'websocket_api', {
|
||
|
'http': {
|
||
|
'api_password': API_PASSWORD
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client = await aiohttp_client(hass.http.app)
|
||
|
|
||
|
async with client.ws_connect(wapi.URL) as ws:
|
||
|
with patch('homeassistant.auth.AuthManager.active',
|
||
|
return_value=True),\
|
||
|
patch('homeassistant.auth.AuthManager.support_legacy',
|
||
|
return_value=True):
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||
|
|
||
|
await ws.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'api_password': API_PASSWORD
|
||
|
})
|
||
|
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_OK
|
||
|
|
||
|
|
||
|
async def test_auth_with_invalid_token(hass, aiohttp_client):
|
||
|
"""Test authenticating with a token."""
|
||
|
assert await async_setup_component(hass, 'websocket_api', {
|
||
|
'http': {
|
||
|
'api_password': API_PASSWORD
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client = await aiohttp_client(hass.http.app)
|
||
|
|
||
|
async with client.ws_connect(wapi.URL) as ws:
|
||
|
with patch('homeassistant.auth.AuthManager.active') as auth_active:
|
||
|
auth_active.return_value = True
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED
|
||
|
|
||
|
await ws.send_json({
|
||
|
'type': wapi.TYPE_AUTH,
|
||
|
'access_token': 'incorrect'
|
||
|
})
|
||
|
|
||
|
auth_msg = await ws.receive_json()
|
||
|
assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID
|