"""Test real forwarded middleware.""" from http import HTTPStatus from ipaddress import ip_network from unittest.mock import Mock, patch from aiohttp import web from aiohttp.hdrs import X_FORWARDED_FOR, X_FORWARDED_HOST, X_FORWARDED_PROTO import pytest from homeassistant.components.http.forwarded import async_setup_forwarded async def mock_handler(request): """Return the real IP as text.""" return web.Response(text=request.remote) async def test_x_forwarded_for_without_trusted_proxy(aiohttp_client, caplog): """Test that we get the IP from the transport.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == "127.0.0.1" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, []) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"}) assert resp.status == HTTPStatus.BAD_REQUEST assert ( "Received X-Forwarded-For header from an untrusted proxy 127.0.0.1" in caplog.text ) @pytest.mark.parametrize( "trusted_proxies,x_forwarded_for,remote", [ ( ["127.0.0.0/24", "1.1.1.1", "10.10.10.0/24"], "10.10.10.10, 1.1.1.1", "10.10.10.10", ), (["127.0.0.0/24", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"), (["127.0.0.0/24", "1.1.1.1"], "123.123.123.123,2.2.2.2,1.1.1.1", "2.2.2.2"), (["127.0.0.0/24"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "1.1.1.1"), (["127.0.0.0/24"], "127.0.0.1", "127.0.0.1"), (["127.0.0.1", "1.1.1.1"], "123.123.123.123, 1.1.1.1", "123.123.123.123"), (["127.0.0.1", "1.1.1.1"], "123.123.123.123, 2.2.2.2, 1.1.1.1", "2.2.2.2"), (["127.0.0.1"], "255.255.255.255", "255.255.255.255"), ], ) async def test_x_forwarded_for_with_trusted_proxy( trusted_proxies, x_forwarded_for, remote, aiohttp_client ): """Test that we get the IP from the forwarded for header.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == remote return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded( app, True, [ip_network(trusted_proxy) for trusted_proxy in trusted_proxies] ) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for}) assert resp.status == HTTPStatus.OK async def test_x_forwarded_for_disabled_with_proxy(aiohttp_client, caplog): """Test that we warn when processing is disabled, but proxy has been detected.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == "127.0.0.1" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, False, []) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: "255.255.255.255"}) assert resp.status == HTTPStatus.BAD_REQUEST assert ( "A request from a reverse proxy was received from 127.0.0.1, but your HTTP " "integration is not set-up for reverse proxies" in caplog.text ) async def test_x_forwarded_for_with_spoofed_header(aiohttp_client): """Test that we get the IP from the transport with a spoofed header.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == "255.255.255.255" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={X_FORWARDED_FOR: "222.222.222.222, 255.255.255.255"} ) assert resp.status == HTTPStatus.OK @pytest.mark.parametrize( "x_forwarded_for", [ "This value is invalid", "1.1.1.1, , 1.2.3.4", "1.1.1.1,,1.2.3.4", "1.1.1.1, batman, 1.2.3.4", "192.168.0.0/24", "192.168.0.0/24, 1.1.1.1", ",", "", ], ) async def test_x_forwarded_for_with_malformed_header( x_forwarded_for, aiohttp_client, caplog ): """Test that we get a HTTP 400 bad request with a malformed header.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_FOR: x_forwarded_for}) assert resp.status == HTTPStatus.BAD_REQUEST assert "Invalid IP address in X-Forwarded-For" in caplog.text async def test_x_forwarded_for_with_multiple_headers(aiohttp_client, caplog): """Test that we get a HTTP 400 bad request with multiple headers.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers=[ (X_FORWARDED_FOR, "222.222.222.222"), (X_FORWARDED_FOR, "123.123.123.123"), ], ) assert resp.status == HTTPStatus.BAD_REQUEST assert "Too many headers for X-Forwarded-For" in caplog.text @pytest.mark.parametrize( "x_forwarded_for,remote,x_forwarded_proto,secure", [ ("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https, http, http", True), ("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https,http,http", True), ("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "http", False), ( "10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "http, https, https", False, ), ("10.10.10.10, 127.0.0.1, 127.0.0.2", "10.10.10.10", "https", True), ( "255.255.255.255, 10.10.10.10, 127.0.0.1", "10.10.10.10", "http, https, http", True, ), ( "255.255.255.255, 10.10.10.10, 127.0.0.1", "10.10.10.10", "https, http, https", False, ), ("255.255.255.255, 10.10.10.10, 127.0.0.1", "10.10.10.10", "https", True), ], ) async def test_x_forwarded_proto_with_trusted_proxy( x_forwarded_for, remote, x_forwarded_proto, secure, aiohttp_client ): """Test that we get the proto header if proxy is trusted.""" async def handler(request): assert request.remote == remote assert request.scheme == ("https" if secure else "http") assert request.secure == secure return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.0/24")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={ X_FORWARDED_FOR: x_forwarded_for, X_FORWARDED_PROTO: x_forwarded_proto, }, ) assert resp.status == HTTPStatus.OK async def test_x_forwarded_proto_with_trusted_proxy_multiple_for(aiohttp_client): """Test that we get the proto with 1 element in the proto, multiple in the for.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "https" assert request.secure assert request.remote == "255.255.255.255" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.0/24")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={ X_FORWARDED_FOR: "255.255.255.255, 127.0.0.1, 127.0.0.2", X_FORWARDED_PROTO: "https", }, ) assert resp.status == HTTPStatus.OK async def test_x_forwarded_proto_not_processed_without_for(aiohttp_client): """Test that proto header isn't processed without a for header.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == "127.0.0.1" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_PROTO: "https"}) assert resp.status == HTTPStatus.OK async def test_x_forwarded_proto_with_multiple_headers(aiohttp_client, caplog): """Test that we get a HTTP 400 bad request with multiple headers.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers=[ (X_FORWARDED_FOR, "222.222.222.222"), (X_FORWARDED_PROTO, "https"), (X_FORWARDED_PROTO, "http"), ], ) assert resp.status == HTTPStatus.BAD_REQUEST assert "Too many headers for X-Forward-Proto" in caplog.text @pytest.mark.parametrize( "x_forwarded_proto", ["", ",", "https, , https", "https, https, "], ) async def test_x_forwarded_proto_empty_element( x_forwarded_proto, aiohttp_client, caplog ): """Test that we get a HTTP 400 bad request with empty proto.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={X_FORWARDED_FOR: "1.1.1.1", X_FORWARDED_PROTO: x_forwarded_proto}, ) assert resp.status == HTTPStatus.BAD_REQUEST assert "Empty item received in X-Forward-Proto header" in caplog.text @pytest.mark.parametrize( "x_forwarded_for,x_forwarded_proto,expected,got", [ ("1.1.1.1, 2.2.2.2", "https, https, https", 2, 3), ("1.1.1.1, 2.2.2.2, 3.3.3.3, 4.4.4.4", "https, https, https", 4, 3), ], ) async def test_x_forwarded_proto_incorrect_number_of_elements( x_forwarded_for, x_forwarded_proto, expected, got, aiohttp_client, caplog ): """Test that we get a HTTP 400 bad request with incorrect number of elements.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={ X_FORWARDED_FOR: x_forwarded_for, X_FORWARDED_PROTO: x_forwarded_proto, }, ) assert resp.status == HTTPStatus.BAD_REQUEST assert ( f"Incorrect number of elements in X-Forward-Proto. Expected 1 or {expected}, got {got}" in caplog.text ) async def test_x_forwarded_host_with_trusted_proxy(aiohttp_client): """Test that we get the host header if proxy is trusted.""" async def handler(request): assert request.host == "example.com" assert request.scheme == "http" assert not request.secure assert request.remote == "255.255.255.255" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={X_FORWARDED_FOR: "255.255.255.255", X_FORWARDED_HOST: "example.com"}, ) assert resp.status == HTTPStatus.OK async def test_x_forwarded_host_not_processed_without_for(aiohttp_client): """Test that host header isn't processed without a for header.""" async def handler(request): url = mock_api_client.make_url("/") assert request.host == f"{url.host}:{url.port}" assert request.scheme == "http" assert not request.secure assert request.remote == "127.0.0.1" return web.Response() app = web.Application() app.router.add_get("/", handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get("/", headers={X_FORWARDED_HOST: "example.com"}) assert resp.status == HTTPStatus.OK async def test_x_forwarded_host_with_multiple_headers(aiohttp_client, caplog): """Test that we get a HTTP 400 bad request with multiple headers.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers=[ (X_FORWARDED_FOR, "222.222.222.222"), (X_FORWARDED_HOST, "example.com"), (X_FORWARDED_HOST, "example.spoof"), ], ) assert resp.status == HTTPStatus.BAD_REQUEST assert "Too many headers for X-Forwarded-Host" in caplog.text async def test_x_forwarded_host_with_empty_header(aiohttp_client, caplog): """Test that we get a HTTP 400 bad request with empty host value.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) resp = await mock_api_client.get( "/", headers={X_FORWARDED_FOR: "222.222.222.222", X_FORWARDED_HOST: ""} ) assert resp.status == HTTPStatus.BAD_REQUEST assert "Empty value received in X-Forward-Host header" in caplog.text async def test_x_forwarded_cloud(aiohttp_client, caplog): """Test that cloud requests are not processed.""" app = web.Application() app.router.add_get("/", mock_handler) async_setup_forwarded(app, True, [ip_network("127.0.0.1")]) mock_api_client = await aiohttp_client(app) with patch( "hass_nabucasa.remote.is_cloud_request", Mock(get=Mock(return_value=True)) ): resp = await mock_api_client.get( "/", headers={X_FORWARDED_FOR: "222.222.222.222", X_FORWARDED_HOST: ""} ) # This request would normally fail because it's invalid, now it works. assert resp.status == HTTPStatus.OK