Use constants for HTTP headers (#10313)

* Use constants for HTTP headers

* Fix ordering

* Move 'no-cache' to platform
pull/10350/head
Fabian Affolter 2017-11-04 20:04:05 +01:00 committed by Paulus Schoutsen
parent e64803e701
commit de9d19d6f4
36 changed files with 408 additions and 444 deletions

View File

@ -7,25 +7,32 @@ https://home-assistant.io/components/binary_sensor.aurora/
from datetime import timedelta
import logging
from aiohttp.hdrs import USER_AGENT
import requests
import voluptuous as vol
from homeassistant.components.binary_sensor \
import (BinarySensorDevice, PLATFORM_SCHEMA)
from homeassistant.const import (CONF_NAME)
from homeassistant.components.binary_sensor import (
PLATFORM_SCHEMA, BinarySensorDevice)
from homeassistant.const import CONF_NAME, ATTR_ATTRIBUTION
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
CONF_THRESHOLD = "forecast_threshold"
_LOGGER = logging.getLogger(__name__)
CONF_ATTRIBUTION = "Data provided by the National Oceanic and Atmospheric" \
"Administration"
CONF_THRESHOLD = 'forecast_threshold'
DEFAULT_DEVICE_CLASS = 'visible'
DEFAULT_NAME = 'Aurora Visibility'
DEFAULT_DEVICE_CLASS = "visible"
DEFAULT_THRESHOLD = 75
HA_USER_AGENT = "Home Assistant Aurora Tracker v.0.1.0"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
URL = "http://services.swpc.noaa.gov/text/aurora-nowcast-map.txt"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_THRESHOLD, default=DEFAULT_THRESHOLD): cv.positive_int,
@ -43,10 +50,7 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
try:
aurora_data = AuroraData(
hass.config.latitude,
hass.config.longitude,
threshold
)
hass.config.latitude, hass.config.longitude, threshold)
aurora_data.update()
except requests.exceptions.HTTPError as error:
_LOGGER.error(
@ -85,9 +89,9 @@ class AuroraSensor(BinarySensorDevice):
attrs = {}
if self.aurora_data:
attrs["visibility_level"] = self.aurora_data.visibility_level
attrs["message"] = self.aurora_data.is_visible_text
attrs['visibility_level'] = self.aurora_data.visibility_level
attrs['message'] = self.aurora_data.is_visible_text
attrs[ATTR_ATTRIBUTION] = CONF_ATTRIBUTION
return attrs
def update(self):
@ -104,10 +108,7 @@ class AuroraData(object):
self.longitude = longitude
self.number_of_latitude_intervals = 513
self.number_of_longitude_intervals = 1024
self.api_url = \
"http://services.swpc.noaa.gov/text/aurora-nowcast-map.txt"
self.headers = {"User-Agent": "Home Assistant Aurora Tracker v.0.1.0"}
self.headers = {USER_AGENT: HA_USER_AGENT}
self.threshold = int(threshold)
self.is_visible = None
self.is_visible_text = None
@ -132,14 +133,14 @@ class AuroraData(object):
def get_aurora_forecast(self):
"""Get forecast data and parse for given long/lat."""
raw_data = requests.get(self.api_url, headers=self.headers).text
raw_data = requests.get(URL, headers=self.headers, timeout=5).text
forecast_table = [
row.strip(" ").split(" ")
for row in raw_data.split("\n")
if not row.startswith("#")
]
# convert lat and long for data points in table
# Convert lat and long for data points in table
converted_latitude = round((self.latitude / 180)
* self.number_of_latitude_intervals)
converted_longitude = round((self.longitude / 360)

View File

@ -4,16 +4,17 @@ Support for BloomSky weather station.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/bloomsky/
"""
import logging
from datetime import timedelta
import logging
from aiohttp.hdrs import AUTHORIZATION
import requests
import voluptuous as vol
from homeassistant.const import CONF_API_KEY
from homeassistant.helpers import discovery
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
@ -68,7 +69,7 @@ class BloomSky(object):
"""Use the API to retrieve a list of devices."""
_LOGGER.debug("Fetching BloomSky update")
response = requests.get(
self.API_URL, headers={"Authorization": self._api_key}, timeout=10)
self.API_URL, headers={AUTHORIZATION: self._api_key}, timeout=10)
if response.status_code == 401:
raise RuntimeError("Invalid API_KEY")
elif response.status_code != 200:

View File

@ -6,13 +6,14 @@ https://home-assistant.io/components/device_tracker.swisscom/
"""
import logging
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -77,7 +78,7 @@ class SwisscomDeviceScanner(DeviceScanner):
def get_swisscom_data(self):
"""Retrieve data from Swisscom and return parsed result."""
url = 'http://{}/ws'.format(self.host)
headers = {'Content-Type': 'application/x-sah-ws-4-call+json'}
headers = {CONTENT_TYPE: 'application/x-sah-ws-4-call+json'}
data = """
{"service":"Devices", "method":"get",
"parameters":{"expression":"lan and not self"}}"""

View File

@ -5,21 +5,27 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.tplink/
"""
import base64
from datetime import datetime
import hashlib
import logging
import re
from datetime import datetime
from aiohttp.hdrs import (
ACCEPT, COOKIE, PRAGMA, REFERER, CONNECTION, KEEP_ALIVE, USER_AGENT,
CONTENT_TYPE, CACHE_CONTROL, ACCEPT_ENCODING, ACCEPT_LANGUAGE)
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, HTTP_HEADER_X_REQUESTED_WITH)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
HTTP_HEADER_NO_CACHE = 'no-cache'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
@ -78,7 +84,7 @@ class TplinkDeviceScanner(DeviceScanner):
referer = 'http://{}'.format(self.host)
page = requests.get(
url, auth=(self.username, self.password),
headers={'referer': referer}, timeout=4)
headers={REFERER: referer}, timeout=4)
result = self.parse_macs.findall(page.text)
@ -123,7 +129,7 @@ class Tplink2DeviceScanner(TplinkDeviceScanner):
.format(b64_encoded_username_password)
response = requests.post(
url, headers={'referer': referer, 'cookie': cookie},
url, headers={REFERER: referer, COOKIE: cookie},
timeout=4)
try:
@ -174,11 +180,11 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
.format(self.host)
referer = 'http://{}/webpages/login.html'.format(self.host)
# If possible implement rsa encryption of password here.
# If possible implement RSA encryption of password here.
response = requests.post(
url, params={'operation': 'login', 'username': self.username,
'password': self.password},
headers={'referer': referer}, timeout=4)
headers={REFERER: referer}, timeout=4)
try:
self.stok = response.json().get('data').get('stok')
@ -207,11 +213,9 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
'form=statistics').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
response = requests.post(url,
params={'operation': 'load'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth},
timeout=5)
response = requests.post(
url, params={'operation': 'load'}, headers={REFERER: referer},
cookies={'sysauth': self.sysauth}, timeout=5)
try:
json_response = response.json()
@ -248,10 +252,9 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
'form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
requests.post(url,
params={'operation': 'write'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth})
requests.post(
url, params={'operation': 'write'}, headers={REFERER: referer},
cookies={'sysauth': self.sysauth})
self.stok = ''
self.sysauth = ''
@ -292,7 +295,7 @@ class Tplink4DeviceScanner(TplinkDeviceScanner):
# Create the authorization cookie.
cookie = 'Authorization=Basic {}'.format(self.credentials)
response = requests.get(url, headers={'cookie': cookie})
response = requests.get(url, headers={COOKIE: cookie})
try:
result = re.search(r'window.parent.location.href = '
@ -326,8 +329,8 @@ class Tplink4DeviceScanner(TplinkDeviceScanner):
cookie = 'Authorization=Basic {}'.format(self.credentials)
page = requests.get(url, headers={
'cookie': cookie,
'referer': referer
COOKIE: cookie,
REFERER: referer,
})
mac_results.extend(self.parse_macs.findall(page.text))
@ -361,31 +364,31 @@ class Tplink5DeviceScanner(TplinkDeviceScanner):
base_url = 'http://{}'.format(self.host)
header = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12;"
" rv:53.0) Gecko/20100101 Firefox/53.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "Accept-Language: en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded; "
"charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Referer": "http://" + self.host + "/",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
USER_AGENT:
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12;"
" rv:53.0) Gecko/20100101 Firefox/53.0",
ACCEPT: "application/json, text/javascript, */*; q=0.01",
ACCEPT_LANGUAGE: "Accept-Language: en-US,en;q=0.5",
ACCEPT_ENCODING: "gzip, deflate",
CONTENT_TYPE: "application/x-www-form-urlencoded; charset=UTF-8",
HTTP_HEADER_X_REQUESTED_WITH: "XMLHttpRequest",
REFERER: "http://{}/".format(self.host),
CONNECTION: KEEP_ALIVE,
PRAGMA: HTTP_HEADER_NO_CACHE,
CACHE_CONTROL: HTTP_HEADER_NO_CACHE,
}
password_md5 = hashlib.md5(
self.password.encode('utf')).hexdigest().upper()
# create a session to handle cookie easier
# Create a session to handle cookie easier
session = requests.session()
session.get(base_url, headers=header)
login_data = {"username": self.username, "password": password_md5}
session.post(base_url, login_data, headers=header)
# a timestamp is required to be sent as get parameter
# A timestamp is required to be sent as get parameter
timestamp = int(datetime.now().timestamp() * 1e3)
client_list_url = '{}/data/monitor.client.client.json'.format(
@ -393,18 +396,17 @@ class Tplink5DeviceScanner(TplinkDeviceScanner):
get_params = {
'operation': 'load',
'_': timestamp
'_': timestamp,
}
response = session.get(client_list_url,
headers=header,
params=get_params)
response = session.get(
client_list_url, headers=header, params=get_params)
session.close()
try:
list_of_devices = response.json()
except ValueError:
_LOGGER.error("AP didn't respond with JSON. "
"Check if credentials are correct.")
"Check if credentials are correct")
return False
if list_of_devices:

View File

@ -8,28 +8,28 @@ import asyncio
import logging
import aiohttp
from aiohttp.hdrs import REFERER, USER_AGENT
import async_timeout
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.const import CONF_HOST, HTTP_HEADER_X_REQUESTED_WITH
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['defusedxml==0.5.0']
_LOGGER = logging.getLogger(__name__)
CMD_DEVICES = 123
DEFAULT_IP = '192.168.0.1'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_HOST, default=DEFAULT_IP): cv.string,
})
CMD_DEVICES = 123
@asyncio.coroutine
def async_get_scanner(hass, config):
@ -52,11 +52,11 @@ class UPCDeviceScanner(DeviceScanner):
self.token = None
self.headers = {
'X-Requested-With': 'XMLHttpRequest',
'Referer': "http://{}/index.html".format(self.host),
'User-Agent': ("Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36")
HTTP_HEADER_X_REQUESTED_WITH: 'XMLHttpRequest',
REFERER: "http://{}/index.html".format(self.host),
USER_AGENT: ("Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36")
}
self.websession = async_get_clientsession(hass)
@ -95,8 +95,7 @@ class UPCDeviceScanner(DeviceScanner):
with async_timeout.timeout(10, loop=self.hass.loop):
response = yield from self.websession.get(
"http://{}/common_page/login.html".format(self.host),
headers=self.headers
)
headers=self.headers)
yield from response.text()
@ -118,17 +117,15 @@ class UPCDeviceScanner(DeviceScanner):
response = yield from self.websession.post(
"http://{}/xml/getter.xml".format(self.host),
data="token={}&fun={}".format(self.token, function),
headers=self.headers,
allow_redirects=False
)
headers=self.headers, allow_redirects=False)
# error?
# Error?
if response.status != 200:
_LOGGER.warning("Receive http code %d", response.status)
self.token = None
return
# load data, store token for next request
# Load data, store token for next request
self.token = response.cookies['sessionToken'].value
return (yield from response.text())

View File

@ -7,27 +7,24 @@ https://home-assistant.io/components/google_assistant/
import asyncio
import logging
from typing import Any, Dict # NOQA
from aiohttp.hdrs import AUTHORIZATION
from aiohttp.web import Request, Response # NOQA
# Typing imports
# pylint: disable=using-constant-test,unused-import,ungrouped-imports
# if False:
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED
from homeassistant.core import HomeAssistant # NOQA
from aiohttp.web import Request, Response # NOQA
from typing import Dict, Tuple, Any # NOQA
from homeassistant.helpers.entity import Entity # NOQA
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import (HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED)
from .const import (
GOOGLE_ASSISTANT_API_ENDPOINT,
CONF_ACCESS_TOKEN,
DEFAULT_EXPOSE_BY_DEFAULT,
DEFAULT_EXPOSED_DOMAINS,
CONF_EXPOSE_BY_DEFAULT,
CONF_EXPOSED_DOMAINS,
ATTR_GOOGLE_ASSISTANT)
from .smart_home import entity_to_device, query_device, determine_service
CONF_ACCESS_TOKEN, CONF_EXPOSED_DOMAINS, ATTR_GOOGLE_ASSISTANT,
CONF_EXPOSE_BY_DEFAULT, DEFAULT_EXPOSED_DOMAINS, DEFAULT_EXPOSE_BY_DEFAULT,
GOOGLE_ASSISTANT_API_ENDPOINT)
from .smart_home import query_device, entity_to_device, determine_service
_LOGGER = logging.getLogger(__name__)
@ -140,7 +137,7 @@ class GoogleAssistantView(HomeAssistantView):
@asyncio.coroutine
def post(self, request: Request) -> Response:
"""Handle Google Assistant requests."""
auth = request.headers.get('Authorization', None)
auth = request.headers.get(AUTHORIZATION, None)
if 'Bearer {}'.format(self.access_token) != auth:
return self.json_message(
"missing authorization", status_code=HTTP_UNAUTHORIZED)

View File

@ -5,37 +5,43 @@ For more details about this component, please refer to the documentation at
https://home-assistant.io/components/http/
"""
import asyncio
import json
from functools import wraps
import logging
import ssl
from ipaddress import ip_network
import json
import logging
import os
import voluptuous as vol
from aiohttp import web
from aiohttp.web_exceptions import HTTPUnauthorized, HTTPMovedPermanently
import ssl
from aiohttp import web
from aiohttp.hdrs import ACCEPT, ORIGIN, CONTENT_TYPE
from aiohttp.web_exceptions import HTTPUnauthorized, HTTPMovedPermanently
import voluptuous as vol
from homeassistant.const import (
SERVER_PORT, CONTENT_TYPE_JSON, HTTP_HEADER_HA_AUTH,
EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START,
HTTP_HEADER_X_REQUESTED_WITH)
from homeassistant.core import is_callback
import homeassistant.helpers.config_validation as cv
import homeassistant.remote as rem
import homeassistant.util as hass_util
from homeassistant.const import (
SERVER_PORT, CONTENT_TYPE_JSON, ALLOWED_CORS_HEADERS,
EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START)
from homeassistant.core import is_callback
from homeassistant.util.logging import HideSensitiveDataFilter
from .auth import auth_middleware
from .ban import ban_middleware
from .const import (
KEY_USE_X_FORWARDED_FOR, KEY_TRUSTED_NETWORKS, KEY_BANS_ENABLED,
KEY_LOGIN_THRESHOLD, KEY_AUTHENTICATED)
KEY_BANS_ENABLED, KEY_AUTHENTICATED, KEY_LOGIN_THRESHOLD,
KEY_TRUSTED_NETWORKS, KEY_USE_X_FORWARDED_FOR)
from .static import (
staticresource_middleware, CachingFileResponse, CachingStaticResource)
CachingFileResponse, CachingStaticResource, staticresource_middleware)
from .util import get_real_ip
REQUIREMENTS = ['aiohttp_cors==0.5.3']
ALLOWED_CORS_HEADERS = [
ORIGIN, ACCEPT, HTTP_HEADER_X_REQUESTED_WITH, CONTENT_TYPE,
HTTP_HEADER_HA_AUTH]
DOMAIN = 'http'
CONF_API_PASSWORD = 'api_password'

