Improve typing in Zabbix (#118545)
parent
a4612143e6
commit
51933b0f47
|
@ -1,5 +1,6 @@
|
|||
"""Support for Zabbix."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from contextlib import suppress
|
||||
import json
|
||||
import logging
|
||||
|
@ -24,7 +25,7 @@ from homeassistant.const import (
|
|||
STATE_UNAVAILABLE,
|
||||
STATE_UNKNOWN,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.core import Event, EventStateChangedData, HomeAssistant, callback
|
||||
from homeassistant.helpers import event as event_helper, state as state_helper
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entityfilter import (
|
||||
|
@ -100,7 +101,9 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
|
||||
hass.data[DOMAIN] = zapi
|
||||
|
||||
def event_to_metrics(event, float_keys, string_keys):
|
||||
def event_to_metrics(
|
||||
event: Event, float_keys: set[str], string_keys: set[str]
|
||||
) -> list[ZabbixMetric] | None:
|
||||
"""Add an event to the outgoing Zabbix list."""
|
||||
state = event.data.get("new_state")
|
||||
if state is None or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE):
|
||||
|
@ -158,7 +161,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
|
||||
if publish_states_host:
|
||||
zabbix_sender = ZabbixSender(zabbix_server=conf[CONF_HOST])
|
||||
instance = ZabbixThread(hass, zabbix_sender, event_to_metrics)
|
||||
instance = ZabbixThread(zabbix_sender, event_to_metrics)
|
||||
instance.setup(hass)
|
||||
|
||||
return True
|
||||
|
@ -169,41 +172,47 @@ class ZabbixThread(threading.Thread):
|
|||
|
||||
MAX_TRIES = 3
|
||||
|
||||
def __init__(self, hass, zabbix_sender, event_to_metrics):
|
||||
def __init__(
|
||||
self,
|
||||
zabbix_sender: ZabbixSender,
|
||||
event_to_metrics: Callable[
|
||||
[Event, set[str], set[str]], list[ZabbixMetric] | None
|
||||
],
|
||||
) -> None:
|
||||
"""Initialize the listener."""
|
||||
threading.Thread.__init__(self, name="Zabbix")
|
||||
self.queue = queue.Queue()
|
||||
self.queue: queue.Queue = queue.Queue()
|
||||
self.zabbix_sender = zabbix_sender
|
||||
self.event_to_metrics = event_to_metrics
|
||||
self.write_errors = 0
|
||||
self.shutdown = False
|
||||
self.float_keys = set()
|
||||
self.string_keys = set()
|
||||
self.float_keys: set[str] = set()
|
||||
self.string_keys: set[str] = set()
|
||||
|
||||
def setup(self, hass):
|
||||
def setup(self, hass: HomeAssistant) -> None:
|
||||
"""Set up the thread and start it."""
|
||||
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self._shutdown)
|
||||
self.start()
|
||||
_LOGGER.debug("Started publishing state changes to Zabbix")
|
||||
|
||||
def _shutdown(self, event):
|
||||
def _shutdown(self, event: Event) -> None:
|
||||
"""Shut down the thread."""
|
||||
self.queue.put(None)
|
||||
self.join()
|
||||
|
||||
@callback
|
||||
def _event_listener(self, event):
|
||||
def _event_listener(self, event: Event[EventStateChangedData]) -> None:
|
||||
"""Listen for new messages on the bus and queue them for Zabbix."""
|
||||
item = (time.monotonic(), event)
|
||||
self.queue.put(item)
|
||||
|
||||
def get_metrics(self):
|
||||
def get_metrics(self) -> tuple[int, list[ZabbixMetric]]:
|
||||
"""Return a batch of events formatted for writing."""
|
||||
queue_seconds = QUEUE_BACKLOG_SECONDS + self.MAX_TRIES * RETRY_DELAY
|
||||
|
||||
count = 0
|
||||
metrics = []
|
||||
metrics: list[ZabbixMetric] = []
|
||||
|
||||
dropped = 0
|
||||
|
||||
|
@ -233,7 +242,7 @@ class ZabbixThread(threading.Thread):
|
|||
|
||||
return count, metrics
|
||||
|
||||
def write_to_zabbix(self, metrics):
|
||||
def write_to_zabbix(self, metrics: list[ZabbixMetric]) -> None:
|
||||
"""Write preprocessed events to zabbix, with retry."""
|
||||
|
||||
for retry in range(self.MAX_TRIES + 1):
|
||||
|
@ -254,7 +263,7 @@ class ZabbixThread(threading.Thread):
|
|||
_LOGGER.error("Write error: %s", err)
|
||||
self.write_errors += len(metrics)
|
||||
|
||||
def run(self):
|
||||
def run(self) -> None:
|
||||
"""Process incoming events."""
|
||||
while not self.shutdown:
|
||||
count, metrics = self.get_metrics()
|
||||
|
|
|
@ -2,8 +2,11 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from pyzabbix import ZabbixAPI
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
|
||||
|
@ -11,7 +14,7 @@ from homeassistant.const import CONF_NAME
|
|||
from homeassistant.core import HomeAssistant
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
|
||||
|
||||
from .. import zabbix
|
||||
|
||||
|
@ -88,25 +91,25 @@ def setup_platform(
|
|||
class ZabbixTriggerCountSensor(SensorEntity):
|
||||
"""Get the active trigger count for all Zabbix monitored hosts."""
|
||||
|
||||
def __init__(self, zapi, name="Zabbix"):
|
||||
def __init__(self, zapi: ZabbixAPI, name: str | None = "Zabbix") -> None:
|
||||
"""Initialize Zabbix sensor."""
|
||||
self._name = name
|
||||
self._zapi = zapi
|
||||
self._state = None
|
||||
self._attributes = {}
|
||||
self._state: int | None = None
|
||||
self._attributes: dict[str, Any] = {}
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
def name(self) -> str | None:
|
||||
"""Return the name of the sensor."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def native_value(self):
|
||||
def native_value(self) -> StateType:
|
||||
"""Return the state of the sensor."""
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def native_unit_of_measurement(self):
|
||||
def native_unit_of_measurement(self) -> str:
|
||||
"""Return the units of measurement."""
|
||||
return "issues"
|
||||
|
||||
|
@ -122,7 +125,7 @@ class ZabbixTriggerCountSensor(SensorEntity):
|
|||
self._state = len(triggers)
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self):
|
||||
def extra_state_attributes(self) -> Mapping[str, Any] | None:
|
||||
"""Return the state attributes of the device."""
|
||||
return self._attributes
|
||||
|
||||
|
@ -130,7 +133,9 @@ class ZabbixTriggerCountSensor(SensorEntity):
|
|||
class ZabbixSingleHostTriggerCountSensor(ZabbixTriggerCountSensor):
|
||||
"""Get the active trigger count for a single Zabbix monitored host."""
|
||||
|
||||
def __init__(self, zapi, hostid, name=None):
|
||||
def __init__(
|
||||
self, zapi: ZabbixAPI, hostid: list[str], name: str | None = None
|
||||
) -> None:
|
||||
"""Initialize Zabbix sensor."""
|
||||
super().__init__(zapi, name)
|
||||
self._hostid = hostid
|
||||
|
@ -154,7 +159,9 @@ class ZabbixSingleHostTriggerCountSensor(ZabbixTriggerCountSensor):
|
|||
class ZabbixMultipleHostTriggerCountSensor(ZabbixTriggerCountSensor):
|
||||
"""Get the active trigger count for specified Zabbix monitored hosts."""
|
||||
|
||||
def __init__(self, zapi, hostids, name=None):
|
||||
def __init__(
|
||||
self, zapi: ZabbixAPI, hostids: list[str], name: str | None = None
|
||||
) -> None:
|
||||
"""Initialize Zabbix sensor."""
|
||||
super().__init__(zapi, name)
|
||||
self._hostids = hostids
|
||||
|
|
Loading…
Reference in New Issue