core/homeassistant/components/sensor/netatmo.py

155 lines
4.9 KiB
Python
Raw Normal View History

"""
homeassistant.components.sensor.netatmo
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
NetAtmo Weather Service service.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/...
"""
import logging
from datetime import timedelta
from homeassistant.components.sensor import DOMAIN
2016-01-11 05:57:31 +00:00
from homeassistant.const import (CONF_API_KEY, CONF_USERNAME, CONF_PASSWORD,
TEMP_CELCIUS)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle
REQUIREMENTS = [
'https://github.com/HydrelioxGitHub/netatmo-api-python/archive/'
'43ff238a0122b0939a0dc4e8836b6782913fb6e2.zip'
'#lnetatmo==0.4.0']
_LOGGER = logging.getLogger(__name__)
2016-01-13 02:45:43 +00:00
SENSOR_TYPES = {
'temperature': ['Temperature', TEMP_CELCIUS],
2016-01-13 03:26:40 +00:00
'co2': ['CO2', 'ppm'],
'pressure': ['Pressure', 'mb'],
'noise': ['Noise', 'dB'],
'humidity': ['Humidity', '%']
}
2016-01-13 02:45:43 +00:00
CONF_SECRET_KEY = 'secret_key'
2016-01-13 03:19:27 +00:00
ATTR_MODULE = 'modules'
2016-01-13 02:45:43 +00:00
# Return cached results if last scan was less then this time ago
2016-01-11 05:57:31 +00:00
# NetAtmo Data is uploaded to server every 10mn
# so this time should not be under
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=600)
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Get the NetAtmo sensor. """
if not validate_config({DOMAIN: config},
{DOMAIN: [CONF_API_KEY,
CONF_USERNAME,
CONF_PASSWORD,
2016-01-13 02:45:43 +00:00
CONF_SECRET_KEY]},
_LOGGER):
return None
2016-01-13 02:45:43 +00:00
import lnetatmo
2016-01-11 05:57:31 +00:00
authorization = lnetatmo.ClientAuth(config.get(CONF_API_KEY, None),
2016-01-13 02:45:43 +00:00
config.get(CONF_SECRET_KEY, None),
2016-01-11 05:57:31 +00:00
config.get(CONF_USERNAME, None),
config.get(CONF_PASSWORD, None))
if not authorization:
_LOGGER.error(
"Connection error "
"Please check your settings for NatAtmo API.")
return False
data = NetAtmoData(authorization)
dev = []
try:
2016-01-11 05:57:31 +00:00
# Iterate each module
2016-01-13 03:19:27 +00:00
for module_name, monitored_conditions in config[ATTR_MODULE].items():
2016-01-11 05:57:31 +00:00
# Test if module exist """
if module_name not in data.get_module_names():
_LOGGER.error('Module name: "%s" not found', module_name)
continue
2016-01-11 05:57:31 +00:00
# Only create sensor for monitored """
for variable in monitored_conditions:
if variable not in SENSOR_TYPES:
_LOGGER.error('Sensor type: "%s" does not exist', variable)
else:
2016-01-11 05:57:31 +00:00
dev.append(
NetAtmoSensor(data, module_name, variable))
except KeyError:
pass
add_devices(dev)
# pylint: disable=too-few-public-methods
class NetAtmoSensor(Entity):
""" Implements a NetAtmo sensor. """
def __init__(self, netatmo_data, module_name, sensor_type):
self._name = "NetAtmo {} {}".format(module_name,
SENSOR_TYPES[sensor_type][0])
self.netatmo_data = netatmo_data
self.module_name = module_name
self.type = sensor_type
self._state = None
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
self.update()
@property
def name(self):
return self._name
@property
def state(self):
""" Returns the state of the device. """
return self._state
@property
def unit_of_measurement(self):
""" Unit of measurement of this entity, if any. """
return self._unit_of_measurement
# pylint: disable=too-many-branches
def update(self):
""" Gets the latest data from NetAtmo API and updates the states. """
self.netatmo_data.update()
data = self.netatmo_data.data[self.module_name]
if self.type == 'temperature':
self._state = round(data['Temperature'], 1)
elif self.type == 'humidity':
self._state = data['Humidity']
elif self.type == 'noise':
self._state = data['Noise']
elif self.type == 'co2':
self._state = data['CO2']
elif self.type == 'pressure':
self._state = round(data['Pressure'], 1)
class NetAtmoData(object):
""" Gets the latest data from NetAtmo. """
def __init__(self, auth):
self.auth = auth
self.data = None
2016-01-11 05:57:31 +00:00
def get_module_names(self):
""" Return all module available on the API as a list. """
self.update()
return self.data.keys()
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
2016-01-11 05:57:31 +00:00
""" Call the NetAtmo API to update the data. """
2016-01-08 05:19:03 +00:00
import lnetatmo
2016-01-11 05:57:31 +00:00
# Gets the latest data from NetAtmo. """
dev_list = lnetatmo.DeviceList(self.auth)
self.data = dev_list.lastData(exclude=3600)