Add new webhook action to allow enabling encryption in an exis… (#31743)

* Add new webhook action to allow enabling encryption in an existing registration

* Harden tests

* Make requested fixes
pull/31073/head
Robbie Trencheny 2020-02-11 23:56:22 -08:00 committed by GitHub
parent f5be9ef7fb
commit 0700d38d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 25 deletions

View File

@ -52,6 +52,8 @@ ATTR_WEBHOOK_ENCRYPTED = "encrypted"
ATTR_WEBHOOK_ENCRYPTED_DATA = "encrypted_data"
ATTR_WEBHOOK_TYPE = "type"
ERR_ENCRYPTION_ALREADY_ENABLED = "encryption_already_enabled"
ERR_ENCRYPTION_NOT_AVAILABLE = "encryption_not_available"
ERR_ENCRYPTION_REQUIRED = "encryption_required"
ERR_SENSOR_NOT_REGISTERED = "not_registered"
ERR_SENSOR_DUPLICATE_UNIQUE_ID = "duplicate_unique_id"

View File

@ -1,8 +1,10 @@
"""Webhook handlers for mobile_app."""
from functools import wraps
import logging
import secrets
from aiohttp.web import HTTPBadRequest, Request, Response
from aiohttp.web import HTTPBadRequest, Request, Response, json_response
from nacl.secret import SecretBox
import voluptuous as vol
from homeassistant.components.binary_sensor import (
@ -71,6 +73,8 @@ from .const import (
DATA_DELETED_IDS,
DATA_STORE,
DOMAIN,
ERR_ENCRYPTION_ALREADY_ENABLED,
ERR_ENCRYPTION_NOT_AVAILABLE,
ERR_ENCRYPTION_REQUIRED,
ERR_SENSOR_DUPLICATE_UNIQUE_ID,
ERR_SENSOR_NOT_REGISTERED,
@ -84,6 +88,7 @@ from .helpers import (
registration_context,
safe_registration,
savable_state,
supports_encryption,
webhook_response,
)
@ -307,6 +312,34 @@ async def webhook_update_registration(hass, config_entry, data):
)
@WEBHOOK_COMMANDS.register("enable_encryption")
async def webhook_enable_encryption(hass, config_entry, data):
"""Handle a encryption enable webhook."""
if config_entry.data[ATTR_SUPPORTS_ENCRYPTION]:
_LOGGER.warning(
"Refusing to enable encryption for %s because it is already enabled!",
config_entry.data[ATTR_DEVICE_NAME],
)
return error_response(
ERR_ENCRYPTION_ALREADY_ENABLED, "Encryption already enabled"
)
if not supports_encryption():
_LOGGER.warning(
"Unable to enable encryption for %s because libsodium is unavailable!",
config_entry.data[ATTR_DEVICE_NAME],
)
return error_response(ERR_ENCRYPTION_NOT_AVAILABLE, "Encryption is unavailable")
secret = secrets.token_hex(SecretBox.KEY_SIZE)
data = {**config_entry.data, ATTR_SUPPORTS_ENCRYPTION: True, CONF_SECRET: secret}
hass.config_entries.async_update_entry(config_entry, data=data)
return json_response({"secret": secret})
@WEBHOOK_COMMANDS.register("register_sensor")
@validate_schema(
{

View File

@ -1,5 +1,4 @@
"""Webhook tests for mobile_app."""
import logging
import pytest
@ -17,6 +16,53 @@ from tests.common import async_mock_service
_LOGGER = logging.getLogger(__name__)
def encrypt_payload(secret_key, payload):
"""Return a encrypted payload given a key and dictionary of data."""
try:
from nacl.secret import SecretBox
from nacl.encoding import Base64Encoder
except (ImportError, OSError):
pytest.skip("libnacl/libsodium is not installed")
return
import json
keylen = SecretBox.KEY_SIZE
prepped_key = secret_key.encode("utf-8")
prepped_key = prepped_key[:keylen]
prepped_key = prepped_key.ljust(keylen, b"\0")
payload = json.dumps(payload).encode("utf-8")
return (
SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
)
def decrypt_payload(secret_key, encrypted_data):
"""Return a decrypted payload given a key and a string of encrypted data."""
try:
from nacl.secret import SecretBox
from nacl.encoding import Base64Encoder
except (ImportError, OSError):
pytest.skip("libnacl/libsodium is not installed")
return
import json
keylen = SecretBox.KEY_SIZE
prepped_key = secret_key.encode("utf-8")
prepped_key = prepped_key[:keylen]
prepped_key = prepped_key.ljust(keylen, b"\0")
decrypted_data = SecretBox(prepped_key).decrypt(
encrypted_data, encoder=Base64Encoder
)
decrypted_data = decrypted_data.decode("utf-8")
return json.loads(decrypted_data)
async def test_webhook_handle_render_template(create_registrations, webhook_client):
"""Test that we render templates properly."""
resp = await webhook_client.post(
@ -166,23 +212,8 @@ async def test_webhook_returns_error_incorrect_json(
async def test_webhook_handle_decryption(webhook_client, create_registrations):
"""Test that we can encrypt/decrypt properly."""
try:
from nacl.secret import SecretBox
from nacl.encoding import Base64Encoder
except (ImportError, OSError):
pytest.skip("libnacl/libsodium is not installed")
return
import json
keylen = SecretBox.KEY_SIZE
key = create_registrations[0]["secret"].encode("utf-8")
key = key[:keylen]
key = key.ljust(keylen, b"\0")
payload = json.dumps(RENDER_TEMPLATE["data"]).encode("utf-8")
data = SecretBox(key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
key = create_registrations[0]["secret"]
data = encrypt_payload(key, RENDER_TEMPLATE["data"])
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
@ -195,12 +226,9 @@ async def test_webhook_handle_decryption(webhook_client, create_registrations):
webhook_json = await resp.json()
assert "encrypted_data" in webhook_json
decrypted_data = SecretBox(key).decrypt(
webhook_json["encrypted_data"], encoder=Base64Encoder
)
decrypted_data = decrypted_data.decode("utf-8")
decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"])
assert json.loads(decrypted_data) == {"one": "Hello world"}
assert decrypted_data == {"one": "Hello world"}
async def test_webhook_requires_encryption(webhook_client, create_registrations):
@ -219,7 +247,7 @@ async def test_webhook_requires_encryption(webhook_client, create_registrations)
async def test_webhook_update_location(hass, webhook_client, create_registrations):
"""Test that encrypted registrations only accept encrypted data."""
"""Test that location can be updated."""
resp = await webhook_client.post(
"/api/webhook/{}".format(create_registrations[1]["webhook_id"]),
json={
@ -236,3 +264,52 @@ async def test_webhook_update_location(hass, webhook_client, create_registration
assert state.attributes["longitude"] == 2.0
assert state.attributes["gps_accuracy"] == 10
assert state.attributes["altitude"] == -10
async def test_webhook_enable_encryption(hass, webhook_client, create_registrations):
"""Test that encryption can be added to a reg initially created without."""
webhook_id = create_registrations[1]["webhook_id"]
enable_enc_resp = await webhook_client.post(
"/api/webhook/{}".format(webhook_id), json={"type": "enable_encryption"},
)
assert enable_enc_resp.status == 200
enable_enc_json = await enable_enc_resp.json()
assert len(enable_enc_json) == 1
assert CONF_SECRET in enable_enc_json
key = enable_enc_json["secret"]
enc_required_resp = await webhook_client.post(
"/api/webhook/{}".format(webhook_id), json=RENDER_TEMPLATE,
)
assert enc_required_resp.status == 400
enc_required_json = await enc_required_resp.json()
assert "error" in enc_required_json
assert enc_required_json["success"] is False
assert enc_required_json["error"]["code"] == "encryption_required"
enc_data = encrypt_payload(key, RENDER_TEMPLATE["data"])
container = {
"type": "render_template",
"encrypted": True,
"encrypted_data": enc_data,
}
enc_resp = await webhook_client.post(
"/api/webhook/{}".format(webhook_id), json=container
)
assert enc_resp.status == 200
enc_json = await enc_resp.json()
assert "encrypted_data" in enc_json
decrypted_data = decrypt_payload(key, enc_json["encrypted_data"])
assert decrypted_data == {"one": "Hello world"}