core/homeassistant/components/weheat/coordinator.py

87 lines
2.6 KiB
Python
Raw Normal View History

Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
"""Define a custom coordinator for the Weheat heatpump integration."""
from datetime import timedelta
from weheat.abstractions.discovery import HeatPumpDiscovery
from weheat.abstractions.heat_pump import HeatPump
from weheat.exceptions import (
ApiException,
BadRequestException,
ForbiddenException,
NotFoundException,
ServiceException,
UnauthorizedException,
)
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import API_URL, DOMAIN, LOGGER, UPDATE_INTERVAL
EXCEPTIONS = (
ServiceException,
NotFoundException,
ForbiddenException,
BadRequestException,
ApiException,
)
class WeheatDataUpdateCoordinator(DataUpdateCoordinator[HeatPump]):
"""A custom coordinator for the Weheat heatpump integration."""
def __init__(
self,
hass: HomeAssistant,
session: OAuth2Session,
heat_pump: HeatPumpDiscovery.HeatPumpInfo,
) -> None:
"""Initialize the data coordinator."""
super().__init__(
hass,
logger=LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=UPDATE_INTERVAL),
)
self.heat_pump_info = heat_pump
self._heat_pump_data = HeatPump(API_URL, heat_pump.uuid)
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
self.session = session
@property
def heatpump_id(self) -> str:
"""Return the heat pump id."""
return self.heat_pump_info.uuid
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
@property
def readable_name(self) -> str | None:
"""Return the readable name of the heat pump."""
if self.heat_pump_info.name:
return self.heat_pump_info.name
return self.heat_pump_info.model
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
@property
def model(self) -> str:
"""Return the model of the heat pump."""
return self.heat_pump_info.model
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
def fetch_data(self) -> HeatPump:
"""Get the data from the API."""
try:
self._heat_pump_data.get_status(self.session.token[CONF_ACCESS_TOKEN])
except UnauthorizedException as error:
raise ConfigEntryAuthFailed from error
Add weheat core integration (#123057) * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Update const.py * Add reauthentication support for weheat integration * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Add empty weheat integration * Add first sensor to weheat integration * Add weheat entity to provide device information * Fixed automatic selection for a single heat pump * Replaced integration specific package and removed status sensor * Add reauthentication support for weheat integration * Update const.py * Add test cases for the config flow of the weheat integration * Changed API and OATH url to weheat production environment * Resolved merge conflict after adding weheat package * Apply suggestions from code review Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com> * Added translation keys, more type info and version bump the weheat package * Adding native property value for weheat sensor * Removed reauth, added weheat sensor description and changed discovery of heat pumps * Added unique ID of user to entity * Replaced string by constants, added test case for duplicate unique id * Removed duplicate constant * Added offline scope * Removed re-auth related code * Simplified oath implementation * Cleanup tests for weheat integration * Added oath scope to tests --------- Co-authored-by: kjell-van-straaten <kjell.van.straaten@wefabricate.com> Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-09-06 09:58:01 +00:00
except EXCEPTIONS as error:
raise UpdateFailed(error) from error
return self._heat_pump_data
async def _async_update_data(self) -> HeatPump:
"""Fetch data from the API."""
await self.session.async_ensure_token_valid()
return await self.hass.async_add_executor_job(self.fetch_data)