Add Netatmo data handler (#35571)

* Fix webhook registration

* Only load camera platform with valid scope

* Add initial data handler and netatmo base class

* Update camera to use data handler

* Update init

* Parallelize API calls

* Remove cruft

* Minor tweaks

* Refactor data handler

* Update climate to use data handler

* Fix pylint error

* Fix climate update not getting fresh data

* Update climate data

* update to pyatmo 4.0.0

* Refactor for pyatmo 4.0.0

* Exclude from coverage until tests are written

* Fix typo

* Reduce parallel calls

* Add heating request attr

* Async get_entities

* Undo parallel updates

* Fix camera issue

* Introduce individual scan interval per device class

* Some cleanup

* Add basic webhook support for climate to improve responsiveness

* Replace ClimateDevice by ClimateEntity

* Add support for turning camera on/off

* Update camera state upon webhook events

* Guard data class registration with lock

* Capture errors

* Add light platform

* Add dis-/connect handling

* Fix set schedule service

* Remove extra calls

* Add service to set person(s) home/away

* Add service descriptions

* Improve service descriptions

* Use LightEntity instead of Light

* Add guard if no data is retrieved

* Make services entity based

* Only raise platform not ready if there is a NOC

* Register webhook even during runtime

* Fix turning off event

* Fix linter error

* Fix linter error

* Exclude light platform from coverage

* Change log level

* Refactor public weather sensor to use data handler

* Prevent too short coordinates

* Ignore modules without _id

* Code cleanup

* Fix test

* Exit early if no home data is retrieved

* Prevent discovery if already active

* Add services to (un-)register webhook

* Fix tests

* Not actually a coroutine

* Move methods to base class

* Address pylint comment

* Address pylint complaints

* Address comments

* Address more comments

* Add docstring

* Use single instance allowed

* Extract method

* Remove cruft

* Write state directly

* Fix test

* Add file to coverage

* Move nested function

* Move nested function

* Update docstring

* Clean up code

* Fix webhook bug

* Clean up listeners

* Use deque

* Clean up prints

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update homeassistant/components/netatmo/camera.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Rename data_class variable

* Break when match

* Extract method

* Extract methods

* Rename variable

* Improve comment

* Some refinements

* Extra

* Extract method

* Simplify code

* Improve reability

* Code simplification

* Simplify code

* Simplify code

* Code cleanup

* Fix import

* Clean up

* Clean up magic strings

* Replace data_class_name with CAMERA_DATA_CLASS_NAME

* Replace data_class_name with CAMERA_DATA_CLASS_NAME

* Replace data_class_name with HOMEDATA_DATA_CLASS_NAME

* Replace data_class_name in public weather sensor

* Clean up

* Remove deprecated config options

* Schedule immediate update on camera reconnect

* Use UUID to clearly identify public weather areas

* Use subscription mode

* Move clean up of temporary data classes

* Delay data class removal

* Fix linter complaints

* Adjust test

* Only setup lights if webhook are registered

* Prevent crash with old config entries

* Don't cache home ids

* Remove stale code

* Fix coordinates if entered mixed up by the user

* Move nested function

* Add test case for swapped coordinates

* Only wait for discovery entries

* Only use what I need

* Bring stuff closer to where it's used

* Auto clean up setup data classes

* Code cleanup

* Remove unneccessary lock

* Update homeassistant/components/netatmo/sensor.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update tests/components/netatmo/test_config_flow.py

Co-authored-by: J. Nick Koston <nick@koston.org>

* Clean up dead code

* Fix formating

* Extend coverage

* Extend coverage

Co-authored-by: J. Nick Koston <nick@koston.org>
pull/38330/head
cgtobi 2020-08-04 20:46:46 +02:00 committed by GitHub
parent 0780650015
commit 31dbdff3c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1422 additions and 846 deletions

View File

@ -552,6 +552,10 @@ omit =
homeassistant/components/netatmo/camera.py
homeassistant/components/netatmo/climate.py
homeassistant/components/netatmo/const.py
homeassistant/components/netatmo/data_handler.py
homeassistant/components/netatmo/helper.py
homeassistant/components/netatmo/light.py
homeassistant/components/netatmo/netatmo_entity_base.py
homeassistant/components/netatmo/sensor.py
homeassistant/components/netatmo/webhook.py
homeassistant/components/netdata/sensor.py

View File

@ -15,13 +15,11 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
CONF_DISCOVERY,
CONF_USERNAME,
CONF_WEBHOOK_ID,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow, config_validation as cv
from . import api, config_flow
@ -29,30 +27,25 @@ from .const import (
AUTH,
CONF_CLOUDHOOK_URL,
DATA_DEVICE_IDS,
DATA_HANDLER,
DATA_HOMES,
DATA_PERSONS,
DATA_SCHEDULES,
DOMAIN,
OAUTH2_AUTHORIZE,
OAUTH2_TOKEN,
)
from .data_handler import NetatmoDataHandler
from .webhook import handle_webhook
_LOGGER = logging.getLogger(__name__)
CONF_SECRET_KEY = "secret_key"
CONF_WEBHOOKS = "webhooks"
WAIT_FOR_CLOUD = 5
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_CLIENT_ID): cv.string,
vol.Required(CONF_CLIENT_SECRET): cv.string,
cv.deprecated(CONF_SECRET_KEY): cv.match_all,
cv.deprecated(CONF_USERNAME): cv.match_all,
cv.deprecated(CONF_WEBHOOKS): cv.match_all,
cv.deprecated(CONF_DISCOVERY): cv.match_all,
}
)
},
@ -67,6 +60,8 @@ async def async_setup(hass: HomeAssistant, config: dict):
hass.data[DOMAIN] = {}
hass.data[DOMAIN][DATA_PERSONS] = {}
hass.data[DOMAIN][DATA_DEVICE_IDS] = {}
hass.data[DOMAIN][DATA_SCHEDULES] = {}
hass.data[DOMAIN][DATA_HOMES] = {}
if DOMAIN not in config:
return True
@ -100,27 +95,27 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
AUTH: api.ConfigEntryNetatmoAuth(hass, entry, implementation)
}
data_handler = NetatmoDataHandler(hass, entry)
await data_handler.async_setup()
hass.data[DOMAIN][entry.entry_id][DATA_HANDLER] = data_handler
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)
async def unregister_webhook(event):
async def unregister_webhook(_):
if CONF_WEBHOOK_ID not in entry.data:
return
_LOGGER.debug("Unregister Netatmo webhook (%s)", entry.data[CONF_WEBHOOK_ID])
webhook_unregister(hass, entry.data[CONF_WEBHOOK_ID])
async def register_webhook(event):
# Wait for the cloud integration to be ready
await asyncio.sleep(WAIT_FOR_CLOUD)
if CONF_WEBHOOK_ID not in entry.data:
data = {**entry.data, CONF_WEBHOOK_ID: secrets.token_hex()}
hass.config_entries.async_update_entry(entry, data=data)
if hass.components.cloud.async_active_subscription():
# Wait for cloud connection to be established
await asyncio.sleep(WAIT_FOR_CLOUD)
if CONF_CLOUDHOOK_URL not in entry.data:
webhook_url = await hass.components.cloud.async_create_cloudhook(
entry.data[CONF_WEBHOOK_ID]
@ -134,20 +129,39 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
entry.data[CONF_WEBHOOK_ID]
)
try:
await hass.async_add_executor_job(
hass.data[DOMAIN][entry.entry_id][AUTH].addwebhook, webhook_url
if entry.data["auth_implementation"] == "cloud" and not webhook_url.startswith(
"https://"
):
_LOGGER.warning(
"Webhook not registered - "
"https and port 443 is required to register the webhook"
)
return
try:
webhook_register(
hass, DOMAIN, "Netatmo", entry.data[CONF_WEBHOOK_ID], handle_webhook
)
await hass.async_add_executor_job(
hass.data[DOMAIN][entry.entry_id][AUTH].addwebhook, webhook_url
)
_LOGGER.info("Register Netatmo webhook: %s", webhook_url)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, "light")
)
except pyatmo.ApiError as err:
_LOGGER.error("Error during webhook registration - %s", err)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, unregister_webhook)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, register_webhook)
if hass.state == CoreState.running:
await register_webhook(None)
else:
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, register_webhook)
hass.services.async_register(DOMAIN, "register_webhook", register_webhook)
hass.services.async_register(DOMAIN, "unregister_webhook", unregister_webhook)
return True
@ -157,6 +171,9 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
await hass.async_add_executor_job(
hass.data[DOMAIN][entry.entry_id][AUTH].dropwebhook
)
_LOGGER.info("Unregister Netatmo webhook.")
await hass.data[DOMAIN][entry.entry_id][DATA_HANDLER].async_cleanup()
unload_ok = all(
await asyncio.gather(
@ -175,7 +192,10 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Cleanup when entry is removed."""
if CONF_WEBHOOK_ID in entry.data:
if (
CONF_WEBHOOK_ID in entry.data
and hass.components.cloud.async_active_subscription()
):
try:
_LOGGER.debug(
"Removing Netatmo cloudhook (%s)", entry.data[CONF_WEBHOOK_ID]

View File

@ -10,7 +10,7 @@ from homeassistant.helpers import config_entry_oauth2_flow
_LOGGER = logging.getLogger(__name__)
class ConfigEntryNetatmoAuth(pyatmo.auth.NetatmOAuth2):
class ConfigEntryNetatmoAuth(pyatmo.auth.NetatmoOAuth2):
"""Provide Netatmo authentication tied to an OAuth2 based config entry."""
def __init__(

View File

@ -10,59 +10,113 @@ from homeassistant.components.camera import (
SUPPORT_STREAM,
Camera,
)
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv, entity_platform
from .const import (
ATTR_PERSON,
ATTR_PERSONS,
ATTR_PSEUDO,
AUTH,
DATA_HANDLER,
DATA_PERSONS,
DOMAIN,
MANUFACTURER,
MIN_TIME_BETWEEN_EVENT_UPDATES,
MIN_TIME_BETWEEN_UPDATES,
MODELS,
SERVICE_SETPERSONAWAY,
SERVICE_SETPERSONSHOME,
SIGNAL_NAME,
)
from .data_handler import CAMERA_DATA_CLASS_NAME
from .netatmo_entity_base import NetatmoBase
_LOGGER = logging.getLogger(__name__)
CONF_HOME = "home"
CONF_CAMERAS = "cameras"
CONF_QUALITY = "quality"
DEFAULT_QUALITY = "high"
VALID_QUALITIES = ["high", "medium", "low", "poor"]
SCHEMA_SERVICE_SETPERSONSHOME = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_domain(CAMERA_DOMAIN),
vol.Required(ATTR_PERSONS): vol.All(cv.ensure_list, [cv.string]),
}
)
_BOOL_TO_STATE = {True: STATE_ON, False: STATE_OFF}
SCHEMA_SERVICE_SETLIGHTAUTO = vol.Schema(
{vol.Optional(ATTR_ENTITY_ID): cv.entity_domain(CAMERA_DOMAIN)}
SCHEMA_SERVICE_SETPERSONAWAY = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_domain(CAMERA_DOMAIN),
vol.Optional(ATTR_PERSON): cv.string,
}
)
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo camera platform."""
if "access_camera" not in entry.data["token"]["scope"]:
_LOGGER.info(
"Cameras are currently not supported with this authentication method"
)
return
def get_entities():
data_handler = hass.data[DOMAIN][entry.entry_id][DATA_HANDLER]
async def get_entities():
"""Retrieve Netatmo entities."""
await data_handler.register_data_class(
CAMERA_DATA_CLASS_NAME, CAMERA_DATA_CLASS_NAME, None
)
data = data_handler.data
if not data.get(CAMERA_DATA_CLASS_NAME):
return []
data_class = data_handler.data[CAMERA_DATA_CLASS_NAME]
entities = []
try:
camera_data = CameraData(hass, hass.data[DOMAIN][entry.entry_id][AUTH])
for camera in camera_data.get_all_cameras():
_LOGGER.debug("Setting up camera %s %s", camera["id"], camera["name"])
all_cameras = []
for home in data_class.cameras.values():
for camera in home.values():
all_cameras.append(camera)
for camera in all_cameras:
_LOGGER.debug("Adding camera %s %s", camera["id"], camera["name"])
entities.append(
NetatmoCamera(
camera_data, camera["id"], camera["type"], True, DEFAULT_QUALITY
data_handler,
camera["id"],
camera["type"],
camera["home_id"],
DEFAULT_QUALITY,
)
)
camera_data.update_persons()
for person_id, person_data in data_handler.data[
CAMERA_DATA_CLASS_NAME
].persons.items():
hass.data[DOMAIN][DATA_PERSONS][person_id] = person_data.get(
ATTR_PSEUDO
)
except pyatmo.NoDevice:
_LOGGER.debug("No cameras found")
return entities
async_add_entities(await hass.async_add_executor_job(get_entities), True)
async_add_entities(await get_entities(), True)
platform = entity_platform.current_platform.get()
if data_handler.data[CAMERA_DATA_CLASS_NAME] is not None:
platform.async_register_entity_service(
SERVICE_SETPERSONSHOME,
SCHEMA_SERVICE_SETPERSONSHOME,
"_service_setpersonshome",
)
platform.async_register_entity_service(
SERVICE_SETPERSONAWAY,
SCHEMA_SERVICE_SETPERSONAWAY,
"_service_setpersonaway",
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
@ -70,19 +124,26 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
return
class NetatmoCamera(Camera):
class NetatmoCamera(NetatmoBase, Camera):
"""Representation of a Netatmo camera."""
def __init__(self, data, camera_id, camera_type, verify_ssl, quality):
def __init__(
self, data_handler, camera_id, camera_type, home_id, quality,
):
"""Set up for access to the Netatmo camera images."""
super().__init__()
self._data = data
self._camera_id = camera_id
self._camera_name = self._data.camera_data.get_camera(cid=camera_id).get("name")
self._name = f"{MANUFACTURER} {self._camera_name}"
self._camera_type = camera_type
self._unique_id = f"{self._camera_id}-{self._camera_type}"
self._verify_ssl = verify_ssl
Camera.__init__(self)
super().__init__(data_handler)
self._data_classes.append(
{"name": CAMERA_DATA_CLASS_NAME, SIGNAL_NAME: CAMERA_DATA_CLASS_NAME}
)
self._id = camera_id
self._home_id = home_id
self._device_name = self._data.get_camera(camera_id=camera_id).get("name")
self._name = f"{MANUFACTURER} {self._device_name}"
self._model = camera_type
self._unique_id = f"{self._id}-{self._model}"
self._quality = quality
self._vpnurl = None
self._localurl = None
@ -91,6 +152,35 @@ class NetatmoCamera(Camera):
self._alim_status = None
self._is_local = None
async def async_added_to_hass(self) -> None:
"""Entity created."""
await super().async_added_to_hass()
self._listeners.append(
self.hass.bus.async_listen("netatmo_event", self.handle_event)
)
async def handle_event(self, event):
"""Handle webhook events."""
data = event.data["data"]
if not data.get("event_type"):
return
if not data.get("camera_id"):
return
if data["home_id"] == self._home_id and data["camera_id"] == self._id:
if data["push_type"] in ["NACamera-off", "NACamera-disconnection"]:
self.is_streaming = False
self._status = "off"
elif data["push_type"] in ["NACamera-on", "NACamera-connection"]:
self.is_streaming = True
self._status = "on"
self.async_write_ha_state()
return
def camera_image(self):
"""Return a still image response from the camera."""
try:
@ -100,77 +190,46 @@ class NetatmoCamera(Camera):
)
elif self._vpnurl:
response = requests.get(
f"{self._vpnurl}/live/snapshot_720.jpg",
timeout=10,
verify=self._verify_ssl,
f"{self._vpnurl}/live/snapshot_720.jpg", timeout=10, verify=True,
)
else:
_LOGGER.error("Welcome/Presence VPN URL is None")
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.camera_urls(
cid=self._camera_id
(self._vpnurl, self._localurl) = self._data.camera_urls(
camera_id=self._id
)
return None
except requests.exceptions.RequestException as error:
_LOGGER.info("Welcome/Presence URL changed: %s", error)
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.camera_urls(
cid=self._camera_id
)
self._data.update_camera_urls(camera_id=self._id)
(self._vpnurl, self._localurl) = self._data.camera_urls(camera_id=self._id)
return None
return response.content
@property
def should_poll(self) -> bool:
"""Return True if entity has to be polled for state.
False if entity pushes its state to HA.
"""
return True
@property
def name(self):
"""Return the name of this Netatmo camera device."""
return self._name
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._camera_id)},
"name": self._camera_name,
"manufacturer": MANUFACTURER,
"model": MODELS[self._camera_type],
}
@property
def device_state_attributes(self):
"""Return the Netatmo-specific camera state attributes."""
attr = {}
attr["id"] = self._camera_id
attr["status"] = self._status
attr["sd_status"] = self._sd_status
attr["alim_status"] = self._alim_status
attr["is_local"] = self._is_local
attr["vpn_url"] = self._vpnurl
return attr
return {
"id": self._id,
"status": self._status,
"sd_status": self._sd_status,
"alim_status": self._alim_status,
"is_local": self._is_local,
"vpn_url": self._vpnurl,
"local_url": self._localurl,
}
@property
def available(self):
"""Return True if entity is available."""
return bool(self._alim_status == "on")
return bool(self._alim_status == "on" or self._status == "disconnected")
@property
def supported_features(self):
"""Return supported features."""
return SUPPORT_STREAM
@property
def is_recording(self):
"""Return true if the device is recording."""
return bool(self._status == "on")
@property
def brand(self):
"""Return the camera brand."""
@ -186,6 +245,16 @@ class NetatmoCamera(Camera):
"""Return true if on."""
return self.is_streaming
def turn_off(self):
"""Turn off camera."""
self._data.set_state(
home_id=self._home_id, camera_id=self._id, monitoring="off"
)
def turn_on(self):
"""Turn on camera."""
self._data.set_state(home_id=self._home_id, camera_id=self._id, monitoring="on")
async def stream_source(self):
"""Return the stream source."""
url = "{0}/live/files/{1}/index.m3u8"
@ -196,72 +265,48 @@ class NetatmoCamera(Camera):
@property
def model(self):
"""Return the camera model."""
if self._camera_type == "NOC":
return "Presence"
if self._camera_type == "NACamera":
return "Welcome"
return None
return MODELS[self._model]
@property
def unique_id(self):
"""Return the unique ID for this sensor."""
return self._unique_id
def update(self):
"""Update entity status."""
self._data.update()
camera = self._data.camera_data.get_camera(cid=self._camera_id)
self._vpnurl, self._localurl = self._data.camera_data.camera_urls(
cid=self._camera_id
)
@callback
def async_update_callback(self):
"""Update the entity's state."""
camera = self._data.get_camera(self._id)
self._vpnurl, self._localurl = self._data.camera_urls(self._id)
self._status = camera.get("status")
self._sd_status = camera.get("sd_status")
self._alim_status = camera.get("alim_status")
self._is_local = camera.get("is_local")
self.is_streaming = self._alim_status == "on"
self.is_streaming = bool(self._status == "on")
def _service_setpersonshome(self, **kwargs):
"""Service to change current home schedule."""
persons = kwargs.get(ATTR_PERSONS)
person_ids = []
for person in persons:
for pid, data in self._data.persons.items():
if data.get("pseudo") == person:
person_ids.append(pid)
class CameraData:
"""Get the latest data from Netatmo."""
self._data.set_persons_home(person_ids=person_ids, home_id=self._home_id)
_LOGGER.info("Set %s as at home", persons)
def __init__(self, hass, auth):
"""Initialize the data object."""
self._hass = hass
self.auth = auth
self.camera_data = None
def _service_setpersonaway(self, **kwargs):
"""Service to mark a person as away or set the home as empty."""
person = kwargs.get(ATTR_PERSON)
person_id = None
if person:
for pid, data in self._data.persons.items():
if data.get("pseudo") == person:
person_id = pid
def get_all_cameras(self):
"""Return all camera available on the API as a list."""
self.update()
cameras = []
for camera in self.camera_data.cameras.values():
cameras.extend(camera.values())
return cameras
def get_modules(self, camera_id):
"""Return all modules for a given camera."""
return self.camera_data.get_camera(camera_id).get("modules", [])
def get_camera_type(self, camera_id):
"""Return camera type for a camera, cid has preference over camera."""
return self.camera_data.cameraType(cid=camera_id)
def update_persons(self):
"""Gather person data for webhooks."""
for person_id, person_data in self.camera_data.persons.items():
self._hass.data[DOMAIN][DATA_PERSONS][person_id] = person_data.get(
ATTR_PSEUDO
if person_id is not None:
self._data.set_persons_away(
person_id=person_id, home_id=self._home_id,
)
_LOGGER.info("Set %s as away", person)
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Call the Netatmo API to update the data."""
self.camera_data = pyatmo.CameraData(self.auth, size=100)
self.update_persons()
@Throttle(MIN_TIME_BETWEEN_EVENT_UPDATES)
def update_event(self, camera_type):
"""Call the Netatmo API to update the events."""
self.camera_data.updateEvent(devicetype=camera_type)
else:
self._data.set_persons_away(
person_id=person_id, home_id=self._home_id,
)
_LOGGER.info("Set home as empty")

View File

@ -1,13 +1,10 @@
"""Support for Netatmo Smart thermostats."""
from datetime import timedelta
import logging
from typing import List, Optional
import pyatmo
import requests
import voluptuous as vol
from homeassistant.components.climate import ClimateEntity
from homeassistant.components.climate import DOMAIN as CLIMATE_DOMAIN, ClimateEntity
from homeassistant.components.climate.const import (
CURRENT_HVAC_HEAT,
CURRENT_HVAC_IDLE,
@ -22,23 +19,28 @@ from homeassistant.components.climate.const import (
)
from homeassistant.const import (
ATTR_BATTERY_LEVEL,
ATTR_ENTITY_ID,
ATTR_TEMPERATURE,
PRECISION_HALVES,
STATE_OFF,
TEMP_CELSIUS,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.util import Throttle
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv, entity_platform
from .const import (
ATTR_HOME_NAME,
ATTR_HEATING_POWER_REQUEST,
ATTR_SCHEDULE_NAME,
AUTH,
DATA_HANDLER,
DATA_HOMES,
DATA_SCHEDULES,
DOMAIN,
MANUFACTURER,
MODELS,
SERVICE_SETSCHEDULE,
SIGNAL_NAME,
)
from .data_handler import HOMEDATA_DATA_CLASS_NAME, HOMESTATUS_DATA_CLASS_NAME
from .netatmo_entity_base import NetatmoBase
_LOGGER = logging.getLogger(__name__)
@ -88,11 +90,6 @@ HVAC_MAP_NETATMO = {
CURRENT_HVAC_MAP_NETATMO = {True: CURRENT_HVAC_HEAT, False: CURRENT_HVAC_IDLE}
CONF_HOMES = "homes"
CONF_ROOMS = "rooms"
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=300)
DEFAULT_MAX_TEMP = 30
NA_THERM = "NATherm1"
@ -100,54 +97,66 @@ NA_VALVE = "NRV"
SCHEMA_SERVICE_SETSCHEDULE = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_domain(CLIMATE_DOMAIN),
vol.Required(ATTR_SCHEDULE_NAME): cv.string,
vol.Required(ATTR_HOME_NAME): cv.string,
}
)
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo energy platform."""
auth = hass.data[DOMAIN][entry.entry_id][AUTH]
data_handler = hass.data[DOMAIN][entry.entry_id][DATA_HANDLER]
home_data = HomeData(auth)
await data_handler.register_data_class(
HOMEDATA_DATA_CLASS_NAME, HOMEDATA_DATA_CLASS_NAME, None
)
home_data = data_handler.data.get(HOMEDATA_DATA_CLASS_NAME)
def get_entities():
if not home_data:
return
async def get_entities():
"""Retrieve Netatmo entities."""
entities = []
try:
home_data.setup()
except pyatmo.NoDevice:
return
home_ids = home_data.get_all_home_ids()
for home_id in home_ids:
for home_id in get_all_home_ids(home_data):
_LOGGER.debug("Setting up home %s ...", home_id)
try:
room_data = ThermostatData(auth, home_id)
except pyatmo.NoDevice:
continue
for room_id in room_data.get_room_ids():
room_name = room_data.homedata.rooms[home_id][room_id]["name"]
for room_id in home_data.rooms[home_id].keys():
room_name = home_data.rooms[home_id][room_id]["name"]
_LOGGER.debug("Setting up room %s (%s) ...", room_name, room_id)
entities.append(NetatmoThermostat(room_data, room_id))
signal_name = f"{HOMESTATUS_DATA_CLASS_NAME}-{home_id}"
await data_handler.register_data_class(
HOMESTATUS_DATA_CLASS_NAME, signal_name, None, home_id=home_id
)
home_status = data_handler.data.get(signal_name)
if home_status and room_id in home_status.rooms:
entities.append(NetatmoThermostat(data_handler, home_id, room_id))
hass.data[DOMAIN][DATA_SCHEDULES][home_id] = {
schedule_id: schedule_data.get("name")
for schedule_id, schedule_data in (
data_handler.data[HOMEDATA_DATA_CLASS_NAME]
.schedules[home_id]
.items()
)
}
hass.data[DOMAIN][DATA_HOMES] = {
home_id: home_data.get("name")
for home_id, home_data in (
data_handler.data[HOMEDATA_DATA_CLASS_NAME].homes.items()
)
}
return entities
async_add_entities(await hass.async_add_executor_job(get_entities), True)
async_add_entities(await get_entities(), True)
def _service_setschedule(service):
"""Service to change current home schedule."""
home_name = service.data.get(ATTR_HOME_NAME)
schedule_name = service.data.get(ATTR_SCHEDULE_NAME)
home_data.homedata.switchHomeSchedule(schedule=schedule_name, home=home_name)
_LOGGER.info("Set home (%s) schedule to %s", home_name, schedule_name)
platform = entity_platform.current_platform.get()
if home_data.homedata is not None:
hass.services.async_register(
DOMAIN,
SERVICE_SETSCHEDULE,
_service_setschedule,
schema=SCHEMA_SERVICE_SETSCHEDULE,
if home_data is not None:
platform.async_register_entity_service(
SERVICE_SETSCHEDULE, SCHEMA_SERVICE_SETSCHEDULE, "_service_setschedule",
)
@ -156,16 +165,46 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
return
class NetatmoThermostat(ClimateEntity):
class NetatmoThermostat(NetatmoBase, ClimateEntity):
"""Representation a Netatmo thermostat."""
def __init__(self, data, room_id):
def __init__(self, data_handler, home_id, room_id):
"""Initialize the sensor."""
self._data = data
ClimateEntity.__init__(self)
super().__init__(data_handler)
self._id = room_id
self._home_id = home_id
self._home_status_class = f"{HOMESTATUS_DATA_CLASS_NAME}-{self._home_id}"
self._data_classes.extend(
[
{
"name": HOMEDATA_DATA_CLASS_NAME,
SIGNAL_NAME: HOMEDATA_DATA_CLASS_NAME,
},
{
"name": HOMESTATUS_DATA_CLASS_NAME,
"home_id": self._home_id,
SIGNAL_NAME: self._home_status_class,
},
]
)
self._home_status = self.data_handler.data[self._home_status_class]
self._room_status = self._home_status.rooms[room_id]
self._room_data = self._data.rooms[home_id][room_id]
self._model = NA_VALVE
for module in self._room_data.get("module_ids"):
if self._home_status.thermostats.get(module):
self._model = NA_THERM
break
self._state = None
self._room_id = room_id
self._room_name = self._data.homedata.rooms[self._data.home_id][room_id]["name"]
self._name = f"{MANUFACTURER} {self._room_name}"
self._device_name = self._data.rooms[home_id][room_id]["name"]
self._name = f"{MANUFACTURER} {self._device_name}"
self._current_temperature = None
self._target_temperature = None
self._preset = None
@ -175,41 +214,72 @@ class NetatmoThermostat(ClimateEntity):
self._hvac_mode = None
self._battery_level = None
self._connected = None
self.update_without_throttle = False
self._module_type = self._data.room_status.get(room_id, {}).get(
"module_type", NA_VALVE
)
if self._module_type == NA_THERM:
self._away_temperature = None
self._hg_temperature = None
self._boilerstatus = None
self._setpoint_duration = None
if self._model == NA_THERM:
self._operation_list.append(HVAC_MODE_OFF)
self._unique_id = f"{self._room_id}-{self._module_type}"
self._unique_id = f"{self._id}-{self._model}"
@property
def device_info(self):
"""Return the device info for the thermostat/valve."""
return {
"identifiers": {(DOMAIN, self._room_id)},
"name": self._room_name,
"manufacturer": MANUFACTURER,
"model": MODELS[self._module_type],
}
async def async_added_to_hass(self) -> None:
"""Entity created."""
await super().async_added_to_hass()
@property
def unique_id(self):
"""Return a unique ID."""
return self._unique_id
self._listeners.append(
self.hass.bus.async_listen("netatmo_event", self.handle_event)
)
async def handle_event(self, event):
"""Handle webhook events."""
data = event.data["data"]
if not data.get("event_type"):
return
if not data.get("home"):
return
home = data["home"]
if self._home_id == home["id"] and data["event_type"] == "therm_mode":
self._preset = NETATMO_MAP_PRESET[home["therm_mode"]]
self._hvac_mode = HVAC_MAP_NETATMO[self._preset]
if self._preset == PRESET_FROST_GUARD:
self._target_temperature = self._hg_temperature
elif self._preset == PRESET_AWAY:
self._target_temperature = self._away_temperature
elif self._preset == PRESET_SCHEDULE:
self.async_update_callback()
self.async_write_ha_state()
return
if not home.get("rooms"):
return
for room in home["rooms"]:
if data["event_type"] == "set_point":
if self._id == room["id"]:
if room["therm_setpoint_mode"] == "off":
self._hvac_mode = HVAC_MODE_OFF
else:
self._target_temperature = room["therm_setpoint_temperature"]
self.async_write_ha_state()
break
elif data["event_type"] == "cancel_set_point":
if self._id == room["id"]:
self.async_update_callback()
self.async_write_ha_state()
break
@property
def supported_features(self):
"""Return the list of supported features."""
return self._support_flags
@property
def name(self):
"""Return the name of the thermostat."""
return self._name
@property
def temperature_unit(self):
"""Return the unit of measurement."""
@ -243,15 +313,11 @@ class NetatmoThermostat(ClimateEntity):
@property
def hvac_action(self) -> Optional[str]:
"""Return the current running hvac operation if supported."""
if self._module_type == NA_THERM:
return CURRENT_HVAC_MAP_NETATMO[self._data.boilerstatus]
if self._model == NA_THERM:
return CURRENT_HVAC_MAP_NETATMO[self._boilerstatus]
# Maybe it is a valve
if self._room_id in self._data.room_status:
if (
self._data.room_status[self._room_id].get("heating_power_request", 0)
> 0
):
return CURRENT_HVAC_HEAT
if self._room_status and self._room_status.get("heating_power_request", 0) > 0:
return CURRENT_HVAC_HEAT
return CURRENT_HVAC_IDLE
def set_hvac_mode(self, hvac_mode: str) -> None:
@ -268,33 +334,24 @@ class NetatmoThermostat(ClimateEntity):
def set_preset_mode(self, preset_mode: str) -> None:
"""Set new preset mode."""
if self.target_temperature == 0:
self._data.homestatus.setroomThermpoint(
self._data.home_id, self._room_id, STATE_NETATMO_HOME,
self._home_status.set_room_thermpoint(
self._id, STATE_NETATMO_HOME,
)
if (
preset_mode in [PRESET_BOOST, STATE_NETATMO_MAX]
and self._module_type == NA_VALVE
):
self._data.homestatus.setroomThermpoint(
self._data.home_id,
self._room_id,
STATE_NETATMO_MANUAL,
DEFAULT_MAX_TEMP,
if preset_mode in [PRESET_BOOST, STATE_NETATMO_MAX] and self._model == NA_VALVE:
self._home_status.set_room_thermpoint(
self._id, STATE_NETATMO_MANUAL, DEFAULT_MAX_TEMP,
)
elif preset_mode in [PRESET_BOOST, STATE_NETATMO_MAX]:
self._data.homestatus.setroomThermpoint(
self._data.home_id, self._room_id, PRESET_MAP_NETATMO[preset_mode]
self._home_status.set_room_thermpoint(
self._id, PRESET_MAP_NETATMO[preset_mode]
)
elif preset_mode in [PRESET_SCHEDULE, PRESET_FROST_GUARD, PRESET_AWAY]:
self._data.homestatus.setThermmode(
self._data.home_id, PRESET_MAP_NETATMO[preset_mode]
)
self._home_status.set_thermmode(PRESET_MAP_NETATMO[preset_mode])
else:
_LOGGER.error("Preset mode '%s' not available", preset_mode)
self.update_without_throttle = True
self.schedule_update_ha_state()
self.async_write_ha_state()
@property
def preset_mode(self) -> Optional[str]:
@ -311,12 +368,9 @@ class NetatmoThermostat(ClimateEntity):
temp = kwargs.get(ATTR_TEMPERATURE)
if temp is None:
return
self._data.homestatus.setroomThermpoint(
self._data.home_id, self._room_id, STATE_NETATMO_MANUAL, temp
)
self._home_status.set_room_thermpoint(self._id, STATE_NETATMO_MANUAL, temp)
self.update_without_throttle = True
self.schedule_update_ha_state()
self.async_write_ha_state()
@property
def device_state_attributes(self):
@ -326,241 +380,147 @@ class NetatmoThermostat(ClimateEntity):
if self._battery_level is not None:
attr[ATTR_BATTERY_LEVEL] = self._battery_level
if self._model == NA_VALVE:
attr[ATTR_HEATING_POWER_REQUEST] = self._room_status.get(
"heating_power_request", 0
)
return attr
def turn_off(self):
"""Turn the entity off."""
if self._module_type == NA_VALVE:
self._data.homestatus.setroomThermpoint(
self._data.home_id,
self._room_id,
STATE_NETATMO_MANUAL,
DEFAULT_MIN_TEMP,
if self._model == NA_VALVE:
self._home_status.set_room_thermpoint(
self._id, STATE_NETATMO_MANUAL, DEFAULT_MIN_TEMP,
)
elif self.hvac_mode != HVAC_MODE_OFF:
self._data.homestatus.setroomThermpoint(
self._data.home_id, self._room_id, STATE_NETATMO_OFF
)
self.update_without_throttle = True
self.schedule_update_ha_state()
self._home_status.set_room_thermpoint(self._id, STATE_NETATMO_OFF)
self.async_write_ha_state()
def turn_on(self):
"""Turn the entity on."""
self._data.homestatus.setroomThermpoint(
self._data.home_id, self._room_id, STATE_NETATMO_HOME
)
self.update_without_throttle = True
self.schedule_update_ha_state()
self._home_status.set_room_thermpoint(self._id, STATE_NETATMO_HOME)
self.async_write_ha_state()
@property
def available(self) -> bool:
"""If the device hasn't been able to connect, mark as unavailable."""
return bool(self._connected)
def update(self):
"""Get the latest data from NetAtmo API and updates the states."""
@callback
def async_update_callback(self):
"""Update the entity's state."""
self._home_status = self.data_handler.data[self._home_status_class]
self._room_status = self._home_status.rooms[self._id]
self._room_data = self._data.rooms[self._home_id][self._id]
roomstatus = {"roomID": self._room_status["id"]}
if self._room_status.get("reachable"):
roomstatus.update(self._build_room_status())
self._away_temperature = self._data.get_away_temp(self._home_id)
self._hg_temperature = self._data.get_hg_temp(self._home_id)
self._setpoint_duration = self._data.setpoint_duration[self._home_id]
try:
if self.update_without_throttle:
self._data.update(no_throttle=True)
self.update_without_throttle = False
else:
self._data.update()
except AttributeError:
_LOGGER.error("NetatmoThermostat::update() got exception")
return
try:
if self._module_type is None:
self._module_type = self._data.room_status[self._room_id]["module_type"]
self._current_temperature = self._data.room_status[self._room_id][
"current_temperature"
]
self._target_temperature = self._data.room_status[self._room_id][
"target_temperature"
]
self._preset = NETATMO_MAP_PRESET[
self._data.room_status[self._room_id]["setpoint_mode"]
]
if self._model is None:
self._model = roomstatus["module_type"]
self._current_temperature = roomstatus["current_temperature"]
self._target_temperature = roomstatus["target_temperature"]
self._preset = NETATMO_MAP_PRESET[roomstatus["setpoint_mode"]]
self._hvac_mode = HVAC_MAP_NETATMO[self._preset]
self._battery_level = self._data.room_status[self._room_id].get(
"battery_level"
)
self._battery_level = roomstatus.get("battery_level")
self._connected = True
except KeyError as err:
if self._connected is not False:
if self._connected:
_LOGGER.debug(
"The thermostat in room %s seems to be out of reach. (%s)",
self._room_name,
self._device_name,
err,
)
self._connected = False
self._away = self._hvac_mode == HVAC_MAP_NETATMO[STATE_NETATMO_AWAY]
class HomeData:
"""Representation Netatmo homes."""
def __init__(self, auth, home=None):
"""Initialize the HomeData object."""
self.auth = auth
self.homedata = None
self.home_ids = []
self.home_names = []
self.room_names = []
self.schedules = []
self.home = home
self.home_id = None
def get_all_home_ids(self):
"""Get all the home ids returned by NetAtmo API."""
if self.homedata is None:
return []
for home_id in self.homedata.homes:
if (
"therm_schedules" in self.homedata.homes[home_id]
and "modules" in self.homedata.homes[home_id]
):
self.home_ids.append(self.homedata.homes[home_id]["id"])
return self.home_ids
def setup(self):
"""Retrieve HomeData by NetAtmo API."""
def _build_room_status(self):
"""Construct room status."""
try:
self.homedata = pyatmo.HomeData(self.auth)
self.home_id = self.homedata.gethomeId(self.home)
except TypeError:
_LOGGER.error("Error when getting home data")
except AttributeError:
_LOGGER.error("No default_home in HomeData")
except pyatmo.NoDevice:
_LOGGER.debug("No thermostat devices available")
except pyatmo.InvalidHome:
_LOGGER.debug("Invalid home %s", self.home)
roomstatus = {
"roomname": self._room_data["name"],
"target_temperature": self._room_status["therm_setpoint_temperature"],
"setpoint_mode": self._room_status["therm_setpoint_mode"],
"current_temperature": self._room_status["therm_measured_temperature"],
"module_type": self._data.get_thermostat_type(
home_id=self._home_id, room_id=self._id
),
"module_id": None,
"heating_status": None,
"heating_power_request": None,
}
class ThermostatData:
"""Get the latest data from Netatmo."""
def __init__(self, auth, home_id=None):
"""Initialize the data object."""
self.auth = auth
self.homedata = None
self.homestatus = None
self.room_ids = []
self.room_status = {}
self.schedules = []
self.home_id = home_id
self.home_name = None
self.away_temperature = None
self.hg_temperature = None
self.boilerstatus = None
self.setpoint_duration = None
def get_room_ids(self):
"""Return all module available on the API as a list."""
if not self.setup():
return []
for room in self.homestatus.rooms:
self.room_ids.append(room)
return self.room_ids
def setup(self):
"""Retrieve HomeData and HomeStatus by NetAtmo API."""
try:
self.homedata = pyatmo.HomeData(self.auth)
self.homestatus = pyatmo.HomeStatus(self.auth, home_id=self.home_id)
self.home_name = self.homedata.getHomeName(self.home_id)
self.update()
except TypeError:
_LOGGER.error("ThermostatData::setup() got error")
return False
except pyatmo.exceptions.NoDevice:
_LOGGER.debug(
"No climate devices for %s (%s)", self.home_name, self.home_id
)
return False
return True
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Call the NetAtmo API to update the data."""
try:
self.homestatus = pyatmo.HomeStatus(self.auth, home_id=self.home_id)
except pyatmo.exceptions.NoDevice:
_LOGGER.error("No device found")
return
except TypeError:
_LOGGER.error("Error when getting homestatus")
return
except requests.exceptions.Timeout:
_LOGGER.warning("Timed out when connecting to Netatmo server")
return
for room in self.homestatus.rooms:
try:
roomstatus = {}
homestatus_room = self.homestatus.rooms[room]
homedata_room = self.homedata.rooms[self.home_id][room]
roomstatus["roomID"] = homestatus_room["id"]
if homestatus_room["reachable"]:
roomstatus["roomname"] = homedata_room["name"]
roomstatus["target_temperature"] = homestatus_room[
"therm_setpoint_temperature"
]
roomstatus["setpoint_mode"] = homestatus_room["therm_setpoint_mode"]
roomstatus["current_temperature"] = homestatus_room[
"therm_measured_temperature"
]
roomstatus["module_type"] = self.homestatus.thermostatType(
home_id=self.home_id, rid=room, home=self.home_name
batterylevel = None
for module_id in self._room_data["module_ids"]:
if (
self._data.modules[self._home_id][module_id]["type"] == NA_THERM
or roomstatus["module_id"] is None
):
roomstatus["module_id"] = module_id
if roomstatus["module_type"] == NA_THERM:
self._boilerstatus = self._home_status.boiler_status(
roomstatus["module_id"]
)
roomstatus["heating_status"] = self._boilerstatus
batterylevel = self._home_status.thermostats[
roomstatus["module_id"]
].get("battery_level")
elif roomstatus["module_type"] == NA_VALVE:
roomstatus["heating_power_request"] = self._room_status[
"heating_power_request"
]
roomstatus["heating_status"] = roomstatus["heating_power_request"] > 0
if self._boilerstatus is not None:
roomstatus["heating_status"] = (
self._boilerstatus and roomstatus["heating_status"]
)
roomstatus["module_id"] = None
roomstatus["heating_status"] = None
roomstatus["heating_power_request"] = None
batterylevel = None
for module_id in homedata_room["module_ids"]:
if (
self.homedata.modules[self.home_id][module_id]["type"]
== NA_THERM
or roomstatus["module_id"] is None
):
roomstatus["module_id"] = module_id
if roomstatus["module_type"] == NA_THERM:
self.boilerstatus = self.homestatus.boilerStatus(
rid=roomstatus["module_id"]
)
roomstatus["heating_status"] = self.boilerstatus
batterylevel = self.homestatus.thermostats[
roomstatus["module_id"]
].get("battery_level")
elif roomstatus["module_type"] == NA_VALVE:
roomstatus["heating_power_request"] = homestatus_room[
"heating_power_request"
]
roomstatus["heating_status"] = (
roomstatus["heating_power_request"] > 0
)
if self.boilerstatus is not None:
roomstatus["heating_status"] = (
self.boilerstatus and roomstatus["heating_status"]
)
batterylevel = self.homestatus.valves[
roomstatus["module_id"]
].get("battery_level")
batterylevel = self._home_status.valves[roomstatus["module_id"]].get(
"battery_level"
)
if batterylevel:
batterypct = interpolate(
batterylevel, roomstatus["module_type"]
)
if roomstatus.get("battery_level") is None:
roomstatus["battery_level"] = batterypct
elif batterypct < roomstatus["battery_level"]:
roomstatus["battery_level"] = batterypct
self.room_status[room] = roomstatus
except KeyError as err:
_LOGGER.error("Update of room %s failed. Error: %s", room, err)
self.away_temperature = self.homestatus.getAwaytemp(home_id=self.home_id)
self.hg_temperature = self.homestatus.getHgtemp(home_id=self.home_id)
self.setpoint_duration = self.homedata.setpoint_duration[self.home_id]
if batterylevel:
batterypct = interpolate(batterylevel, roomstatus["module_type"])
if (
not roomstatus.get("battery_level")
or batterypct < roomstatus["battery_level"]
):
roomstatus["battery_level"] = batterypct
return roomstatus
except KeyError as err:
_LOGGER.error("Update of room %s failed. Error: %s", self._id, err)
return {}
def _service_setschedule(self, **kwargs):
schedule_name = kwargs.get(ATTR_SCHEDULE_NAME)
schedule_id = None
for sid, name in self.hass.data[DOMAIN][DATA_SCHEDULES][self._home_id].items():
if name == schedule_name:
schedule_id = sid
if not schedule_id:
_LOGGER.error("You passed an invalid schedule")
return
self._data.switch_home_schedule(home_id=self._home_id, schedule_id=schedule_id)
_LOGGER.info(
"Setting %s schedule to %s (%s)",
self._home_id,
kwargs.get(ATTR_SCHEDULE_NAME),
schedule_id,
)
def interpolate(batterylevel, module_type):
@ -603,3 +563,17 @@ def interpolate(batterylevel, module_type):
/ (levels[i + 1] - levels[i])
)
return int(pct)
def get_all_home_ids(home_data):
"""Get all the home ids returned by NetAtmo API."""
if home_data is None:
return []
return [
home_data.homes[home_id]["id"]
for home_id in home_data.homes
if (
"therm_schedules" in home_data.homes[home_id]
and "modules" in home_data.homes[home_id]
)
]

View File

@ -1,5 +1,6 @@
"""Config flow for Netatmo."""
import logging
import uuid
import voluptuous as vol
@ -16,6 +17,7 @@ from .const import (
CONF_LON_SW,
CONF_NEW_AREA,
CONF_PUBLIC_MODE,
CONF_UUID,
CONF_WEATHER_AREAS,
DOMAIN,
)
@ -66,6 +68,10 @@ class NetatmoFlowHandler(
async def async_step_user(self, user_input=None):
"""Handle a flow start."""
await self.async_set_unique_id(DOMAIN)
if self.hass.config_entries.async_entries(DOMAIN):
return self.async_abort(reason="single_instance_allowed")
return await super().async_step_user(user_input)
async def async_step_homekit(self, homekit_info):
@ -102,7 +108,7 @@ class NetatmoOptionsFlowHandler(config_entries.OptionsFlow):
user_input={CONF_NEW_AREA: new_client}
)
return await self._update_options()
return self._update_options()
weather_areas = list(self.options[CONF_WEATHER_AREAS])
@ -121,7 +127,14 @@ class NetatmoOptionsFlowHandler(config_entries.OptionsFlow):
async def async_step_public_weather(self, user_input=None):
"""Manage configuration of Netatmo public weather sensors."""
if user_input is not None and CONF_NEW_AREA not in user_input:
self.options[CONF_WEATHER_AREAS][user_input[CONF_AREA_NAME]] = user_input
self.options[CONF_WEATHER_AREAS][
user_input[CONF_AREA_NAME]
] = fix_coordinates(user_input)
self.options[CONF_WEATHER_AREAS][user_input[CONF_AREA_NAME]][
CONF_UUID
] = str(uuid.uuid4())
return await self.async_step_public_weather_areas()
orig_options = self.config_entry.options.get(CONF_WEATHER_AREAS, {}).get(
@ -170,8 +183,30 @@ class NetatmoOptionsFlowHandler(config_entries.OptionsFlow):
return self.async_show_form(step_id="public_weather", data_schema=data_schema)
async def _update_options(self):
def _update_options(self):
"""Update config entry options."""
return self.async_create_entry(
title="Netatmo Public Weather", data=self.options
)
def fix_coordinates(user_input):
"""Fix coordinates if they don't comply with the Netatmo API."""
# Ensure coordinates have acceptable length for the Netatmo API
for coordinate in [CONF_LAT_NE, CONF_LAT_SW, CONF_LON_NE, CONF_LON_SW]:
if len(str(user_input[coordinate]).split(".")[1]) < 7:
user_input[coordinate] = user_input[coordinate] + 0.0000001
# Swap coordinates if entered in wrong order
if user_input[CONF_LAT_NE] < user_input[CONF_LAT_SW]:
user_input[CONF_LAT_NE], user_input[CONF_LAT_SW] = (
user_input[CONF_LAT_SW],
user_input[CONF_LAT_NE],
)
if user_input[CONF_LON_NE] < user_input[CONF_LON_SW]:
user_input[CONF_LON_NE], user_input[CONF_LON_SW] = (
user_input[CONF_LON_SW],
user_input[CONF_LON_NE],
)
return user_input

View File

@ -27,6 +27,8 @@ AUTH = "netatmo_auth"
CONF_PUBLIC = "public_sensor_config"
CAMERA_DATA = "netatmo_camera"
HOME_DATA = "netatmo_home_data"
DATA_HANDLER = "netatmo_data_handler"
SIGNAL_NAME = "signal_name"
CONF_CLOUDHOOK_URL = "cloudhook_url"
CONF_WEATHER_AREAS = "weather_areas"
@ -37,12 +39,15 @@ CONF_LON_NE = "lon_ne"
CONF_LAT_SW = "lat_sw"
CONF_LON_SW = "lon_sw"
CONF_PUBLIC_MODE = "mode"
CONF_UUID = "uuid"
OAUTH2_AUTHORIZE = "https://api.netatmo.com/oauth2/authorize"
OAUTH2_TOKEN = "https://api.netatmo.com/oauth2/token"
DATA_DEVICE_IDS = "netatmo_device_ids"
DATA_HOMES = "netatmo_homes"
DATA_PERSONS = "netatmo_persons"
DATA_SCHEDULES = "netatmo_schedules"
NETATMO_WEBHOOK_URL = None
NETATMO_EVENT = "netatmo_event"
@ -55,8 +60,10 @@ ATTR_ID = "id"
ATTR_PSEUDO = "pseudo"
ATTR_NAME = "name"
ATTR_EVENT_TYPE = "event_type"
ATTR_HEATING_POWER_REQUEST = "heating_power_request"
ATTR_HOME_ID = "home_id"
ATTR_HOME_NAME = "home_name"
ATTR_PERSON = "person"
ATTR_PERSONS = "persons"
ATTR_IS_KNOWN = "is_known"
ATTR_FACE_URL = "face_url"
@ -67,3 +74,5 @@ MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
MIN_TIME_BETWEEN_EVENT_UPDATES = timedelta(seconds=5)
SERVICE_SETSCHEDULE = "set_schedule"
SERVICE_SETPERSONSHOME = "set_persons_home"
SERVICE_SETPERSONAWAY = "set_person_away"

View File

@ -0,0 +1,166 @@
"""The Netatmo data handler."""
from collections import deque
from datetime import timedelta
from functools import partial
from itertools import islice
import logging
from time import time
from typing import Deque, Dict, List
import pyatmo
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers.event import async_track_time_interval
from .const import AUTH, DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__)
CAMERA_DATA_CLASS_NAME = "CameraData"
WEATHERSTATION_DATA_CLASS_NAME = "WeatherStationData"
HOMECOACH_DATA_CLASS_NAME = "HomeCoachData"
HOMEDATA_DATA_CLASS_NAME = "HomeData"
HOMESTATUS_DATA_CLASS_NAME = "HomeStatus"
PUBLICDATA_DATA_CLASS_NAME = "PublicData"
NEXT_SCAN = "next_scan"
DATA_CLASSES = {
WEATHERSTATION_DATA_CLASS_NAME: pyatmo.WeatherStationData,
HOMECOACH_DATA_CLASS_NAME: pyatmo.HomeCoachData,
CAMERA_DATA_CLASS_NAME: pyatmo.CameraData,
HOMEDATA_DATA_CLASS_NAME: pyatmo.HomeData,
HOMESTATUS_DATA_CLASS_NAME: pyatmo.HomeStatus,
PUBLICDATA_DATA_CLASS_NAME: pyatmo.PublicData,
}
MAX_CALLS_1H = 20
BATCH_SIZE = 3
DEFAULT_INTERVALS = {
HOMEDATA_DATA_CLASS_NAME: 900,
HOMESTATUS_DATA_CLASS_NAME: 300,
CAMERA_DATA_CLASS_NAME: 900,
WEATHERSTATION_DATA_CLASS_NAME: 300,
HOMECOACH_DATA_CLASS_NAME: 300,
PUBLICDATA_DATA_CLASS_NAME: 600,
}
SCAN_INTERVAL = 60
class NetatmoDataHandler:
"""Manages the Netatmo data handling."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry):
"""Initialize self."""
self.hass = hass
self._auth = hass.data[DOMAIN][entry.entry_id][AUTH]
self.listeners: List[CALLBACK_TYPE] = []
self._data_classes: Dict = {}
self.data = {}
self._queue: Deque = deque()
self._webhook: bool = False
async def async_setup(self):
"""Set up the Netatmo data handler."""
async_track_time_interval(
self.hass, self.async_update, timedelta(seconds=SCAN_INTERVAL)
)
self.listeners.append(
self.hass.bus.async_listen("netatmo_event", self.handle_event)
)
async def async_update(self, event_time):
"""
Update device.
We do up to BATCH_SIZE calls in one update in order
to minimize the calls on the api service.
"""
for data_class in islice(self._queue, 0, BATCH_SIZE):
if data_class[NEXT_SCAN] > time():
continue
self._data_classes[data_class["name"]][NEXT_SCAN] = (
time() + data_class["interval"]
)
await self.async_fetch_data(
data_class["class"], data_class["name"], **data_class["kwargs"]
)
self._queue.rotate(BATCH_SIZE)
async def async_cleanup(self):
"""Clean up the Netatmo data handler."""
for listener in self.listeners:
listener()
async def handle_event(self, event):
"""Handle webhook events."""
if event.data["data"]["push_type"] == "webhook_activation":
_LOGGER.info("%s webhook successfully registered", MANUFACTURER)
self._webhook = True
elif event.data["data"]["push_type"] == "NACamera-connection":
_LOGGER.debug("%s camera reconnected", MANUFACTURER)
self._data_classes[CAMERA_DATA_CLASS_NAME][NEXT_SCAN] = time()
async def async_fetch_data(self, data_class, data_class_entry, **kwargs):
"""Fetch data and notify."""
try:
self.data[data_class_entry] = await self.hass.async_add_executor_job(
partial(data_class, **kwargs), self._auth,
)
for update_callback in self._data_classes[data_class_entry][
"subscriptions"
]:
if update_callback:
update_callback()
except (pyatmo.NoDevice, pyatmo.ApiError) as err:
_LOGGER.debug(err)
async def register_data_class(
self, data_class_name, data_class_entry, update_callback, **kwargs
):
"""Register data class."""
if data_class_entry not in self._data_classes:
self._data_classes[data_class_entry] = {
"class": DATA_CLASSES[data_class_name],
"name": data_class_entry,
"interval": DEFAULT_INTERVALS[data_class_name],
NEXT_SCAN: time() + DEFAULT_INTERVALS[data_class_name],
"kwargs": kwargs,
"subscriptions": [update_callback],
}
await self.async_fetch_data(
DATA_CLASSES[data_class_name], data_class_entry, **kwargs
)
self._queue.append(self._data_classes[data_class_entry])
_LOGGER.debug("Data class %s added", data_class_entry)
else:
self._data_classes[data_class_entry]["subscriptions"].append(
update_callback
)
async def unregister_data_class(self, data_class_entry, update_callback):
"""Unregister data class."""
if update_callback not in self._data_classes[data_class_entry]["subscriptions"]:
return
self._data_classes[data_class_entry]["subscriptions"].remove(update_callback)
if not self._data_classes[data_class_entry].get("subscriptions"):
self._queue.remove(self._data_classes[data_class_entry])
self._data_classes.pop(data_class_entry)
_LOGGER.debug("Data class %s removed", data_class_entry)
@property
def webhook(self) -> bool:
"""Return the webhook state."""
return self._webhook

View File

@ -0,0 +1,17 @@
"""Helper for Netatmo integration."""
from dataclasses import dataclass
from uuid import uuid4
@dataclass
class NetatmoArea:
"""Class for keeping track of an area."""
area_name: str
lat_ne: float
lon_ne: float
lat_sw: float
lon_sw: float
mode: str
show_on_map: bool
uuid: str = uuid4()

View File

@ -0,0 +1,145 @@
"""Support for the Netatmo camera lights."""
import logging
import pyatmo
from homeassistant.components.light import LightEntity
from homeassistant.core import callback
from homeassistant.exceptions import PlatformNotReady
from .const import DATA_HANDLER, DOMAIN, MANUFACTURER, SIGNAL_NAME
from .data_handler import CAMERA_DATA_CLASS_NAME, NetatmoDataHandler
from .netatmo_entity_base import NetatmoBase
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo camera light platform."""
if "access_camera" not in entry.data["token"]["scope"]:
_LOGGER.info(
"Cameras are currently not supported with this authentication method"
)
return
data_handler = hass.data[DOMAIN][entry.entry_id][DATA_HANDLER]
async def get_entities():
"""Retrieve Netatmo entities."""
await data_handler.register_data_class(
CAMERA_DATA_CLASS_NAME, CAMERA_DATA_CLASS_NAME, None
)
entities = []
try:
all_cameras = []
for home in data_handler.data[CAMERA_DATA_CLASS_NAME].cameras.values():
for camera in home.values():
all_cameras.append(camera)
for camera in all_cameras:
if camera["type"] == "NOC":
if not data_handler.webhook:
raise PlatformNotReady
_LOGGER.debug(
"Adding camera light %s %s", camera["id"], camera["name"]
)
entities.append(
NetatmoLight(
data_handler,
camera["id"],
camera["type"],
camera["home_id"],
)
)
except pyatmo.NoDevice:
_LOGGER.debug("No cameras found")
return entities
async_add_entities(await get_entities(), True)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Netatmo camera platform."""
return
class NetatmoLight(NetatmoBase, LightEntity):
"""Representation of a Netatmo Presence camera light."""
def __init__(
self,
data_handler: NetatmoDataHandler,
camera_id: str,
camera_type: str,
home_id: str,
):
"""Initialize a Netatmo Presence camera light."""
LightEntity.__init__(self)
super().__init__(data_handler)
self._data_classes.append(
{"name": CAMERA_DATA_CLASS_NAME, SIGNAL_NAME: CAMERA_DATA_CLASS_NAME}
)
self._id = camera_id
self._home_id = home_id
self._model = camera_type
self._device_name = self._data.get_camera(camera_id).get("name")
self._name = f"{MANUFACTURER} {self._device_name}"
self._is_on = False
self._unique_id = f"{self._id}-light"
async def async_added_to_hass(self) -> None:
"""Entity created."""
await super().async_added_to_hass()
self._listeners.append(
self.hass.bus.async_listen("netatmo_event", self.handle_event)
)
async def handle_event(self, event):
"""Handle webhook events."""
data = event.data["data"]
if not data.get("event_type"):
return
if not data.get("camera_id"):
return
if (
data["home_id"] == self._home_id
and data["camera_id"] == self._id
and data["push_type"] == "NOC-light_mode"
):
self._is_on = bool(data["sub_type"] == "on")
self.async_write_ha_state()
return
@property
def is_on(self):
"""Return true if light is on."""
return self._is_on
def turn_on(self, **kwargs):
"""Turn camera floodlight on."""
_LOGGER.debug("Turn camera '%s' on", self._name)
self._data.set_state(
home_id=self._home_id, camera_id=self._id, floodlight="on",
)
def turn_off(self, **kwargs):
"""Turn camera floodlight into auto mode."""
_LOGGER.debug("Turn camera '%s' off", self._name)
self._data.set_state(
home_id=self._home_id, camera_id=self._id, floodlight="auto",
)
@callback
def async_update_callback(self):
"""Update the entity's state."""
self._is_on = bool(self._data.get_light_state(self._id) == "on")

View File

@ -3,7 +3,7 @@
"name": "Netatmo",
"documentation": "https://www.home-assistant.io/integrations/netatmo",
"requirements": [
"pyatmo==3.3.1"
"pyatmo==4.0.0"
],
"after_dependencies": [
"cloud"

View File

@ -0,0 +1,113 @@
"""Base class for Netatmo entities."""
import logging
from typing import Dict, List
from homeassistant.core import CALLBACK_TYPE, callback
from homeassistant.helpers.entity import Entity
from .const import DOMAIN, MANUFACTURER, MODELS, SIGNAL_NAME
from .data_handler import NetatmoDataHandler
_LOGGER = logging.getLogger(__name__)
class NetatmoBase(Entity):
"""Netatmo entity base class."""
def __init__(self, data_handler: NetatmoDataHandler) -> None:
"""Set up Netatmo entity base."""
self.data_handler = data_handler
self._data_classes: List[Dict] = []
self._listeners: List[CALLBACK_TYPE] = []
self._device_name = None
self._id = None
self._model = None
self._name = None
self._unique_id = None
async def async_added_to_hass(self) -> None:
"""Entity created."""
_LOGGER.debug("New client %s", self.entity_id)
for data_class in self._data_classes:
signal_name = data_class[SIGNAL_NAME]
if "home_id" in data_class:
await self.data_handler.register_data_class(
data_class["name"],
signal_name,
self.async_update_callback,
home_id=data_class["home_id"],
)
elif data_class["name"] == "PublicData":
await self.data_handler.register_data_class(
data_class["name"],
signal_name,
self.async_update_callback,
LAT_NE=data_class["LAT_NE"],
LON_NE=data_class["LON_NE"],
LAT_SW=data_class["LAT_SW"],
LON_SW=data_class["LON_SW"],
)
else:
await self.data_handler.register_data_class(
data_class["name"], signal_name, self.async_update_callback
)
await self.data_handler.unregister_data_class(signal_name, None)
self.async_update_callback()
async def async_will_remove_from_hass(self):
"""Run when entity will be removed from hass."""
await super().async_will_remove_from_hass()
for listener in self._listeners:
listener()
for data_class in self._data_classes:
await self.data_handler.unregister_data_class(
data_class[SIGNAL_NAME], self.async_update_callback
)
async def async_remove(self):
"""Clean up when removing entity."""
entity_registry = await self.hass.helpers.entity_registry.async_get_registry()
entity_entry = entity_registry.async_get(self.entity_id)
if not entity_entry:
await super().async_remove()
return
entity_registry.async_remove(self.entity_id)
@callback
def async_update_callback(self):
"""Update the entity's state."""
raise NotImplementedError
@property
def _data(self):
"""Return data for this entity."""
return self.data_handler.data[self._data_classes[0]["name"]]
@property
def unique_id(self):
"""Return the unique ID of this entity."""
return self._unique_id
@property
def name(self):
"""Return the name of this entity."""
return self._name
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._id)},
"name": self._device_name,
"manufacturer": MANUFACTURER,
"model": MODELS[self._model],
}

View File

@ -1,15 +1,11 @@
"""Support for the Netatmo Weather Service."""
from datetime import timedelta
import logging
import pyatmo
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONCENTRATION_PARTS_PER_MILLION,
CONF_SHOW_ON_MAP,
DEVICE_CLASS_BATTERY,
DEVICE_CLASS_HUMIDITY,
DEVICE_CLASS_TEMPERATURE,
@ -23,31 +19,18 @@ from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
from .const import (
AUTH,
CONF_AREA_NAME,
CONF_LAT_NE,
CONF_LAT_SW,
CONF_LON_NE,
CONF_LON_SW,
CONF_PUBLIC_MODE,
CONF_WEATHER_AREAS,
DOMAIN,
MANUFACTURER,
MODELS,
from .const import CONF_WEATHER_AREAS, DATA_HANDLER, DOMAIN, MANUFACTURER, SIGNAL_NAME
from .data_handler import (
HOMECOACH_DATA_CLASS_NAME,
PUBLICDATA_DATA_CLASS_NAME,
WEATHERSTATION_DATA_CLASS_NAME,
)
from .helper import NetatmoArea
from .netatmo_entity_base import NetatmoBase
_LOGGER = logging.getLogger(__name__)
# This is the Netatmo data upload interval in seconds
NETATMO_UPDATE_INTERVAL = 600
# NetAtmo Public Data is uploaded to server every 10 minutes
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=NETATMO_UPDATE_INTERVAL)
SUPPORTED_PUBLIC_SENSOR_TYPES = [
"temperature",
"pressure",
@ -76,11 +59,11 @@ SENSOR_TYPES = {
DEVICE_CLASS_HUMIDITY,
],
"rain": ["Rain", "mm", "mdi:weather-rainy", None],
"sum_rain_1": ["sum_rain_1", "mm", "mdi:weather-rainy", None],
"sum_rain_24": ["sum_rain_24", "mm", "mdi:weather-rainy", None],
"sum_rain_1": ["Rain last hour", "mm", "mdi:weather-rainy", None],
"sum_rain_24": ["Rain last 24h", "mm", "mdi:weather-rainy", None],
"battery_vp": ["Battery", "", "mdi:battery", None],
"battery_lvl": ["Battery_lvl", "", "mdi:battery", None],
"battery_percent": ["battery_percent", UNIT_PERCENTAGE, None, DEVICE_CLASS_BATTERY],
"battery_lvl": ["Battery Level", "", "mdi:battery", None],
"battery_percent": ["Battery Percent", UNIT_PERCENTAGE, None, DEVICE_CLASS_BATTERY],
"min_temp": ["Min Temp.", TEMP_CELSIUS, "mdi:thermometer", None],
"max_temp": ["Max Temp.", TEMP_CELSIUS, "mdi:thermometer", None],
"windangle": ["Angle", "", "mdi:compass", None],
@ -101,9 +84,9 @@ SENSOR_TYPES = {
],
"reachable": ["Reachability", "", "mdi:signal", None],
"rf_status": ["Radio", "", "mdi:signal", None],
"rf_status_lvl": ["Radio_lvl", "", "mdi:signal", None],
"rf_status_lvl": ["Radio Level", "", "mdi:signal", None],
"wifi_status": ["Wifi", "", "mdi:wifi", None],
"wifi_status_lvl": ["Wifi_lvl", "dBm", "mdi:wifi", None],
"wifi_status_lvl": ["Wifi Level", "dBm", "mdi:wifi", None],
"health_idx": ["Health", "", "mdi:cloud", None],
}
@ -112,76 +95,110 @@ MODULE_TYPE_WIND = "NAModule2"
MODULE_TYPE_RAIN = "NAModule3"
MODULE_TYPE_INDOOR = "NAModule4"
NETATMO_DEVICE_TYPES = {
"WeatherStationData": "weather station",
"HomeCoachData": "home coach",
BATTERY_VALUES = {
MODULE_TYPE_WIND: {"Full": 5590, "High": 5180, "Medium": 4770, "Low": 4360},
MODULE_TYPE_RAIN: {"Full": 5500, "High": 5000, "Medium": 4500, "Low": 4000},
MODULE_TYPE_INDOOR: {"Full": 5500, "High": 5280, "Medium": 4920, "Low": 4560},
MODULE_TYPE_OUTDOOR: {"Full": 5500, "High": 5000, "Medium": 4500, "Low": 4000},
}
PUBLIC = "public"
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities
):
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo weather and homecoach platform."""
auth = hass.data[DOMAIN][entry.entry_id][AUTH]
device_registry = await hass.helpers.device_registry.async_get_registry()
data_handler = hass.data[DOMAIN][entry.entry_id][DATA_HANDLER]
def find_entities(data):
async def find_entities(data_class_name):
"""Find all entities."""
all_module_infos = data.get_module_infos()
await data_handler.register_data_class(data_class_name, data_class_name, None)
all_module_infos = {}
data = data_handler.data
if not data.get(data_class_name):
return []
data_class = data[data_class_name]
for station_id in data_class.stations:
for module_id in data_class.get_modules(station_id):
all_module_infos[module_id] = data_class.get_module(module_id)
all_module_infos[station_id] = data_class.get_station(station_id)
entities = []
for module in all_module_infos.values():
_LOGGER.debug("Adding module %s %s", module["module_name"], module["id"])
for condition in data.station_data.monitoredConditions(
moduleId=module["id"]
):
entities.append(NetatmoSensor(data, module, condition.lower()))
return entities
def get_entities():
"""Retrieve Netatmo entities."""
entities = []
for data_class in [pyatmo.WeatherStationData, pyatmo.HomeCoachData]:
try:
dc_data = data_class(auth)
_LOGGER.debug("%s detected!", NETATMO_DEVICE_TYPES[data_class.__name__])
data = NetatmoData(auth, dc_data)
except pyatmo.NoDevice:
_LOGGER.debug(
"No %s entities found", NETATMO_DEVICE_TYPES[data_class.__name__]
)
if "_id" not in module:
_LOGGER.debug("Skipping module %s", module.get("module_name"))
continue
entities.extend(find_entities(data))
_LOGGER.debug(
"Adding module %s %s", module.get("module_name"), module.get("_id"),
)
for condition in data_class.get_monitored_conditions(
module_id=module["_id"]
):
entities.append(
NetatmoSensor(
data_handler, data_class_name, module, condition.lower()
)
)
return entities
async_add_entities(await hass.async_add_executor_job(get_entities), True)
for data_class_name in [
WEATHERSTATION_DATA_CLASS_NAME,
HOMECOACH_DATA_CLASS_NAME,
]:
async_add_entities(await find_entities(data_class_name), True)
@callback
def add_public_entities():
async def add_public_entities(update=True):
"""Retrieve Netatmo public weather entities."""
entities = []
for area in entry.options.get(CONF_WEATHER_AREAS, {}).values():
data = NetatmoPublicData(
auth,
lat_ne=area[CONF_LAT_NE],
lon_ne=area[CONF_LON_NE],
lat_sw=area[CONF_LAT_SW],
lon_sw=area[CONF_LON_SW],
entities = {
device.name: device.id
for device in async_entries_for_config_entry(
device_registry, entry.entry_id
)
if device.model == "Public Weather stations"
}
new_entities = []
for area in [
NetatmoArea(**i) for i in entry.options.get(CONF_WEATHER_AREAS, {}).values()
]:
signal_name = f"{PUBLICDATA_DATA_CLASS_NAME}-{area.uuid}"
if area.area_name in entities:
entities.pop(area.area_name)
if update:
async_dispatcher_send(
hass, f"netatmo-config-{area.area_name}", area,
)
continue
await data_handler.register_data_class(
PUBLICDATA_DATA_CLASS_NAME,
signal_name,
None,
LAT_NE=area.lat_ne,
LON_NE=area.lon_ne,
LAT_SW=area.lat_sw,
LON_SW=area.lon_sw,
)
for sensor_type in SUPPORTED_PUBLIC_SENSOR_TYPES:
entities.append(NetatmoPublicSensor(area, data, sensor_type,))
new_entities.append(
NetatmoPublicSensor(data_handler, area, sensor_type)
)
for device in async_entries_for_config_entry(device_registry, entry.entry_id):
if device.model == "Public Weather stations":
device_registry.async_remove_device(device.id)
for device_id in entities.values():
device_registry.async_remove_device(device_id)
if entities:
async_add_entities(entities)
if new_entities:
async_add_entities(new_entities)
async_dispatcher_connect(
hass, f"signal-{DOMAIN}-public-update-{entry.entry_id}", add_public_entities
@ -189,7 +206,7 @@ async def async_setup_entry(
entry.add_update_listener(async_config_entry_updated)
add_public_entities()
await add_public_entities(False)
async def async_config_entry_updated(hass: HomeAssistant, entry: ConfigEntry) -> None:
@ -202,39 +219,42 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
return
class NetatmoSensor(Entity):
class NetatmoSensor(NetatmoBase):
"""Implementation of a Netatmo sensor."""
def __init__(self, netatmo_data, module_info, sensor_type):
def __init__(self, data_handler, data_class_name, module_info, sensor_type):
"""Initialize the sensor."""
self.netatmo_data = netatmo_data
super().__init__(data_handler)
self._data_classes.append(
{"name": data_class_name, SIGNAL_NAME: data_class_name}
)
self._id = module_info["_id"]
self._station_id = module_info.get("main_device", self._id)
station = self._data.get_station(self._station_id)
device = self._data.get_module(self._id)
device = self.netatmo_data.station_data.moduleById(mid=module_info["id"])
if not device:
# Assume it's a station if module can't be found
device = self.netatmo_data.station_data.stationById(sid=module_info["id"])
device = station
if device["type"] == "NHC":
self.module_name = module_info["station_name"]
if device["type"] in ("NHC", "NAMain"):
self._device_name = module_info["station_name"]
else:
self.module_name = (
f"{module_info['station_name']} {module_info['module_name']}"
)
self._device_name = f"{station['station_name']} {module_info.get('module_name', device['type'])}"
self._name = f"{MANUFACTURER} {self.module_name} {SENSOR_TYPES[sensor_type][0]}"
self._name = (
f"{MANUFACTURER} {self._device_name} {SENSOR_TYPES[sensor_type][0]}"
)
self.type = sensor_type
self._state = None
self._device_class = SENSOR_TYPES[self.type][3]
self._icon = SENSOR_TYPES[self.type][2]
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
self._module_type = device["type"]
self._module_id = module_info["id"]
self._unique_id = f"{self._module_id}-{self.type}"
@property
def name(self):
"""Return the name of the sensor."""
return self._name
self._model = device["type"]
self._unique_id = f"{self._id}-{self.type}"
@property
def icon(self):
@ -246,16 +266,6 @@ class NetatmoSensor(Entity):
"""Return the device class of the sensor."""
return self._device_class
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._module_id)},
"name": self.module_name,
"manufacturer": MANUFACTURER,
"model": MODELS[self._module_type],
}
@property
def state(self):
"""Return the state of the device."""
@ -266,34 +276,33 @@ class NetatmoSensor(Entity):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
@property
def unique_id(self):
"""Return the unique ID for this sensor."""
return self._unique_id
@property
def available(self):
"""Return True if entity is available."""
"""Return entity availability."""
return self._state is not None
def update(self):
"""Get the latest data from Netatmo API and updates the states."""
self.netatmo_data.update()
if self.netatmo_data.data is None:
@callback
def async_update_callback(self):
"""Update the entity's state."""
if self._data is None:
if self._state is None:
return
_LOGGER.warning("No data from update")
self._state = None
return
data = self.netatmo_data.data.get(self._module_id)
data = self._data.get_last_data(station_id=self._station_id, exclude=3600).get(
self._id
)
if data is None:
if self._state:
_LOGGER.debug(
"No data found for %s (%s)", self.module_name, self._module_id
"No data (%s) found for %s (%s)",
self._data,
self._device_name,
self._id,
)
_LOGGER.debug("data: %s", self.netatmo_data.data)
self._state = None
return
@ -318,50 +327,8 @@ class NetatmoSensor(Entity):
self._state = data["battery_percent"]
elif self.type == "battery_lvl":
self._state = data["battery_vp"]
elif self.type == "battery_vp" and self._module_type == MODULE_TYPE_WIND:
if data["battery_vp"] >= 5590:
self._state = "Full"
elif data["battery_vp"] >= 5180:
self._state = "High"
elif data["battery_vp"] >= 4770:
self._state = "Medium"
elif data["battery_vp"] >= 4360:
self._state = "Low"
elif data["battery_vp"] < 4360:
self._state = "Very Low"
elif self.type == "battery_vp" and self._module_type == MODULE_TYPE_RAIN:
if data["battery_vp"] >= 5500:
self._state = "Full"
elif data["battery_vp"] >= 5000:
self._state = "High"
elif data["battery_vp"] >= 4500:
self._state = "Medium"
elif data["battery_vp"] >= 4000:
self._state = "Low"
elif data["battery_vp"] < 4000:
self._state = "Very Low"
elif self.type == "battery_vp" and self._module_type == MODULE_TYPE_INDOOR:
if data["battery_vp"] >= 5640:
self._state = "Full"
elif data["battery_vp"] >= 5280:
self._state = "High"
elif data["battery_vp"] >= 4920:
self._state = "Medium"
elif data["battery_vp"] >= 4560:
self._state = "Low"
elif data["battery_vp"] < 4560:
self._state = "Very Low"
elif self.type == "battery_vp" and self._module_type == MODULE_TYPE_OUTDOOR:
if data["battery_vp"] >= 5500:
self._state = "Full"
elif data["battery_vp"] >= 5000:
self._state = "High"
elif data["battery_vp"] >= 4500:
self._state = "Medium"
elif data["battery_vp"] >= 4000:
self._state = "Low"
elif data["battery_vp"] < 4000:
self._state = "Very Low"
elif self.type == "battery_vp":
self._state = process_battery(data["battery_vp"], self._model)
elif self.type == "min_temp":
self._state = data["min_temp"]
elif self.type == "max_temp":
@ -369,47 +336,13 @@ class NetatmoSensor(Entity):
elif self.type == "windangle_value":
self._state = data["WindAngle"]
elif self.type == "windangle":
if data["WindAngle"] >= 330:
self._state = "N (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 300:
self._state = "NW (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 240:
self._state = "W (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 210:
self._state = "SW (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 150:
self._state = "S (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 120:
self._state = "SE (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 60:
self._state = "E (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 30:
self._state = "NE (%d\xb0)" % data["WindAngle"]
elif data["WindAngle"] >= 0:
self._state = "N (%d\xb0)" % data["WindAngle"]
self._state = process_angle(data["WindAngle"])
elif self.type == "windstrength":
self._state = data["WindStrength"]
elif self.type == "gustangle_value":
self._state = data["GustAngle"]
elif self.type == "gustangle":
if data["GustAngle"] >= 330:
self._state = "N (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 300:
self._state = "NW (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 240:
self._state = "W (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 210:
self._state = "SW (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 150:
self._state = "S (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 120:
self._state = "SE (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 60:
self._state = "E (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 30:
self._state = "NE (%d\xb0)" % data["GustAngle"]
elif data["GustAngle"] >= 0:
self._state = "N (%d\xb0)" % data["GustAngle"]
self._state = process_angle(data["GustAngle"])
elif self.type == "guststrength":
self._state = data["GustStrength"]
elif self.type == "reachable":
@ -417,90 +350,127 @@ class NetatmoSensor(Entity):
elif self.type == "rf_status_lvl":
self._state = data["rf_status"]
elif self.type == "rf_status":
if data["rf_status"] >= 90:
self._state = "Low"
elif data["rf_status"] >= 76:
self._state = "Medium"
elif data["rf_status"] >= 60:
self._state = "High"
elif data["rf_status"] <= 59:
self._state = "Full"
self._state = process_rf(data["rf_status"])
elif self.type == "wifi_status_lvl":
self._state = data["wifi_status"]
elif self.type == "wifi_status":
if data["wifi_status"] >= 86:
self._state = "Low"
elif data["wifi_status"] >= 71:
self._state = "Medium"
elif data["wifi_status"] >= 56:
self._state = "High"
elif data["wifi_status"] <= 55:
self._state = "Full"
self._state = process_wifi(data["wifi_status"])
elif self.type == "health_idx":
if data["health_idx"] == 0:
self._state = "Healthy"
elif data["health_idx"] == 1:
self._state = "Fine"
elif data["health_idx"] == 2:
self._state = "Fair"
elif data["health_idx"] == 3:
self._state = "Poor"
elif data["health_idx"] == 4:
self._state = "Unhealthy"
self._state = process_health(data["health_idx"])
except KeyError:
if self._state:
_LOGGER.info("No %s data found for %s", self.type, self.module_name)
_LOGGER.debug("No %s data found for %s", self.type, self._device_name)
self._state = None
return
class NetatmoData:
"""Get the latest data from Netatmo."""
def __init__(self, auth, station_data):
"""Initialize the data object."""
self.data = {}
self.station_data = station_data
self.auth = auth
def get_module_infos(self):
"""Return all modules available on the API as a dict."""
return self.station_data.getModules()
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Call the Netatmo API to update the data."""
self.station_data = self.station_data.__class__(self.auth)
data = self.station_data.lastData(exclude=3600, byId=True)
if not data:
_LOGGER.debug("No data received when updating station data")
return
self.data = data
def process_angle(angle: int) -> str:
"""Process angle and return string for display."""
if angle >= 330:
return f"N ({angle}\xb0)"
if angle >= 300:
return f"NW ({angle}\xb0)"
if angle >= 240:
return f"W ({angle}\xb0)"
if angle >= 210:
return f"SW ({angle}\xb0)"
if angle >= 150:
return f"S ({angle}\xb0)"
if angle >= 120:
return f"SE ({angle}\xb0)"
if angle >= 60:
return f"E ({angle}\xb0)"
if angle >= 30:
return f"NE ({angle}\xb0)"
return f"N ({angle}\xb0)"
class NetatmoPublicSensor(Entity):
def process_battery(data: int, model: str) -> str:
"""Process battery data and return string for display."""
values = BATTERY_VALUES[model]
if data >= values["Full"]:
return "Full"
if data >= values["High"]:
return "High"
if data >= values["Medium"]:
return "Medium"
if data >= values["Low"]:
return "Low"
return "Very Low"
def process_health(health):
"""Process health index and return string for display."""
if health == 0:
return "Healthy"
if health == 1:
return "Fine"
if health == 2:
return "Fair"
if health == 3:
return "Poor"
if health == 4:
return "Unhealthy"
def process_rf(strength):
"""Process wifi signal strength and return string for display."""
if strength >= 90:
return "Low"
if strength >= 76:
return "Medium"
if strength >= 60:
return "High"
return "Full"
def process_wifi(strength):
"""Process wifi signal strength and return string for display."""
if strength >= 86:
return "Low"
if strength >= 71:
return "Medium"
if strength >= 56:
return "High"
return "Full"
class NetatmoPublicSensor(NetatmoBase):
"""Represent a single sensor in a Netatmo."""
def __init__(self, area, data, sensor_type):
def __init__(self, data_handler, area, sensor_type):
"""Initialize the sensor."""
self.netatmo_data = data
super().__init__(data_handler)
self._signal_name = f"{PUBLICDATA_DATA_CLASS_NAME}-{area.uuid}"
self._data_classes.append(
{
"name": PUBLICDATA_DATA_CLASS_NAME,
"LAT_NE": area.lat_ne,
"LON_NE": area.lon_ne,
"LAT_SW": area.lat_sw,
"LON_SW": area.lon_sw,
"area_name": area.area_name,
SIGNAL_NAME: self._signal_name,
}
)
self.type = sensor_type
self._mode = area[CONF_PUBLIC_MODE]
self._area_name = area[CONF_AREA_NAME]
self._name = f"{MANUFACTURER} {self._area_name} {SENSOR_TYPES[self.type][0]}"
self.area = area
self._mode = area.mode
self._area_name = area.area_name
self._id = self._area_name
self._device_name = f"{self._area_name}"
self._name = f"{MANUFACTURER} {self._device_name} {SENSOR_TYPES[self.type][0]}"
self._state = None
self._device_class = SENSOR_TYPES[self.type][3]
self._icon = SENSOR_TYPES[self.type][2]
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
self._show_on_map = area[CONF_SHOW_ON_MAP]
self._unique_id = f"{self._name.replace(' ', '-')}"
self._module_type = PUBLIC
@property
def name(self):
"""Return the name of the sensor."""
return self._name
self._show_on_map = area.show_on_map
self._unique_id = f"{self._device_name.replace(' ', '-')}-{self.type}"
self._model = PUBLIC
@property
def icon(self):
@ -512,28 +482,14 @@ class NetatmoPublicSensor(Entity):
"""Return the device class of the sensor."""
return self._device_class
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._area_name)},
"name": self._area_name,
"manufacturer": MANUFACTURER,
"model": MODELS[self._module_type],
}
@property
def device_state_attributes(self):
"""Return the attributes of the device."""
attrs = {}
if self._show_on_map:
attrs[ATTR_LATITUDE] = (
self.netatmo_data.lat_ne + self.netatmo_data.lat_sw
) / 2
attrs[ATTR_LONGITUDE] = (
self.netatmo_data.lon_ne + self.netatmo_data.lon_sw
) / 2
attrs[ATTR_LATITUDE] = (self.area.lat_ne + self.area.lat_sw) / 2
attrs[ATTR_LONGITUDE] = (self.area.lon_ne + self.area.lon_sw) / 2
return attrs
@ -547,46 +503,95 @@ class NetatmoPublicSensor(Entity):
"""Return the unit of measurement of this entity."""
return self._unit_of_measurement
@property
def unique_id(self):
"""Return the unique ID for this sensor."""
return self._unique_id
@property
def available(self):
"""Return True if entity is available."""
return self._state is not None
def update(self):
"""Get the latest data from Netatmo API and updates the states."""
self.netatmo_data.update()
@property
def _data(self):
return self.data_handler.data[self._signal_name]
if self.netatmo_data.data is None:
_LOGGER.info("No data found for %s", self._name)
async def async_added_to_hass(self) -> None:
"""Entity created."""
await super().async_added_to_hass()
self.data_handler.listeners.append(
async_dispatcher_connect(
self.hass,
f"netatmo-config-{self.device_info['name']}",
self.async_config_update_callback,
)
)
@callback
async def async_config_update_callback(self, area):
"""Update the entity's config."""
if self.area == area:
return
await self.data_handler.unregister_data_class(
self._signal_name, self.async_update_callback
)
self.area = area
self._signal_name = f"{PUBLICDATA_DATA_CLASS_NAME}-{area.uuid}"
self._data_classes = [
{
"name": PUBLICDATA_DATA_CLASS_NAME,
"LAT_NE": area.lat_ne,
"LON_NE": area.lon_ne,
"LAT_SW": area.lat_sw,
"LON_SW": area.lon_sw,
"area_name": area.area_name,
SIGNAL_NAME: self._signal_name,
}
]
self._mode = area.mode
self._show_on_map = area.show_on_map
await self.data_handler.register_data_class(
PUBLICDATA_DATA_CLASS_NAME,
self._signal_name,
self.async_update_callback,
LAT_NE=area.lat_ne,
LON_NE=area.lon_ne,
LAT_SW=area.lat_sw,
LON_SW=area.lon_sw,
)
@callback
def async_update_callback(self):
"""Update the entity's state."""
if self._data is None:
if self._state is None:
return
_LOGGER.warning("No data from update")
self._state = None
return
data = None
if self.type == "temperature":
data = self.netatmo_data.data.getLatestTemperatures()
data = self._data.get_latest_temperatures()
elif self.type == "pressure":
data = self.netatmo_data.data.getLatestPressures()
data = self._data.get_latest_pressures()
elif self.type == "humidity":
data = self.netatmo_data.data.getLatestHumidities()
data = self._data.get_latest_humidities()
elif self.type == "rain":
data = self.netatmo_data.data.getLatestRain()
data = self._data.get_latest_rain()
elif self.type == "sum_rain_1":
data = self.netatmo_data.data.get60minRain()
data = self._data.get_60_min_rain()
elif self.type == "sum_rain_24":
data = self.netatmo_data.data.get24hRain()
data = self._data.get_24_h_rain()
elif self.type == "windstrength":
data = self.netatmo_data.data.getLatestWindStrengths()
data = self._data.get_latest_wind_strengths()
elif self.type == "guststrength":
data = self.netatmo_data.data.getLatestGustStrengths()
data = self._data.get_latest_gust_strengths()
if not data:
_LOGGER.warning(
if self._state is None:
return
_LOGGER.debug(
"No station provides %s data in the area %s", self.type, self._area_name
)
self._state = None
@ -597,41 +602,3 @@ class NetatmoPublicSensor(Entity):
self._state = round(sum(values) / len(values), 1)
elif self._mode == "max":
self._state = max(values)
class NetatmoPublicData:
"""Get the latest data from Netatmo."""
def __init__(self, auth, lat_ne, lon_ne, lat_sw, lon_sw):
"""Initialize the data object."""
self.auth = auth
self.data = None
self.lat_ne = lat_ne
self.lon_ne = lon_ne
self.lat_sw = lat_sw
self.lon_sw = lon_sw
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Request an update from the Netatmo API."""
try:
data = pyatmo.PublicData(
self.auth,
LAT_NE=self.lat_ne,
LON_NE=self.lon_ne,
LAT_SW=self.lat_sw,
LON_SW=self.lon_sw,
filtering=True,
)
except pyatmo.NoDevice:
data = None
if not data:
_LOGGER.debug("No data received when updating public station data")
return
if data.CountStationInArea() == 0:
_LOGGER.warning("No Stations available in this area")
return
self.data = data

View File

@ -3,8 +3,34 @@ set_schedule:
description: Set the heating schedule.
fields:
schedule_name:
description: Schedule name.
description: Schedule name
example: Standard
home_name:
description: Home name.
example: MyHome
entity_id:
description: Entity id of the climate device.
example: climate.netatmo_livingroom
set_persons_home:
description: Set a list of persons as at home. Person's name must match a name known by the Welcome Camera.
fields:
persons:
description: List of names
example: Bob
entity_id:
description: Entity id of the camera.
example: camera.netatmo_entrance
set_person_away:
description: Set a person away. If no person is set the home will be marked as empty. Person's name must match a name known by the Welcome Camera.
fields:
person:
description: Person's name (optional)
example: Bob
entity_id:
description: Entity id of the camera.
example: camera.netatmo_entrance
register_webhook:
description: Register webhook
unregister_webhook:
description: Unregister webhook

View File

@ -6,7 +6,7 @@
}
},
"abort": {
"already_setup": "[%key:common::config_flow::abort::single_instance_allowed%]",
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
"authorize_url_timeout": "[%key:common::config_flow::abort::oauth2_authorize_url_timeout%]",
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]"
},

View File

@ -18,6 +18,11 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
EVENT_TYPE_MAP = {
"outdoor": "",
"therm_mode": "",
}
async def handle_webhook(hass, webhook_id, request):
"""Handle webhook callback."""
@ -31,18 +36,13 @@ async def handle_webhook(hass, webhook_id, request):
event_type = data.get(ATTR_EVENT_TYPE)
if event_type == "outdoor":
if event_type in ["outdoor", "therm_mode"]:
hass.bus.async_fire(
event_type=NETATMO_EVENT, event_data={"type": event_type, "data": data}
)
for event_data in data.get("event_list"):
async_evaluate_event(hass, event_data)
elif event_type == "therm_mode":
hass.bus.async_fire(
event_type=NETATMO_EVENT, event_data={"type": event_type, "data": data}
)
for event_data in data.get("data"):
for event_data in data.get(EVENT_TYPE_MAP[event_type], []):
async_evaluate_event(hass, event_data)
else:
async_evaluate_event(hass, data)
@ -65,19 +65,8 @@ def async_evaluate_event(hass, event_data):
event_type=NETATMO_EVENT,
event_data={"type": event_type, "data": person_event_data},
)
elif event_type == "therm_mode":
_LOGGER.debug("therm_mode: %s", event_data)
hass.bus.async_fire(
event_type=NETATMO_EVENT,
event_data={"type": event_type, "data": event_data},
)
elif event_type == "set_point":
_LOGGER.debug("set_point: %s", event_data)
hass.bus.async_fire(
event_type=NETATMO_EVENT,
event_data={"type": event_type, "data": event_data},
)
else:
_LOGGER.debug("%s: %s", event_type, event_data)
hass.bus.async_fire(
event_type=NETATMO_EVENT,
event_data={"type": event_type, "data": event_data},

View File

@ -1223,7 +1223,7 @@ pyarlo==0.2.3
pyatag==0.3.3.4
# homeassistant.components.netatmo
pyatmo==3.3.1
pyatmo==4.0.0
# homeassistant.components.atome
pyatome==0.1.1

View File

@ -583,7 +583,7 @@ pyarlo==0.2.3
pyatag==0.3.3.4
# homeassistant.components.netatmo
pyatmo==3.3.1
pyatmo==4.0.0
# homeassistant.components.blackbird
pyblackbird==0.5

View File

@ -31,7 +31,7 @@ async def test_abort_if_existing_entry(hass):
"netatmo", context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "missing_configuration"
assert result["reason"] == "single_instance_allowed"
result = await hass.config_entries.flow.async_init(
"netatmo",
@ -39,7 +39,7 @@ async def test_abort_if_existing_entry(hass):
data={"host": "0.0.0.0", "properties": {"id": "aa:bb:cc:dd:ee:ff"}},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "missing_configuration"
assert result["reason"] == "single_instance_allowed"
async def test_full_flow(hass, aiohttp_client, aioclient_mock):
@ -108,11 +108,21 @@ async def test_option_flow(hass):
"""Test config flow options."""
valid_option = {
"lat_ne": 32.91336,
"lon_ne": -117.187429,
"lat_sw": 32.83336,
"lon_sw": -117.26743,
"show_on_map": False,
"area_name": "Home",
"lon_ne": -117.187429,
"lat_sw": 32.83336,
"mode": "avg",
}
expected_result = {
"lat_ne": 32.9133601,
"lon_ne": -117.1874289,
"lat_sw": 32.8333601,
"lon_sw": -117.26742990000001,
"show_on_map": False,
"area_name": "Home",
"mode": "avg",
}
@ -145,4 +155,60 @@ async def test_option_flow(hass):
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {CONF_WEATHER_AREAS: {"Home": valid_option}}
for k, v in expected_result.items():
assert config_entry.options[CONF_WEATHER_AREAS]["Home"][k] == v
async def test_option_flow_wrong_coordinates(hass):
"""Test config flow options with mixed up coordinates."""
valid_option = {
"lat_ne": 32.1234567,
"lon_ne": -117.2345678,
"lat_sw": 32.2345678,
"lon_sw": -117.1234567,
"show_on_map": False,
"area_name": "Home",
"mode": "avg",
}
expected_result = {
"lat_ne": 32.2345678,
"lon_ne": -117.1234567,
"lat_sw": 32.1234567,
"lon_sw": -117.2345678,
"show_on_map": False,
"area_name": "Home",
"mode": "avg",
}
config_entry = MockConfigEntry(
domain=DOMAIN, unique_id=DOMAIN, data=VALID_CONFIG, options={},
)
config_entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "public_weather_areas"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={CONF_NEW_AREA: "Home"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "public_weather"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input=valid_option
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "public_weather_areas"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
for k, v in expected_result.items():
assert config_entry.options[CONF_WEATHER_AREAS]["Home"][k] == v