View File

@ -12,27 +12,27 @@ import logging
import os
from random import SystemRandom
from aiohttp import web, hdrs
from aiohttp import web
from aiohttp.hdrs import CONTENT_TYPE, CACHE_CONTROL
import async_timeout
import voluptuous as vol
from homeassistant.components.http import KEY_AUTHENTICATED, HomeAssistantView
from homeassistant.config import load_yaml_config_file
from homeassistant.loader import bind_hass
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.config_validation import PLATFORM_SCHEMA # noqa
from homeassistant.components.http import HomeAssistantView, KEY_AUTHENTICATED
from homeassistant.const import (
STATE_OFF, STATE_IDLE, STATE_PLAYING, STATE_UNKNOWN, ATTR_ENTITY_ID,
SERVICE_TOGGLE, SERVICE_TURN_ON, SERVICE_TURN_OFF, SERVICE_VOLUME_UP,
SERVICE_MEDIA_PLAY, SERVICE_MEDIA_SEEK, SERVICE_MEDIA_STOP,
SERVICE_VOLUME_SET, SERVICE_MEDIA_PAUSE, SERVICE_SHUFFLE_SET,
SERVICE_VOLUME_DOWN, SERVICE_VOLUME_MUTE, SERVICE_MEDIA_NEXT_TRACK,
SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PREVIOUS_TRACK)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.config_validation import PLATFORM_SCHEMA # noqa
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.loader import bind_hass
from homeassistant.util.async import run_coroutine_threadsafe
from homeassistant.const import (
STATE_OFF, STATE_UNKNOWN, STATE_PLAYING, STATE_IDLE,
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_TURN_ON,
SERVICE_VOLUME_UP, SERVICE_VOLUME_DOWN, SERVICE_VOLUME_SET,
SERVICE_VOLUME_MUTE, SERVICE_TOGGLE, SERVICE_MEDIA_STOP,
SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PLAY, SERVICE_MEDIA_PAUSE,
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREVIOUS_TRACK, SERVICE_MEDIA_SEEK,
SERVICE_SHUFFLE_SET)
_LOGGER = logging.getLogger(__name__)
_RND = SystemRandom()
@ -53,8 +53,6 @@ ENTITY_IMAGE_CACHE = {
ATTR_CACHE_MAXSIZE: 16
}
CONTENT_TYPE_HEADER = 'Content-Type'
SERVICE_PLAY_MEDIA = 'play_media'
SERVICE_SELECT_SOURCE = 'select_source'
SERVICE_CLEAR_PLAYLIST = 'clear_playlist'
@ -911,7 +909,7 @@ def _async_fetch_image(hass, url):
if response.status == 200:
content = yield from response.read()
content_type = response.headers.get(CONTENT_TYPE_HEADER)
content_type = response.headers.get(CONTENT_TYPE)
if content_type:
content_type = content_type.split(';')[0]
@ -965,8 +963,6 @@ class MediaPlayerImageView(HomeAssistantView):
if data is None:
return web.Response(status=500)
headers = {hdrs.CACHE_CONTROL: 'max-age=3600'}
headers = {CACHE_CONTROL: 'max-age=3600'}
return web.Response(
body=data,
content_type=content_type,
headers=headers)
body=data, content_type=content_type, headers=headers)

