core/homeassistant/components/http/auth.py

139 lines
3.7 KiB
Python

"""Authentication for HTTP component."""
import logging
from aiohttp import hdrs
from aiohttp.web import middleware
import jwt
from homeassistant.auth.util import generate_secret
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .const import KEY_AUTHENTICATED, KEY_HASS_USER, KEY_REAL_IP
# mypy: allow-untyped-defs, no-check-untyped-defs
_LOGGER = logging.getLogger(__name__)
DATA_API_PASSWORD = "api_password"
DATA_SIGN_SECRET = "http.auth.sign_secret"
SIGN_QUERY_PARAM = "authSig"
@callback
def async_sign_path(hass, refresh_token_id, path, expiration):
"""Sign a path for temporary access without auth header."""
secret = hass.data.get(DATA_SIGN_SECRET)
if secret is None:
secret = hass.data[DATA_SIGN_SECRET] = generate_secret()
now = dt_util.utcnow()
return "{}?{}={}".format(
path,
SIGN_QUERY_PARAM,
jwt.encode(
{
"iss": refresh_token_id,
"path": path,
"iat": now,
"exp": now + expiration,
},
secret,
algorithm="HS256",
).decode(),
)
@callback
def setup_auth(hass, app):
"""Create auth middleware for the app."""
async def async_validate_auth_header(request):
"""
Test authorization header against access token.
Basic auth_type is legacy code, should be removed with api_password.
"""
try:
auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION).split(" ", 1)
except ValueError:
# If no space in authorization header
return False
if auth_type != "Bearer":
return False
refresh_token = await hass.auth.async_validate_access_token(auth_val)
if refresh_token is None:
return False
request[KEY_HASS_USER] = refresh_token.user
return True
async def async_validate_signed_request(request):
"""Validate a signed request."""
secret = hass.data.get(DATA_SIGN_SECRET)
if secret is None:
return False
signature = request.query.get(SIGN_QUERY_PARAM)
if signature is None:
return False
try:
claims = jwt.decode(
signature, secret, algorithms=["HS256"], options={"verify_iss": False}
)
except jwt.InvalidTokenError:
return False
if claims["path"] != request.path:
return False
refresh_token = await hass.auth.async_get_refresh_token(claims["iss"])
if refresh_token is None:
return False
request[KEY_HASS_USER] = refresh_token.user
return True
@middleware
async def auth_middleware(request, handler):
"""Authenticate as middleware."""
authenticated = False
if hdrs.AUTHORIZATION in request.headers and await async_validate_auth_header(
request
):
authenticated = True
auth_type = "bearer token"
# We first start with a string check to avoid parsing query params
# for every request.
elif (
request.method == "GET"
and SIGN_QUERY_PARAM in request.query
and await async_validate_signed_request(request)
):
authenticated = True
auth_type = "signed request"
if authenticated:
_LOGGER.debug(
"Authenticated %s for %s using %s",
request[KEY_REAL_IP],
request.path,
auth_type,
)
request[KEY_AUTHENTICATED] = authenticated
return await handler(request)
app.middlewares.append(auth_middleware)