Fix PEP257 issues (#3962)

pull/3966/head
Fabian Affolter 2016-10-20 19:10:12 +02:00 committed by GitHub
parent c32afcd961
commit 3aa1b6a3f8
5 changed files with 93 additions and 110 deletions

View File

@ -28,13 +28,13 @@ CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
CONF_EXCLUDE: vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]): vol.All(cv.ensure_list,
[cv.string])
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
CONF_INCLUDE: vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]): vol.All(cv.ensure_list,
[cv.string])
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
})
}),
}, extra=vol.ALLOW_EXTRA)
@ -244,7 +244,7 @@ class Filters(object):
self.included_domains = []
def apply(self, query, entity_ids=None):
"""Apply the Include/exclude filter on domains and entities on query.
"""Apply the include/exclude filter on domains and entities on query.
Following rules apply:
* only the include section is configured - just query the specified
@ -278,8 +278,8 @@ class Filters(object):
filter_query &= (states.domain.in_(self.included_domains) |
states.entity_id.in_(self.included_entities))
else:
filter_query &= (states.domain.in_(self.included_domains) &
~states.domain.in_(self.excluded_domains))
filter_query &= (states.domain.in_(self.included_domains) & ~
states.domain.in_(self.excluded_domains))
# no domain filter just included entities
elif not self.excluded_domains and not self.included_domains and \
self.included_entities:

View File

@ -7,7 +7,10 @@ from homeassistant.components.media_player import cast
class FakeChromeCast(object):
"""A fake Chrome Cast."""
def __init__(self, host, port):
"""Initialize the fake Chrome Cast."""
self.host = host
self.port = port
@ -19,7 +22,6 @@ class TestCastMediaPlayer(unittest.TestCase):
@patch('pychromecast.get_chromecasts')
def test_filter_duplicates(self, mock_get_chromecasts, mock_device):
"""Test filtering of duplicates."""
mock_get_chromecasts.return_value = [
FakeChromeCast('some_host', cast.DEFAULT_PORT)
]

View File