View File

@ -4,33 +4,37 @@ Bluesound.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/media_player.bluesound/
"""
import logging
from datetime import timedelta
from asyncio.futures import CancelledError
import asyncio
import voluptuous as vol
from aiohttp.client_exceptions import ClientError
from asyncio.futures import CancelledError
from datetime import timedelta
import logging
import aiohttp
from aiohttp.client_exceptions import ClientError
from aiohttp.hdrs import CONNECTION, KEEP_ALIVE
import async_timeout
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.core import callback
from homeassistant.util import Throttle
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.util.dt as dt_util
import voluptuous as vol
from homeassistant.components.media_player import (
SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PREVIOUS_TRACK, SUPPORT_SEEK,
SUPPORT_PLAY_MEDIA, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_STOP,
SUPPORT_PLAY, MediaPlayerDevice, PLATFORM_SCHEMA, MEDIA_TYPE_MUSIC,
SUPPORT_CLEAR_PLAYLIST, SUPPORT_SELECT_SOURCE, SUPPORT_VOLUME_STEP)
SUPPORT_PLAY, SUPPORT_SEEK, SUPPORT_STOP, SUPPORT_PAUSE, PLATFORM_SCHEMA,
MEDIA_TYPE_MUSIC, SUPPORT_NEXT_TRACK, SUPPORT_PLAY_MEDIA,
SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_STEP,
SUPPORT_SELECT_SOURCE, SUPPORT_CLEAR_PLAYLIST, SUPPORT_PREVIOUS_TRACK,
MediaPlayerDevice)
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
STATE_PLAYING, STATE_PAUSED, STATE_IDLE, CONF_HOSTS,
CONF_HOST, CONF_PORT, CONF_NAME)
CONF_HOST, CONF_NAME, CONF_PORT, CONF_HOSTS, STATE_IDLE, STATE_PAUSED,
STATE_PLAYING, EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START)
from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util import Throttle
import homeassistant.util.dt as dt_util
REQUIREMENTS = ['xmltodict==0.11.0']
_LOGGER = logging.getLogger(__name__)
STATE_OFFLINE = 'offline'
ATTR_MODEL = 'model'
ATTR_MODEL_NAME = 'model_name'
@ -46,8 +50,6 @@ UPDATE_PRESETS_INTERVAL = timedelta(minutes=30)
NODE_OFFLINE_CHECK_TIMEOUT = 180
NODE_RETRY_INITIATION = timedelta(minutes=3)
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_HOSTS): vol.All(cv.ensure_list, [{
vol.Required(CONF_HOST): cv.string,
@ -80,20 +82,15 @@ def _add_player(hass, async_add_devices, host, port=None, name=None):
def _add_player_cb():
"""Add player after first sync fetch."""
async_add_devices([player])
_LOGGER.info('Added Bluesound device with name: %s', player.name)
_LOGGER.info("Added device with name: %s", player.name)
if hass.is_running:
_start_polling()
else:
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START,
_start_polling
)
EVENT_HOMEASSISTANT_START, _start_polling)
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP,
_stop_polling
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_polling)
player = BluesoundPlayer(hass, host, port, name, _add_player_cb)
hass.data[DATA_BLUESOUND].append(player)
@ -101,10 +98,7 @@ def _add_player(hass, async_add_devices, host, port=None, name=None):
if hass.is_running:
_init_player()
else:
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START,
_init_player
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, _init_player)
@asyncio.coroutine
@ -121,11 +115,9 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
hosts = config.get(CONF_HOSTS, None)
if hosts:
for host in hosts:
_add_player(hass,
async_add_devices,
host.get(CONF_HOST),
host.get(CONF_PORT, None),
host.get(CONF_NAME, None))
_add_player(
hass, async_add_devices, host.get(CONF_HOST),
host.get(CONF_PORT), host.get(CONF_NAME, None))
class BluesoundPlayer(MediaPlayerDevice):
@ -137,7 +129,7 @@ class BluesoundPlayer(MediaPlayerDevice):
self._hass = hass
self._port = port
self._polling_session = async_get_clientsession(hass)
self._polling_task = None # The actuall polling task.
self._polling_task = None # The actual polling task.
self._name = name
self._brand = None
self._model = None
@ -156,7 +148,6 @@ class BluesoundPlayer(MediaPlayerDevice):
if self._port is None:
self._port = DEFAULT_PORT
# Internal methods
@staticmethod
def _try_get_index(string, seach_string):
try:
@ -165,13 +156,12 @@ class BluesoundPlayer(MediaPlayerDevice):
return -1
@asyncio.coroutine
def _internal_update_sync_status(self, on_updated_cb=None,
raise_timeout=False):
def _internal_update_sync_status(
self, on_updated_cb=None, raise_timeout=False):
resp = None
try:
resp = yield from self.send_bluesound_command(
'SyncStatus',
raise_timeout, raise_timeout)
'SyncStatus', raise_timeout, raise_timeout)
except:
raise
@ -193,9 +183,7 @@ class BluesoundPlayer(MediaPlayerDevice):
if on_updated_cb:
on_updated_cb()
return True
# END Internal methods
# Poll functionality
@asyncio.coroutine
def _start_poll_command(self):
""""Loop which polls the status of the player."""
@ -204,14 +192,13 @@ class BluesoundPlayer(MediaPlayerDevice):
yield from self.async_update_status()
except (asyncio.TimeoutError, ClientError):
_LOGGER.info("Bluesound node %s is offline, retrying later",
self._name)
yield from asyncio.sleep(NODE_OFFLINE_CHECK_TIMEOUT,
loop=self._hass.loop)
_LOGGER.info("Node %s is offline, retrying later", self._name)
yield from asyncio.sleep(
NODE_OFFLINE_CHECK_TIMEOUT, loop=self._hass.loop)
self.start_polling()
except CancelledError:
_LOGGER.debug("Stopping bluesound polling of node %s", self._name)
_LOGGER.debug("Stopping the polling of node %s", self._name)
except:
_LOGGER.exception("Unexpected error in %s", self._name)
raise
@ -224,9 +211,7 @@ class BluesoundPlayer(MediaPlayerDevice):
def stop_polling(self):
"""Stop the polling task."""
self._polling_task.cancel()
# END Poll functionality
# Initiator
@asyncio.coroutine
def async_init(self):
"""Initiate the player async."""
@ -235,22 +220,17 @@ class BluesoundPlayer(MediaPlayerDevice):
self._retry_remove()
self._retry_remove = None
yield from self._internal_update_sync_status(self._init_callback,
True)
yield from self._internal_update_sync_status(
self._init_callback, True)
except (asyncio.TimeoutError, ClientError):
_LOGGER.info("Bluesound node %s is offline, retrying later",
self.host)
_LOGGER.info("Node %s is offline, retrying later", self.host)
self._retry_remove = async_track_time_interval(
self._hass,
self.async_init,
NODE_RETRY_INITIATION)
self._hass, self.async_init, NODE_RETRY_INITIATION)
except:
_LOGGER.exception("Unexpected when initiating error in %s",
self.host)
raise
# END Initiator
# Status updates fetchers
@asyncio.coroutine
def async_update(self):
"""Update internal status of the entity."""
@ -275,7 +255,7 @@ class BluesoundPlayer(MediaPlayerDevice):
method = method[1:]
url = "http://{}:{}/{}".format(self.host, self._port, method)
_LOGGER.info("calling URL: %s", url)
_LOGGER.debug("Calling URL: %s", url)
response = None
try:
websession = async_get_clientsession(self._hass)
@ -294,11 +274,10 @@ class BluesoundPlayer(MediaPlayerDevice):
except (asyncio.TimeoutError, aiohttp.ClientError):
if raise_timeout:
_LOGGER.info("Timeout with Bluesound: %s", self.host)
_LOGGER.info("Timeout: %s", self.host)
raise
else:
_LOGGER.debug("Failed communicating with Bluesound: %s",
self.host)
_LOGGER.debug("Failed communicating: %s", self.host)
return None
return data
@ -315,17 +294,17 @@ class BluesoundPlayer(MediaPlayerDevice):
etag = self._status.get('@etag', '')
if etag != '':
url = 'Status?etag='+etag+'&timeout=60.0'
url = 'Status?etag={}&timeout=60.0'.format(etag)
url = "http://{}:{}/{}".format(self.host, self._port, url)
_LOGGER.debug("calling URL: %s", url)
_LOGGER.debug("Calling URL: %s", url)
try:
with async_timeout.timeout(65, loop=self._hass.loop):
response = yield from self._polling_session.get(
url,
headers={'connection': 'keep-alive'})
headers={CONNECTION: KEEP_ALIVE})
if response.status != 200:
_LOGGER.error("Error %s on %s", response.status, url)
@ -350,8 +329,8 @@ class BluesoundPlayer(MediaPlayerDevice):
def async_update_sync_status(self, on_updated_cb=None,
raise_timeout=False):
"""Update sync status."""
yield from self._internal_update_sync_status(on_updated_cb,
raise_timeout=False)
yield from self._internal_update_sync_status(
on_updated_cb, raise_timeout=False)
@asyncio.coroutine
@Throttle(UPDATE_CAPTURE_INTERVAL)
@ -436,9 +415,7 @@ class BluesoundPlayer(MediaPlayerDevice):
_create_service_item(resp['services']['service'])
return self._services_items
# END Status updates fetchers
# Media player (and core) properties
@property
def should_poll(self):
"""No need to poll information."""
@ -611,17 +588,17 @@ class BluesoundPlayer(MediaPlayerDevice):
stream_url = self._status.get('streamUrl', '')
if self._status.get('is_preset', '') == '1' and stream_url != '':
# this check doesn't work with all presets, for example playlists.
# But it works with radio service_items will catch playlists
# This check doesn't work with all presets, for example playlists.
# But it works with radio service_items will catch playlists.
items = [x for x in self._preset_items if 'url2' in x and
parse.unquote(x['url2']) == stream_url]
if len(items) > 0:
return items[0]['title']
# this could be a bit difficult to detect. Bluetooth could be named
# This could be a bit difficult to detect. Bluetooth could be named
# different things and there is not any way to match chooses in
# capture list to current playing. It's a bit of guesswork.
# This method will be needing some tweaking over time
# This method will be needing some tweaking over time.
title = self._status.get('title1', '').lower()
if title == 'bluetooth' or stream_url == 'Capture:hw:2,0/44100/16/2':
items = [x for x in self._capture_items
@ -660,7 +637,7 @@ class BluesoundPlayer(MediaPlayerDevice):
return items[0]['title']
if self._status.get('streamUrl', '') != '':
_LOGGER.debug("Couldn't find source of stream url: %s",
_LOGGER.debug("Couldn't find source of stream URL: %s",
self._status.get('streamUrl', ''))
return None
@ -695,9 +672,7 @@ class BluesoundPlayer(MediaPlayerDevice):
ATTR_MODEL_NAME: self._model_name,
ATTR_BRAND: self._brand,
}
# END Media player (and core) properties
# Media player commands
@asyncio.coroutine
def async_select_source(self, source):
"""Select input source."""
@ -712,8 +687,8 @@ class BluesoundPlayer(MediaPlayerDevice):
return
selected_source = items[0]
url = 'Play?url={}&preset_id&image={}'.format(selected_source['url'],
selected_source['image'])
url = 'Play?url={}&preset_id&image={}'.format(
selected_source['url'], selected_source['image'])
if 'is_raw_url' in selected_source and selected_source['is_raw_url']:
url = selected_source['url']
@ -806,4 +781,3 @@ class BluesoundPlayer(MediaPlayerDevice):
else:
return self.send_bluesound_command(
'Volume?level=' + str(float(self._lastvol) * 100))
# END Media player commands

View File

@ -6,18 +6,18 @@ https://home-assistant.io/components/no_ip/
"""
import asyncio
import base64
import logging
from datetime import timedelta
import logging
import aiohttp
from aiohttp.hdrs import USER_AGENT, AUTHORIZATION
import async_timeout
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_DOMAIN, CONF_PASSWORD, CONF_TIMEOUT, CONF_USERNAME, HTTP_HEADER_AUTH,
HTTP_HEADER_USER_AGENT)
CONF_DOMAIN, CONF_TIMEOUT, CONF_PASSWORD, CONF_USERNAME)
from homeassistant.helpers.aiohttp_client import SERVER_SOFTWARE
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -41,7 +41,7 @@ NO_IP_ERRORS = {
}
UPDATE_URL = 'https://dynupdate.noip.com/nic/update'
USER_AGENT = "{} {}".format(SERVER_SOFTWARE, EMAIL)
HA_USER_AGENT = "{} {}".format(SERVER_SOFTWARE, EMAIL)
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
@ -92,8 +92,8 @@ def _update_no_ip(hass, session, domain, auth_str, timeout):
}
headers = {
HTTP_HEADER_AUTH: "Basic {}".format(auth_str.decode('utf-8')),
HTTP_HEADER_USER_AGENT: USER_AGENT,
AUTHORIZATION: "Basic {}".format(auth_str.decode('utf-8')),
USER_AGENT: HA_USER_AGENT,
}
try:

View File

@ -6,22 +6,22 @@ https://home-assistant.io/components/notify.clicksend/
"""
import json
import logging
import requests
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_USERNAME, CONF_API_KEY, CONF_RECIPIENT, HTTP_HEADER_CONTENT_TYPE,
CONTENT_TYPE_JSON)
from homeassistant.components.notify import (
PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import (
CONF_API_KEY, CONF_USERNAME, CONF_RECIPIENT, CONTENT_TYPE_JSON)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
BASE_API_URL = 'https://rest.clicksend.com/v3'
HEADERS = {HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON}
HEADERS = {CONTENT_TYPE: CONTENT_TYPE_JSON}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_USERNAME): cv.string,

