Add filtering
parent
3800a4feee
commit
f047d04882
|
@ -30,6 +30,7 @@ from .const import KEY_AUTHENTICATED, KEY_HASS, KEY_HASS_USER # noqa: F401
|
|||
from .cors import setup_cors
|
||||
from .forwarded import async_setup_forwarded
|
||||
from .request_context import setup_request_context
|
||||
from .security_filter import setup_security_filter
|
||||
from .static import CACHE_HEADERS, CachingStaticResource
|
||||
from .view import HomeAssistantView # noqa: F401
|
||||
from .web_runner import HomeAssistantTCPSite
|
||||
|
@ -296,7 +297,10 @@ class HomeAssistantHTTP:
|
|||
)
|
||||
app[KEY_HASS] = hass
|
||||
|
||||
# Order matters, forwarded middleware needs to go first.
|
||||
# Order matters, security filters middle ware needs to go first,
|
||||
# forwarded middleware needs to go second.
|
||||
setup_security_filter(app)
|
||||
|
||||
# Only register middleware if `use_x_forwarded_for` is enabled
|
||||
# and trusted proxies are provided
|
||||
if use_x_forwarded_for and trusted_proxies:
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
"""Middleware to add some basic security filtering to requests."""
|
||||
import logging
|
||||
import re
|
||||
|
||||
from aiohttp.web import HTTPBadRequest, middleware
|
||||
|
||||
from homeassistant.core import callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# mypy: allow-untyped-defs
|
||||
|
||||
# fmt: off
|
||||
FILTERS = re.compile(
|
||||
r"(?:"
|
||||
|
||||
# Common exploits
|
||||
r"proc/self/environ"
|
||||
r"|(<|%3C).*script.*(>|%3E)"
|
||||
|
||||
# File Injections
|
||||
r"|(\.\.//?)+" # ../../anywhere
|
||||
r"|[a-zA-Z0-9_]=/([a-z0-9_.]//?)+" # .html?v=/.//test
|
||||
|
||||
# SQL Injections
|
||||
r"|union.*select.*\("
|
||||
r"|union.*all.*select.*"
|
||||
r"|concat.*\("
|
||||
|
||||
r")",
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@callback
|
||||
def setup_security_filter(app):
|
||||
"""Create security filter middleware for the app."""
|
||||
|
||||
@middleware
|
||||
async def security_filter_middleware(request, handler):
|
||||
"""Process request and block commonly known exploit attempts."""
|
||||
if FILTERS.search(request.raw_path):
|
||||
_LOGGER.warning(
|
||||
"Filtered a potential harmful request to: %s", request.raw_path
|
||||
)
|
||||
raise HTTPBadRequest
|
||||
|
||||
return await handler(request)
|
||||
|
||||
app.middlewares.append(security_filter_middleware)
|
|
@ -0,0 +1,58 @@
|
|||
"""Test security filter middleware."""
|
||||
from aiohttp import web
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.http.security_filter import setup_security_filter
|
||||
|
||||
|
||||
async def mock_handler(request):
|
||||
"""Return OK."""
|
||||
return web.Response(text="OK")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"request_path,request_params",
|
||||
[
|
||||
("/", {}),
|
||||
("/lovelace/dashboard", {}),
|
||||
("/frontend_latest/chunk.4c9e2d8dc10f77b885b0.js", {}),
|
||||
("/static/translations/en-f96a262a5a6eede29234dc45dc63abf2.json", {}),
|
||||
("/", {"test": "123"}),
|
||||
],
|
||||
)
|
||||
async def test_ok_requests(request_path, request_params, aiohttp_client):
|
||||
"""Test request paths that should not be filtered."""
|
||||
app = web.Application()
|
||||
app.router.add_get("/{all:.*}", mock_handler)
|
||||
|
||||
setup_security_filter(app)
|
||||
|
||||
mock_api_client = await aiohttp_client(app)
|
||||
resp = await mock_api_client.get(request_path, params=request_params)
|
||||
|
||||
assert resp.status == 200
|
||||
assert await resp.text() == "OK"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"request_path,request_params",
|
||||
[
|
||||
("/proc/self/environ", {}),
|
||||
("/", {"test": "/test/../../api"}),
|
||||
("/", {"test": "test/../../api"}),
|
||||
("/", {"sql": ";UNION SELECT (a, b"}),
|
||||
("/", {"sql": "concat(..."}),
|
||||
("/", {"xss": "<script >"}),
|
||||
],
|
||||
)
|
||||
async def test_bad_requests(request_path, request_params, aiohttp_client):
|
||||
"""Test request paths that should be filtered."""
|
||||
app = web.Application()
|
||||
app.router.add_get("/{all:.*}", mock_handler)
|
||||
|
||||
setup_security_filter(app)
|
||||
|
||||
mock_api_client = await aiohttp_client(app)
|
||||
resp = await mock_api_client.get(request_path, params=request_params)
|
||||
|
||||
assert resp.status == 400
|
Loading…
Reference in New Issue