350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""Support for monitoring the local system."""
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sys
|
|
|
|
import psutil
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.sensor import PLATFORM_SCHEMA
|
|
from homeassistant.const import (
|
|
CONF_RESOURCES,
|
|
CONF_TYPE,
|
|
DATA_GIBIBYTES,
|
|
DATA_MEBIBYTES,
|
|
DATA_RATE_MEGABYTES_PER_SECOND,
|
|
PERCENTAGE,
|
|
STATE_OFF,
|
|
STATE_ON,
|
|
TEMP_CELSIUS,
|
|
)
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.util import slugify
|
|
import homeassistant.util.dt as dt_util
|
|
|
|
# mypy: allow-untyped-defs, no-check-untyped-defs
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
CONF_ARG = "arg"
|
|
|
|
if sys.maxsize > 2 ** 32:
|
|
CPU_ICON = "mdi:cpu-64-bit"
|
|
else:
|
|
CPU_ICON = "mdi:cpu-32-bit"
|
|
|
|
# Schema: [name, unit of measurement, icon, device class, flag if mandatory arg]
|
|
SENSOR_TYPES = {
|
|
"disk_free": ["Disk free", DATA_GIBIBYTES, "mdi:harddisk", None, False],
|
|
"disk_use": ["Disk use", DATA_GIBIBYTES, "mdi:harddisk", None, False],
|
|
"disk_use_percent": [
|
|
"Disk use (percent)",
|
|
PERCENTAGE,
|
|
"mdi:harddisk",
|
|
None,
|
|
False,
|
|
],
|
|
"ipv4_address": ["IPv4 address", "", "mdi:server-network", None, True],
|
|
"ipv6_address": ["IPv6 address", "", "mdi:server-network", None, True],
|
|
"last_boot": ["Last boot", "", "mdi:clock", "timestamp", False],
|
|
"load_15m": ["Load (15m)", " ", CPU_ICON, None, False],
|
|
"load_1m": ["Load (1m)", " ", CPU_ICON, None, False],
|
|
"load_5m": ["Load (5m)", " ", CPU_ICON, None, False],
|
|
"memory_free": ["Memory free", DATA_MEBIBYTES, "mdi:memory", None, False],
|
|
"memory_use": ["Memory use", DATA_MEBIBYTES, "mdi:memory", None, False],
|
|
"memory_use_percent": [
|
|
"Memory use (percent)",
|
|
PERCENTAGE,
|
|
"mdi:memory",
|
|
None,
|
|
False,
|
|
],
|
|
"network_in": ["Network in", DATA_MEBIBYTES, "mdi:server-network", None, True],
|
|
"network_out": ["Network out", DATA_MEBIBYTES, "mdi:server-network", None, True],
|
|
"packets_in": ["Packets in", " ", "mdi:server-network", None, True],
|
|
"packets_out": ["Packets out", " ", "mdi:server-network", None, True],
|
|
"throughput_network_in": [
|
|
"Network throughput in",
|
|
DATA_RATE_MEGABYTES_PER_SECOND,
|
|
"mdi:server-network",
|
|
None,
|
|
True,
|
|
],
|
|
"throughput_network_out": [
|
|
"Network throughput out",
|
|
DATA_RATE_MEGABYTES_PER_SECOND,
|
|
"mdi:server-network",
|
|
True,
|
|
],
|
|
"process": ["Process", " ", CPU_ICON, None, True],
|
|
"processor_use": ["Processor use (percent)", PERCENTAGE, CPU_ICON, None, False],
|
|
"processor_temperature": [
|
|
"Processor temperature",
|
|
TEMP_CELSIUS,
|
|
CPU_ICON,
|
|
None,
|
|
False,
|
|
],
|
|
"swap_free": ["Swap free", DATA_MEBIBYTES, "mdi:harddisk", None, False],
|
|
"swap_use": ["Swap use", DATA_MEBIBYTES, "mdi:harddisk", None, False],
|
|
"swap_use_percent": ["Swap use (percent)", PERCENTAGE, "mdi:harddisk", None, False],
|
|
}
|
|
|
|
|
|
def check_required_arg(value):
|
|
"""Validate that the required "arg" for the sensor types that need it are set."""
|
|
for sensor in value:
|
|
sensor_type = sensor[CONF_TYPE]
|
|
sensor_arg = sensor.get(CONF_ARG)
|
|
|
|
if sensor_arg is None and SENSOR_TYPES[sensor_type][4]:
|
|
raise vol.RequiredFieldInvalid(
|
|
f"Mandatory 'arg' is missing for sensor type '{sensor_type}'."
|
|
)
|
|
|
|
return value
|
|
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Optional(CONF_RESOURCES, default={CONF_TYPE: "disk_use"}): vol.All(
|
|
cv.ensure_list,
|
|
[
|
|
vol.Schema(
|
|
{
|
|
vol.Required(CONF_TYPE): vol.In(SENSOR_TYPES),
|
|
vol.Optional(CONF_ARG): cv.string,
|
|
}
|
|
)
|
|
],
|
|
check_required_arg,
|
|
)
|
|
}
|
|
)
|
|
|
|
IO_COUNTER = {
|
|
"network_out": 0,
|
|
"network_in": 1,
|
|
"packets_out": 2,
|
|
"packets_in": 3,
|
|
"throughput_network_out": 0,
|
|
"throughput_network_in": 1,
|
|
}
|
|
|
|
IF_ADDRS_FAMILY = {"ipv4_address": socket.AF_INET, "ipv6_address": socket.AF_INET6}
|
|
|
|
# There might be additional keys to be added for different
|
|
# platforms / hardware combinations.
|
|
# Taken from last version of "glances" integration before they moved to
|
|
# a generic temperature sensor logic.
|
|
# https://github.com/home-assistant/core/blob/5e15675593ba94a2c11f9f929cdad317e27ce190/homeassistant/components/glances/sensor.py#L199
|
|
CPU_SENSOR_PREFIXES = [
|
|
"amdgpu 1",
|
|
"aml_thermal",
|
|
"Core 0",
|
|
"Core 1",
|
|
"CPU Temperature",
|
|
"CPU",
|
|
"cpu-thermal 1",
|
|
"cpu_thermal 1",
|
|
"exynos-therm 1",
|
|
"Package id 0",
|
|
"Physical id 0",
|
|
"radeon 1",
|
|
"soc-thermal 1",
|
|
"soc_thermal 1",
|
|
]
|
|
|
|
|
|
def setup_platform(hass, config, add_entities, discovery_info=None):
|
|
"""Set up the system monitor sensors."""
|
|
dev = []
|
|
for resource in config[CONF_RESOURCES]:
|
|
# Initialize the sensor argument if none was provided.
|
|
# For disk monitoring default to "/" (root) to prevent runtime errors, if argument was not specified.
|
|
if CONF_ARG not in resource:
|
|
if resource[CONF_TYPE].startswith("disk_"):
|
|
resource[CONF_ARG] = "/"
|
|
else:
|
|
resource[CONF_ARG] = ""
|
|
|
|
# Verify if we can retrieve CPU / processor temperatures.
|
|
# If not, do not create the entity and add a warning to the log
|
|
if resource[CONF_TYPE] == "processor_temperature":
|
|
if SystemMonitorSensor.read_cpu_temperature() is None:
|
|
_LOGGER.warning("Cannot read CPU / processor temperature information.")
|
|
continue
|
|
|
|
dev.append(SystemMonitorSensor(resource[CONF_TYPE], resource[CONF_ARG]))
|
|
|
|
add_entities(dev, True)
|
|
|
|
|
|
class SystemMonitorSensor(Entity):
|
|
"""Implementation of a system monitor sensor."""
|
|
|
|
def __init__(self, sensor_type, argument=""):
|
|
"""Initialize the sensor."""
|
|
self._name = "{} {}".format(SENSOR_TYPES[sensor_type][0], argument)
|
|
self._unique_id = slugify(f"{sensor_type}_{argument}")
|
|
self.argument = argument
|
|
self.type = sensor_type
|
|
self._state = None
|
|
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
|
|
self._available = True
|
|
if sensor_type in ["throughput_network_out", "throughput_network_in"]:
|
|
self._last_value = None
|
|
self._last_update_time = None
|
|
|
|
@property
|
|
def name(self):
|
|
"""Return the name of the sensor."""
|
|
return self._name.rstrip()
|
|
|
|
@property
|
|
def unique_id(self):
|
|
"""Return the unique ID."""
|
|
return self._unique_id
|
|
|
|
@property
|
|
def device_class(self):
|
|
"""Return the class of this sensor."""
|
|
return SENSOR_TYPES[self.type][3]
|
|
|
|
@property
|
|
def icon(self):
|
|
"""Icon to use in the frontend, if any."""
|
|
return SENSOR_TYPES[self.type][2]
|
|
|
|
@property
|
|
def state(self):
|
|
"""Return the state of the device."""
|
|
return self._state
|
|
|
|
@property
|
|
def unit_of_measurement(self):
|
|
"""Return the unit of measurement of this entity, if any."""
|
|
return self._unit_of_measurement
|
|
|
|
@property
|
|
def available(self):
|
|
"""Return True if entity is available."""
|
|
return self._available
|
|
|
|
def update(self):
|
|
"""Get the latest system information."""
|
|
if self.type == "disk_use_percent":
|
|
self._state = psutil.disk_usage(self.argument).percent
|
|
elif self.type == "disk_use":
|
|
self._state = round(psutil.disk_usage(self.argument).used / 1024 ** 3, 1)
|
|
elif self.type == "disk_free":
|
|
self._state = round(psutil.disk_usage(self.argument).free / 1024 ** 3, 1)
|
|
elif self.type == "memory_use_percent":
|
|
self._state = psutil.virtual_memory().percent
|
|
elif self.type == "memory_use":
|
|
virtual_memory = psutil.virtual_memory()
|
|
self._state = round(
|
|
(virtual_memory.total - virtual_memory.available) / 1024 ** 2, 1
|
|
)
|
|
elif self.type == "memory_free":
|
|
self._state = round(psutil.virtual_memory().available / 1024 ** 2, 1)
|
|
elif self.type == "swap_use_percent":
|
|
self._state = psutil.swap_memory().percent
|
|
elif self.type == "swap_use":
|
|
self._state = round(psutil.swap_memory().used / 1024 ** 2, 1)
|
|
elif self.type == "swap_free":
|
|
self._state = round(psutil.swap_memory().free / 1024 ** 2, 1)
|
|
elif self.type == "processor_use":
|
|
self._state = round(psutil.cpu_percent(interval=None))
|
|
elif self.type == "processor_temperature":
|
|
self._state = self.read_cpu_temperature()
|
|
elif self.type == "process":
|
|
for proc in psutil.process_iter():
|
|
try:
|
|
if self.argument == proc.name():
|
|
self._state = STATE_ON
|
|
return
|
|
except psutil.NoSuchProcess as err:
|
|
_LOGGER.warning(
|
|
"Failed to load process with ID: %s, old name: %s",
|
|
err.pid,
|
|
err.name,
|
|
)
|
|
self._state = STATE_OFF
|
|
elif self.type == "network_out" or self.type == "network_in":
|
|
counters = psutil.net_io_counters(pernic=True)
|
|
if self.argument in counters:
|
|
counter = counters[self.argument][IO_COUNTER[self.type]]
|
|
self._state = round(counter / 1024 ** 2, 1)
|
|
else:
|
|
self._state = None
|
|
elif self.type == "packets_out" or self.type == "packets_in":
|
|
counters = psutil.net_io_counters(pernic=True)
|
|
if self.argument in counters:
|
|
self._state = counters[self.argument][IO_COUNTER[self.type]]
|
|
else:
|
|
self._state = None
|
|
elif (
|
|
self.type == "throughput_network_out"
|
|
or self.type == "throughput_network_in"
|
|
):
|
|
counters = psutil.net_io_counters(pernic=True)
|
|
if self.argument in counters:
|
|
counter = counters[self.argument][IO_COUNTER[self.type]]
|
|
now = dt_util.utcnow()
|
|
if self._last_value and self._last_value < counter:
|
|
self._state = round(
|
|
(counter - self._last_value)
|
|
/ 1000 ** 2
|
|
/ (now - self._last_update_time).seconds,
|
|
3,
|
|
)
|
|
else:
|
|
self._state = None
|
|
self._last_update_time = now
|
|
self._last_value = counter
|
|
else:
|
|
self._state = None
|
|
elif self.type == "ipv4_address" or self.type == "ipv6_address":
|
|
addresses = psutil.net_if_addrs()
|
|
if self.argument in addresses:
|
|
for addr in addresses[self.argument]:
|
|
if addr.family == IF_ADDRS_FAMILY[self.type]:
|
|
self._state = addr.address
|
|
else:
|
|
self._state = None
|
|
elif self.type == "last_boot":
|
|
# Only update on initial setup
|
|
if self._state is None:
|
|
self._state = dt_util.as_local(
|
|
dt_util.utc_from_timestamp(psutil.boot_time())
|
|
).isoformat()
|
|
elif self.type == "load_1m":
|
|
self._state = round(os.getloadavg()[0], 2)
|
|
elif self.type == "load_5m":
|
|
self._state = round(os.getloadavg()[1], 2)
|
|
elif self.type == "load_15m":
|
|
self._state = round(os.getloadavg()[2], 2)
|
|
|
|
@staticmethod
|
|
def read_cpu_temperature():
|
|
"""Attempt to read CPU / processor temperature."""
|
|
temps = psutil.sensors_temperatures()
|
|
|
|
for name, entries in temps.items():
|
|
i = 1
|
|
for entry in entries:
|
|
# In case the label is empty (e.g. on Raspberry PI 4),
|
|
# construct it ourself here based on the sensor key name.
|
|
if not entry.label:
|
|
_label = f"{name} {i}"
|
|
else:
|
|
_label = entry.label
|
|
|
|
if _label in CPU_SENSOR_PREFIXES:
|
|
return round(entry.current, 1)
|
|
|
|
i += 1
|