Add an input_datetime (#9313)
* Initial proposal for the input_datetime * Linting * Further linting, don't define time validation twice * Make pylint *and* flake8 happy at the same time * Move todos to the PR to make lint happy * Actually validate the type of date/time * First testing * Linting * Address code review issues * Code review: Remove forgotten print()s * Make set_datetime a coroutine * Create contains_at_least_one_key_value CV method, use it * Add timestamp to the attributes * Test and fix corner case where restore data is bogus * Add FIXME * Fix date/time setting * Fix Validation * Merge date / time validation, add tests * Simplify service data validation * No default for initial state, allow 'unknown' as state * cleanup * fix schemapull/9621/head
parent
2df433eb0a
commit
236d5f8742
|
@ -0,0 +1,227 @@
|
|||
"""
|
||||
Component to offer a way to select a date and / or a time.
|
||||
|
||||
For more details about this component, please refer to the documentation
|
||||
at https://home-assistant.io/components/input_datetime/
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import (
|
||||
ATTR_ENTITY_ID, CONF_ICON, CONF_NAME, STATE_UNKNOWN)
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.helpers.entity_component import EntityComponent
|
||||
from homeassistant.helpers.restore_state import async_get_last_state
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DOMAIN = 'input_datetime'
|
||||
ENTITY_ID_FORMAT = DOMAIN + '.{}'
|
||||
|
||||
CONF_HAS_DATE = 'has_date'
|
||||
CONF_HAS_TIME = 'has_time'
|
||||
CONF_INITIAL = 'initial'
|
||||
|
||||
ATTR_DATE = 'date'
|
||||
ATTR_TIME = 'time'
|
||||
|
||||
SERVICE_SET_DATETIME = 'set_datetime'
|
||||
|
||||
SERVICE_SET_DATETIME_SCHEMA = vol.Schema({
|
||||
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
|
||||
vol.Optional(ATTR_DATE): cv.date,
|
||||
vol.Optional(ATTR_TIME): cv.time,
|
||||
})
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema({
|
||||
DOMAIN: vol.Schema({
|
||||
cv.slug: vol.All({
|
||||
vol.Optional(CONF_NAME): cv.string,
|
||||
vol.Required(CONF_HAS_DATE): cv.boolean,
|
||||
vol.Required(CONF_HAS_TIME): cv.boolean,
|
||||
vol.Optional(CONF_ICON): cv.icon,
|
||||
vol.Optional(CONF_INITIAL): cv.datetime,
|
||||
}, cv.has_at_least_one_key_value((CONF_HAS_DATE, True),
|
||||
(CONF_HAS_TIME, True)))})
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_set_datetime(hass, entity_id, dt_value):
|
||||
"""Set date and / or time of input_datetime."""
|
||||
yield from hass.services.async_call(DOMAIN, SERVICE_SET_DATETIME, {
|
||||
ATTR_ENTITY_ID: entity_id,
|
||||
ATTR_DATE: dt_value.date(),
|
||||
ATTR_TIME: dt_value.time()
|
||||
})
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_setup(hass, config):
|
||||
"""Set up an input datetime."""
|
||||
component = EntityComponent(_LOGGER, DOMAIN, hass)
|
||||
|
||||
entities = []
|
||||
|
||||
for object_id, cfg in config[DOMAIN].items():
|
||||
name = cfg.get(CONF_NAME)
|
||||
has_time = cfg.get(CONF_HAS_TIME)
|
||||
has_date = cfg.get(CONF_HAS_DATE)
|
||||
icon = cfg.get(CONF_ICON)
|
||||
initial = cfg.get(CONF_INITIAL)
|
||||
entities.append(InputDatetime(object_id, name,
|
||||
has_date, has_time, icon, initial))
|
||||
|
||||
if not entities:
|
||||
return False
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_set_datetime_service(call):
|
||||
"""Handle a call to the input datetime 'set datetime' service."""
|
||||
target_inputs = component.async_extract_from_service(call)
|
||||
|
||||
tasks = []
|
||||
for input_datetime in target_inputs:
|
||||
time = call.data.get(ATTR_TIME)
|
||||
date = call.data.get(ATTR_DATE)
|
||||
if (input_datetime.has_date() and not date) or \
|
||||
(input_datetime.has_time() and not time):
|
||||
_LOGGER.error("Invalid service data for "
|
||||
"input_datetime.set_datetime: %s",
|
||||
str(call.data))
|
||||
continue
|
||||
|
||||
tasks.append(input_datetime.async_set_datetime(date, time))
|
||||
|
||||
if tasks:
|
||||
yield from asyncio.wait(tasks, loop=hass.loop)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN, SERVICE_SET_DATETIME, async_set_datetime_service,
|
||||
schema=SERVICE_SET_DATETIME_SCHEMA)
|
||||
|
||||
yield from component.async_add_entities(entities)
|
||||
return True
|
||||
|
||||
|
||||
class InputDatetime(Entity):
|
||||
"""Representation of a datetime input."""
|
||||
|
||||
def __init__(self, object_id, name, has_date, has_time, icon, initial):
|
||||
"""Initialize a select input."""
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(object_id)
|
||||
self._name = name
|
||||
self._has_date = has_date
|
||||
self._has_time = has_time
|
||||
self._icon = icon
|
||||
self._initial = initial
|
||||
self._current_datetime = None
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_added_to_hass(self):
|
||||
"""Run when entity about to be added."""
|
||||
restore_val = None
|
||||
|
||||
# Priority 1: Initial State
|
||||
if self._initial is not None:
|
||||
restore_val = self._initial
|
||||
|
||||
# Priority 2: Old state
|
||||
if restore_val is None:
|
||||
old_state = yield from async_get_last_state(self.hass,
|
||||
self.entity_id)
|
||||
if old_state is not None:
|
||||
restore_val = dt_util.parse_datetime(old_state.state)
|
||||
|
||||
if restore_val is not None:
|
||||
if not self._has_date:
|
||||
self._current_datetime = restore_val.time()
|
||||
elif not self._has_time:
|
||||
self._current_datetime = restore_val.date()
|
||||
else:
|
||||
self._current_datetime = restore_val
|
||||
|
||||
def has_date(self):
|
||||
"""Return whether the input datetime carries a date."""
|
||||
return self._has_date
|
||||
|
||||
def has_time(self):
|
||||
"""Return whether the input datetime carries a time."""
|
||||
return self._has_time
|
||||
|
||||
@property
|
||||
def should_poll(self):
|
||||
"""If entity should be polled."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the select input."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def icon(self):
|
||||
"""Return the icon to be used for this entity."""
|
||||
return self._icon
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the component."""
|
||||
if self._current_datetime is None:
|
||||
return STATE_UNKNOWN
|
||||
|
||||
return self._current_datetime
|
||||
|
||||
@property
|
||||
def state_attributes(self):
|
||||
"""Return the state attributes."""
|
||||
attrs = {
|
||||
'has_date': self._has_date,
|
||||
'has_time': self._has_time,
|
||||
}
|
||||
|
||||
if self._current_datetime is None:
|
||||
return attrs
|
||||
|
||||
if self._has_date and self._current_datetime is not None:
|
||||
attrs['year'] = self._current_datetime.year
|
||||
attrs['month'] = self._current_datetime.month
|
||||
attrs['day'] = self._current_datetime.day
|
||||
|
||||
if self._has_time and self._current_datetime is not None:
|
||||
attrs['hour'] = self._current_datetime.hour
|
||||
attrs['minute'] = self._current_datetime.minute
|
||||
attrs['second'] = self._current_datetime.second
|
||||
|
||||
if self._current_datetime is not None:
|
||||
if not self._has_date:
|
||||
attrs['timestamp'] = self._current_datetime.hour * 3600 + \
|
||||
self._current_datetime.minute * 60 + \
|
||||
self._current_datetime.second
|
||||
elif not self._has_time:
|
||||
extended = datetime.datetime.combine(self._current_datetime,
|
||||
datetime.time(0, 0))
|
||||
attrs['timestamp'] = extended.timestamp()
|
||||
else:
|
||||
attrs['timestamp'] = self._current_datetime.timestamp()
|
||||
|
||||
return attrs
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_set_datetime(self, date_val, time_val):
|
||||
"""Set a new date / time."""
|
||||
if self._has_date and self._has_time and date_val and time_val:
|
||||
self._current_datetime = datetime.datetime.combine(date_val,
|
||||
time_val)
|
||||
elif self._has_date and not self._has_time and date_val:
|
||||
self._current_datetime = date_val
|
||||
if self._has_time and not self._has_date and time_val:
|
||||
self._current_datetime = time_val
|
||||
|
||||
yield from self.async_update_ha_state()
|
|
@ -1,5 +1,6 @@
|
|||
"""Helpers for config validation using voluptuous."""
|
||||
from datetime import timedelta, datetime as datetime_sys
|
||||
from datetime import (timedelta, datetime as datetime_sys,
|
||||
time as time_sys, date as date_sys)
|
||||
import os
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
|
@ -57,6 +58,21 @@ def has_at_least_one_key(*keys: str) -> Callable:
|
|||
return validate
|
||||
|
||||
|
||||
def has_at_least_one_key_value(*items: list) -> Callable:
|
||||
"""Validate that at least one (key, value) pair exists."""
|
||||
def validate(obj: Dict) -> Dict:
|
||||
"""Test (key,value) exist in dict."""
|
||||
if not isinstance(obj, dict):
|
||||
raise vol.Invalid('expected dictionary')
|
||||
|
||||
for item in obj.items():
|
||||
if item in items:
|
||||
return obj
|
||||
raise vol.Invalid('must contain one of {}.'.format(str(items)))
|
||||
|
||||
return validate
|
||||
|
||||
|
||||
def boolean(value: Any) -> bool:
|
||||
"""Validate and coerce a boolean value."""
|
||||
if isinstance(value, str):
|
||||
|
@ -144,6 +160,38 @@ time_period_dict = vol.All(
|
|||
lambda value: timedelta(**value))
|
||||
|
||||
|
||||
def time(value) -> time_sys:
|
||||
"""Validate and transform a time."""
|
||||
if isinstance(value, time_sys):
|
||||
return value
|
||||
|
||||
try:
|
||||
time_val = dt_util.parse_time(value)
|
||||
except TypeError:
|
||||
raise vol.Invalid('Not a parseable type')
|
||||
|
||||
if time_val is None:
|
||||
raise vol.Invalid('Invalid time specified: {}'.format(value))
|
||||
|
||||
return time_val
|
||||
|
||||
|
||||
def date(value) -> date_sys:
|
||||
"""Validate and transform a date."""
|
||||
if isinstance(value, date_sys):
|
||||
return value
|
||||
|
||||
try:
|
||||
date_val = dt_util.parse_date(value)
|
||||
except TypeError:
|
||||
raise vol.Invalid('Not a parseable type')
|
||||
|
||||
if date_val is None:
|
||||
raise vol.Invalid("Could not parse date")
|
||||
|
||||
return date_val
|
||||
|
||||
|
||||
def time_period_str(value: str) -> timedelta:
|
||||
"""Validate and transform time offset."""
|
||||
if isinstance(value, int):
|
||||
|
@ -297,16 +345,6 @@ def template_complex(value):
|
|||
return template(value)
|
||||
|
||||
|
||||
def time(value):
|
||||
"""Validate time."""
|
||||
time_val = dt_util.parse_time(value)
|
||||
|
||||
if time_val is None:
|
||||
raise vol.Invalid('Invalid time specified: {}'.format(value))
|
||||
|
||||
return time_val
|
||||
|
||||
|
||||
def datetime(value):
|
||||
"""Validate datetime."""
|
||||
if isinstance(value, datetime_sys):
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
"""Tests for the Input slider component."""
|
||||
# pylint: disable=protected-access
|
||||
import asyncio
|
||||
import unittest
|
||||
import datetime
|
||||
|
||||
from homeassistant.core import CoreState, State
|
||||
from homeassistant.setup import setup_component, async_setup_component
|
||||
from homeassistant.components.input_datetime import (
|
||||
DOMAIN, async_set_datetime)
|
||||
|
||||
from tests.common import get_test_home_assistant, mock_restore_cache
|
||||
|
||||
|
||||
class TestInputDatetime(unittest.TestCase):
|
||||
"""Test the input datetime component."""
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def setUp(self):
|
||||
"""Setup things to be run when tests are started."""
|
||||
self.hass = get_test_home_assistant()
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def tearDown(self):
|
||||
"""Stop everything that was started."""
|
||||
self.hass.stop()
|
||||
|
||||
def test_invalid_configs(self):
|
||||
"""Test config."""
|
||||
invalid_configs = [
|
||||
None,
|
||||
{},
|
||||
{'name with space': None},
|
||||
{'test_no_value': {
|
||||
'has_time': False,
|
||||
'has_date': False
|
||||
}},
|
||||
]
|
||||
for cfg in invalid_configs:
|
||||
self.assertFalse(
|
||||
setup_component(self.hass, DOMAIN, {DOMAIN: cfg}))
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_set_datetime(hass):
|
||||
"""Test set_datetime method."""
|
||||
yield from async_setup_component(hass, DOMAIN, {
|
||||
DOMAIN: {
|
||||
'test_datetime': {
|
||||
'has_time': True,
|
||||
'has_date': True
|
||||
},
|
||||
}})
|
||||
|
||||
entity_id = 'input_datetime.test_datetime'
|
||||
|
||||
dt_obj = datetime.datetime(2017, 9, 7, 19, 46)
|
||||
|
||||
yield from async_set_datetime(hass, entity_id, dt_obj)
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == str(dt_obj)
|
||||
assert state.attributes['has_time']
|
||||
assert state.attributes['has_date']
|
||||
|
||||
assert state.attributes['year'] == 2017
|
||||
assert state.attributes['month'] == 9
|
||||
assert state.attributes['day'] == 7
|
||||
assert state.attributes['hour'] == 19
|
||||
assert state.attributes['minute'] == 46
|
||||
assert state.attributes['timestamp'] == dt_obj.timestamp()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_set_datetime_time(hass):
|
||||
"""Test set_datetime method with only time."""
|
||||
yield from async_setup_component(hass, DOMAIN, {
|
||||
DOMAIN: {
|
||||
'test_time': {
|
||||
'has_time': True,
|
||||
'has_date': False
|
||||
}
|
||||
}})
|
||||
|
||||
entity_id = 'input_datetime.test_time'
|
||||
|
||||
dt_obj = datetime.datetime(2017, 9, 7, 19, 46)
|
||||
time_portion = dt_obj.time()
|
||||
|
||||
yield from async_set_datetime(hass, entity_id, dt_obj)
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == str(time_portion)
|
||||
assert state.attributes['has_time']
|
||||
assert not state.attributes['has_date']
|
||||
|
||||
assert state.attributes['timestamp'] == (19 * 3600) + (46 * 60)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_set_invalid(hass):
|
||||
"""Test set_datetime method with only time."""
|
||||
initial = datetime.datetime(2017, 1, 1, 0, 0)
|
||||
yield from async_setup_component(hass, DOMAIN, {
|
||||
DOMAIN: {
|
||||
'test_date': {
|
||||
'has_time': False,
|
||||
'has_date': True,
|
||||
'initial': initial
|
||||
}
|
||||
}})
|
||||
|
||||
entity_id = 'input_datetime.test_date'
|
||||
|
||||
dt_obj = datetime.datetime(2017, 9, 7, 19, 46)
|
||||
time_portion = dt_obj.time()
|
||||
|
||||
yield from hass.services.async_call('input_datetime', 'set_datetime', {
|
||||
'entity_id': 'test_date',
|
||||
'time': time_portion
|
||||
})
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == str(initial.date())
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_set_datetime_date(hass):
|
||||
"""Test set_datetime method with only date."""
|
||||
yield from async_setup_component(hass, DOMAIN, {
|
||||
DOMAIN: {
|
||||
'test_date': {
|
||||
'has_time': False,
|
||||
'has_date': True
|
||||
}
|
||||
}})
|
||||
|
||||
entity_id = 'input_datetime.test_date'
|
||||
|
||||
dt_obj = datetime.datetime(2017, 9, 7, 19, 46)
|
||||
date_portion = dt_obj.date()
|
||||
|
||||
yield from async_set_datetime(hass, entity_id, dt_obj)
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get(entity_id)
|
||||
assert state.state == str(date_portion)
|
||||
assert not state.attributes['has_time']
|
||||
assert state.attributes['has_date']
|
||||
|
||||
date_dt_obj = datetime.datetime(2017, 9, 7)
|
||||
assert state.attributes['timestamp'] == date_dt_obj.timestamp()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_restore_state(hass):
|
||||
"""Ensure states are restored on startup."""
|
||||
mock_restore_cache(hass, (
|
||||
State('input_datetime.test_time', '2017-09-07 19:46:00'),
|
||||
State('input_datetime.test_date', '2017-09-07 19:46:00'),
|
||||
State('input_datetime.test_datetime', '2017-09-07 19:46:00'),
|
||||
State('input_datetime.test_bogus_data', 'this is not a date'),
|
||||
))
|
||||
|
||||
hass.state = CoreState.starting
|
||||
|
||||
initial = datetime.datetime(2017, 1, 1, 23, 42)
|
||||
|
||||
yield from async_setup_component(hass, DOMAIN, {
|
||||
DOMAIN: {
|
||||
'test_time': {
|
||||
'has_time': True,
|
||||
'has_date': False
|
||||
},
|
||||
'test_date': {
|
||||
'has_time': False,
|
||||
'has_date': True
|
||||
},
|
||||
'test_datetime': {
|
||||
'has_time': True,
|
||||
'has_date': True
|
||||
},
|
||||
'test_bogus_data': {
|
||||
'has_time': True,
|
||||
'has_date': True,
|
||||
'initial': str(initial)
|
||||
},
|
||||
}})
|
||||
|
||||
dt_obj = datetime.datetime(2017, 9, 7, 19, 46)
|
||||
state_time = hass.states.get('input_datetime.test_time')
|
||||
assert state_time.state == str(dt_obj.time())
|
||||
|
||||
state_date = hass.states.get('input_datetime.test_date')
|
||||
assert state_date.state == str(dt_obj.date())
|
||||
|
||||
state_datetime = hass.states.get('input_datetime.test_datetime')
|
||||
assert state_datetime.state == str(dt_obj)
|
||||
|
||||
state_bogus = hass.states.get('input_datetime.test_bogus_data')
|
||||
assert state_bogus.state == str(initial)
|
|
@ -405,6 +405,31 @@ def test_time_zone():
|
|||
schema('UTC')
|
||||
|
||||
|
||||
def test_date():
|
||||
"""Test date validation."""
|
||||
schema = vol.Schema(cv.date)
|
||||
|
||||
for value in ['Not a date', '23:42', '2016-11-23T18:59:08']:
|
||||
with pytest.raises(vol.Invalid):
|
||||
schema(value)
|
||||
|
||||
schema(datetime.now().date())
|
||||
schema('2016-11-23')
|
||||
|
||||
|
||||
def test_time():
|
||||
"""Test date validation."""
|
||||
schema = vol.Schema(cv.time)
|
||||
|
||||
for value in ['Not a time', '2016-11-23', '2016-11-23T18:59:08']:
|
||||
with pytest.raises(vol.Invalid):
|
||||
schema(value)
|
||||
|
||||
schema(datetime.now().time())
|
||||
schema('23:42:00')
|
||||
schema('23:42')
|
||||
|
||||
|
||||
def test_datetime():
|
||||
"""Test date time validation."""
|
||||
schema = vol.Schema(cv.datetime)
|
||||
|
@ -447,6 +472,21 @@ def test_has_at_least_one_key():
|
|||
schema(value)
|
||||
|
||||
|
||||
def test_has_at_least_one_key_value():
|
||||
"""Test has_at_least_one_key_value validator."""
|
||||
schema = vol.Schema(cv.has_at_least_one_key_value(('drink', 'beer'),
|
||||
('drink', 'soda'),
|
||||
('food', 'maultaschen')))
|
||||
|
||||
for value in (None, [], {}, {'wine': None}, {'drink': 'water'}):
|
||||
with pytest.raises(vol.MultipleInvalid):
|
||||
schema(value)
|
||||
|
||||
for value in ({'drink': 'beer'}, {'food': 'maultaschen'},
|
||||
{'drink': 'soda', 'food': 'maultaschen'}):
|
||||
schema(value)
|
||||
|
||||
|
||||
def test_enum():
|
||||
"""Test enum validator."""
|
||||
class TestEnum(enum.Enum):
|
||||
|
|
Loading…
Reference in New Issue