161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
"""
|
|
Support for Speedtest.net based on speedtest-cli.
|
|
|
|
For more details about this platform, please refer to the documentation at
|
|
https://home-assistant.io/components/sensor.speedtest/
|
|
"""
|
|
import logging
|
|
import re
|
|
import sys
|
|
from subprocess import check_output, CalledProcessError
|
|
import voluptuous as vol
|
|
|
|
import homeassistant.util.dt as dt_util
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.components import recorder
|
|
from homeassistant.components.sensor import (DOMAIN, PLATFORM_SCHEMA)
|
|
from homeassistant.const import CONF_MONITORED_CONDITIONS
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.helpers.event import track_time_change
|
|
|
|
REQUIREMENTS = ['speedtest-cli==0.3.4']
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
_SPEEDTEST_REGEX = re.compile(r'Ping:\s(\d+\.\d+)\sms[\r\n]+'
|
|
r'Download:\s(\d+\.\d+)\sMbit/s[\r\n]+'
|
|
r'Upload:\s(\d+\.\d+)\sMbit/s[\r\n]+')
|
|
|
|
CONF_SECOND = 'second'
|
|
CONF_MINUTE = 'minute'
|
|
CONF_HOUR = 'hour'
|
|
CONF_DAY = 'day'
|
|
CONF_SERVER_ID = 'server_id'
|
|
SENSOR_TYPES = {
|
|
'ping': ['Ping', 'ms'],
|
|
'download': ['Download', 'Mbit/s'],
|
|
'upload': ['Upload', 'Mbit/s'],
|
|
}
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
|
vol.Required(CONF_MONITORED_CONDITIONS):
|
|
vol.All(cv.ensure_list, [vol.In(list(SENSOR_TYPES.keys()))]),
|
|
vol.Optional(CONF_SERVER_ID): cv.positive_int,
|
|
vol.Optional(CONF_SECOND, default=[0]):
|
|
vol.All(cv.ensure_list, [vol.All(vol.Coerce(int), vol.Range(0, 59))]),
|
|
vol.Optional(CONF_MINUTE, default=[0]):
|
|
vol.All(cv.ensure_list, [vol.All(vol.Coerce(int), vol.Range(0, 59))]),
|
|
vol.Optional(CONF_HOUR):
|
|
vol.All(cv.ensure_list, [vol.All(vol.Coerce(int), vol.Range(0, 23))]),
|
|
vol.Optional(CONF_DAY):
|
|
vol.All(cv.ensure_list, [vol.All(vol.Coerce(int), vol.Range(1, 31))]),
|
|
})
|
|
|
|
|
|
def setup_platform(hass, config, add_devices, discovery_info=None):
|
|
"""Setup the Speedtest sensor."""
|
|
data = SpeedtestData(hass, config)
|
|
dev = []
|
|
for sensor in config[CONF_MONITORED_CONDITIONS]:
|
|
if sensor not in SENSOR_TYPES:
|
|
_LOGGER.error('Sensor type: "%s" does not exist', sensor)
|
|
else:
|
|
dev.append(SpeedtestSensor(data, sensor))
|
|
|
|
add_devices(dev)
|
|
|
|
def update(call=None):
|
|
"""Update service for manual updates."""
|
|
data.update(dt_util.now())
|
|
for sensor in dev:
|
|
sensor.update()
|
|
|
|
hass.services.register(DOMAIN, 'update_speedtest', update)
|
|
|
|
|
|
# pylint: disable=too-few-public-methods
|
|
class SpeedtestSensor(Entity):
|
|
"""Implementation of a speedtest.net sensor."""
|
|
|
|
def __init__(self, speedtest_data, sensor_type):
|
|
"""Initialize the sensor."""
|
|
self._name = SENSOR_TYPES[sensor_type][0]
|
|
self.speedtest_client = speedtest_data
|
|
self.type = sensor_type
|
|
self._state = None
|
|
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
|
|
|
|
@property
|
|
def name(self):
|
|
"""Return the name of the sensor."""
|
|
return '{} {}'.format('Speedtest', self._name)
|
|
|
|
@property
|
|
def state(self):
|
|
"""Return the state of the device."""
|
|
return self._state
|
|
|
|
@property
|
|
def unit_of_measurement(self):
|
|
"""Return the unit of measurement of this entity, if any."""
|
|
return self._unit_of_measurement
|
|
|
|
def update(self):
|
|
"""Get the latest data and update the states."""
|
|
data = self.speedtest_client.data
|
|
if data is None:
|
|
entity_id = 'sensor.speedtest_' + self._name.lower()
|
|
states = recorder.get_model('States')
|
|
try:
|
|
last_state = recorder.execute(
|
|
recorder.query('States').filter(
|
|
(states.entity_id == entity_id) &
|
|
(states.last_changed == states.last_updated) &
|
|
(states.state != 'unknown')
|
|
).order_by(states.state_id.desc()).limit(1))
|
|
except TypeError:
|
|
return
|
|
except RuntimeError:
|
|
return
|
|
if not last_state:
|
|
return
|
|
self._state = last_state[0].state
|
|
elif self.type == 'ping':
|
|
self._state = data['ping']
|
|
elif self.type == 'download':
|
|
self._state = data['download']
|
|
elif self.type == 'upload':
|
|
self._state = data['upload']
|
|
|
|
|
|
class SpeedtestData(object):
|
|
"""Get the latest data from speedtest.net."""
|
|
|
|
def __init__(self, hass, config):
|
|
"""Initialize the data object."""
|
|
self.data = None
|
|
self._server_id = config.get(CONF_SERVER_ID)
|
|
track_time_change(hass, self.update,
|
|
second=config.get(CONF_SECOND),
|
|
minute=config.get(CONF_MINUTE),
|
|
hour=config.get(CONF_HOUR),
|
|
day=config.get(CONF_DAY))
|
|
|
|
def update(self, now):
|
|
"""Get the latest data from speedtest.net."""
|
|
import speedtest_cli
|
|
|
|
_LOGGER.info('Executing speedtest')
|
|
try:
|
|
args = [sys.executable, speedtest_cli.__file__, '--simple']
|
|
if self._server_id:
|
|
args = args + ['--server', str(self._server_id)]
|
|
|
|
re_output = _SPEEDTEST_REGEX.split(
|
|
check_output(args).decode("utf-8"))
|
|
except CalledProcessError as process_error:
|
|
_LOGGER.error('Error executing speedtest: %s', process_error)
|
|
return
|
|
self.data = {'ping': round(float(re_output[1]), 2),
|
|
'download': round(float(re_output[2]), 2),
|
|
'upload': round(float(re_output[3]), 2)}
|