core/homeassistant/components/systemmonitor/sensor.py

348 lines
12 KiB
Python

"""Support for monitoring the local system."""
import logging
import os
import socket
import sys
import psutil
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_RESOURCES,
CONF_TYPE,
DATA_GIBIBYTES,
DATA_MEBIBYTES,
DATA_RATE_MEGABYTES_PER_SECOND,
PERCENTAGE,
STATE_OFF,
STATE_ON,
TEMP_CELSIUS,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import slugify
import homeassistant.util.dt as dt_util
# mypy: allow-untyped-defs, no-check-untyped-defs
_LOGGER = logging.getLogger(__name__)
CONF_ARG = "arg"
if sys.maxsize > 2 ** 32:
CPU_ICON = "mdi:cpu-64-bit"
else:
CPU_ICON = "mdi:cpu-32-bit"
# Schema: [name, unit of measurement, icon, device class, flag if mandatory arg]
SENSOR_TYPES = {
"disk_free": ["Disk free", DATA_GIBIBYTES, "mdi:harddisk", None, False],
"disk_use": ["Disk use", DATA_GIBIBYTES, "mdi:harddisk", None, False],
"disk_use_percent": [
"Disk use (percent)",
PERCENTAGE,
"mdi:harddisk",
None,
False,
],
"ipv4_address": ["IPv4 address", "", "mdi:server-network", None, True],
"ipv6_address": ["IPv6 address", "", "mdi:server-network", None, True],
"last_boot": ["Last boot", "", "mdi:clock", "timestamp", False],
"load_15m": ["Load (15m)", " ", CPU_ICON, None, False],
"load_1m": ["Load (1m)", " ", CPU_ICON, None, False],
"load_5m": ["Load (5m)", " ", CPU_ICON, None, False],
"memory_free": ["Memory free", DATA_MEBIBYTES, "mdi:memory", None, False],
"memory_use": ["Memory use", DATA_MEBIBYTES, "mdi:memory", None, False],
"memory_use_percent": [
"Memory use (percent)",
PERCENTAGE,
"mdi:memory",
None,
False,
],
"network_in": ["Network in", DATA_MEBIBYTES, "mdi:server-network", None, True],
"network_out": ["Network out", DATA_MEBIBYTES, "mdi:server-network", None, True],
"packets_in": ["Packets in", " ", "mdi:server-network", None, True],
"packets_out": ["Packets out", " ", "mdi:server-network", None, True],
"throughput_network_in": [
"Network throughput in",
DATA_RATE_MEGABYTES_PER_SECOND,
"mdi:server-network",
None,
True,
],
"throughput_network_out": [
"Network throughput out",
DATA_RATE_MEGABYTES_PER_SECOND,
"mdi:server-network",
True,
],
"process": ["Process", " ", CPU_ICON, None, True],
"processor_use": ["Processor use (percent)", PERCENTAGE, CPU_ICON, None, False],
"processor_temperature": [
"Processor temperature",
TEMP_CELSIUS,
CPU_ICON,
None,
False,
],
"swap_free": ["Swap free", DATA_MEBIBYTES, "mdi:harddisk", None, False],
"swap_use": ["Swap use", DATA_MEBIBYTES, "mdi:harddisk", None, False],
"swap_use_percent": ["Swap use (percent)", PERCENTAGE, "mdi:harddisk", None, False],
}
def check_required_arg(value):
"""Validate that the required "arg" for the sensor types that need it are set."""
for sensor in value:
sensor_type = sensor[CONF_TYPE]
sensor_arg = sensor.get(CONF_ARG)
if sensor_arg is None and SENSOR_TYPES[sensor_type][4]:
raise vol.RequiredFieldInvalid(
f"Mandatory 'arg' is missing for sensor type '{sensor_type}'."
)
return value
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_RESOURCES, default={CONF_TYPE: "disk_use"}): vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_TYPE): vol.In(SENSOR_TYPES),
vol.Optional(CONF_ARG): cv.string,
}
)
],
check_required_arg,
)
}
)
IO_COUNTER = {
"network_out": 0,
"network_in": 1,
"packets_out": 2,
"packets_in": 3,
"throughput_network_out": 0,
"throughput_network_in": 1,
}
IF_ADDRS_FAMILY = {"ipv4_address": socket.AF_INET, "ipv6_address": socket.AF_INET6}
# There might be additional keys to be added for different
# platforms / hardware combinations.
# Taken from last version of "glances" integration before they moved to
# a generic temperature sensor logic.
# https://github.com/home-assistant/core/blob/5e15675593ba94a2c11f9f929cdad317e27ce190/homeassistant/components/glances/sensor.py#L199
CPU_SENSOR_PREFIXES = [
"amdgpu 1",
"aml_thermal",
"Core 0",
"Core 1",
"CPU Temperature",
"CPU",
"cpu-thermal 1",
"cpu_thermal 1",
"exynos-therm 1",
"Package id 0",
"Physical id 0",
"radeon 1",
"soc-thermal 1",
"soc_thermal 1",
]
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the system monitor sensors."""
dev = []
for resource in config[CONF_RESOURCES]:
# Initialize the sensor argument if none was provided.
# For disk monitoring default to "/" (root) to prevent runtime errors, if argument was not specified.
if CONF_ARG not in resource:
if resource[CONF_TYPE].startswith("disk_"):
resource[CONF_ARG] = "/"
else:
resource[CONF_ARG] = ""
# Verify if we can retrieve CPU / processor temperatures.
# If not, do not create the entity and add a warning to the log
if resource[CONF_TYPE] == "processor_temperature":
if SystemMonitorSensor.read_cpu_temperature() is None:
_LOGGER.warning("Cannot read CPU / processor temperature information.")
continue
dev.append(SystemMonitorSensor(resource[CONF_TYPE], resource[CONF_ARG]))
add_entities(dev, True)
class SystemMonitorSensor(Entity):
"""Implementation of a system monitor sensor."""
def __init__(self, sensor_type, argument=""):
"""Initialize the sensor."""
self._name = "{} {}".format(SENSOR_TYPES[sensor_type][0], argument)
self._unique_id = slugify(f"{sensor_type}_{argument}")
self.argument = argument
self.type = sensor_type
self._state = None
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
self._available = True
if sensor_type in ["throughput_network_out", "throughput_network_in"]:
self._last_value = None
self._last_update_time = None
@property
def name(self):
"""Return the name of the sensor."""
return self._name.rstrip()
@property
def unique_id(self):
"""Return the unique ID."""
return self._unique_id
@property
def device_class(self):
"""Return the class of this sensor."""
return SENSOR_TYPES[self.type][3]
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return SENSOR_TYPES[self.type][2]
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
@property
def available(self):
"""Return True if entity is available."""
return self._available
def update(self):
"""Get the latest system information."""
if self.type == "disk_use_percent":
self._state = psutil.disk_usage(self.argument).percent
elif self.type == "disk_use":
self._state = round(psutil.disk_usage(self.argument).used / 1024 ** 3, 1)
elif self.type == "disk_free":
self._state = round(psutil.disk_usage(self.argument).free / 1024 ** 3, 1)
elif self.type == "memory_use_percent":
self._state = psutil.virtual_memory().percent
elif self.type == "memory_use":
virtual_memory = psutil.virtual_memory()
self._state = round(
(virtual_memory.total - virtual_memory.available) / 1024 ** 2, 1
)
elif self.type == "memory_free":
self._state = round(psutil.virtual_memory().available / 1024 ** 2, 1)
elif self.type == "swap_use_percent":
self._state = psutil.swap_memory().percent
elif self.type == "swap_use":
self._state = round(psutil.swap_memory().used / 1024 ** 2, 1)
elif self.type == "swap_free":
self._state = round(psutil.swap_memory().free / 1024 ** 2, 1)
elif self.type == "processor_use":
self._state = round(psutil.cpu_percent(interval=None))
elif self.type == "processor_temperature":
self._state = self.read_cpu_temperature()
elif self.type == "process":
for proc in psutil.process_iter():
try:
if self.argument == proc.name():
self._state = STATE_ON
return
except psutil.NoSuchProcess as err:
_LOGGER.warning(
"Failed to load process with id: %s, old name: %s",
err.pid,
err.name,
)
self._state = STATE_OFF
elif self.type == "network_out" or self.type == "network_in":
counters = psutil.net_io_counters(pernic=True)
if self.argument in counters:
counter = counters[self.argument][IO_COUNTER[self.type]]
self._state = round(counter / 1024 ** 2, 1)
else:
self._state = None
elif self.type == "packets_out" or self.type == "packets_in":
counters = psutil.net_io_counters(pernic=True)
if self.argument in counters:
self._state = counters[self.argument][IO_COUNTER[self.type]]
else:
self._state = None
elif (
self.type == "throughput_network_out"
or self.type == "throughput_network_in"
):
counters = psutil.net_io_counters(pernic=True)
if self.argument in counters:
counter = counters[self.argument][IO_COUNTER[self.type]]
now = dt_util.utcnow()
if self._last_value and self._last_value < counter:
self._state = round(
(counter - self._last_value)
/ 1000 ** 2
/ (now - self._last_update_time).seconds,
3,
)
else:
self._state = None
self._last_update_time = now
self._last_value = counter
else:
self._state = None
elif self.type == "ipv4_address" or self.type == "ipv6_address":
addresses = psutil.net_if_addrs()
if self.argument in addresses:
for addr in addresses[self.argument]:
if addr.family == IF_ADDRS_FAMILY[self.type]:
self._state = addr.address
else:
self._state = None
elif self.type == "last_boot":
self._state = dt_util.as_local(
dt_util.utc_from_timestamp(psutil.boot_time())
).isoformat()
elif self.type == "load_1m":
self._state = round(os.getloadavg()[0], 2)
elif self.type == "load_5m":
self._state = round(os.getloadavg()[1], 2)
elif self.type == "load_15m":
self._state = round(os.getloadavg()[2], 2)
@staticmethod
def read_cpu_temperature():
"""Attempt to read CPU / processor temperature."""
temps = psutil.sensors_temperatures()
for name, entries in temps.items():
i = 1
for entry in entries:
# In case the label is empty (e.g. on Raspberry PI 4),
# construct it ourself here based on the sensor key name.
if not entry.label:
_label = f"{name} {i}"
else:
_label = entry.label
if _label in CPU_SENSOR_PREFIXES:
return round(entry.current, 1)
i += 1