core/homeassistant/components/mobile_app/helpers.py

150 lines
4.8 KiB
Python

"""Helpers for mobile_app."""
import logging
import json
from typing import Callable, Dict, Tuple
from aiohttp.web import json_response, Response
from homeassistant.core import Context
from homeassistant.helpers.typing import HomeAssistantType
from .const import (ATTR_APP_DATA, ATTR_APP_ID, ATTR_APP_NAME,
ATTR_APP_VERSION, ATTR_DEVICE_NAME, ATTR_MANUFACTURER,
ATTR_MODEL, ATTR_OS_VERSION, ATTR_SUPPORTS_ENCRYPTION,
CONF_SECRET, CONF_USER_ID, DATA_BINARY_SENSOR,
DATA_DELETED_IDS, DATA_SENSOR, DOMAIN)
_LOGGER = logging.getLogger(__name__)
def setup_decrypt() -> Tuple[int, Callable]:
"""Return decryption function and length of key.
Async friendly.
"""
from nacl.secret import SecretBox
from nacl.encoding import Base64Encoder
def decrypt(ciphertext, key):
"""Decrypt ciphertext using key."""
return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder)
return (SecretBox.KEY_SIZE, decrypt)
def setup_encrypt() -> Tuple[int, Callable]:
"""Return encryption function and length of key.
Async friendly.
"""
from nacl.secret import SecretBox
from nacl.encoding import Base64Encoder
def encrypt(ciphertext, key):
"""Encrypt ciphertext using key."""
return SecretBox(key).encrypt(ciphertext, encoder=Base64Encoder)
return (SecretBox.KEY_SIZE, encrypt)
def _decrypt_payload(key: str, ciphertext: str) -> Dict[str, str]:
"""Decrypt encrypted payload."""
try:
keylen, decrypt = setup_decrypt()
except OSError:
_LOGGER.warning(
"Ignoring encrypted payload because libsodium not installed")
return None
if key is None:
_LOGGER.warning(
"Ignoring encrypted payload because no decryption key known")
return None
key = key.encode("utf-8")
key = key[:keylen]
key = key.ljust(keylen, b'\0')
try:
message = decrypt(ciphertext, key)
message = json.loads(message.decode("utf-8"))
_LOGGER.debug("Successfully decrypted mobile_app payload")
return message
except ValueError:
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
return None
def registration_context(registration: Dict) -> Context:
"""Generate a context from a request."""
return Context(user_id=registration[CONF_USER_ID])
def empty_okay_response(headers: Dict = None, status: int = 200) -> Response:
"""Return a Response with empty JSON object and a 200."""
return Response(body='{}', status=status, content_type='application/json',
headers=headers)
def error_response(code: str, message: str, status: int = 400,
headers: dict = None) -> Response:
"""Return an error Response."""
return json_response({
'success': False,
'error': {
'code': code,
'message': message
}
}, status=status, headers=headers)
def supports_encryption() -> bool:
"""Test if we support encryption."""
try:
import nacl # noqa pylint: disable=unused-import
return True
except OSError:
return False
def safe_registration(registration: Dict) -> Dict:
"""Return a registration without sensitive values."""
# Sensitive values: webhook_id, secret, cloudhook_url
return {
ATTR_APP_DATA: registration[ATTR_APP_DATA],
ATTR_APP_ID: registration[ATTR_APP_ID],
ATTR_APP_NAME: registration[ATTR_APP_NAME],
ATTR_APP_VERSION: registration[ATTR_APP_VERSION],
ATTR_DEVICE_NAME: registration[ATTR_DEVICE_NAME],
ATTR_MANUFACTURER: registration[ATTR_MANUFACTURER],
ATTR_MODEL: registration[ATTR_MODEL],
ATTR_OS_VERSION: registration[ATTR_OS_VERSION],
ATTR_SUPPORTS_ENCRYPTION: registration[ATTR_SUPPORTS_ENCRYPTION],
}
def savable_state(hass: HomeAssistantType) -> Dict:
"""Return a clean object containing things that should be saved."""
return {
DATA_BINARY_SENSOR: hass.data[DOMAIN][DATA_BINARY_SENSOR],
DATA_DELETED_IDS: hass.data[DOMAIN][DATA_DELETED_IDS],
DATA_SENSOR: hass.data[DOMAIN][DATA_SENSOR],
}
def webhook_response(data, *, registration: Dict, status: int = 200,
headers: Dict = None) -> Response:
"""Return a encrypted response if registration supports it."""
data = json.dumps(data)
if CONF_SECRET in registration:
keylen, encrypt = setup_encrypt()
key = registration[CONF_SECRET].encode("utf-8")
key = key[:keylen]
key = key.ljust(keylen, b'\0')
enc_data = encrypt(data.encode("utf-8"), key).decode("utf-8")
data = json.dumps({'encrypted': True, 'encrypted_data': enc_data})
return Response(text=data, status=status, content_type='application/json',
headers=headers)