WSGI: Hide password in logs (#2164)

* WSGI: Hide password in logs

* Add auth + pw in logs tests
pull/1951/head
Paulus Schoutsen 2016-05-24 23:19:37 -07:00
parent 88bb136813
commit 415cfc2537
4 changed files with 136 additions and 40 deletions

View File

@ -31,8 +31,29 @@ _FINGERPRINT = re.compile(r'^(.+)-[a-z0-9]{32}\.(\w+)$', re.IGNORECASE)
_LOGGER = logging.getLogger(__name__)
class HideSensitiveFilter(logging.Filter):
"""Filter API password calls."""
# pylint: disable=too-few-public-methods
def __init__(self, hass):
"""Initialize sensitive data filter."""
super().__init__()
self.hass = hass
def filter(self, record):
"""Hide sensitive data in messages."""
if self.hass.wsgi.api_password is None:
return True
record.msg = record.msg.replace(self.hass.wsgi.api_password, '*******')
return True
def setup(hass, config):
"""Set up the HTTP API and debug interface."""
_LOGGER.addFilter(HideSensitiveFilter(hass))
conf = config.get(DOMAIN, {})
api_password = util.convert(conf.get(CONF_API_PASSWORD), str)
@ -202,7 +223,7 @@ class HomeAssistantWSGI(object):
"""Register a redirect with the server.
If given this must be either a string or callable. In case of a
callable its called with the url adapter that triggered the match and
callable it's called with the url adapter that triggered the match and
the values of the URL as keyword arguments and has to return the target
for the redirect, otherwise it has to be a string with placeholders in
rule syntax.
@ -245,7 +266,7 @@ class HomeAssistantWSGI(object):
if self.ssl_certificate:
sock = eventlet.wrap_ssl(sock, certfile=self.ssl_certificate,
keyfile=self.ssl_key, server_side=True)
wsgi.server(sock, self)
wsgi.server(sock, self, log=_LOGGER)
def dispatch_request(self, request):
"""Handle incoming request."""
@ -318,9 +339,7 @@ class HomeAssistantView(object):
def handle_request(self, request, **values):
"""Handle request to url."""
from werkzeug.exceptions import (
MethodNotAllowed, Unauthorized, BadRequest,
)
from werkzeug.exceptions import MethodNotAllowed, Unauthorized
try:
handler = getattr(self, request.method.lower())
@ -342,18 +361,6 @@ class HomeAssistantView(object):
self.hass.wsgi.api_password):
authenticated = True
else:
# Do we still want to support passing it in as post data?
try:
json_data = request.json
if (json_data is not None and
hmac.compare_digest(
json_data.get(DATA_API_PASSWORD, ''),
self.hass.wsgi.api_password)):
authenticated = True
except BadRequest:
pass
if self.requires_auth and not authenticated:
raise Unauthorized()

View File

@ -4,5 +4,6 @@ coveralls>=1.1
pytest>=2.9.1
pytest-cov>=2.2.0
pytest-timeout>=1.0.0
pytest-capturelog>=0.7
betamax==0.5.1
pydocstyle>=1.0.0

View File

@ -1,4 +1,4 @@
"""The tests for the Home Assistant HTTP component."""
"""The tests for the Home Assistant API component."""
# pylint: disable=protected-access,too-many-public-methods
# from contextlib import closing
import json
@ -66,28 +66,6 @@ class TestAPI(unittest.TestCase):
"""Stop everything that was started."""
hass.pool.block_till_done()
# TODO move back to http component and test with use_auth.
def test_access_denied_without_password(self):
"""Test access without password."""
req = requests.get(_url(const.URL_API))
self.assertEqual(401, req.status_code)
def test_access_denied_with_wrong_password(self):
"""Test ascces with wrong password."""
req = requests.get(
_url(const.URL_API),
headers={const.HTTP_HEADER_HA_AUTH: 'wrongpassword'})
self.assertEqual(401, req.status_code)
def test_access_with_password_in_url(self):
"""Test access with password in URL."""
req = requests.get(
"{}?api_password={}".format(_url(const.URL_API), API_PASSWORD))
self.assertEqual(200, req.status_code)
def test_api_list_state_entities(self):
"""Test if the debug interface allows us to list state entities."""
req = requests.get(_url(const.URL_API_STATES),

View File

@ -0,0 +1,110 @@
"""The tests for the Home Assistant HTTP component."""
# pylint: disable=protected-access,too-many-public-methods
import logging
import eventlet
import requests
from homeassistant import bootstrap, const
import homeassistant.components.http as http
from tests.common import get_test_instance_port, get_test_home_assistant
API_PASSWORD = "test1234"
SERVER_PORT = get_test_instance_port()
HTTP_BASE_URL = "http://127.0.0.1:{}".format(SERVER_PORT)
HA_HEADERS = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
}
hass = None
def _url(path=""):
"""Helper method to generate URLs."""
return HTTP_BASE_URL + path
def setUpModule(): # pylint: disable=invalid-name
"""Initialize a Home Assistant server."""
global hass
hass = get_test_home_assistant()
hass.bus.listen('test_event', lambda _: _)
hass.states.set('test.test', 'a_state')
bootstrap.setup_component(
hass, http.DOMAIN,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: SERVER_PORT}})
bootstrap.setup_component(hass, 'api')
hass.start()
eventlet.sleep(0.05)
def tearDownModule(): # pylint: disable=invalid-name
"""Stop the Home Assistant server."""
hass.stop()
class TestHttp:
"""Test HTTP component."""
def test_access_denied_without_password(self):
"""Test access without password."""
req = requests.get(_url(const.URL_API))
assert req.status_code == 401
def test_access_denied_with_wrong_password_in_header(self):
"""Test ascces with wrong password."""
req = requests.get(
_url(const.URL_API),
headers={const.HTTP_HEADER_HA_AUTH: 'wrongpassword'})
assert req.status_code == 401
def test_access_with_password_in_header(self, caplog):
"""Test access with password in URL."""
# Hide logging from requests package that we use to test logging
caplog.setLevel(logging.WARNING,
logger='requests.packages.urllib3.connectionpool')
req = requests.get(
_url(const.URL_API),
headers={const.HTTP_HEADER_HA_AUTH: API_PASSWORD})
assert req.status_code == 200
logs = caplog.text()
assert const.URL_API in logs
assert API_PASSWORD not in logs
def test_access_denied_with_wrong_password_in_url(self):
"""Test ascces with wrong password."""
req = requests.get(_url(const.URL_API),
params={'api_password': 'wrongpassword'})
assert req.status_code == 401
def test_access_with_password_in_url(self, caplog):
"""Test access with password in URL."""
# Hide logging from requests package that we use to test logging
caplog.setLevel(logging.WARNING,
logger='requests.packages.urllib3.connectionpool')
req = requests.get(_url(const.URL_API),
params={'api_password': API_PASSWORD})
assert req.status_code == 200
logs = caplog.text()
assert const.URL_API in logs
assert API_PASSWORD not in logs