View File

@ -8,22 +8,22 @@ https://home-assistant.io/components/notify.clicksend_tts/
"""
import json
import logging
import requests
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_USERNAME, CONF_API_KEY, CONF_RECIPIENT, HTTP_HEADER_CONTENT_TYPE,
CONTENT_TYPE_JSON)
from homeassistant.components.notify import (
PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import (
CONF_API_KEY, CONF_USERNAME, CONF_RECIPIENT, CONTENT_TYPE_JSON)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
BASE_API_URL = 'https://rest.clicksend.com/v3'
HEADERS = {HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON}
HEADERS = {CONTENT_TYPE: CONTENT_TYPE_JSON}
CONF_LANGUAGE = 'language'
CONF_VOICE = 'voice'

View File

@ -6,14 +6,14 @@ https://home-assistant.io/components/notify.facebook/
"""
import logging
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.notify import (
ATTR_TARGET, ATTR_DATA, PLATFORM_SCHEMA, BaseNotificationService)
ATTR_DATA, ATTR_TARGET, PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import CONTENT_TYPE_JSON
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -70,7 +70,7 @@ class FacebookNotificationService(BaseNotificationService):
import json
resp = requests.post(BASE_URL, data=json.dumps(body),
params=payload,
headers={'Content-Type': CONTENT_TYPE_JSON},
headers={CONTENT_TYPE: CONTENT_TYPE_JSON},
timeout=10)
if resp.status_code != 200:
obj = resp.json()

View File

