Enforce a Google Photos upload action file size limit (#126437)
* Set a Google Photos upload file size limit * Update homeassistant/components/google_photos/services.py Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Replace strings with constants --------- Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>pull/126456/head^2
parent
9e37c14179
commit
b107b2c7bf
|
@ -32,6 +32,7 @@ UPLOAD_SERVICE_SCHEMA = vol.Schema(
|
||||||
vol.Required(CONF_FILENAME): vol.All(cv.ensure_list, [cv.string]),
|
vol.Required(CONF_FILENAME): vol.All(cv.ensure_list, [cv.string]),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
CONTENT_SIZE_LIMIT = 20 * 1024 * 1024
|
||||||
|
|
||||||
|
|
||||||
def _read_file_contents(
|
def _read_file_contents(
|
||||||
|
@ -53,6 +54,16 @@ def _read_file_contents(
|
||||||
translation_key="filename_does_not_exist",
|
translation_key="filename_does_not_exist",
|
||||||
translation_placeholders={"filename": filename},
|
translation_placeholders={"filename": filename},
|
||||||
)
|
)
|
||||||
|
if filename_path.stat().st_size > CONTENT_SIZE_LIMIT:
|
||||||
|
raise HomeAssistantError(
|
||||||
|
translation_domain=DOMAIN,
|
||||||
|
translation_key="file_too_large",
|
||||||
|
translation_placeholders={
|
||||||
|
"filename": filename,
|
||||||
|
"size": str(filename_path.stat().st_size),
|
||||||
|
"limit": str(CONTENT_SIZE_LIMIT),
|
||||||
|
},
|
||||||
|
)
|
||||||
mime_type, _ = mimetypes.guess_type(filename)
|
mime_type, _ = mimetypes.guess_type(filename)
|
||||||
if mime_type is None or not (mime_type.startswith(("image", "video"))):
|
if mime_type is None or not (mime_type.startswith(("image", "video"))):
|
||||||
raise HomeAssistantError(
|
raise HomeAssistantError(
|
||||||
|
|
|
@ -40,6 +40,9 @@
|
||||||
"filename_does_not_exist": {
|
"filename_does_not_exist": {
|
||||||
"message": "`{filename}` does not exist"
|
"message": "`{filename}` does not exist"
|
||||||
},
|
},
|
||||||
|
"file_too_large": {
|
||||||
|
"message": "`{filename}` is too large ({size} > {limit})"
|
||||||
|
},
|
||||||
"filename_is_not_image": {
|
"filename_is_not_image": {
|
||||||
"message": "`{filename}` is not an image"
|
"message": "`{filename}` is not an image"
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
"""Tests for Google Photos."""
|
"""Tests for Google Photos."""
|
||||||
|
|
||||||
|
from collections.abc import Generator
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import re
|
||||||
from unittest.mock import Mock, patch
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
from google_photos_library_api.exceptions import GooglePhotosApiError
|
from google_photos_library_api.exceptions import GooglePhotosApiError
|
||||||
|
@ -12,12 +15,61 @@ from google_photos_library_api.model import (
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components.google_photos.const import DOMAIN, READ_SCOPE
|
from homeassistant.components.google_photos.const import DOMAIN, READ_SCOPE
|
||||||
|
from homeassistant.components.google_photos.services import (
|
||||||
|
CONF_CONFIG_ENTRY_ID,
|
||||||
|
UPLOAD_SERVICE,
|
||||||
|
)
|
||||||
from homeassistant.config_entries import ConfigEntryState
|
from homeassistant.config_entries import ConfigEntryState
|
||||||
|
from homeassistant.const import CONF_FILENAME
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.exceptions import HomeAssistantError
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
|
|
||||||
from tests.common import MockConfigEntry
|
from tests.common import MockConfigEntry
|
||||||
|
|
||||||
|
TEST_FILENAME = "doorbell_snapshot.jpg"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockUploadFile:
|
||||||
|
"""Dataclass used to configure the test with a fake file behavior."""
|
||||||
|
|
||||||
|
content: bytes = b"image bytes"
|
||||||
|
exists: bool = True
|
||||||
|
is_allowed_path: bool = True
|
||||||
|
size: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="upload_file")
|
||||||
|
def upload_file_fixture() -> None:
|
||||||
|
"""Fixture to set up test configuration with a fake file."""
|
||||||
|
return MockUploadFile()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def mock_upload_file(
|
||||||
|
hass: HomeAssistant, upload_file: MockUploadFile
|
||||||
|
) -> Generator[None]:
|
||||||
|
"""Fixture that mocks out the file calls using the FakeFile fixture."""
|
||||||
|
with (
|
||||||
|
patch(
|
||||||
|
"homeassistant.components.google_photos.services.Path.read_bytes",
|
||||||
|
return_value=upload_file.content,
|
||||||
|
),
|
||||||
|
patch(
|
||||||
|
"homeassistant.components.google_photos.services.Path.exists",
|
||||||
|
return_value=upload_file.exists,
|
||||||
|
),
|
||||||
|
patch.object(
|
||||||
|
hass.config, "is_allowed_path", return_value=upload_file.is_allowed_path
|
||||||
|
),
|
||||||
|
patch("pathlib.Path.stat") as mock_stat,
|
||||||
|
):
|
||||||
|
mock_stat.return_value = Mock()
|
||||||
|
mock_stat.return_value.st_size = (
|
||||||
|
upload_file.size if upload_file.size else len(upload_file.content)
|
||||||
|
)
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_integration")
|
@pytest.mark.usefixtures("setup_integration")
|
||||||
async def test_upload_service(
|
async def test_upload_service(
|
||||||
|
@ -38,27 +90,16 @@ async def test_upload_service(
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
with (
|
response = await hass.services.async_call(
|
||||||
patch(
|
DOMAIN,
|
||||||
"homeassistant.components.google_photos.services.Path.read_bytes",
|
UPLOAD_SERVICE,
|
||||||
return_value=b"image bytes",
|
{
|
||||||
),
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
patch(
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
"homeassistant.components.google_photos.services.Path.exists",
|
},
|
||||||
return_value=True,
|
blocking=True,
|
||||||
),
|
return_response=True,
|
||||||
patch.object(hass.config, "is_allowed_path", return_value=True),
|
)
|
||||||
):
|
|
||||||
response = await hass.services.async_call(
|
|
||||||
DOMAIN,
|
|
||||||
"upload",
|
|
||||||
{
|
|
||||||
"config_entry_id": config_entry.entry_id,
|
|
||||||
"filename": "doorbell_snapshot.jpg",
|
|
||||||
},
|
|
||||||
blocking=True,
|
|
||||||
return_response=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response == {"media_items": [{"media_item_id": "new-media-item-id-1"}]}
|
assert response == {"media_items": [{"media_item_id": "new-media-item-id-1"}]}
|
||||||
|
|
||||||
|
@ -72,10 +113,10 @@ async def test_upload_service_config_entry_not_found(
|
||||||
with pytest.raises(HomeAssistantError, match="not found in registry"):
|
with pytest.raises(HomeAssistantError, match="not found in registry"):
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": "invalid-config-entry-id",
|
CONF_CONFIG_ENTRY_ID: "invalid-config-entry-id",
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -96,10 +137,10 @@ async def test_config_entry_not_loaded(
|
||||||
with pytest.raises(HomeAssistantError, match="not found in registry"):
|
with pytest.raises(HomeAssistantError, match="not found in registry"):
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.unique_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.unique_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -107,21 +148,21 @@ async def test_config_entry_not_loaded(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_integration")
|
@pytest.mark.usefixtures("setup_integration")
|
||||||
|
@pytest.mark.parametrize("upload_file", [MockUploadFile(is_allowed_path=False)])
|
||||||
async def test_path_is_not_allowed(
|
async def test_path_is_not_allowed(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
config_entry: MockConfigEntry,
|
config_entry: MockConfigEntry,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test upload service call with a filename path that is not allowed."""
|
"""Test upload service call with a filename path that is not allowed."""
|
||||||
with (
|
with (
|
||||||
patch.object(hass.config, "is_allowed_path", return_value=False),
|
|
||||||
pytest.raises(HomeAssistantError, match="no access to path"),
|
pytest.raises(HomeAssistantError, match="no access to path"),
|
||||||
):
|
):
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.entry_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -129,22 +170,19 @@ async def test_path_is_not_allowed(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_integration")
|
@pytest.mark.usefixtures("setup_integration")
|
||||||
|
@pytest.mark.parametrize("upload_file", [MockUploadFile(exists=False)])
|
||||||
async def test_filename_does_not_exist(
|
async def test_filename_does_not_exist(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
config_entry: MockConfigEntry,
|
config_entry: MockConfigEntry,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test upload service call with a filename path that does not exist."""
|
"""Test upload service call with a filename path that does not exist."""
|
||||||
with (
|
with pytest.raises(HomeAssistantError, match="does not exist"):
|
||||||
patch.object(hass.config, "is_allowed_path", return_value=True),
|
|
||||||
patch("pathlib.Path.exists", return_value=False),
|
|
||||||
pytest.raises(HomeAssistantError, match="does not exist"),
|
|
||||||
):
|
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.entry_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -161,24 +199,13 @@ async def test_upload_service_upload_content_failure(
|
||||||
|
|
||||||
mock_api.upload_content.side_effect = GooglePhotosApiError()
|
mock_api.upload_content.side_effect = GooglePhotosApiError()
|
||||||
|
|
||||||
with (
|
with pytest.raises(HomeAssistantError, match="Failed to upload content"):
|
||||||
patch(
|
|
||||||
"homeassistant.components.google_photos.services.Path.read_bytes",
|
|
||||||
return_value=b"image bytes",
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"homeassistant.components.google_photos.services.Path.exists",
|
|
||||||
return_value=True,
|
|
||||||
),
|
|
||||||
patch.object(hass.config, "is_allowed_path", return_value=True),
|
|
||||||
pytest.raises(HomeAssistantError, match="Failed to upload content"),
|
|
||||||
):
|
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.entry_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -195,26 +222,15 @@ async def test_upload_service_fails_create(
|
||||||
|
|
||||||
mock_api.create_media_items.side_effect = GooglePhotosApiError()
|
mock_api.create_media_items.side_effect = GooglePhotosApiError()
|
||||||
|
|
||||||
with (
|
with pytest.raises(
|
||||||
patch(
|
HomeAssistantError, match="Google Photos API responded with error"
|
||||||
"homeassistant.components.google_photos.services.Path.read_bytes",
|
|
||||||
return_value=b"image bytes",
|
|
||||||
),
|
|
||||||
patch(
|
|
||||||
"homeassistant.components.google_photos.services.Path.exists",
|
|
||||||
return_value=True,
|
|
||||||
),
|
|
||||||
patch.object(hass.config, "is_allowed_path", return_value=True),
|
|
||||||
pytest.raises(
|
|
||||||
HomeAssistantError, match="Google Photos API responded with error"
|
|
||||||
),
|
|
||||||
):
|
):
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.entry_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
@ -237,10 +253,33 @@ async def test_upload_service_no_scope(
|
||||||
with pytest.raises(HomeAssistantError, match="not granted permission"):
|
with pytest.raises(HomeAssistantError, match="not granted permission"):
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
"upload",
|
UPLOAD_SERVICE,
|
||||||
{
|
{
|
||||||
"config_entry_id": config_entry.entry_id,
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
"filename": "doorbell_snapshot.jpg",
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
|
},
|
||||||
|
blocking=True,
|
||||||
|
return_response=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_integration")
|
||||||
|
@pytest.mark.parametrize("upload_file", [MockUploadFile(size=26 * 1024 * 1024)])
|
||||||
|
async def test_upload_size_limit(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
config_entry: MockConfigEntry,
|
||||||
|
) -> None:
|
||||||
|
"""Test upload service call with a filename path that does not exist."""
|
||||||
|
with pytest.raises(
|
||||||
|
HomeAssistantError,
|
||||||
|
match=re.escape(f"`{TEST_FILENAME}` is too large (27262976 > 20971520)"),
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN,
|
||||||
|
UPLOAD_SERVICE,
|
||||||
|
{
|
||||||
|
CONF_CONFIG_ENTRY_ID: config_entry.entry_id,
|
||||||
|
CONF_FILENAME: TEST_FILENAME,
|
||||||
},
|
},
|
||||||
blocking=True,
|
blocking=True,
|
||||||
return_response=True,
|
return_response=True,
|
||||||
|
|
Loading…
Reference in New Issue