Do not ban supervisor ip if set (#33781)
* Use asynctest patch instead of mock_coro * Add test for supervisor ip ban * Do not ban supervisor ip if set * Extract supervisor ip helper * Check supervisor ip before banning * Remove added blank line * Clean up get supervisor ip Co-Authored-By: Pascal Vizeli <pvizeli@syshack.ch> Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch>pull/33819/head
parent
8d61893c39
commit
c3decc6531
homeassistant/components
hassio
http
tests/components
|
@ -143,6 +143,14 @@ def is_hassio(hass):
|
||||||
return DOMAIN in hass.config.components
|
return DOMAIN in hass.config.components
|
||||||
|
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def get_supervisor_ip():
|
||||||
|
"""Return the supervisor ip address."""
|
||||||
|
if "SUPERVISOR" not in os.environ:
|
||||||
|
return None
|
||||||
|
return os.environ["SUPERVISOR"].partition(":")[0]
|
||||||
|
|
||||||
|
|
||||||
async def async_setup(hass, config):
|
async def async_setup(hass, config):
|
||||||
"""Set up the Hass.io component."""
|
"""Set up the Hass.io component."""
|
||||||
# Check local setup
|
# Check local setup
|
||||||
|
|
|
@ -110,6 +110,12 @@ async def process_wrong_login(request):
|
||||||
|
|
||||||
request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr] += 1
|
request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr] += 1
|
||||||
|
|
||||||
|
# Supervisor IP should never be banned
|
||||||
|
if "hassio" in hass.config.components and hass.components.hassio.get_supervisor_ip() == str(
|
||||||
|
remote_addr
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
if (
|
if (
|
||||||
request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr]
|
request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr]
|
||||||
>= request.app[KEY_LOGIN_THRESHOLD]
|
>= request.app[KEY_LOGIN_THRESHOLD]
|
||||||
|
|
|
@ -1,16 +1,12 @@
|
||||||
"""Fixtures for component testing."""
|
"""Fixtures for component testing."""
|
||||||
from unittest.mock import patch
|
from asynctest import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tests.common import mock_coro
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def prevent_io():
|
def prevent_io():
|
||||||
"""Fixture to prevent certain I/O from happening."""
|
"""Fixture to prevent certain I/O from happening."""
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.http.ban.async_load_ip_bans_config",
|
"homeassistant.components.http.ban.async_load_ip_bans_config", return_value=[],
|
||||||
side_effect=lambda *args: mock_coro([]),
|
|
||||||
):
|
):
|
||||||
yield
|
yield
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
"""The tests for the Home Assistant HTTP component."""
|
"""The tests for the Home Assistant HTTP component."""
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
from unittest.mock import Mock, mock_open, patch
|
import os
|
||||||
|
from unittest.mock import Mock, mock_open
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from aiohttp.web_exceptions import HTTPUnauthorized
|
from aiohttp.web_exceptions import HTTPUnauthorized
|
||||||
from aiohttp.web_middlewares import middleware
|
from aiohttp.web_middlewares import middleware
|
||||||
|
from asynctest import patch
|
||||||
|
import pytest
|
||||||
|
|
||||||
import homeassistant.components.http as http
|
import homeassistant.components.http as http
|
||||||
from homeassistant.components.http import KEY_AUTHENTICATED
|
from homeassistant.components.http import KEY_AUTHENTICATED
|
||||||
|
@ -21,20 +24,31 @@ from homeassistant.setup import async_setup_component
|
||||||
|
|
||||||
from . import mock_real_ip
|
from . import mock_real_ip
|
||||||
|
|
||||||
from tests.common import mock_coro
|
SUPERVISOR_IP = "1.2.3.4"
|
||||||
|
|
||||||
BANNED_IPS = ["200.201.202.203", "100.64.0.2"]
|
BANNED_IPS = ["200.201.202.203", "100.64.0.2"]
|
||||||
|
BANNED_IPS_WITH_SUPERVISOR = BANNED_IPS + [SUPERVISOR_IP]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="hassio_env")
|
||||||
|
def hassio_env_fixture():
|
||||||
|
"""Fixture to inject hassio env."""
|
||||||
|
with patch.dict(os.environ, {"HASSIO": "127.0.0.1"}), patch(
|
||||||
|
"homeassistant.components.hassio.HassIO.is_connected",
|
||||||
|
return_value={"result": "ok", "data": {}},
|
||||||
|
), patch.dict(os.environ, {"HASSIO_TOKEN": "123456"}):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
async def test_access_from_banned_ip(hass, aiohttp_client):
|
async def test_access_from_banned_ip(hass, aiohttp_client):
|
||||||
"""Test accessing to server from banned IP. Both trusted and not."""
|
"""Test accessing to server from banned IP. Both trusted and not."""
|
||||||
app = web.Application()
|
app = web.Application()
|
||||||
|
app["hass"] = hass
|
||||||
setup_bans(hass, app, 5)
|
setup_bans(hass, app, 5)
|
||||||
set_real_ip = mock_real_ip(app)
|
set_real_ip = mock_real_ip(app)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.http.ban.async_load_ip_bans_config",
|
"homeassistant.components.http.ban.async_load_ip_bans_config",
|
||||||
return_value=mock_coro([IpBan(banned_ip) for banned_ip in BANNED_IPS]),
|
return_value=[IpBan(banned_ip) for banned_ip in BANNED_IPS],
|
||||||
):
|
):
|
||||||
client = await aiohttp_client(app)
|
client = await aiohttp_client(app)
|
||||||
|
|
||||||
|
@ -44,6 +58,48 @@ async def test_access_from_banned_ip(hass, aiohttp_client):
|
||||||
assert resp.status == 403
|
assert resp.status == 403
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"remote_addr, bans, status",
|
||||||
|
list(zip(BANNED_IPS_WITH_SUPERVISOR, [1, 1, 0], [403, 403, 401])),
|
||||||
|
)
|
||||||
|
async def test_access_from_supervisor_ip(
|
||||||
|
remote_addr, bans, status, hass, aiohttp_client, hassio_env
|
||||||
|
):
|
||||||
|
"""Test accessing to server from supervisor IP."""
|
||||||
|
app = web.Application()
|
||||||
|
app["hass"] = hass
|
||||||
|
|
||||||
|
async def unauth_handler(request):
|
||||||
|
"""Return a mock web response."""
|
||||||
|
raise HTTPUnauthorized
|
||||||
|
|
||||||
|
app.router.add_get("/", unauth_handler)
|
||||||
|
setup_bans(hass, app, 1)
|
||||||
|
mock_real_ip(app)(remote_addr)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.http.ban.async_load_ip_bans_config", return_value=[],
|
||||||
|
):
|
||||||
|
client = await aiohttp_client(app)
|
||||||
|
|
||||||
|
assert await async_setup_component(hass, "hassio", {"hassio": {}})
|
||||||
|
|
||||||
|
m_open = mock_open()
|
||||||
|
|
||||||
|
with patch.dict(os.environ, {"SUPERVISOR": SUPERVISOR_IP}), patch(
|
||||||
|
"homeassistant.components.http.ban.open", m_open, create=True
|
||||||
|
):
|
||||||
|
resp = await client.get("/")
|
||||||
|
assert resp.status == 401
|
||||||
|
assert len(app[KEY_BANNED_IPS]) == bans
|
||||||
|
assert m_open.call_count == bans
|
||||||
|
|
||||||
|
# second request should be forbidden if banned
|
||||||
|
resp = await client.get("/")
|
||||||
|
assert resp.status == status
|
||||||
|
assert len(app[KEY_BANNED_IPS]) == bans
|
||||||
|
|
||||||
|
|
||||||
async def test_ban_middleware_not_loaded_by_config(hass):
|
async def test_ban_middleware_not_loaded_by_config(hass):
|
||||||
"""Test accessing to server from banned IP when feature is off."""
|
"""Test accessing to server from banned IP when feature is off."""
|
||||||
with patch("homeassistant.components.http.setup_bans") as mock_setup:
|
with patch("homeassistant.components.http.setup_bans") as mock_setup:
|
||||||
|
@ -77,26 +133,26 @@ async def test_ip_bans_file_creation(hass, aiohttp_client):
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.http.ban.async_load_ip_bans_config",
|
"homeassistant.components.http.ban.async_load_ip_bans_config",
|
||||||
return_value=mock_coro([IpBan(banned_ip) for banned_ip in BANNED_IPS]),
|
return_value=[IpBan(banned_ip) for banned_ip in BANNED_IPS],
|
||||||
):
|
):
|
||||||
client = await aiohttp_client(app)
|
client = await aiohttp_client(app)
|
||||||
|
|
||||||
m = mock_open()
|
m_open = mock_open()
|
||||||
|
|
||||||
with patch("homeassistant.components.http.ban.open", m, create=True):
|
with patch("homeassistant.components.http.ban.open", m_open, create=True):
|
||||||
resp = await client.get("/")
|
resp = await client.get("/")
|
||||||
assert resp.status == 401
|
assert resp.status == 401
|
||||||
assert len(app[KEY_BANNED_IPS]) == len(BANNED_IPS)
|
assert len(app[KEY_BANNED_IPS]) == len(BANNED_IPS)
|
||||||
assert m.call_count == 0
|
assert m_open.call_count == 0
|
||||||
|
|
||||||
resp = await client.get("/")
|
resp = await client.get("/")
|
||||||
assert resp.status == 401
|
assert resp.status == 401
|
||||||
assert len(app[KEY_BANNED_IPS]) == len(BANNED_IPS) + 1
|
assert len(app[KEY_BANNED_IPS]) == len(BANNED_IPS) + 1
|
||||||
m.assert_called_once_with(hass.config.path(IP_BANS_FILE), "a")
|
m_open.assert_called_once_with(hass.config.path(IP_BANS_FILE), "a")
|
||||||
|
|
||||||
resp = await client.get("/")
|
resp = await client.get("/")
|
||||||
assert resp.status == 403
|
assert resp.status == 403
|
||||||
assert m.call_count == 1
|
assert m_open.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
async def test_failed_login_attempts_counter(hass, aiohttp_client):
|
async def test_failed_login_attempts_counter(hass, aiohttp_client):
|
||||||
|
|
Loading…
Reference in New Issue