core/tests/components/auth/test_init.py

416 lines
13 KiB
Python
Raw Normal View History

"""Integration tests for the auth component."""
from datetime import timedelta
2021-01-01 21:31:56 +00:00
from unittest.mock import patch
from homeassistant.auth import InvalidAuthError
from homeassistant.auth.models import Credentials
from homeassistant.components import auth
from homeassistant.components.auth import RESULT_TYPE_USER
2018-07-17 07:24:51 +00:00
from homeassistant.setup import async_setup_component
from homeassistant.util.dt import utcnow
from . import async_setup_auth
from tests.common import CLIENT_ID, CLIENT_REDIRECT_URI, MockUser
async def async_setup_user_refresh_token(hass):
"""Create a testing user with a connected credential."""
user = await hass.auth.async_create_user("Test User")
credential = Credentials(
id="mock-credential-id",
auth_provider_type="insecure_example",
auth_provider_id=None,
data={"username": "test-user"},
is_new=False,
)
user.credentials.append(credential)
return await hass.auth.async_create_refresh_token(
user, CLIENT_ID, credential=credential
)
async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
"""Test logging in with new user and refreshing tokens."""
client = await async_setup_auth(hass, aiohttp_client, setup_api=True)
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/login_flow",
json={
"client_id": CLIENT_ID,
"handler": ["insecure_example", None],
"redirect_uri": CLIENT_REDIRECT_URI,
},
)
assert resp.status == 200
step = await resp.json()
resp = await client.post(
f"/auth/login_flow/{step['flow_id']}",
2019-07-31 19:25:30 +00:00
json={"client_id": CLIENT_ID, "username": "test-user", "password": "test-pass"},
)
assert resp.status == 200
step = await resp.json()
2019-07-31 19:25:30 +00:00
code = step["result"]
# Exchange code for tokens
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={"client_id": CLIENT_ID, "grant_type": "authorization_code", "code": code},
)
assert resp.status == 200
tokens = await resp.json()
assert (
2019-07-31 19:25:30 +00:00
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
)
# Use refresh token to get more tokens.
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": tokens["refresh_token"],
},
)
assert resp.status == 200
tokens = await resp.json()
2019-07-31 19:25:30 +00:00
assert "refresh_token" not in tokens
assert (
2019-07-31 19:25:30 +00:00
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
)
# Test using access token to hit API.
2019-07-31 19:25:30 +00:00
resp = await client.get("/api/")
assert resp.status == 401
2019-07-31 19:25:30 +00:00
resp = await client.get(
"/api/", headers={"authorization": f"Bearer {tokens['access_token']}"}
2019-07-31 19:25:30 +00:00
)
assert resp.status == 200
def test_auth_code_store_expiration():
"""Test that the auth code store will not return expired tokens."""
store, retrieve = auth._create_auth_code_store()
2019-07-31 19:25:30 +00:00
client_id = "bla"
user = MockUser(id="mock_user")
now = utcnow()
2019-07-31 19:25:30 +00:00
with patch("homeassistant.util.dt.utcnow", return_value=now):
code = store(client_id, user)
2019-07-31 19:25:30 +00:00
with patch(
"homeassistant.util.dt.utcnow", return_value=now + timedelta(minutes=10)
):
assert retrieve(client_id, RESULT_TYPE_USER, code) is None
2019-07-31 19:25:30 +00:00
with patch("homeassistant.util.dt.utcnow", return_value=now):
code = store(client_id, user)
2019-07-31 19:25:30 +00:00
with patch(
"homeassistant.util.dt.utcnow",
return_value=now + timedelta(minutes=9, seconds=59),
):
assert retrieve(client_id, RESULT_TYPE_USER, code) == user
2018-07-17 07:24:51 +00:00
async def test_ws_current_user(hass, hass_ws_client, hass_access_token):
"""Test the current user command with Home Assistant creds."""
assert await async_setup_component(hass, "auth", {})
2019-07-31 19:25:30 +00:00
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
user = refresh_token.user
client = await hass_ws_client(hass, hass_access_token)
2018-07-17 07:24:51 +00:00
2019-07-31 19:25:30 +00:00
await client.send_json({"id": 5, "type": auth.WS_TYPE_CURRENT_USER})
2018-07-17 07:24:51 +00:00
result = await client.receive_json()
2019-07-31 19:25:30 +00:00
assert result["success"], result
2018-07-17 07:24:51 +00:00
2019-07-31 19:25:30 +00:00
user_dict = result["result"]
2018-07-17 07:24:51 +00:00
2019-07-31 19:25:30 +00:00
assert user_dict["name"] == user.name
assert user_dict["id"] == user.id
assert user_dict["is_owner"] == user.is_owner
assert len(user_dict["credentials"]) == 1
2019-07-31 19:25:30 +00:00
hass_cred = user_dict["credentials"][0]
assert hass_cred["auth_provider_type"] == "homeassistant"
assert hass_cred["auth_provider_id"] is None
assert "data" not in hass_cred
async def test_cors_on_token(hass, aiohttp_client):
"""Test logging in with new user and refreshing tokens."""
client = await async_setup_auth(hass, aiohttp_client)
2019-07-31 19:25:30 +00:00
resp = await client.options(
"/auth/token",
headers={
"origin": "http://example.com",
"Access-Control-Request-Method": "POST",
},
)
assert resp.headers["Access-Control-Allow-Origin"] == "http://example.com"
assert resp.headers["Access-Control-Allow-Methods"] == "POST"
2019-07-31 19:25:30 +00:00
resp = await client.post("/auth/token", headers={"origin": "http://example.com"})
assert resp.headers["Access-Control-Allow-Origin"] == "http://example.com"
async def test_refresh_token_system_generated(hass, aiohttp_client):
"""Test that we can get access tokens for system generated user."""
client = await async_setup_auth(hass, aiohttp_client)
2019-07-31 19:25:30 +00:00
user = await hass.auth.async_create_system_user("Test System")
refresh_token = await hass.auth.async_create_refresh_token(user, None)
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": "https://this-is-not-allowed-for-system-users.com/",
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 400
result = await resp.json()
2019-07-31 19:25:30 +00:00
assert result["error"] == "invalid_request"
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={"grant_type": "refresh_token", "refresh_token": refresh_token.token},
)
assert resp.status == 200
tokens = await resp.json()
assert (
2019-07-31 19:25:30 +00:00
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
)
async def test_refresh_token_different_client_id(hass, aiohttp_client):
"""Test that we verify client ID."""
client = await async_setup_auth(hass, aiohttp_client)
refresh_token = await async_setup_user_refresh_token(hass)
# No client ID
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={"grant_type": "refresh_token", "refresh_token": refresh_token.token},
)
assert resp.status == 400
result = await resp.json()
2019-07-31 19:25:30 +00:00
assert result["error"] == "invalid_request"
# Different client ID
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": "http://example-different.com",
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 400
result = await resp.json()
2019-07-31 19:25:30 +00:00
assert result["error"] == "invalid_request"
# Correct
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 200
tokens = await resp.json()
assert (
2019-07-31 19:25:30 +00:00
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
)
async def test_refresh_token_provider_rejected(
hass, aiohttp_client, hass_admin_user, hass_admin_credential
):
"""Test that we verify client ID."""
client = await async_setup_auth(hass, aiohttp_client)
refresh_token = await async_setup_user_refresh_token(hass)
# Rejected by provider
with patch(
"homeassistant.auth.providers.insecure_example.ExampleAuthProvider.async_validate_refresh_token",
side_effect=InvalidAuthError("Invalid access"),
):
resp = await client.post(
"/auth/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 403
result = await resp.json()
assert result["error"] == "access_denied"
assert result["error_description"] == "Invalid access"
async def test_revoking_refresh_token(hass, aiohttp_client):
"""Test that we can revoke refresh tokens."""
client = await async_setup_auth(hass, aiohttp_client)
refresh_token = await async_setup_user_refresh_token(hass)
# Test that we can create an access token
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 200
tokens = await resp.json()
assert (
2019-07-31 19:25:30 +00:00
await hass.auth.async_validate_access_token(tokens["access_token"]) is not None
)
# Revoke refresh token
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token", data={"token": refresh_token.token, "action": "revoke"}
)
assert resp.status == 200
# Old access token should be no longer valid
2019-07-31 19:25:30 +00:00
assert await hass.auth.async_validate_access_token(tokens["access_token"]) is None
# Test that we no longer can create an access token
2019-07-31 19:25:30 +00:00
resp = await client.post(
"/auth/token",
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token.token,
},
)
assert resp.status == 400
2019-07-31 19:25:30 +00:00
async def test_ws_long_lived_access_token(hass, hass_ws_client, hass_access_token):
"""Test generate long-lived access token."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, "auth", {"http": {}})
ws_client = await hass_ws_client(hass, hass_access_token)
# verify create long-lived access token
2019-07-31 19:25:30 +00:00
await ws_client.send_json(
{
"id": 5,
"type": auth.WS_TYPE_LONG_LIVED_ACCESS_TOKEN,
"client_name": "GPS Logger",
"lifespan": 365,
}
)
result = await ws_client.receive_json()
2019-07-31 19:25:30 +00:00
assert result["success"], result
2019-07-31 19:25:30 +00:00
long_lived_access_token = result["result"]
assert long_lived_access_token is not None
2019-07-31 19:25:30 +00:00
refresh_token = await hass.auth.async_validate_access_token(long_lived_access_token)
assert refresh_token.client_id is None
2019-07-31 19:25:30 +00:00
assert refresh_token.client_name == "GPS Logger"
assert refresh_token.client_icon is None
async def test_ws_refresh_tokens(hass, hass_ws_client, hass_access_token):
"""Test fetching refresh token metadata."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, "auth", {"http": {}})
ws_client = await hass_ws_client(hass, hass_access_token)
2019-07-31 19:25:30 +00:00
await ws_client.send_json({"id": 5, "type": auth.WS_TYPE_REFRESH_TOKENS})
result = await ws_client.receive_json()
2019-07-31 19:25:30 +00:00
assert result["success"], result
assert len(result["result"]) == 1
token = result["result"][0]
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
assert token["id"] == refresh_token.id
assert token["type"] == refresh_token.token_type
assert token["client_id"] == refresh_token.client_id
assert token["client_name"] == refresh_token.client_name
assert token["client_icon"] == refresh_token.client_icon
assert token["created_at"] == refresh_token.created_at.isoformat()
assert token["is_current"] is True
assert token["last_used_at"] == refresh_token.last_used_at.isoformat()
assert token["last_used_ip"] == refresh_token.last_used_ip
async def test_ws_delete_refresh_token(hass, hass_ws_client, hass_access_token):
"""Test deleting a refresh token."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, "auth", {"http": {}})
2019-07-31 19:25:30 +00:00
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
ws_client = await hass_ws_client(hass, hass_access_token)
# verify create long-lived access token
2019-07-31 19:25:30 +00:00
await ws_client.send_json(
{
"id": 5,
"type": auth.WS_TYPE_DELETE_REFRESH_TOKEN,
"refresh_token_id": refresh_token.id,
}
)
result = await ws_client.receive_json()
2019-07-31 19:25:30 +00:00
assert result["success"], result
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
assert refresh_token is None
async def test_ws_sign_path(hass, hass_ws_client, hass_access_token):
"""Test signing a path."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, "auth", {"http": {}})
ws_client = await hass_ws_client(hass, hass_access_token)
2019-07-31 19:25:30 +00:00
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
2019-07-31 19:25:30 +00:00
with patch(
"homeassistant.components.auth.async_sign_path", return_value="hello_world"
) as mock_sign:
await ws_client.send_json(
{
"id": 5,
"type": auth.WS_TYPE_SIGN_PATH,
"path": "/api/hello",
"expires": 20,
}
)
result = await ws_client.receive_json()
2019-07-31 19:25:30 +00:00
assert result["success"], result
assert result["result"] == {"path": "hello_world"}
assert len(mock_sign.mock_calls) == 1
hass, p_refresh_token, path, expires = mock_sign.mock_calls[0][1]
assert p_refresh_token == refresh_token.id
2019-07-31 19:25:30 +00:00
assert path == "/api/hello"
assert expires.total_seconds() == 20