@ -5,25 +5,26 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/notify.html5/
"""
import asyncio
import os
import logging
import json
import time
import datetime
import json
import logging
import os
import time
import uuid
from aiohttp.hdrs import AUTHORIZATION
import voluptuous as vol
from voluptuous.humanize import humanize_error
from homeassistant.const import (HTTP_BAD_REQUEST, HTTP_INTERNAL_SERVER_ERROR,
HTTP_UNAUTHORIZED, URL_ROOT)
from homeassistant.util import ensure_unique_string
from homeassistant.components.notify import (
ATTR_TARGET, ATTR_TITLE, ATTR_TITLE_DEFAULT, ATTR_DATA,
BaseNotificationService, PLATFORM_SCHEMA)
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.frontend import add_manifest_json_key
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.notify import (
ATTR_DATA, ATTR_TITLE, ATTR_TARGET, PLATFORM_SCHEMA, ATTR_TITLE_DEFAULT,
BaseNotificationService)
from homeassistant.const import (
URL_ROOT, HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, HTTP_INTERNAL_SERVER_ERROR)
from homeassistant.helpers import config_validation as cv
from homeassistant.util import ensure_unique_string
REQUIREMENTS = ['pywebpush==1.1.0', 'PyJWT==1.5.3']
@ -62,24 +63,25 @@ ATTR_JWT = 'jwt'
# is valid.
JWT_VALID_DAYS = 7
KEYS_SCHEMA = vol.All(dict,
vol.Schema({
vol.Required(ATTR_AUTH): cv.string,
vol.Required(ATTR_P256DH): cv.string
}))
KEYS_SCHEMA = vol.All(
dict, vol.Schema({
vol.Required(ATTR_AUTH): cv.string,
vol.Required(ATTR_P256DH): cv.string,
})
)
SUBSCRIPTION_SCHEMA = vol.All(dict,
vol.Schema({
# pylint: disable=no-value-for-parameter
vol.Required(ATTR_ENDPOINT): vol.Url(),
vol.Required(ATTR_KEYS): KEYS_SCHEMA,
vol.Optional(ATTR_EXPIRATIONTIME):
vol.Any(None, cv.positive_int)
}))
SUBSCRIPTION_SCHEMA = vol.All(
dict, vol.Schema({
# pylint: disable=no-value-for-parameter
vol.Required(ATTR_ENDPOINT): vol.Url(),
vol.Required(ATTR_KEYS): KEYS_SCHEMA,
vol.Optional(ATTR_EXPIRATIONTIME): vol.Any(None, cv.positive_int),
})
)
REGISTER_SCHEMA = vol.Schema({
vol.Required(ATTR_SUBSCRIPTION): SUBSCRIPTION_SCHEMA,
vol.Required(ATTR_BROWSER): vol.In(['chrome', 'firefox'])
vol.Required(ATTR_BROWSER): vol.In(['chrome', 'firefox']),
})
CALLBACK_EVENT_PAYLOAD_SCHEMA = vol.Schema({
@ -145,7 +147,7 @@ class JSONBytesDecoder(json.JSONEncoder):
# pylint: disable=method-hidden
def default(self, obj):
"""Decode object if it's a bytes object, else defer to baseclass."""
"""Decode object if it's a bytes object, else defer to base class."""
if isinstance(obj, bytes):
return obj.decode()
return json.JSONEncoder.default(self, obj)
@ -158,7 +160,7 @@ def _save_config(filename, config):
fdesc.write(json.dumps(
config, cls=JSONBytesDecoder, indent=4, sort_keys=True))
except (IOError, TypeError) as error:
_LOGGER.error("Saving config file failed: %s", error)
_LOGGER.error("Saving configuration file failed: %s", error)
return False
return True
@ -266,7 +268,7 @@ class HTML5PushCallbackView(HomeAssistantView):
def check_authorization_header(self, request):
"""Check the authorization header."""
import jwt
auth = request.headers.get('Authorization', None)
auth = request.headers.get(AUTHORIZATION, None)
if not auth:
return self.json_message('Authorization header is expected',
status_code=HTTP_UNAUTHORIZED)
@ -323,8 +325,7 @@ class HTML5PushCallbackView(HomeAssistantView):
event_name = '{}.{}'.format(NOTIFY_CALLBACK_EVENT,
event_payload[ATTR_TYPE])
request.app['hass'].bus.fire(event_name, event_payload)
return self.json({'status': 'ok',
'event': event_payload[ATTR_TYPE]})
return self.json({'status': 'ok', 'event': event_payload[ATTR_TYPE]})
class HTML5NotificationService(BaseNotificationService):
@ -413,6 +414,6 @@ class HTML5NotificationService(BaseNotificationService):
if not _save_config(self.registrations_json_path,
self.registrations):
self.registrations[target] = reg
_LOGGER.error("Error saving registration.")
_LOGGER.error("Error saving registration")
else:
_LOGGER.info("Configuration saved")

View File

@ -7,14 +7,14 @@ https://home-assistant.io/components/notify.instapush/
import json
import logging
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.notify import (
ATTR_TITLE, ATTR_TITLE_DEFAULT, PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import (
CONF_API_KEY, HTTP_HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
ATTR_TITLE, PLATFORM_SCHEMA, ATTR_TITLE_DEFAULT, BaseNotificationService)
from homeassistant.const import CONF_API_KEY, CONTENT_TYPE_JSON
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
_RESOURCE = 'https://api.instapush.im/v1/'
@ -76,7 +76,7 @@ class InstapushNotificationService(BaseNotificationService):
self._headers = {
HTTP_HEADER_APPID: self._api_key,
HTTP_HEADER_APPSECRET: self._app_secret,
HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON,
CONTENT_TYPE: CONTENT_TYPE_JSON,
}
def send_message(self, message="", **kwargs):

View File

@ -10,7 +10,8 @@ import voluptuous as vol
from homeassistant.components.notify import (
ATTR_TITLE, ATTR_TITLE_DEFAULT, PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import (CONF_API_KEY, CONF_SENDER, CONF_RECIPIENT)
from homeassistant.const import (
CONF_API_KEY, CONF_SENDER, CONF_RECIPIENT, CONTENT_TYPE_TEXT_PLAIN)
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['sendgrid==5.3.0']
@ -67,7 +68,7 @@ class SendgridNotificationService(BaseNotificationService):
},
"content": [
{
"type": "text/plain",
"type": CONTENT_TYPE_TEXT_PLAIN,
"value": message
}
]

View File

@ -6,12 +6,13 @@ https://home-assistant.io/components/notify.telstra/
"""
import logging
from aiohttp.hdrs import CONTENT_TYPE, AUTHORIZATION
import requests
import voluptuous as vol
from homeassistant.components.notify import (
BaseNotificationService, ATTR_TITLE, PLATFORM_SCHEMA)
from homeassistant.const import CONTENT_TYPE_JSON, HTTP_HEADER_CONTENT_TYPE
ATTR_TITLE, PLATFORM_SCHEMA, BaseNotificationService)
from homeassistant.const import CONTENT_TYPE_JSON
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -73,8 +74,8 @@ class TelstraNotificationService(BaseNotificationService):
}
message_resource = 'https://api.telstra.com/v1/sms/messages'
message_headers = {
HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON,
'Authorization': 'Bearer ' + token_response['access_token'],
CONTENT_TYPE: CONTENT_TYPE_JSON,
AUTHORIZATION: 'Bearer {}'.format(token_response['access_token']),
}
message_response = requests.post(
message_resource, headers=message_headers, json=message_data,

View File

@ -9,6 +9,7 @@ import time
import requests
import voluptuous as vol
from aiohttp.hdrs import CONTENT_TYPE
from homeassistant.const import CONF_API_KEY, CONF_HOST, CONTENT_TYPE_JSON
import homeassistant.helpers.config_validation as cv
@ -55,8 +56,10 @@ class OctoPrintAPI(object):
def __init__(self, api_url, key, bed, number_of_tools):
"""Initialize OctoPrint API and set headers needed later."""
self.api_url = api_url
self.headers = {'content-type': CONTENT_TYPE_JSON,
'X-Api-Key': key}
self.headers = {
CONTENT_TYPE: CONTENT_TYPE_JSON,
'X-Api-Key': key,
}
self.printer_last_reading = [{}, None]
self.job_last_reading = [{}, None]
self.job_available = False

View File

@ -7,15 +7,15 @@ https://home-assistant.io/components/scene.lifx_cloud/
import asyncio
import logging
import aiohttp
from aiohttp.hdrs import AUTHORIZATION
import async_timeout
import voluptuous as vol
import aiohttp
import async_timeout
from homeassistant.components.scene import Scene
from homeassistant.const import (CONF_PLATFORM, CONF_TOKEN, CONF_TIMEOUT)
from homeassistant.const import CONF_TOKEN, CONF_TIMEOUT, CONF_PLATFORM
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.aiohttp_client import (async_get_clientsession)
_LOGGER = logging.getLogger(__name__)
@ -37,7 +37,7 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
timeout = config.get(CONF_TIMEOUT)
headers = {
"Authorization": "Bearer %s" % token,
AUTHORIZATION: "Bearer {}".format(token),
}
url = LIFX_API_URL.format('scenes')

View File

@ -6,8 +6,8 @@ https://home-assistant.io/components/scene.lutron_caseta/
"""
import logging
from homeassistant.components.scene import Scene
from homeassistant.components.lutron_caseta import LUTRON_CASETA_SMARTBRIDGE
from homeassistant.components.scene import Scene
_LOGGER = logging.getLogger(__name__)

View File

@ -7,24 +7,28 @@ https://home-assistant.io/components/sensor.haveibeenpwned/
from datetime import timedelta
import logging
import voluptuous as vol
from aiohttp.hdrs import USER_AGENT
import requests
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL)
from homeassistant.helpers.entity import Entity
from homeassistant.const import CONF_EMAIL
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import track_point_in_time
from homeassistant.util import Throttle
import homeassistant.util.dt as dt_util
from homeassistant.helpers.event import track_point_in_time
_LOGGER = logging.getLogger(__name__)
DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
HA_USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"
MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
URL = 'https://haveibeenpwned.com/api/v2/breachedaccount/'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]),
@ -33,7 +37,7 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the HaveIBeenPwnedSensor sensor."""
"""Set up the HaveIBeenPwned sensor."""
emails = config.get(CONF_EMAIL)
data = HaveIBeenPwnedData(emails)
@ -50,11 +54,11 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class HaveIBeenPwnedSensor(Entity):
"""Implementation of a HaveIBeenPwnedSensor."""
"""Implementation of a HaveIBeenPwned sensor."""
def __init__(self, data, hass, email):
"""Initialize the HaveIBeenPwnedSensor sensor."""
self._state = STATE_UNKNOWN
"""Initialize the HaveIBeenPwned sensor."""
self._state = None
self._data = data
self._hass = hass
self._email = email
@ -77,7 +81,7 @@ class HaveIBeenPwnedSensor(Entity):
@property
def device_state_attributes(self):
"""Return the atrributes of the sensor."""
"""Return the attributes of the sensor."""
val = {}
if self._email not in self._data.data:
return val
@ -143,17 +147,16 @@ class HaveIBeenPwnedData(object):
def update(self, **kwargs):
"""Get the latest data for current email from REST service."""
try:
url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \
format(self._email)
url = "{}{}".format(URL, self._email)
_LOGGER.info("Checking for breaches for email %s", self._email)
_LOGGER.debug("Checking for breaches for email: %s", self._email)
req = requests.get(url, headers={"User-agent": USER_AGENT},
allow_redirects=True, timeout=5)
req = requests.get(
url, headers={USER_AGENT: HA_USER_AGENT}, allow_redirects=True,
timeout=5)
except requests.exceptions.RequestException:
_LOGGER.error("Failed fetching HaveIBeenPwned Data for %s",
self._email)
_LOGGER.error("Failed fetching data for %s", self._email)
return
if req.status_code == 200:
@ -161,7 +164,7 @@ class HaveIBeenPwnedData(object):
key=lambda k: k["AddedDate"],
reverse=True)
# only goto next email if we had data so that
# Only goto next email if we had data so that
# the forced updates try this current email again
self.set_next_email()
@ -173,6 +176,6 @@ class HaveIBeenPwnedData(object):
self.set_next_email()
else:
_LOGGER.error("Failed fetching HaveIBeenPwned Data for %s"
_LOGGER.error("Failed fetching data for %s"
"(HTTP Status_code = %d)", self._email,
req.status_code)

View File

@ -4,19 +4,20 @@ Support for monitoring NZBGet NZB client.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.nzbget/
"""
import logging
from datetime import timedelta
import logging
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_NAME, CONF_PORT,
CONF_SSL, CONTENT_TYPE_JSON, CONF_MONITORED_VARIABLES)
CONF_SSL, CONF_HOST, CONF_NAME, CONF_PORT, CONF_PASSWORD, CONF_USERNAME,
CONTENT_TYPE_JSON, CONF_MONITORED_VARIABLES)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
@ -145,7 +146,7 @@ class NZBGetAPI(object):
"""Initialize NZBGet API and set headers needed later."""
self.api_url = api_url
self.status = None
self.headers = {'content-type': CONTENT_TYPE_JSON}
self.headers = {CONTENT_TYPE: CONTENT_TYPE_JSON}
if username is not None and password is not None:
self.auth = (username, password)
@ -155,7 +156,7 @@ class NZBGetAPI(object):
def post(self, method, params=None):
"""Send a POST request and return the response as a dict."""
payload = {"method": method}
payload = {'method': method}
if params:
payload['params'] = params