@ -12,18 +12,16 @@ import homeassistant.components.http as http
from tests.common import get_test_instance_port, get_test_home_assistant
API_PASSWORD = "test1234"
API_PASSWORD = 'test1234'
SERVER_PORT = get_test_instance_port()
HTTP_BASE = "127.0.0.1:{}".format(SERVER_PORT)
HTTP_BASE_URL = "http://{}".format(HTTP_BASE)
HTTP_BASE = '127.0.0.1:{}'.format(SERVER_PORT)
HTTP_BASE_URL = 'http://{}'.format(HTTP_BASE)
HA_HEADERS = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
}
# dont' add 127.0.0.1/::1 as trusted, as it may interfere with other test cases
TRUSTED_NETWORKS = ["192.0.2.0/24",
"2001:DB8:ABCD::/48",
'100.64.0.1',
# Don't add 127.0.0.1/::1 as trusted, as it may interfere with other test cases
TRUSTED_NETWORKS = ['192.0.2.0/24', '2001:DB8:ABCD::/48', '100.64.0.1',
'FD01:DB8::1']
CORS_ORIGINS = [HTTP_BASE_URL, HTTP_BASE]
@ -31,12 +29,13 @@ CORS_ORIGINS = [HTTP_BASE_URL, HTTP_BASE]
hass = None
def _url(path=""):
def _url(path=''):
"""Helper method to generate URLs."""
return HTTP_BASE_URL + path
def setUpModule(): # pylint: disable=invalid-name
# pylint: disable=invalid-name
def setUpModule():
"""Initialize a Home Assistant server."""
global hass
@ -46,10 +45,14 @@ def setUpModule(): # pylint: disable=invalid-name
hass.states.set('test.test', 'a_state')
bootstrap.setup_component(
hass, http.DOMAIN,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: SERVER_PORT,
http.CONF_CORS_ORIGINS: CORS_ORIGINS}})
hass, http.DOMAIN, {
http.DOMAIN: {
http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: SERVER_PORT,
http.CONF_CORS_ORIGINS: CORS_ORIGINS,
}
}
)
bootstrap.setup_component(hass, 'api')
@ -85,16 +88,13 @@ class TestHttp:
def test_access_denied_with_untrusted_ip(self, caplog):
"""Test access with an untrusted ip address."""
for remote_addr in ['198.51.100.1',
'2001:DB8:FA1::1',
'127.0.0.1',
for remote_addr in ['198.51.100.1', '2001:DB8:FA1::1', '127.0.0.1',
'::1']:
with patch('homeassistant.components.http.'
'HomeAssistantWSGI.get_real_ip',
return_value=remote_addr):
req = requests.get(_url(const.URL_API),
params={'api_password': ''})
req = requests.get(
_url(const.URL_API), params={'api_password': ''})
assert req.status_code == 401, \
"{} shouldn't be trusted".format(remote_addr)
@ -102,8 +102,8 @@ class TestHttp:
def test_access_with_password_in_header(self, caplog):
"""Test access with password in URL."""
# Hide logging from requests package that we use to test logging
caplog.set_level(logging.WARNING,
logger='requests.packages.urllib3.connectionpool')
caplog.set_level(
logging.WARNING, logger='requests.packages.urllib3.connectionpool')
req = requests.get(
_url(const.URL_API),
@ -118,19 +118,19 @@ class TestHttp:
def test_access_denied_with_wrong_password_in_url(self):
"""Test access with wrong password."""
req = requests.get(_url(const.URL_API),
params={'api_password': 'wrongpassword'})
req = requests.get(
_url(const.URL_API), params={'api_password': 'wrongpassword'})
assert req.status_code == 401
def test_access_with_password_in_url(self, caplog):
"""Test access with password in URL."""
# Hide logging from requests package that we use to test logging
caplog.set_level(logging.WARNING,
logger='requests.packages.urllib3.connectionpool')
caplog.set_level(
logging.WARNING, logger='requests.packages.urllib3.connectionpool')
req = requests.get(_url(const.URL_API),
params={'api_password': API_PASSWORD})
req = requests.get(
_url(const.URL_API), params={'api_password': API_PASSWORD})
assert req.status_code == 200
@ -141,18 +141,16 @@ class TestHttp:
def test_access_with_trusted_ip(self, caplog):
"""Test access with trusted addresses."""
for remote_addr in ['100.64.0.1',
'192.0.2.100',
'FD01:DB8::1',
for remote_addr in ['100.64.0.1', '192.0.2.100', 'FD01:DB8::1',
'2001:DB8:ABCD::1']:
with patch('homeassistant.components.http.'
'HomeAssistantWSGI.get_real_ip',
return_value=remote_addr):
req = requests.get(_url(const.URL_API),
params={'api_password': ''})
req = requests.get(
_url(const.URL_API), params={'api_password': ''})
assert req.status_code == 200, \
"{} should be trusted".format(remote_addr)
'{} should be trusted'.format(remote_addr)
def test_cors_allowed_with_password_in_url(self):
"""Test cross origin resource sharing with password in url."""
@ -162,7 +160,7 @@ class TestHttp:
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
all_allow_headers = ", ".join(const.ALLOWED_CORS_HEADERS)
all_allow_headers = ', '.join(const.ALLOWED_CORS_HEADERS)
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
@ -174,12 +172,11 @@ class TestHttp:
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL
}
req = requests.get(_url(const.URL_API),
headers=headers)
req = requests.get(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
all_allow_headers = ", ".join(const.ALLOWED_CORS_HEADERS)
all_allow_headers = ', '.join(const.ALLOWED_CORS_HEADERS)
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
@ -190,8 +187,7 @@ class TestHttp:
headers = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD
}
req = requests.get(_url(const.URL_API),
headers=headers)
req = requests.get(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
@ -207,12 +203,11 @@ class TestHttp:
'Access-Control-Request-Method': 'GET',
'Access-Control-Request-Headers': 'x-ha-access'
}
req = requests.options(_url(const.URL_API),
headers=headers)
req = requests.options(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
all_allow_headers = ", ".join(const.ALLOWED_CORS_HEADERS)
all_allow_headers = ', '.join(const.ALLOWED_CORS_HEADERS)
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL

View File

@ -34,8 +34,8 @@ class TestInfluxDB(unittest.TestCase):
}
assert setup_component(self.hass, influxdb.DOMAIN, config)
self.assertTrue(self.hass.bus.listen.called)
self.assertEqual(EVENT_STATE_CHANGED,
self.hass.bus.listen.call_args_list[0][0][0])
self.assertEqual(
EVENT_STATE_CHANGED, self.hass.bus.listen.call_args_list[0][0][0])
self.assertTrue(mock_client.return_value.query.called)
def test_setup_config_defaults(self, mock_client):
@ -49,11 +49,11 @@ class TestInfluxDB(unittest.TestCase):
}
assert setup_component(self.hass, influxdb.DOMAIN, config)
self.assertTrue(self.hass.bus.listen.called)
self.assertEqual(EVENT_STATE_CHANGED,
self.hass.bus.listen.call_args_list[0][0][0])
self.assertEqual(
EVENT_STATE_CHANGED, self.hass.bus.listen.call_args_list[0][0][0])
def test_setup_minimal_config(self, mock_client):
"""Tests the setup with minimal configuration."""
"""Test the setup with minimal configuration."""
config = {
'influxdb': {}
}
@ -100,23 +100,22 @@ class TestInfluxDB(unittest.TestCase):
"""Test the event listener."""
self._setup()
valid = {'1': 1,
'1.0': 1.0,
STATE_ON: 1,
STATE_OFF: 0,
'foo': 'foo'}
valid = {
'1': 1,
'1.0': 1.0,
STATE_ON: 1,
STATE_OFF: 0,
'foo': 'foo'
}
for in_, out in valid.items():
attrs = {
'unit_of_measurement': 'foobars',
'longitude': '1.1',
'latitude': '2.2'
}
state = mock.MagicMock(state=in_,
domain='fake',
object_id='entity',
attributes=attrs)
event = mock.MagicMock(data={'new_state': state},
time_fired=12345)
'unit_of_measurement': 'foobars',
'longitude': '1.1',
'latitude': '2.2'
}
state = mock.MagicMock(
state=in_, domain='fake', object_id='entity', attributes=attrs)
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
body = [{
'measurement': 'foobars',
'tags': {
@ -149,13 +148,10 @@ class TestInfluxDB(unittest.TestCase):
attrs = {'unit_of_measurement': unit}
else:
attrs = {}
state = mock.MagicMock(state=1,
domain='fake',
entity_id='entity-id',
object_id='entity',
attributes=attrs)
event = mock.MagicMock(data={'new_state': state},
time_fired=12345)
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity-id',
object_id='entity', attributes=attrs)
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
body = [{
'measurement': 'entity-id',
'tags': {
@ -181,13 +177,10 @@ class TestInfluxDB(unittest.TestCase):
"""Test the event listener for write failures."""
self._setup()
state = mock.MagicMock(state=1,
domain='fake',
entity_id='entity-id',
object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state},
time_fired=12345)
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity-id', object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
mock_client.return_value.write_points.side_effect = \
influx_client.exceptions.InfluxDBClientError('foo')
self.handler_method(event)
@ -197,13 +190,10 @@ class TestInfluxDB(unittest.TestCase):
self._setup()
for state_state in (1, 'unknown', '', 'unavailable'):
state = mock.MagicMock(state=state_state,
domain='fake',
entity_id='entity-id',
object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state},
time_fired=12345)
state = mock.MagicMock(
state=state_state, domain='fake', entity_id='entity-id',
object_id='entity', attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
body = [{
'measurement': 'entity-id',
'tags': {
@ -233,13 +223,10 @@ class TestInfluxDB(unittest.TestCase):
self._setup()
for entity_id in ('ok', 'blacklisted'):
state = mock.MagicMock(state=1,
domain='fake',
entity_id='fake.{}'.format(entity_id),
object_id=entity_id,
attributes={})
event = mock.MagicMock(data={'new_state': state},
time_fired=12345)
state = mock.MagicMock(
state=1, domain='fake', entity_id='fake.{}'.format(entity_id),
object_id=entity_id, attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
body = [{
'measurement': 'fake.{}'.format(entity_id),
'tags': {

View File

@ -12,8 +12,8 @@ import homeassistant.util.package as package
RESOURCE_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'resources'))
TEST_EXIST_REQ = "pip>=7.0.0"
TEST_NEW_REQ = "pyhelloworld3==1.0.0"
TEST_EXIST_REQ = 'pip>=7.0.0'
TEST_NEW_REQ = 'pyhelloworld3==1.0.0'
TEST_ZIP_REQ = 'file://{}#{}' \
.format(os.path.join(RESOURCE_DIR, 'pyhelloworld3.zip'), TEST_NEW_REQ)
@ -48,8 +48,8 @@ class TestPackageUtilInstallPackage(unittest.TestCase):
self.assertEqual(
mock_subprocess.call_args,
call([
mock_sys.executable, '-m', 'pip',
'install', '--quiet', TEST_NEW_REQ
mock_sys.executable, '-m', 'pip', 'install', '--quiet',
TEST_NEW_REQ
])
)
@ -67,8 +67,8 @@ class TestPackageUtilInstallPackage(unittest.TestCase):
self.assertEqual(
mock_subprocess.call_args,
call([
mock_sys.executable, '-m', 'pip', 'install',
'--quiet', TEST_NEW_REQ, '--upgrade'
mock_sys.executable, '-m', 'pip', 'install', '--quiet',
TEST_NEW_REQ, '--upgrade'
])
)
@ -96,9 +96,8 @@ class TestPackageUtilInstallPackage(unittest.TestCase):
@patch('homeassistant.util.package._LOGGER')
@patch('homeassistant.util.package.sys')
def test_install_error(
self, mock_sys, mock_logger, mock_exists, mock_subprocess
):
def test_install_error(self, mock_sys, mock_logger, mock_exists,
mock_subprocess):
"""Test an install with a target."""
mock_exists.return_value = False
mock_subprocess.side_effect = [subprocess.SubprocessError]
@ -112,13 +111,13 @@ class TestPackageUtilCheckPackageExists(unittest.TestCase):
"""Test for homeassistant.util.package module."""
def test_check_package_global(self):
"""Test for a globally-installed package"""
"""Test for a globally-installed package."""
installed_package = list(pkg_resources.working_set)[0].project_name
self.assertTrue(package.check_package_exists(installed_package, None))
def test_check_package_local(self):
"""Test for a locally-installed package"""
"""Test for a locally-installed package."""
lib_dir = get_python_lib()
installed_package = list(pkg_resources.working_set)[0].project_name
@ -127,5 +126,5 @@ class TestPackageUtilCheckPackageExists(unittest.TestCase):
)
def test_check_package_zip(self):
"""Test for an installed zip package"""
"""Test for an installed zip package."""
self.assertFalse(package.check_package_exists(TEST_ZIP_REQ, None))