diff --git a/homeassistant/components/mobile_app/const.py b/homeassistant/components/mobile_app/const.py index a2a4e15ee72..ba81a0484cf 100644 --- a/homeassistant/components/mobile_app/const.py +++ b/homeassistant/components/mobile_app/const.py @@ -28,6 +28,7 @@ ATTR_CONFIG_ENTRY_ID = "entry_id" ATTR_DEVICE_NAME = "device_name" ATTR_MANUFACTURER = "manufacturer" ATTR_MODEL = "model" +ATTR_NO_LEGACY_ENCRYPTION = "no_legacy_encryption" ATTR_OS_NAME = "os_name" ATTR_OS_VERSION = "os_version" ATTR_PUSH_WEBSOCKET_CHANNEL = "push_websocket_channel" diff --git a/homeassistant/components/mobile_app/helpers.py b/homeassistant/components/mobile_app/helpers.py index b7d38357a78..545c3511fc9 100644 --- a/homeassistant/components/mobile_app/helpers.py +++ b/homeassistant/components/mobile_app/helpers.py @@ -7,7 +7,7 @@ import json import logging from aiohttp.web import Response, json_response -from nacl.encoding import Base64Encoder +from nacl.encoding import Base64Encoder, HexEncoder, RawEncoder from nacl.secret import SecretBox from homeassistant.const import ATTR_DEVICE_ID, CONTENT_TYPE_JSON @@ -23,6 +23,7 @@ from .const import ( ATTR_DEVICE_NAME, ATTR_MANUFACTURER, ATTR_MODEL, + ATTR_NO_LEGACY_ENCRYPTION, ATTR_OS_VERSION, ATTR_SUPPORTS_ENCRYPTION, CONF_SECRET, @@ -34,7 +35,7 @@ from .const import ( _LOGGER = logging.getLogger(__name__) -def setup_decrypt() -> tuple[int, Callable]: +def setup_decrypt(key_encoder) -> tuple[int, Callable]: """Return decryption function and length of key. Async friendly. @@ -42,12 +43,14 @@ def setup_decrypt() -> tuple[int, Callable]: def decrypt(ciphertext, key): """Decrypt ciphertext using key.""" - return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder) + return SecretBox(key, encoder=key_encoder).decrypt( + ciphertext, encoder=Base64Encoder + ) return (SecretBox.KEY_SIZE, decrypt) -def setup_encrypt() -> tuple[int, Callable]: +def setup_encrypt(key_encoder) -> tuple[int, Callable]: """Return encryption function and length of key. Async friendly. @@ -55,15 +58,22 @@ def setup_encrypt() -> tuple[int, Callable]: def encrypt(ciphertext, key): """Encrypt ciphertext using key.""" - return SecretBox(key).encrypt(ciphertext, encoder=Base64Encoder) + return SecretBox(key, encoder=key_encoder).encrypt( + ciphertext, encoder=Base64Encoder + ) return (SecretBox.KEY_SIZE, encrypt) -def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None: +def _decrypt_payload_helper( + key: str | None, + ciphertext: str, + get_key_bytes: Callable[[str, int], str | bytes], + key_encoder, +) -> dict[str, str] | None: """Decrypt encrypted payload.""" try: - keylen, decrypt = setup_decrypt() + keylen, decrypt = setup_decrypt(key_encoder) except OSError: _LOGGER.warning("Ignoring encrypted payload because libsodium not installed") return None @@ -72,18 +82,33 @@ def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None: _LOGGER.warning("Ignoring encrypted payload because no decryption key known") return None - key_bytes = key.encode("utf-8") - key_bytes = key_bytes[:keylen] - key_bytes = key_bytes.ljust(keylen, b"\0") + key_bytes = get_key_bytes(key, keylen) - try: - msg_bytes = decrypt(ciphertext, key_bytes) - message = json.loads(msg_bytes.decode("utf-8")) - _LOGGER.debug("Successfully decrypted mobile_app payload") - return message - except ValueError: - _LOGGER.warning("Ignoring encrypted payload because unable to decrypt") - return None + msg_bytes = decrypt(ciphertext, key_bytes) + message = json.loads(msg_bytes.decode("utf-8")) + _LOGGER.debug("Successfully decrypted mobile_app payload") + return message + + +def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None: + """Decrypt encrypted payload.""" + + def get_key_bytes(key: str, keylen: int) -> str: + return key + + return _decrypt_payload_helper(key, ciphertext, get_key_bytes, HexEncoder) + + +def _decrypt_payload_legacy(key: str | None, ciphertext: str) -> dict[str, str] | None: + """Decrypt encrypted payload.""" + + def get_key_bytes(key: str, keylen: int) -> bytes: + key_bytes = key.encode("utf-8") + key_bytes = key_bytes[:keylen] + key_bytes = key_bytes.ljust(keylen, b"\0") + return key_bytes + + return _decrypt_payload_helper(key, ciphertext, get_key_bytes, RawEncoder) def registration_context(registration: dict) -> Context: @@ -158,11 +183,16 @@ def webhook_response( data = json.dumps(data, cls=JSONEncoder) if registration[ATTR_SUPPORTS_ENCRYPTION]: - keylen, encrypt = setup_encrypt() + keylen, encrypt = setup_encrypt( + HexEncoder if ATTR_NO_LEGACY_ENCRYPTION in registration else RawEncoder + ) - key = registration[CONF_SECRET].encode("utf-8") - key = key[:keylen] - key = key.ljust(keylen, b"\0") + if ATTR_NO_LEGACY_ENCRYPTION in registration: + key = registration[CONF_SECRET] + else: + key = registration[CONF_SECRET].encode("utf-8") + key = key[:keylen] + key = key.ljust(keylen, b"\0") enc_data = encrypt(data.encode("utf-8"), key).decode("utf-8") data = json.dumps({"encrypted": True, "encrypted_data": enc_data}) diff --git a/homeassistant/components/mobile_app/webhook.py b/homeassistant/components/mobile_app/webhook.py index 221c4eef733..860b8ef7b53 100644 --- a/homeassistant/components/mobile_app/webhook.py +++ b/homeassistant/components/mobile_app/webhook.py @@ -7,6 +7,7 @@ import logging import secrets from aiohttp.web import HTTPBadRequest, Request, Response, json_response +from nacl.exceptions import CryptoError from nacl.secret import SecretBox import voluptuous as vol @@ -58,6 +59,7 @@ from .const import ( ATTR_EVENT_TYPE, ATTR_MANUFACTURER, ATTR_MODEL, + ATTR_NO_LEGACY_ENCRYPTION, ATTR_OS_VERSION, ATTR_SENSOR_ATTRIBUTES, ATTR_SENSOR_DEVICE_CLASS, @@ -97,6 +99,7 @@ from .const import ( ) from .helpers import ( _decrypt_payload, + _decrypt_payload_legacy, empty_okay_response, error_response, registration_context, @@ -191,7 +194,27 @@ async def handle_webhook( if req_data[ATTR_WEBHOOK_ENCRYPTED]: enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA] - webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data) + try: + webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data) + if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data: + data = {**config_entry.data, ATTR_NO_LEGACY_ENCRYPTION: True} + hass.config_entries.async_update_entry(config_entry, data=data) + except CryptoError: + if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data: + try: + webhook_payload = _decrypt_payload_legacy( + config_entry.data[CONF_SECRET], enc_data + ) + except CryptoError: + _LOGGER.warning( + "Ignoring encrypted payload because unable to decrypt" + ) + except ValueError: + _LOGGER.warning("Ignoring invalid encrypted payload") + else: + _LOGGER.warning("Ignoring encrypted payload because unable to decrypt") + except ValueError: + _LOGGER.warning("Ignoring invalid encrypted payload") if webhook_type not in WEBHOOK_COMMANDS: _LOGGER.error( diff --git a/tests/components/mobile_app/test_http_api.py b/tests/components/mobile_app/test_http_api.py index 5d92418bba2..4c4e9b54ccf 100644 --- a/tests/components/mobile_app/test_http_api.py +++ b/tests/components/mobile_app/test_http_api.py @@ -1,4 +1,5 @@ """Tests for the mobile_app HTTP API.""" +from binascii import unhexlify from http import HTTPStatus import json from unittest.mock import patch @@ -75,6 +76,49 @@ async def test_registration_encryption(hass, hass_client): assert resp.status == HTTPStatus.CREATED register_json = await resp.json() + key = unhexlify(register_json[CONF_SECRET]) + + payload = json.dumps(RENDER_TEMPLATE["data"]).encode("utf-8") + + data = SecretBox(key).encrypt(payload, encoder=Base64Encoder).decode("utf-8") + + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + + resp = await api_client.post( + f"/api/webhook/{register_json[CONF_WEBHOOK_ID]}", json=container + ) + + assert resp.status == HTTPStatus.OK + + webhook_json = await resp.json() + assert "encrypted_data" in webhook_json + + decrypted_data = SecretBox(key).decrypt( + webhook_json["encrypted_data"], encoder=Base64Encoder + ) + decrypted_data = decrypted_data.decode("utf-8") + + assert json.loads(decrypted_data) == {"one": "Hello world"} + + +async def test_registration_encryption_legacy(hass, hass_client): + """Test that registrations happen.""" + try: + from nacl.encoding import Base64Encoder + from nacl.secret import SecretBox + except (ImportError, OSError): + pytest.skip("libnacl/libsodium is not installed") + return + + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + api_client = await hass_client() + + resp = await api_client.post("/api/mobile_app/registrations", json=REGISTER) + + assert resp.status == HTTPStatus.CREATED + register_json = await resp.json() + keylen = SecretBox.KEY_SIZE key = register_json[CONF_SECRET].encode("utf-8") key = key[:keylen] diff --git a/tests/components/mobile_app/test_webhook.py b/tests/components/mobile_app/test_webhook.py index 48b61988de2..5f220cf0ebe 100644 --- a/tests/components/mobile_app/test_webhook.py +++ b/tests/components/mobile_app/test_webhook.py @@ -1,4 +1,5 @@ """Webhook tests for mobile_app.""" +from binascii import unhexlify from http import HTTPStatus from unittest.mock import patch @@ -22,7 +23,29 @@ from .const import CALL_SERVICE, FIRE_EVENT, REGISTER_CLEARTEXT, RENDER_TEMPLATE from tests.common import async_mock_service -def encrypt_payload(secret_key, payload): +def encrypt_payload(secret_key, payload, encode_json=True): + """Return a encrypted payload given a key and dictionary of data.""" + try: + from nacl.encoding import Base64Encoder + from nacl.secret import SecretBox + except (ImportError, OSError): + pytest.skip("libnacl/libsodium is not installed") + return + + import json + + prepped_key = unhexlify(secret_key) + + if encode_json: + payload = json.dumps(payload) + payload = payload.encode("utf-8") + + return ( + SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8") + ) + + +def encrypt_payload_legacy(secret_key, payload, encode_json=True): """Return a encrypted payload given a key and dictionary of data.""" try: from nacl.encoding import Base64Encoder @@ -38,7 +61,9 @@ def encrypt_payload(secret_key, payload): prepped_key = prepped_key[:keylen] prepped_key = prepped_key.ljust(keylen, b"\0") - payload = json.dumps(payload).encode("utf-8") + if encode_json: + payload = json.dumps(payload) + payload = payload.encode("utf-8") return ( SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8") @@ -56,6 +81,27 @@ def decrypt_payload(secret_key, encrypted_data): import json + prepped_key = unhexlify(secret_key) + + decrypted_data = SecretBox(prepped_key).decrypt( + encrypted_data, encoder=Base64Encoder + ) + decrypted_data = decrypted_data.decode("utf-8") + + return json.loads(decrypted_data) + + +def decrypt_payload_legacy(secret_key, encrypted_data): + """Return a decrypted payload given a key and a string of encrypted data.""" + try: + from nacl.encoding import Base64Encoder + from nacl.secret import SecretBox + except (ImportError, OSError): + pytest.skip("libnacl/libsodium is not installed") + return + + import json + keylen = SecretBox.KEY_SIZE prepped_key = secret_key.encode("utf-8") prepped_key = prepped_key[:keylen] @@ -273,6 +319,181 @@ async def test_webhook_handle_decryption(webhook_client, create_registrations): assert decrypted_data == {"one": "Hello world"} +async def test_webhook_handle_decryption_legacy(webhook_client, create_registrations): + """Test that we can encrypt/decrypt properly.""" + key = create_registrations[0]["secret"] + data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"]) + + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + + webhook_json = await resp.json() + assert "encrypted_data" in webhook_json + + decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"]) + + assert decrypted_data == {"one": "Hello world"} + + +async def test_webhook_handle_decryption_fail( + webhook_client, create_registrations, caplog +): + """Test that we can encrypt/decrypt properly.""" + key = create_registrations[0]["secret"] + + # Send valid data + data = encrypt_payload(key, RENDER_TEMPLATE["data"]) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"]) + assert decrypted_data == {"one": "Hello world"} + caplog.clear() + + # Send invalid JSON data + data = encrypt_payload(key, "{not_valid", encode_json=False) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + assert decrypt_payload(key, webhook_json["encrypted_data"]) == {} + assert "Ignoring invalid encrypted payload" in caplog.text + caplog.clear() + + # Break the key, and send JSON data + data = encrypt_payload(key[::-1], RENDER_TEMPLATE["data"]) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + assert decrypt_payload(key, webhook_json["encrypted_data"]) == {} + assert "Ignoring encrypted payload because unable to decrypt" in caplog.text + + +async def test_webhook_handle_decryption_legacy_fail( + webhook_client, create_registrations, caplog +): + """Test that we can encrypt/decrypt properly.""" + key = create_registrations[0]["secret"] + + # Send valid data using legacy method + data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"]) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"]) + assert decrypted_data == {"one": "Hello world"} + caplog.clear() + + # Send invalid JSON data + data = encrypt_payload_legacy(key, "{not_valid", encode_json=False) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {} + assert "Ignoring invalid encrypted payload" in caplog.text + caplog.clear() + + # Break the key, and send JSON data + data = encrypt_payload_legacy(key[::-1], RENDER_TEMPLATE["data"]) + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + webhook_json = await resp.json() + assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {} + assert "Ignoring encrypted payload because unable to decrypt" in caplog.text + + +async def test_webhook_handle_decryption_legacy_upgrade( + webhook_client, create_registrations +): + """Test that we can encrypt/decrypt properly.""" + key = create_registrations[0]["secret"] + + # Send using legacy method + data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"]) + + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + + webhook_json = await resp.json() + assert "encrypted_data" in webhook_json + + decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"]) + + assert decrypted_data == {"one": "Hello world"} + + # Send using new method + data = encrypt_payload(key, RENDER_TEMPLATE["data"]) + + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + + webhook_json = await resp.json() + assert "encrypted_data" in webhook_json + + decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"]) + + assert decrypted_data == {"one": "Hello world"} + + # Send using legacy method - no longer possible + data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"]) + + container = {"type": "render_template", "encrypted": True, "encrypted_data": data} + + resp = await webhook_client.post( + "/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container + ) + + assert resp.status == HTTPStatus.OK + + webhook_json = await resp.json() + assert "encrypted_data" in webhook_json + + # The response should be empty, encrypted with the new method + with pytest.raises(Exception): + decrypt_payload_legacy(key, webhook_json["encrypted_data"]) + decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"]) + + assert decrypted_data == {} + + async def test_webhook_requires_encryption(webhook_client, create_registrations): """Test that encrypted registrations only accept encrypted data.""" resp = await webhook_client.post(