View File

@ -4,18 +4,18 @@ Support for monitoring pyLoad.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.pyload/
"""
import logging
from datetime import timedelta
import logging
from aiohttp.hdrs import CONTENT_TYPE
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_NAME, CONF_PORT,
CONF_SSL, HTTP_HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON,
CONF_MONITORED_VARIABLES)
CONF_SSL, CONF_HOST, CONF_NAME, CONF_PORT, CONF_PASSWORD, CONF_USERNAME,
CONTENT_TYPE_JSON, CONF_MONITORED_VARIABLES)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
@ -132,7 +132,7 @@ class PyLoadAPI(object):
"""Initialize pyLoad API and set headers needed later."""
self.api_url = api_url
self.status = None
self.headers = {HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON}
self.headers = {CONTENT_TYPE: CONTENT_TYPE_JSON}
if username is not None and password is not None:
self.payload = {'username': username, 'password': password}

View File

@ -8,15 +8,16 @@ import asyncio
import logging
import aiohttp
from aiohttp.hdrs import ACCEPT, AUTHORIZATION
import async_timeout
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.components.thethingsnetwork import (
DATA_TTN, TTN_APP_ID, TTN_ACCESS_KEY, TTN_DATA_STORAGE_URL)
from homeassistant.const import CONTENT_TYPE_JSON
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
@ -122,8 +123,8 @@ class TtnDataStorage(object):
self._url = TTN_DATA_STORAGE_URL.format(
app_id=app_id, endpoint='api/v2/query', device_id=device_id)
self._headers = {
'Accept': CONTENT_TYPE_JSON,
'Authorization': 'key {}'.format(access_key),
ACCEPT: CONTENT_TYPE_JSON,
AUTHORIZATION: 'key {}'.format(access_key),
}
@asyncio.coroutine

View File

@ -5,24 +5,25 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.zamg/
"""
import csv
from datetime import datetime, timedelta
import gzip
import json
import logging
import os
from datetime import datetime, timedelta
from aiohttp.hdrs import USER_AGENT
import pytz
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.weather import (
ATTR_WEATHER_HUMIDITY, ATTR_WEATHER_ATTRIBUTION, ATTR_WEATHER_PRESSURE,
ATTR_WEATHER_TEMPERATURE, ATTR_WEATHER_WIND_BEARING,
ATTR_WEATHER_WIND_SPEED)
ATTR_WEATHER_HUMIDITY, ATTR_WEATHER_PRESSURE, ATTR_WEATHER_WIND_SPEED,
ATTR_WEATHER_ATTRIBUTION, ATTR_WEATHER_TEMPERATURE,
ATTR_WEATHER_WIND_BEARING)
from homeassistant.const import (
CONF_MONITORED_CONDITIONS, CONF_NAME, __version__,
CONF_LATITUDE, CONF_LONGITUDE)
CONF_NAME, CONF_LATITUDE, CONF_LONGITUDE, CONF_MONITORED_CONDITIONS,
__version__)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
@ -30,13 +31,12 @@ _LOGGER = logging.getLogger(__name__)
ATTR_STATION = 'station'
ATTR_UPDATED = 'updated'
ATTRIBUTION = 'Data provided by ZAMG'
ATTRIBUTION = "Data provided by ZAMG"
CONF_STATION_ID = 'station_id'
DEFAULT_NAME = 'zamg'
# Data source updates once per hour, so we do nothing if it's been less time
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=10)
SENSOR_TYPES = {
@ -138,7 +138,7 @@ class ZamgData(object):
API_URL = 'http://www.zamg.ac.at/ogd/'
API_HEADERS = {
'User-Agent': '{} {}'.format('home-assistant.zamg/', __version__),
USER_AGENT: '{} {}'.format('home-assistant.zamg/', __version__),
}
def __init__(self, station_id):
@ -162,8 +162,8 @@ class ZamgData(object):
cls.API_URL, headers=cls.API_HEADERS, timeout=15)
response.raise_for_status()
response.encoding = 'UTF8'
return csv.DictReader(response.text.splitlines(),
delimiter=';', quotechar='"')
return csv.DictReader(
response.text.splitlines(), delimiter=';', quotechar='"')
except requests.exceptions.HTTPError:
_LOGGER.error("While fetching data")

View File

@ -7,11 +7,12 @@ https://home-assistant.io/components/splunk/
import json
import logging
from aiohttp.hdrs import AUTHORIZATION
import requests
import voluptuous as vol
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_PORT, CONF_SSL, CONF_TOKEN, EVENT_STATE_CHANGED)
CONF_SSL, CONF_HOST, CONF_NAME, CONF_PORT, CONF_TOKEN, EVENT_STATE_CHANGED)
from homeassistant.helpers import state as state_helper
import homeassistant.helpers.config_validation as cv
from homeassistant.remote import JSONEncoder
@ -52,7 +53,7 @@ def setup(hass, config):
event_collector = '{}{}:{}/services/collector/event'.format(
uri_scheme, host, port)
headers = {'Authorization': 'Splunk {}'.format(token)}
headers = {AUTHORIZATION: 'Splunk {}'.format(token)}
def splunk_event_listener(event):
"""Listen for new messages on the bus and sends them to Splunk."""

View File

@ -10,6 +10,7 @@ import logging
import async_timeout
from aiohttp.client_exceptions import ClientError
from aiohttp.hdrs import CONNECTION, KEEP_ALIVE
from homeassistant.components.telegram_bot import (
CONF_ALLOWED_CHAT_IDS, BaseTelegramBotEntity,
@ -41,20 +42,14 @@ def async_setup_platform(hass, config):
"""Stop the bot."""
pol.stop_polling()
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START,
_start_bot
)
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP,
_stop_bot
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, _start_bot)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_bot)
return True
class TelegramPoll(BaseTelegramBotEntity):
"""asyncio telegram incoming message handler."""
"""Asyncio telegram incoming message handler."""
def __init__(self, bot, hass, allowed_chat_ids):
"""Initialize the polling instance."""
@ -62,9 +57,9 @@ class TelegramPoll(BaseTelegramBotEntity):
self.update_id = 0
self.websession = async_get_clientsession(hass)
self.update_url = '{0}/getUpdates'.format(bot.base_url)
self.polling_task = None # The actuall polling task.
self.polling_task = None # The actual polling task.
self.timeout = 15 # async post timeout
# polling timeout should always be less than async post timeout.
# Polling timeout should always be less than async post timeout.
self.post_data = {'timeout': self.timeout - 5}
def start_polling(self):
@ -87,7 +82,7 @@ class TelegramPoll(BaseTelegramBotEntity):
with async_timeout.timeout(self.timeout, loop=self.hass.loop):
resp = yield from self.websession.post(
self.update_url, data=self.post_data,
headers={'connection': 'keep-alive'}
headers={CONNECTION: KEEP_ALIVE}
)
if resp.status == 200:
_json = yield from resp.json()

