Merge pull request #979 from rmkraus/automation-decorator
Automation Decorator for custom componentspull/811/merge
commit
ad2e2d916b
|
@ -29,9 +29,13 @@ import time
|
|||
import logging
|
||||
|
||||
from homeassistant.const import STATE_HOME, STATE_NOT_HOME, STATE_ON, STATE_OFF
|
||||
import homeassistant.loader as loader
|
||||
from homeassistant.helpers import validate_config
|
||||
from homeassistant.helpers.event_decorators import \
|
||||
track_state_change, track_time_change
|
||||
from homeassistant.helpers.service import service
|
||||
import homeassistant.components as core
|
||||
from homeassistant.components import device_tracker
|
||||
from homeassistant.components import light
|
||||
|
||||
# The domain of your component. Should be equal to the name of your component
|
||||
DOMAIN = "example"
|
||||
|
@ -39,11 +43,14 @@ DOMAIN = "example"
|
|||
# List of component names (string) your component depends upon
|
||||
# We depend on group because group will be loaded after all the components that
|
||||
# initialize devices have been setup.
|
||||
DEPENDENCIES = ['group']
|
||||
DEPENDENCIES = ['group', 'device_tracker', 'light']
|
||||
|
||||
# Configuration key for the entity id we are targetting
|
||||
CONF_TARGET = 'target'
|
||||
|
||||
# Variable for storing configuration parameters
|
||||
TARGET_ID = None
|
||||
|
||||
# Name of the service that we expose
|
||||
SERVICE_FLASH = 'flash'
|
||||
|
||||
|
@ -53,84 +60,89 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
def setup(hass, config):
|
||||
""" Setup example component. """
|
||||
global TARGET_ID
|
||||
|
||||
# Validate that all required config options are given
|
||||
if not validate_config(config, {DOMAIN: [CONF_TARGET]}, _LOGGER):
|
||||
return False
|
||||
|
||||
target_id = config[DOMAIN][CONF_TARGET]
|
||||
TARGET_ID = config[DOMAIN][CONF_TARGET]
|
||||
|
||||
# Validate that the target entity id exists
|
||||
if hass.states.get(target_id) is None:
|
||||
_LOGGER.error("Target entity id %s does not exist", target_id)
|
||||
if hass.states.get(TARGET_ID) is None:
|
||||
_LOGGER.error("Target entity id %s does not exist",
|
||||
TARGET_ID)
|
||||
|
||||
# Tell the bootstrapper that we failed to initialize
|
||||
# Tell the bootstrapper that we failed to initialize and clear the
|
||||
# stored target id so our functions don't run.
|
||||
TARGET_ID = None
|
||||
return False
|
||||
|
||||
# We will use the component helper methods to check the states.
|
||||
device_tracker = loader.get_component('device_tracker')
|
||||
light = loader.get_component('light')
|
||||
|
||||
def track_devices(entity_id, old_state, new_state):
|
||||
""" Called when the group.all devices change state. """
|
||||
|
||||
# If anyone comes home and the core is not on, turn it on.
|
||||
if new_state.state == STATE_HOME and not core.is_on(hass, target_id):
|
||||
|
||||
core.turn_on(hass, target_id)
|
||||
|
||||
# If all people leave the house and the core is on, turn it off
|
||||
elif new_state.state == STATE_NOT_HOME and core.is_on(hass, target_id):
|
||||
|
||||
core.turn_off(hass, target_id)
|
||||
|
||||
# Register our track_devices method to receive state changes of the
|
||||
# all tracked devices group.
|
||||
hass.states.track_change(
|
||||
device_tracker.ENTITY_ID_ALL_DEVICES, track_devices)
|
||||
|
||||
def wake_up(now):
|
||||
""" Turn it on in the morning if there are people home and
|
||||
it is not already on. """
|
||||
|
||||
if device_tracker.is_on(hass) and not core.is_on(hass, target_id):
|
||||
_LOGGER.info('People home at 7AM, turning it on')
|
||||
core.turn_on(hass, target_id)
|
||||
|
||||
# Register our wake_up service to be called at 7AM in the morning
|
||||
hass.track_time_change(wake_up, hour=7, minute=0, second=0)
|
||||
|
||||
def all_lights_off(entity_id, old_state, new_state):
|
||||
""" If all lights turn off, turn off. """
|
||||
|
||||
if core.is_on(hass, target_id):
|
||||
_LOGGER.info('All lights have been turned off, turning it off')
|
||||
core.turn_off(hass, target_id)
|
||||
|
||||
# Register our all_lights_off method to be called when all lights turn off
|
||||
hass.states.track_change(
|
||||
light.ENTITY_ID_ALL_LIGHTS, all_lights_off, STATE_ON, STATE_OFF)
|
||||
|
||||
def flash_service(call):
|
||||
""" Service that will turn the target off for 10 seconds
|
||||
if on and vice versa. """
|
||||
|
||||
if core.is_on(hass, target_id):
|
||||
core.turn_off(hass, target_id)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
core.turn_on(hass, target_id)
|
||||
|
||||
else:
|
||||
core.turn_on(hass, target_id)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
core.turn_off(hass, target_id)
|
||||
|
||||
# Register our service with HASS.
|
||||
hass.services.register(DOMAIN, SERVICE_FLASH, flash_service)
|
||||
|
||||
# Tells the bootstrapper that the component was successfully initialized
|
||||
# Tell the bootstrapper that we initialized successfully
|
||||
return True
|
||||
|
||||
|
||||
@track_state_change(device_tracker.ENTITY_ID_ALL_DEVICES)
|
||||
def track_devices(hass, entity_id, old_state, new_state):
|
||||
""" Called when the group.all devices change state. """
|
||||
# If the target id is not set, return
|
||||
if not TARGET_ID:
|
||||
return
|
||||
|
||||
# If anyone comes home and the entity is not on, turn it on.
|
||||
if new_state.state == STATE_HOME and not core.is_on(hass, TARGET_ID):
|
||||
|
||||
core.turn_on(hass, TARGET_ID)
|
||||
|
||||
# If all people leave the house and the entity is on, turn it off
|
||||
elif new_state.state == STATE_NOT_HOME and core.is_on(hass, TARGET_ID):
|
||||
|
||||
core.turn_off(hass, TARGET_ID)
|
||||
|
||||
|
||||
@track_time_change(hour=7, minute=0, second=0)
|
||||
def wake_up(hass, now):
|
||||
"""
|
||||
Turn it on in the morning (7 AM) if there are people home and
|
||||
it is not already on.
|
||||
"""
|
||||
if not TARGET_ID:
|
||||
return
|
||||
|
||||
if device_tracker.is_on(hass) and not core.is_on(hass, TARGET_ID):
|
||||
_LOGGER.info('People home at 7AM, turning it on')
|
||||
core.turn_on(hass, TARGET_ID)
|
||||
|
||||
|
||||
@track_state_change(light.ENTITY_ID_ALL_LIGHTS, STATE_ON, STATE_OFF)
|
||||
def all_lights_off(hass, entity_id, old_state, new_state):
|
||||
""" If all lights turn off, turn off. """
|
||||
if not TARGET_ID:
|
||||
return
|
||||
|
||||
if core.is_on(hass, TARGET_ID):
|
||||
_LOGGER.info('All lights have been turned off, turning it off')
|
||||
core.turn_off(hass, TARGET_ID)
|
||||
|
||||
|
||||
@service(DOMAIN, SERVICE_FLASH)
|
||||
def flash_service(hass, call):
|
||||
"""
|
||||
Service that will turn the target off for 10 seconds if on and vice versa.
|
||||
"""
|
||||
if not TARGET_ID:
|
||||
return
|
||||
|
||||
if core.is_on(hass, TARGET_ID):
|
||||
core.turn_off(hass, TARGET_ID)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
core.turn_on(hass, TARGET_ID)
|
||||
|
||||
else:
|
||||
core.turn_on(hass, TARGET_ID)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
core.turn_off(hass, TARGET_ID)
|
||||
|
|
|
@ -24,6 +24,7 @@ import homeassistant.config as config_util
|
|||
import homeassistant.loader as loader
|
||||
import homeassistant.components as core_components
|
||||
import homeassistant.components.group as group
|
||||
from homeassistant.helpers import event_decorators, service
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.const import (
|
||||
__version__, EVENT_COMPONENT_LOADED, CONF_LATITUDE, CONF_LONGITUDE,
|
||||
|
@ -199,6 +200,10 @@ def from_config_dict(config, hass=None, config_dir=None, enable_log=True,
|
|||
|
||||
_LOGGER.info('Home Assistant core initialized')
|
||||
|
||||
# give event decorators access to HASS
|
||||
event_decorators.HASS = hass
|
||||
service.HASS = hass
|
||||
|
||||
# Setup the components
|
||||
for domain in loader.load_order_components(components):
|
||||
_setup_component(hass, domain, config)
|
||||
|
|
|
@ -10,7 +10,7 @@ import logging
|
|||
from datetime import timedelta
|
||||
|
||||
from homeassistant.components import sun
|
||||
from homeassistant.helpers.event import track_point_in_utc_time
|
||||
from homeassistant.helpers.event import track_sunrise, track_sunset
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
DEPENDENCIES = ['sun']
|
||||
|
@ -47,9 +47,9 @@ def trigger(hass, config, action):
|
|||
|
||||
# Do something to call action
|
||||
if event == EVENT_SUNRISE:
|
||||
trigger_sunrise(hass, action, offset)
|
||||
track_sunrise(hass, action, offset)
|
||||
else:
|
||||
trigger_sunset(hass, action, offset)
|
||||
track_sunset(hass, action, offset)
|
||||
|
||||
return True
|
||||
|
||||
|
@ -125,44 +125,6 @@ def if_action(hass, config):
|
|||
return time_if
|
||||
|
||||
|
||||
def trigger_sunrise(hass, action, offset):
|
||||
""" Trigger action at next sun rise. """
|
||||
def next_rise():
|
||||
""" Returns next sunrise. """
|
||||
next_time = sun.next_rising_utc(hass) + offset
|
||||
|
||||
while next_time < dt_util.utcnow():
|
||||
next_time = next_time + timedelta(days=1)
|
||||
|
||||
return next_time
|
||||
|
||||
def sunrise_automation_listener(now):
|
||||
""" Called when it's time for action. """
|
||||
track_point_in_utc_time(hass, sunrise_automation_listener, next_rise())
|
||||
action()
|
||||
|
||||
track_point_in_utc_time(hass, sunrise_automation_listener, next_rise())
|
||||
|
||||
|
||||
def trigger_sunset(hass, action, offset):
|
||||
""" Trigger action at next sun set. """
|
||||
def next_set():
|
||||
""" Returns next sunrise. """
|
||||
next_time = sun.next_setting_utc(hass) + offset
|
||||
|
||||
while next_time < dt_util.utcnow():
|
||||
next_time = next_time + timedelta(days=1)
|
||||
|
||||
return next_time
|
||||
|
||||
def sunset_automation_listener(now):
|
||||
""" Called when it's time for action. """
|
||||
track_point_in_utc_time(hass, sunset_automation_listener, next_set())
|
||||
action()
|
||||
|
||||
track_point_in_utc_time(hass, sunset_automation_listener, next_set())
|
||||
|
||||
|
||||
def _parse_offset(raw_offset):
|
||||
if raw_offset is None:
|
||||
return timedelta(0)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""
|
||||
Helpers for listening to events
|
||||
"""
|
||||
from datetime import timedelta
|
||||
import functools as ft
|
||||
|
||||
from ..util import dt as dt_util
|
||||
|
@ -95,6 +96,54 @@ def track_point_in_utc_time(hass, action, point_in_time):
|
|||
return point_in_time_listener
|
||||
|
||||
|
||||
def track_sunrise(hass, action, offset=None):
|
||||
"""
|
||||
Adds a listener that will fire a specified offset from sunrise daily.
|
||||
"""
|
||||
from homeassistant.components import sun
|
||||
offset = offset or timedelta()
|
||||
|
||||
def next_rise():
|
||||
""" Returns next sunrise. """
|
||||
next_time = sun.next_rising_utc(hass) + offset
|
||||
|
||||
while next_time < dt_util.utcnow():
|
||||
next_time = next_time + timedelta(days=1)
|
||||
|
||||
return next_time
|
||||
|
||||
def sunrise_automation_listener(now):
|
||||
""" Called when it's time for action. """
|
||||
track_point_in_utc_time(hass, sunrise_automation_listener, next_rise())
|
||||
action()
|
||||
|
||||
track_point_in_utc_time(hass, sunrise_automation_listener, next_rise())
|
||||
|
||||
|
||||
def track_sunset(hass, action, offset=None):
|
||||
"""
|
||||
Adds a listener that will fire a specified offset from sunset daily.
|
||||
"""
|
||||
from homeassistant.components import sun
|
||||
offset = offset or timedelta()
|
||||
|
||||
def next_set():
|
||||
""" Returns next sunrise. """
|
||||
next_time = sun.next_setting_utc(hass) + offset
|
||||
|
||||
while next_time < dt_util.utcnow():
|
||||
next_time = next_time + timedelta(days=1)
|
||||
|
||||
return next_time
|
||||
|
||||
def sunset_automation_listener(now):
|
||||
""" Called when it's time for action. """
|
||||
track_point_in_utc_time(hass, sunset_automation_listener, next_set())
|
||||
action()
|
||||
|
||||
track_point_in_utc_time(hass, sunset_automation_listener, next_set())
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def track_utc_time_change(hass, action, year=None, month=None, day=None,
|
||||
hour=None, minute=None, second=None, local=False):
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
""" Event Decorators for custom components """
|
||||
|
||||
import functools
|
||||
|
||||
from homeassistant.helpers import event
|
||||
|
||||
HASS = None
|
||||
|
||||
|
||||
def track_state_change(entity_ids, from_state=None, to_state=None):
|
||||
""" Decorator factory to track state changes for entity id """
|
||||
|
||||
def track_state_change_decorator(action):
|
||||
""" Decorator to track state changes """
|
||||
event.track_state_change(HASS, entity_ids,
|
||||
functools.partial(action, HASS),
|
||||
from_state, to_state)
|
||||
return action
|
||||
|
||||
return track_state_change_decorator
|
||||
|
||||
|
||||
def track_sunrise(offset=None):
|
||||
""" Decorator factory to track sunrise events """
|
||||
|
||||
def track_sunrise_decorator(action):
|
||||
""" Decorator to track sunrise events """
|
||||
event.track_sunrise(HASS,
|
||||
functools.partial(action, HASS),
|
||||
offset)
|
||||
return action
|
||||
|
||||
return track_sunrise_decorator
|
||||
|
||||
|
||||
def track_sunset(offset=None):
|
||||
""" Decorator factory to track sunset events """
|
||||
|
||||
def track_sunset_decorator(action):
|
||||
""" Decorator to track sunset events """
|
||||
event.track_sunset(HASS,
|
||||
functools.partial(action, HASS),
|
||||
offset)
|
||||
return action
|
||||
|
||||
return track_sunset_decorator
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def track_time_change(year=None, month=None, day=None, hour=None, minute=None,
|
||||
second=None):
|
||||
""" Decorator factory to track time changes """
|
||||
|
||||
def track_time_change_decorator(action):
|
||||
""" Decorator to track time changes """
|
||||
event.track_time_change(HASS,
|
||||
functools.partial(action, HASS),
|
||||
year, month, day, hour, minute, second)
|
||||
return action
|
||||
|
||||
return track_time_change_decorator
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def track_utc_time_change(year=None, month=None, day=None, hour=None,
|
||||
minute=None, second=None):
|
||||
""" Decorator factory to track time changes """
|
||||
|
||||
def track_utc_time_change_decorator(action):
|
||||
""" Decorator to track time changes """
|
||||
event.track_utc_time_change(HASS,
|
||||
functools.partial(action, HASS),
|
||||
year, month, day, hour, minute, second)
|
||||
return action
|
||||
|
||||
return track_utc_time_change_decorator
|
|
@ -1,10 +1,13 @@
|
|||
"""Service calling related helpers."""
|
||||
import functools
|
||||
import logging
|
||||
|
||||
from homeassistant.const import ATTR_ENTITY_ID
|
||||
from homeassistant.helpers.entity import split_entity_id
|
||||
from homeassistant.loader import get_component
|
||||
|
||||
HASS = None
|
||||
|
||||
CONF_SERVICE = 'service'
|
||||
CONF_SERVICE_ENTITY_ID = 'entity_id'
|
||||
CONF_SERVICE_DATA = 'data'
|
||||
|
@ -12,6 +15,18 @@ CONF_SERVICE_DATA = 'data'
|
|||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def service(domain, service_name):
|
||||
""" Decorator factory to register a service """
|
||||
|
||||
def register_service_decorator(action):
|
||||
""" Decorator to register a service """
|
||||
HASS.services.register(domain, service_name,
|
||||
functools.partial(action, HASS))
|
||||
return action
|
||||
|
||||
return register_service_decorator
|
||||
|
||||
|
||||
def call_from_config(hass, config, blocking=False):
|
||||
"""Call a service based on a config hash."""
|
||||
if not isinstance(config, dict) or CONF_SERVICE not in config:
|
||||
|
@ -19,7 +34,7 @@ def call_from_config(hass, config, blocking=False):
|
|||
return
|
||||
|
||||
try:
|
||||
domain, service = split_entity_id(config[CONF_SERVICE])
|
||||
domain, service_name = split_entity_id(config[CONF_SERVICE])
|
||||
except ValueError:
|
||||
_LOGGER.error('Invalid service specified: %s', config[CONF_SERVICE])
|
||||
return
|
||||
|
@ -41,21 +56,21 @@ def call_from_config(hass, config, blocking=False):
|
|||
elif entity_id is not None:
|
||||
service_data[ATTR_ENTITY_ID] = entity_id
|
||||
|
||||
hass.services.call(domain, service, service_data, blocking)
|
||||
hass.services.call(domain, service_name, service_data, blocking)
|
||||
|
||||
|
||||
def extract_entity_ids(hass, service):
|
||||
def extract_entity_ids(hass, service_call):
|
||||
"""
|
||||
Helper method to extract a list of entity ids from a service call.
|
||||
Will convert group entity ids to the entity ids it represents.
|
||||
"""
|
||||
if not (service.data and ATTR_ENTITY_ID in service.data):
|
||||
if not (service_call.data and ATTR_ENTITY_ID in service_call.data):
|
||||
return []
|
||||
|
||||
group = get_component('group')
|
||||
|
||||
# Entity ID attr can be a list or a string
|
||||
service_ent_id = service.data[ATTR_ENTITY_ID]
|
||||
service_ent_id = service_call.data[ATTR_ENTITY_ID]
|
||||
|
||||
if isinstance(service_ent_id, str):
|
||||
return group.expand_entity_ids(hass, [service_ent_id])
|
||||
|
|
|
@ -9,8 +9,11 @@ Tests event helpers.
|
|||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
from astral import Astral
|
||||
|
||||
import homeassistant.core as ha
|
||||
from homeassistant.helpers.event import *
|
||||
from homeassistant.components import sun
|
||||
|
||||
|
||||
class TestEventHelpers(unittest.TestCase):
|
||||
|
@ -121,6 +124,98 @@ class TestEventHelpers(unittest.TestCase):
|
|||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(3, len(wildcard_runs))
|
||||
|
||||
def test_track_sunrise(self):
|
||||
""" Test track sunrise """
|
||||
latitude = 32.87336
|
||||
longitude = 117.22743
|
||||
|
||||
# setup sun component
|
||||
self.hass.config.latitude = latitude
|
||||
self.hass.config.longitude = longitude
|
||||
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}})
|
||||
|
||||
# get next sunrise/sunset
|
||||
astral = Astral()
|
||||
utc_now = dt_util.utcnow()
|
||||
|
||||
mod = -1
|
||||
while True:
|
||||
next_rising = (astral.sunrise_utc(utc_now +
|
||||
timedelta(days=mod), latitude, longitude))
|
||||
if next_rising > utc_now:
|
||||
break
|
||||
mod += 1
|
||||
|
||||
# track sunrise
|
||||
runs = []
|
||||
track_sunrise(self.hass, lambda: runs.append(1))
|
||||
|
||||
offset_runs = []
|
||||
offset = timedelta(minutes=30)
|
||||
track_sunrise(self.hass, lambda: offset_runs.append(1), offset)
|
||||
|
||||
# run tests
|
||||
self._send_time_changed(next_rising - offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(0, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_rising)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_rising + offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(2, len(runs))
|
||||
self.assertEqual(1, len(offset_runs))
|
||||
|
||||
def test_track_sunset(self):
|
||||
""" Test track sunset """
|
||||
latitude = 32.87336
|
||||
longitude = 117.22743
|
||||
|
||||
# setup sun component
|
||||
self.hass.config.latitude = latitude
|
||||
self.hass.config.longitude = longitude
|
||||
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}})
|
||||
|
||||
# get next sunrise/sunset
|
||||
astral = Astral()
|
||||
utc_now = dt_util.utcnow()
|
||||
|
||||
mod = -1
|
||||
while True:
|
||||
next_setting = (astral.sunset_utc(utc_now +
|
||||
timedelta(days=mod), latitude, longitude))
|
||||
if next_setting > utc_now:
|
||||
break
|
||||
mod += 1
|
||||
|
||||
# track sunset
|
||||
runs = []
|
||||
track_sunset(self.hass, lambda: runs.append(1))
|
||||
|
||||
offset_runs = []
|
||||
offset = timedelta(minutes=30)
|
||||
track_sunset(self.hass, lambda: offset_runs.append(1), offset)
|
||||
|
||||
# run tests
|
||||
self._send_time_changed(next_setting - offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(0, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_setting)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_setting + offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(2, len(runs))
|
||||
self.assertEqual(1, len(offset_runs))
|
||||
|
||||
def _send_time_changed(self, now):
|
||||
""" Send a time changed event. """
|
||||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
"""
|
||||
tests.helpers.test_event_decorators
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Tests event decorator helpers.
|
||||
"""
|
||||
# pylint: disable=protected-access,too-many-public-methods
|
||||
# pylint: disable=too-few-public-methods
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from astral import Astral
|
||||
|
||||
import homeassistant.core as ha
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.helpers import event_decorators
|
||||
from homeassistant.helpers.event_decorators import (
|
||||
track_time_change, track_utc_time_change, track_state_change,
|
||||
track_sunrise, track_sunset)
|
||||
from homeassistant.components import sun
|
||||
|
||||
|
||||
class TestEventDecoratorHelpers(unittest.TestCase):
|
||||
"""
|
||||
Tests the Home Assistant event helpers.
|
||||
"""
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
""" things to be run when tests are started. """
|
||||
self.hass = ha.HomeAssistant()
|
||||
self.hass.states.set("light.Bowl", "on")
|
||||
self.hass.states.set("switch.AC", "off")
|
||||
|
||||
event_decorators.HASS = self.hass
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
""" Stop down stuff we started. """
|
||||
self.hass.stop()
|
||||
|
||||
def test_track_sunrise(self):
|
||||
""" Test track sunrise decorator """
|
||||
latitude = 32.87336
|
||||
longitude = 117.22743
|
||||
|
||||
# setup sun component
|
||||
self.hass.config.latitude = latitude
|
||||
self.hass.config.longitude = longitude
|
||||
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}})
|
||||
|
||||
# get next sunrise/sunset
|
||||
astral = Astral()
|
||||
utc_now = dt_util.utcnow()
|
||||
|
||||
mod = -1
|
||||
while True:
|
||||
next_rising = (astral.sunrise_utc(utc_now +
|
||||
timedelta(days=mod), latitude, longitude))
|
||||
if next_rising > utc_now:
|
||||
break
|
||||
mod += 1
|
||||
|
||||
# use decorator
|
||||
runs = []
|
||||
decor = track_sunrise()
|
||||
decor(lambda x: runs.append(1))
|
||||
|
||||
offset_runs = []
|
||||
offset = timedelta(minutes=30)
|
||||
decor = track_sunrise(offset)
|
||||
decor(lambda x: offset_runs.append(1))
|
||||
|
||||
# run tests
|
||||
self._send_time_changed(next_rising - offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(0, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_rising)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_rising + offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(2, len(runs))
|
||||
self.assertEqual(1, len(offset_runs))
|
||||
|
||||
def test_track_sunset(self):
|
||||
""" Test track sunset decorator """
|
||||
latitude = 32.87336
|
||||
longitude = 117.22743
|
||||
|
||||
# setup sun component
|
||||
self.hass.config.latitude = latitude
|
||||
self.hass.config.longitude = longitude
|
||||
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}})
|
||||
|
||||
# get next sunrise/sunset
|
||||
astral = Astral()
|
||||
utc_now = dt_util.utcnow()
|
||||
|
||||
mod = -1
|
||||
while True:
|
||||
next_setting = (astral.sunset_utc(utc_now +
|
||||
timedelta(days=mod), latitude, longitude))
|
||||
if next_setting > utc_now:
|
||||
break
|
||||
mod += 1
|
||||
|
||||
# use decorator
|
||||
runs = []
|
||||
decor = track_sunset()
|
||||
decor(lambda x: runs.append(1))
|
||||
|
||||
offset_runs = []
|
||||
offset = timedelta(minutes=30)
|
||||
decor = track_sunset(offset)
|
||||
decor(lambda x: offset_runs.append(1))
|
||||
|
||||
# run tests
|
||||
self._send_time_changed(next_setting - offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(0, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_setting)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(runs))
|
||||
self.assertEqual(0, len(offset_runs))
|
||||
|
||||
self._send_time_changed(next_setting + offset)
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(2, len(runs))
|
||||
self.assertEqual(1, len(offset_runs))
|
||||
|
||||
def test_track_time_change(self):
|
||||
""" Test tracking time change. """
|
||||
wildcard_runs = []
|
||||
specific_runs = []
|
||||
|
||||
decor = track_time_change()
|
||||
decor(lambda x, y: wildcard_runs.append(1))
|
||||
|
||||
decor = track_utc_time_change(second=[0, 30])
|
||||
decor(lambda x, y: specific_runs.append(1))
|
||||
|
||||
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(1, len(wildcard_runs))
|
||||
|
||||
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(2, len(wildcard_runs))
|
||||
|
||||
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(2, len(specific_runs))
|
||||
self.assertEqual(3, len(wildcard_runs))
|
||||
|
||||
def test_track_state_change(self):
|
||||
""" Test track_state_change. """
|
||||
# 2 lists to track how often our callbacks get called
|
||||
specific_runs = []
|
||||
wildcard_runs = []
|
||||
|
||||
decor = track_state_change('light.Bowl', 'on', 'off')
|
||||
decor(lambda a, b, c, d: specific_runs.append(1))
|
||||
|
||||
decor = track_state_change('light.Bowl', ha.MATCH_ALL, ha.MATCH_ALL)
|
||||
decor(lambda a, b, c, d: wildcard_runs.append(1))
|
||||
|
||||
# Set same state should not trigger a state change/listener
|
||||
self.hass.states.set('light.Bowl', 'on')
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(0, len(specific_runs))
|
||||
self.assertEqual(0, len(wildcard_runs))
|
||||
|
||||
# State change off -> on
|
||||
self.hass.states.set('light.Bowl', 'off')
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(1, len(wildcard_runs))
|
||||
|
||||
# State change off -> off
|
||||
self.hass.states.set('light.Bowl', 'off', {"some_attr": 1})
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(2, len(wildcard_runs))
|
||||
|
||||
# State change off -> on
|
||||
self.hass.states.set('light.Bowl', 'on')
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(specific_runs))
|
||||
self.assertEqual(3, len(wildcard_runs))
|
||||
|
||||
def _send_time_changed(self, now):
|
||||
""" Send a time changed event. """
|
||||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
|
|
@ -24,10 +24,23 @@ class TestServiceHelpers(unittest.TestCase):
|
|||
self.hass = get_test_home_assistant()
|
||||
self.calls = mock_service(self.hass, 'test_domain', 'test_service')
|
||||
|
||||
service.HASS = self.hass
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
""" Stop down stuff we started. """
|
||||
self.hass.stop()
|
||||
|
||||
def test_service(self):
|
||||
""" Test service registration decorator. """
|
||||
runs = []
|
||||
|
||||
decor = service.service('test', 'test')
|
||||
decor(lambda x, y: runs.append(1))
|
||||
|
||||
self.hass.services.call('test', 'test')
|
||||
self.hass.pool.block_till_done()
|
||||
self.assertEqual(1, len(runs))
|
||||
|
||||
def test_split_entity_string(self):
|
||||
service.call_from_config(self.hass, {
|
||||
'service': 'test_domain.test_service',
|
||||
|
|
Loading…
Reference in New Issue