diff --git a/homeassistant/components/http/security_filter.py b/homeassistant/components/http/security_filter.py index a9b32bd7f4c..e8e3aa4699c 100644 --- a/homeassistant/components/http/security_filter.py +++ b/homeassistant/components/http/security_filter.py @@ -35,6 +35,9 @@ FILTERS: Final = re.compile( ) # fmt: on +# Unsafe bytes to be removed per WHATWG spec +UNSAFE_URL_BYTES = ["\t", "\r", "\n"] + @callback def setup_security_filter(app: Application) -> None: @@ -51,6 +54,21 @@ def setup_security_filter(app: Application) -> None: request: Request, handler: Callable[[Request], Awaitable[StreamResponse]] ) -> StreamResponse: """Process request and block commonly known exploit attempts.""" + for unsafe_byte in UNSAFE_URL_BYTES: + if unsafe_byte in request.path: + _LOGGER.warning( + "Filtered a request with an unsafe byte in path: %s", + request.raw_path, + ) + raise HTTPBadRequest + + if unsafe_byte in request.query_string: + _LOGGER.warning( + "Filtered a request with unsafe byte query string: %s", + request.raw_path, + ) + raise HTTPBadRequest + if FILTERS.search(_recursive_unquote(request.path)): _LOGGER.warning( "Filtered a potential harmful request to: %s", request.raw_path diff --git a/tests/components/http/test_security_filter.py b/tests/components/http/test_security_filter.py index 1c139a59161..5469b7ebfa7 100644 --- a/tests/components/http/test_security_filter.py +++ b/tests/components/http/test_security_filter.py @@ -107,3 +107,54 @@ async def test_bad_requests( if fail_on_query_string: message = "Filtered a request with a potential harmful query string:" assert message in caplog.text + + +@pytest.mark.parametrize( + ("request_path", "request_params", "fail_on_query_string"), + [ + ("/some\thing", {}, False), + ("/new\nline/cinema", {}, False), + ("/return\r/to/sender", {}, False), + ("/", {"some": "\thing"}, True), + ("/", {"\newline": "cinema"}, True), + ("/", {"return": "t\rue"}, True), + ], +) +async def test_bad_requests_with_unsafe_bytes( + request_path, + request_params, + fail_on_query_string, + aiohttp_client: ClientSessionGenerator, + caplog: pytest.LogCaptureFixture, + loop, +) -> None: + """Test request with unsafe bytes in their URLs.""" + app = web.Application() + app.router.add_get("/{all:.*}", mock_handler) + + setup_security_filter(app) + + mock_api_client = await aiohttp_client(app) + + # Manual params handling + if request_params: + raw_params = "&".join(f"{val}={key}" for val, key in request_params.items()) + man_params = f"?{raw_params}" + else: + man_params = "" + + http = urllib3.PoolManager() + resp = await loop.run_in_executor( + None, + http.request, + "GET", + f"http://{mock_api_client.host}:{mock_api_client.port}{request_path}{man_params}", + request_params, + ) + + assert resp.status == HTTPStatus.BAD_REQUEST + + message = "Filtered a request with an unsafe byte in path:" + if fail_on_query_string: + message = "Filtered a request with unsafe byte query string:" + assert message in caplog.text