View File

@ -9,14 +9,15 @@ import logging
import re
import aiohttp
from aiohttp.hdrs import REFERER, USER_AGENT
import async_timeout
import voluptuous as vol
import yarl
from homeassistant.components.tts import Provider, PLATFORM_SCHEMA, CONF_LANG
from homeassistant.components.tts import CONF_LANG, PLATFORM_SCHEMA, Provider
from homeassistant.helpers.aiohttp_client import async_get_clientsession
REQUIREMENTS = ["gTTS-token==1.1.1"]
REQUIREMENTS = ['gTTS-token==1.1.1']
_LOGGER = logging.getLogger(__name__)
@ -52,10 +53,10 @@ class GoogleProvider(Provider):
self.hass = hass
self._lang = lang
self.headers = {
'Referer': "http://translate.google.com/",
'User-Agent': ("Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36")
REFERER: "http://translate.google.com/",
USER_AGENT: ("Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36"),
}
self.name = 'Google'

View File

@ -398,24 +398,7 @@ HTTP_BASIC_AUTHENTICATION = 'basic'
HTTP_DIGEST_AUTHENTICATION = 'digest'
HTTP_HEADER_HA_AUTH = 'X-HA-access'
HTTP_HEADER_ACCEPT_ENCODING = 'Accept-Encoding'
HTTP_HEADER_AUTH = 'Authorization'
HTTP_HEADER_USER_AGENT = 'User-Agent'
HTTP_HEADER_CONTENT_TYPE = 'Content-type'
HTTP_HEADER_CONTENT_ENCODING = 'Content-Encoding'
HTTP_HEADER_VARY = 'Vary'
HTTP_HEADER_CONTENT_LENGTH = 'Content-Length'
HTTP_HEADER_CACHE_CONTROL = 'Cache-Control'
HTTP_HEADER_EXPIRES = 'Expires'
HTTP_HEADER_ORIGIN = 'Origin'
HTTP_HEADER_X_REQUESTED_WITH = 'X-Requested-With'
HTTP_HEADER_ACCEPT = 'Accept'
HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN = 'Access-Control-Allow-Origin'
HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS = 'Access-Control-Allow-Headers'
ALLOWED_CORS_HEADERS = [HTTP_HEADER_ORIGIN, HTTP_HEADER_ACCEPT,
HTTP_HEADER_X_REQUESTED_WITH, HTTP_HEADER_CONTENT_TYPE,
HTTP_HEADER_HA_AUTH]
CONTENT_TYPE_JSON = 'application/json'
CONTENT_TYPE_MULTIPART = 'multipart/x-mixed-replace; boundary={}'

View File

