Extend IP ban / failed login notification information (#39020)
parent
d4f1fd7396
commit
475e70986b
|
@ -3,6 +3,7 @@ from collections import defaultdict
|
|||
from datetime import datetime
|
||||
from ipaddress import ip_address
|
||||
import logging
|
||||
from socket import gethostbyaddr, herror
|
||||
from typing import List, Optional
|
||||
|
||||
from aiohttp.web import middleware
|
||||
|
@ -93,12 +94,25 @@ async def process_wrong_login(request):
|
|||
Increase failed login attempts counter for remote IP address.
|
||||
Add ip ban entry if failed login attempts exceeds threshold.
|
||||
"""
|
||||
remote_addr = ip_address(request.remote)
|
||||
hass = request.app["hass"]
|
||||
|
||||
remote_addr = ip_address(request.remote)
|
||||
remote_host = request.remote
|
||||
try:
|
||||
remote_host, _, _ = await hass.async_add_executor_job(
|
||||
gethostbyaddr, request.remote
|
||||
)
|
||||
except herror:
|
||||
pass
|
||||
|
||||
msg = f"Login attempt or request with invalid authentication from {remote_host} ({remote_addr})"
|
||||
|
||||
user_agent = request.headers.get("user-agent")
|
||||
if user_agent:
|
||||
msg = f"{msg} ({user_agent})"
|
||||
|
||||
msg = f"Login attempt or request with invalid authentication from {remote_addr}"
|
||||
_LOGGER.warning(msg)
|
||||
|
||||
hass = request.app["hass"]
|
||||
hass.components.persistent_notification.async_create(
|
||||
msg, "Login attempt failed", NOTIFICATION_ID_LOGIN
|
||||
)
|
||||
|
|
|
@ -24,6 +24,7 @@ from homeassistant.setup import async_setup_component
|
|||
from . import mock_real_ip
|
||||
|
||||
from tests.async_mock import Mock, mock_open, patch
|
||||
from tests.common import async_mock_service
|
||||
|
||||
SUPERVISOR_IP = "1.2.3.4"
|
||||
BANNED_IPS = ["200.201.202.203", "100.64.0.2"]
|
||||
|
@ -40,6 +41,16 @@ def hassio_env_fixture():
|
|||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def gethostbyaddr_mock():
|
||||
"""Fixture to mock out I/O on getting host by address."""
|
||||
with patch(
|
||||
"homeassistant.components.http.ban.gethostbyaddr",
|
||||
return_value=("example.com", ["0.0.0.0.in-addr.arpa"], ["0.0.0.0"]),
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
async def test_access_from_banned_ip(hass, aiohttp_client):
|
||||
"""Test accessing to server from banned IP. Both trusted and not."""
|
||||
app = web.Application()
|
||||
|
@ -125,6 +136,8 @@ async def test_ban_middleware_loaded_by_default(hass):
|
|||
|
||||
async def test_ip_bans_file_creation(hass, aiohttp_client):
|
||||
"""Testing if banned IP file created."""
|
||||
notification_calls = async_mock_service(hass, "persistent_notification", "create")
|
||||
|
||||
app = web.Application()
|
||||
app["hass"] = hass
|
||||
|
||||
|
@ -159,6 +172,12 @@ async def test_ip_bans_file_creation(hass, aiohttp_client):
|
|||
assert resp.status == HTTP_FORBIDDEN
|
||||
assert m_open.call_count == 1
|
||||
|
||||
assert len(notification_calls) == 3
|
||||
assert (
|
||||
"Login attempt or request with invalid authentication from example.com (200.201.202.204) (Python"
|
||||
in notification_calls[0].data["message"]
|
||||
)
|
||||
|
||||
|
||||
async def test_failed_login_attempts_counter(hass, aiohttp_client):
|
||||
"""Testing if failed login attempts counter increased."""
|
||||
|
|
Loading…
Reference in New Issue