core/homeassistant/components/gpsd/sensor.py

172 lines
4.8 KiB
Python

"""Sensor platform for GPSD integration."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
import logging
from typing import Any
from gps3.agps3threaded import (
GPSD_PORT as DEFAULT_PORT,
HOST as DEFAULT_HOST,
AGPS3mechanism,
)
import voluptuous as vol
from homeassistant.components.sensor import (
PLATFORM_SCHEMA,
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
ATTR_MODE,
CONF_HOST,
CONF_NAME,
CONF_PORT,
EntityCategory,
)
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
ATTR_CLIMB = "climb"
ATTR_ELEVATION = "elevation"
ATTR_GPS_TIME = "gps_time"
ATTR_SPEED = "speed"
DEFAULT_NAME = "GPS"
_MODE_VALUES = {2: "2d_fix", 3: "3d_fix"}
@dataclass(frozen=True, kw_only=True)
class GpsdSensorDescription(SensorEntityDescription):
"""Class describing GPSD sensor entities."""
value_fn: Callable[[AGPS3mechanism], str | None]
SENSOR_TYPES: tuple[GpsdSensorDescription, ...] = (
GpsdSensorDescription(
key="mode",
translation_key="mode",
name=None,
entity_category=EntityCategory.DIAGNOSTIC,
device_class=SensorDeviceClass.ENUM,
options=list(_MODE_VALUES.values()),
value_fn=lambda agps_thread: _MODE_VALUES.get(agps_thread.data_stream.mode),
),
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
}
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the GPSD component."""
async_add_entities(
[
GpsdSensor(
config_entry.data[CONF_HOST],
config_entry.data[CONF_PORT],
config_entry.entry_id,
description,
)
for description in SENSOR_TYPES
]
)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Initialize gpsd import from config."""
async_create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_yaml_{DOMAIN}",
is_fixable=False,
breaks_in_ha_version="2024.9.0",
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "GPSD",
},
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=config
)
)
class GpsdSensor(SensorEntity):
"""Representation of a GPS receiver available via GPSD."""
_attr_has_entity_name = True
entity_description: GpsdSensorDescription
def __init__(
self,
host: str,
port: int,
unique_id: str,
description: GpsdSensorDescription,
) -> None:
"""Initialize the GPSD sensor."""
self.entity_description = description
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, unique_id)},
entry_type=DeviceEntryType.SERVICE,
)
self._attr_unique_id = f"{unique_id}-{self.entity_description.key}"
self.agps_thread = AGPS3mechanism()
self.agps_thread.stream_data(host=host, port=port)
self.agps_thread.run_thread()
@property
def native_value(self) -> str | None:
"""Return the state of GPSD."""
return self.entity_description.value_fn(self.agps_thread)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the GPS."""
return {
ATTR_LATITUDE: self.agps_thread.data_stream.lat,
ATTR_LONGITUDE: self.agps_thread.data_stream.lon,
ATTR_ELEVATION: self.agps_thread.data_stream.alt,
ATTR_GPS_TIME: self.agps_thread.data_stream.time,
ATTR_SPEED: self.agps_thread.data_stream.speed,
ATTR_CLIMB: self.agps_thread.data_stream.climb,
ATTR_MODE: self.agps_thread.data_stream.mode,
}