@ -15,22 +15,18 @@ import urllib.parse
from typing import Optional
from aiohttp.hdrs import METH_GET, METH_POST, METH_DELETE, CONTENT_TYPE
import requests
from homeassistant import core as ha
from homeassistant.const import (
HTTP_HEADER_HA_AUTH, SERVER_PORT, URL_API,
URL_API_EVENTS, URL_API_EVENTS_EVENT, URL_API_SERVICES, URL_API_CONFIG,
URL_API_SERVICES_SERVICE, URL_API_STATES, URL_API_STATES_ENTITY,
HTTP_HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
URL_API, SERVER_PORT, URL_API_CONFIG, URL_API_EVENTS, URL_API_STATES,
URL_API_SERVICES, CONTENT_TYPE_JSON, HTTP_HEADER_HA_AUTH,
URL_API_EVENTS_EVENT, URL_API_STATES_ENTITY, URL_API_SERVICES_SERVICE)
from homeassistant.exceptions import HomeAssistantError
_LOGGER = logging.getLogger(__name__)
METHOD_GET = 'get'
METHOD_POST = 'post'
METHOD_DELETE = 'delete'
class APIStatus(enum.Enum):
"""Representation of an API status."""
@ -67,9 +63,7 @@ class API(object):
self.base_url += ':{}'.format(port)
self.status = None
self._headers = {
HTTP_HEADER_CONTENT_TYPE: CONTENT_TYPE_JSON,
}
self._headers = {CONTENT_TYPE: CONTENT_TYPE_JSON}
if api_password is not None:
self._headers[HTTP_HEADER_HA_AUTH] = api_password
@ -89,7 +83,7 @@ class API(object):
url = urllib.parse.urljoin(self.base_url, path)
try:
if method == METHOD_GET:
if method == METH_GET:
return requests.get(
url, params=data, timeout=timeout, headers=self._headers)
@ -144,7 +138,7 @@ class JSONEncoder(json.JSONEncoder):
def validate_api(api):
"""Make a call to validate API."""
try:
req = api(METHOD_GET, URL_API)
req = api(METH_GET, URL_API)
if req.status_code == 200:
return APIStatus.OK
@ -161,7 +155,7 @@ def validate_api(api):
def get_event_listeners(api):
"""List of events that is being listened for."""
try:
req = api(METHOD_GET, URL_API_EVENTS)
req = api(METH_GET, URL_API_EVENTS)
return req.json() if req.status_code == 200 else {}
@ -175,7 +169,7 @@ def get_event_listeners(api):
def fire_event(api, event_type, data=None):
"""Fire an event at remote API."""
try:
req = api(METHOD_POST, URL_API_EVENTS_EVENT.format(event_type), data)
req = api(METH_POST, URL_API_EVENTS_EVENT.format(event_type), data)
if req.status_code != 200:
_LOGGER.error("Error firing event: %d - %s",
@ -188,7 +182,7 @@ def fire_event(api, event_type, data=None):
def get_state(api, entity_id):
"""Query given API for state of entity_id."""
try:
req = api(METHOD_GET, URL_API_STATES_ENTITY.format(entity_id))
req = api(METH_GET, URL_API_STATES_ENTITY.format(entity_id))
# req.status_code == 422 if entity does not exist
@ -205,7 +199,7 @@ def get_state(api, entity_id):
def get_states(api):
"""Query given API for all states."""
try:
req = api(METHOD_GET,
req = api(METH_GET,
URL_API_STATES)
return [ha.State.from_dict(item) for
@ -224,7 +218,7 @@ def remove_state(api, entity_id):
Return True if entity is gone (removed/never existed).
"""
try:
req = api(METHOD_DELETE, URL_API_STATES_ENTITY.format(entity_id))
req = api(METH_DELETE, URL_API_STATES_ENTITY.format(entity_id))
if req.status_code in (200, 404):
return True
@ -250,9 +244,7 @@ def set_state(api, entity_id, new_state, attributes=None, force_update=False):
'force_update': force_update}
try:
req = api(METHOD_POST,
URL_API_STATES_ENTITY.format(entity_id),
data)
req = api(METH_POST, URL_API_STATES_ENTITY.format(entity_id), data)
if req.status_code not in (200, 201):
_LOGGER.error("Error changing state: %d - %s",
@ -280,7 +272,7 @@ def get_services(api):
Each dict has a string "domain" and a list of strings "services".
"""
try:
req = api(METHOD_GET, URL_API_SERVICES)
req = api(METH_GET, URL_API_SERVICES)
return req.json() if req.status_code == 200 else {}
@ -294,7 +286,7 @@ def get_services(api):
def call_service(api, domain, service, service_data=None, timeout=5):
"""Call a service at the remote API."""
try:
req = api(METHOD_POST,
req = api(METH_POST,
URL_API_SERVICES_SERVICE.format(domain, service),
service_data, timeout=timeout)
@ -309,7 +301,7 @@ def call_service(api, domain, service, service_data=None, timeout=5):
def get_config(api):
"""Return configuration."""
try:
req = api(METHOD_GET, URL_API_CONFIG)
req = api(METH_GET, URL_API_CONFIG)
if req.status_code != 200:
return {}

View File

@ -1,33 +1,32 @@
"""The tests for the emulated Hue component."""
import asyncio
import json
from unittest.mock import patch
import pytest
from homeassistant import setup, const, core
from aiohttp.hdrs import CONTENT_TYPE
import pytest
from tests.common import get_test_instance_port
from homeassistant import core, const, setup
import homeassistant.components as core_components
from homeassistant.components import (
emulated_hue, http, light, script, media_player, fan
)
from homeassistant.const import STATE_ON, STATE_OFF
from homeassistant.components.emulated_hue.hue_api import (
HUE_API_STATE_ON, HUE_API_STATE_BRI, HueUsernameView,
HueAllLightsStateView, HueOneLightStateView, HueOneLightChangeView)
fan, http, light, script, emulated_hue, media_player)
from homeassistant.components.emulated_hue import Config
from tests.common import get_test_instance_port
from homeassistant.components.emulated_hue.hue_api import (
HUE_API_STATE_ON, HUE_API_STATE_BRI, HueUsernameView, HueOneLightStateView,
HueAllLightsStateView, HueOneLightChangeView)
from homeassistant.const import STATE_ON, STATE_OFF
HTTP_SERVER_PORT = get_test_instance_port()
BRIDGE_SERVER_PORT = get_test_instance_port()
BRIDGE_URL_BASE = 'http://127.0.0.1:{}'.format(BRIDGE_SERVER_PORT) + '{}'
JSON_HEADERS = {const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON}
JSON_HEADERS = {CONTENT_TYPE: const.CONTENT_TYPE_JSON}
@pytest.fixture
def hass_hue(loop, hass):
"""Setup a hass instance for these tests."""
"""Setup a Home Assistant instance for these tests."""
# We need to do this to get access to homeassistant/turn_(on,off)
loop.run_until_complete(
core_components.async_setup(hass, {core.DOMAIN: {}}))

View File

@ -4,6 +4,7 @@ import json
import unittest
from unittest.mock import patch
import requests
from aiohttp.hdrs import CONTENT_TYPE
from homeassistant import setup, const, core
import homeassistant.components as core_components
@ -16,7 +17,7 @@ HTTP_SERVER_PORT = get_test_instance_port()
BRIDGE_SERVER_PORT = get_test_instance_port()
BRIDGE_URL_BASE = 'http://127.0.0.1:{}'.format(BRIDGE_SERVER_PORT) + '{}'
JSON_HEADERS = {const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON}
JSON_HEADERS = {CONTENT_TYPE: const.CONTENT_TYPE_JSON}
def setup_hass_instance(emulated_hue_config):

View File

@ -1,18 +1,18 @@
"""The tests for the Google Actions component."""
"""The tests for the Google Assistant component."""
# pylint: disable=protected-access
import json
import asyncio
import pytest
import json
from homeassistant import setup, const, core
from homeassistant.components import (
http, async_setup, light, cover, media_player, fan, switch, climate
)
from homeassistant.components import google_assistant as ga
from aiohttp.hdrs import CONTENT_TYPE, AUTHORIZATION
import pytest
from tests.common import get_test_instance_port
from . import DEMO_DEVICES
from homeassistant import core, const, setup
from homeassistant.components import (
fan, http, cover, light, switch, climate, async_setup, media_player)
from homeassistant.components import google_assistant as ga
from . import DEMO_DEVICES
API_PASSWORD = "test1234"
SERVER_PORT = get_test_instance_port()
@ -20,7 +20,7 @@ BASE_API_URL = "http://127.0.0.1:{}".format(SERVER_PORT)
HA_HEADERS = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
CONTENT_TYPE: const.CONTENT_TYPE_JSON,
}
AUTHCFG = {
@ -28,12 +28,12 @@ AUTHCFG = {
'client_id': 'helloworld',
'access_token': 'superdoublesecret'
}
AUTH_HEADER = {'Authorization': 'Bearer {}'.format(AUTHCFG['access_token'])}
AUTH_HEADER = {AUTHORIZATION: 'Bearer {}'.format(AUTHCFG['access_token'])}
@pytest.fixture
def assistant_client(loop, hass_fixture, test_client):
"""Create web client for emulated hue api."""
"""Create web client for the Google Assistant API."""
hass = hass_fixture
web_app = hass.http.app
@ -45,7 +45,7 @@ def assistant_client(loop, hass_fixture, test_client):
@pytest.fixture
def hass_fixture(loop, hass):
"""Set up a hass instance for these tests."""
"""Set up a HOme Assistant instance for these tests."""
# We need to do this to get access to homeassistant/turn_(on,off)
loop.run_until_complete(async_setup(hass, {core.DOMAIN: {}}))

View File

@ -1,19 +1,23 @@
"""The tests for the Home Assistant HTTP component."""
import asyncio
from aiohttp.hdrs import (
ORIGIN, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_ALLOW_HEADERS,
ACCESS_CONTROL_REQUEST_METHOD, ACCESS_CONTROL_REQUEST_HEADERS,
CONTENT_TYPE)
import requests
from homeassistant import setup, const
import homeassistant.components.http as http
from tests.common import get_test_instance_port, get_test_home_assistant
from homeassistant import const, setup
import homeassistant.components.http as http
API_PASSWORD = 'test1234'
SERVER_PORT = get_test_instance_port()
HTTP_BASE = '127.0.0.1:{}'.format(SERVER_PORT)
HTTP_BASE_URL = 'http://{}'.format(HTTP_BASE)
HA_HEADERS = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
CONTENT_TYPE: const.CONTENT_TYPE_JSON,
}
CORS_ORIGINS = [HTTP_BASE_URL, HTTP_BASE]
@ -64,9 +68,9 @@ class TestCors:
"""Test cross origin resource sharing with password in url."""
req = requests.get(_url(const.URL_API),
params={'api_password': API_PASSWORD},
headers={const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL})
headers={ORIGIN: HTTP_BASE_URL})
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_origin = ACCESS_CONTROL_ALLOW_ORIGIN
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
@ -75,11 +79,11 @@ class TestCors:
"""Test cross origin resource sharing with password in header."""
headers = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL
ORIGIN: HTTP_BASE_URL
}
req = requests.get(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_origin = ACCESS_CONTROL_ALLOW_ORIGIN
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
@ -91,8 +95,8 @@ class TestCors:
}
req = requests.get(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
allow_origin = ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = ACCESS_CONTROL_ALLOW_HEADERS
assert req.status_code == 200
assert allow_origin not in req.headers
@ -101,14 +105,14 @@ class TestCors:
def test_cors_preflight_allowed(self):
"""Test cross origin resource sharing preflight (OPTIONS) request."""
headers = {
const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL,
'Access-Control-Request-Method': 'GET',
'Access-Control-Request-Headers': 'x-ha-access'
ORIGIN: HTTP_BASE_URL,
ACCESS_CONTROL_REQUEST_METHOD: 'GET',
ACCESS_CONTROL_REQUEST_HEADERS: 'x-ha-access'
}
req = requests.options(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
allow_origin = ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = ACCESS_CONTROL_ALLOW_HEADERS
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
@ -158,7 +162,7 @@ def test_registering_view_while_running(hass, test_client):
@asyncio.coroutine
def test_api_base_url_with_domain(hass):
"""Test setting api url."""
"""Test setting API URL."""
result = yield from setup.async_setup_component(hass, 'http', {
'http': {
'base_url': 'example.com'

View File

@ -2,6 +2,7 @@
import asyncio
import json
from unittest.mock import patch, MagicMock, mock_open
from aiohttp.hdrs import AUTHORIZATION
from homeassistant.components.notify import html5
@ -278,8 +279,8 @@ class TestHtml5Notify(object):
assert json.loads(handle.write.call_args[0][0]) == config
@asyncio.coroutine
def test_unregister_device_view_handle_unknown_subscription(self, loop,
test_client):
def test_unregister_device_view_handle_unknown_subscription(
self, loop, test_client):
"""Test that the HTML unregister view handles unknown subscriptions."""
hass = MagicMock()
@ -322,8 +323,8 @@ class TestHtml5Notify(object):
assert handle.write.call_count == 0
@asyncio.coroutine
def test_unregistering_device_view_handles_json_safe_error(self, loop,
test_client):
def test_unregistering_device_view_handles_json_safe_error(
self, loop, test_client):
"""Test that the HTML unregister view handles JSON write errors."""
hass = MagicMock()
@ -423,8 +424,8 @@ class TestHtml5Notify(object):
assert len(hass.mock_calls) == 3
with patch('pywebpush.WebPusher') as mock_wp:
service.send_message('Hello', target=['device'],
data={'icon': 'beer.png'})
service.send_message(
'Hello', target=['device'], data={'icon': 'beer.png'})
assert len(mock_wp.mock_calls) == 3
@ -453,7 +454,7 @@ class TestHtml5Notify(object):
resp = yield from client.post(PUBLISH_URL, data=json.dumps({
'type': 'push',
}), headers={'Authorization': bearer_token})
}), headers={AUTHORIZATION: bearer_token})
assert resp.status == 200
body = yield from resp.json()

View File

@ -4,6 +4,7 @@ import json
import unittest
import requests
from aiohttp.hdrs import CONTENT_TYPE
from homeassistant.core import callback
from homeassistant import setup, const
@ -18,7 +19,7 @@ INTENTS_API_URL = "{}{}".format(BASE_API_URL, dialogflow.INTENTS_API_ENDPOINT)
HA_HEADERS = {
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
CONTENT_TYPE: const.CONTENT_TYPE_JSON,
}
SESSION_ID = "a9b84cec-46b6-484e-8f31-f65dba03ae6d"