Fix loading filesize coordinator from wrong place (#99547)

* Fix loading filesize coordinator from wrong place

* aboslute in executor

* combine into executor
pull/99236/head
G Johansson 2023-09-04 07:33:46 +02:00 committed by GitHub
parent 735b5cf1a0
commit 9d6cab8fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 45 deletions

View File

@ -9,10 +9,11 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from .const import PLATFORMS
from .coordinator import FileSizeCoordinator
def _check_path(hass: HomeAssistant, path: str) -> None:
"""Check if path is valid and allowed."""
def _get_full_path(hass: HomeAssistant, path: str) -> str:
"""Check if path is valid, allowed and return full path."""
get_path = pathlib.Path(path)
if not get_path.exists() or not get_path.is_file():
raise ConfigEntryNotReady(f"Can not access file {path}")
@ -20,10 +21,17 @@ def _check_path(hass: HomeAssistant, path: str) -> None:
if not hass.config.is_allowed_path(path):
raise ConfigEntryNotReady(f"Filepath {path} is not valid or allowed")
return str(get_path.absolute())
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up from a config entry."""
await hass.async_add_executor_job(_check_path, hass, entry.data[CONF_FILE_PATH])
full_path = await hass.async_add_executor_job(
_get_full_path, hass, entry.data[CONF_FILE_PATH]
)
coordinator = FileSizeCoordinator(hass, full_path)
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True

View File

@ -0,0 +1,48 @@
"""Coordinator for monitoring the size of a file."""
from __future__ import annotations
from datetime import datetime, timedelta
import logging
import os
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
import homeassistant.util.dt as dt_util
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class FileSizeCoordinator(DataUpdateCoordinator[dict[str, int | float | datetime]]):
"""Filesize coordinator."""
def __init__(self, hass: HomeAssistant, path: str) -> None:
"""Initialize filesize coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=60),
always_update=False,
)
self._path = path
async def _async_update_data(self) -> dict[str, float | int | datetime]:
"""Fetch file information."""
try:
statinfo = await self.hass.async_add_executor_job(os.stat, self._path)
except OSError as error:
raise UpdateFailed(f"Can not retrieve file statistics {error}") from error
size = statinfo.st_size
last_updated = dt_util.utc_from_timestamp(statinfo.st_mtime)
_LOGGER.debug("size %s, last updated %s", size, last_updated)
data: dict[str, int | float | datetime] = {
"file": round(size / 1e6, 2),
"bytes": size,
"last_updated": last_updated,
}
return data

View File

@ -1,9 +1,8 @@
"""Sensor for monitoring the size of a file."""
from __future__ import annotations
from datetime import datetime, timedelta
from datetime import datetime
import logging
import os
import pathlib
from homeassistant.components.sensor import (
@ -17,14 +16,10 @@ from homeassistant.const import CONF_FILE_PATH, EntityCategory, UnitOfInformatio
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
import homeassistant.util.dt as dt_util
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import FileSizeCoordinator
_LOGGER = logging.getLogger(__name__)
@ -80,40 +75,6 @@ async def async_setup_entry(
)
class FileSizeCoordinator(DataUpdateCoordinator):
"""Filesize coordinator."""
def __init__(self, hass: HomeAssistant, path: str) -> None:
"""Initialize filesize coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=60),
always_update=False,
)
self._path = path
async def _async_update_data(self) -> dict[str, float | int | datetime]:
"""Fetch file information."""
try:
statinfo = await self.hass.async_add_executor_job(os.stat, self._path)
except OSError as error:
raise UpdateFailed(f"Can not retrieve file statistics {error}") from error
size = statinfo.st_size
last_updated = dt_util.utc_from_timestamp(statinfo.st_mtime)
_LOGGER.debug("size %s, last updated %s", size, last_updated)
data: dict[str, int | float | datetime] = {
"file": round(size / 1e6, 2),
"bytes": size,
"last_updated": last_updated,
}
return data
class FilesizeEntity(CoordinatorEntity[FileSizeCoordinator], SensorEntity):
"""Filesize sensor."""