Trigger on not matching to/from states (#69760)
parent
0d6d8a17e3
commit
fb92fc6a8e
|
@ -36,6 +36,8 @@ _LOGGER = logging.getLogger(__name__)
|
|||
CONF_ENTITY_ID = "entity_id"
|
||||
CONF_FROM = "from"
|
||||
CONF_TO = "to"
|
||||
CONF_NOT_FROM = "not_from"
|
||||
CONF_NOT_TO = "not_to"
|
||||
|
||||
BASE_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
|
||||
{
|
||||
|
@ -49,15 +51,19 @@ BASE_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
|
|||
TRIGGER_STATE_SCHEMA = BASE_SCHEMA.extend(
|
||||
{
|
||||
# These are str on purpose. Want to catch YAML conversions
|
||||
vol.Optional(CONF_FROM): vol.Any(str, [str], None),
|
||||
vol.Optional(CONF_TO): vol.Any(str, [str], None),
|
||||
vol.Exclusive(CONF_FROM, CONF_FROM): vol.Any(str, [str], None),
|
||||
vol.Exclusive(CONF_NOT_FROM, CONF_FROM): vol.Any(str, [str], None),
|
||||
vol.Exclusive(CONF_TO, CONF_TO): vol.Any(str, [str], None),
|
||||
vol.Exclusive(CONF_NOT_TO, CONF_TO): vol.Any(str, [str], None),
|
||||
}
|
||||
)
|
||||
|
||||
TRIGGER_ATTRIBUTE_SCHEMA = BASE_SCHEMA.extend(
|
||||
{
|
||||
vol.Optional(CONF_FROM): cv.match_all,
|
||||
vol.Optional(CONF_TO): cv.match_all,
|
||||
vol.Exclusive(CONF_FROM, CONF_FROM): cv.match_all,
|
||||
vol.Exclusive(CONF_NOT_FROM, CONF_FROM): cv.match_all,
|
||||
vol.Exclusive(CONF_TO, CONF_TO): cv.match_all,
|
||||
vol.Exclusive(CONF_NOT_TO, CONF_TO): cv.match_all,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -94,19 +100,30 @@ async def async_attach_trigger(
|
|||
) -> CALLBACK_TYPE:
|
||||
"""Listen for state changes based on configuration."""
|
||||
entity_ids = config[CONF_ENTITY_ID]
|
||||
if (from_state := config.get(CONF_FROM)) is None:
|
||||
from_state = MATCH_ALL
|
||||
if (to_state := config.get(CONF_TO)) is None:
|
||||
to_state = MATCH_ALL
|
||||
|
||||
if (from_state := config.get(CONF_FROM)) is not None:
|
||||
match_from_state = process_state_match(from_state)
|
||||
elif (not_from_state := config.get(CONF_NOT_FROM)) is not None:
|
||||
match_from_state = process_state_match(not_from_state, invert=True)
|
||||
else:
|
||||
match_from_state = process_state_match(MATCH_ALL)
|
||||
|
||||
if (to_state := config.get(CONF_TO)) is not None:
|
||||
match_to_state = process_state_match(to_state)
|
||||
elif (not_to_state := config.get(CONF_NOT_TO)) is not None:
|
||||
match_to_state = process_state_match(not_to_state, invert=True)
|
||||
else:
|
||||
match_to_state = process_state_match(MATCH_ALL)
|
||||
|
||||
time_delta = config.get(CONF_FOR)
|
||||
template.attach(hass, time_delta)
|
||||
# If neither CONF_FROM or CONF_TO are specified,
|
||||
# fire on all changes to the state or an attribute
|
||||
match_all = CONF_FROM not in config and CONF_TO not in config
|
||||
match_all = all(
|
||||
item not in config for item in (CONF_FROM, CONF_NOT_FROM, CONF_NOT_TO, CONF_TO)
|
||||
)
|
||||
unsub_track_same = {}
|
||||
period: dict[str, timedelta] = {}
|
||||
match_from_state = process_state_match(from_state)
|
||||
match_to_state = process_state_match(to_state)
|
||||
attribute = config.get(CONF_ATTRIBUTE)
|
||||
job = HassJob(action)
|
||||
|
||||
|
|
|
@ -1532,17 +1532,17 @@ track_time_change = threaded_listener_factory(async_track_time_change)
|
|||
|
||||
|
||||
def process_state_match(
|
||||
parameter: None | str | Iterable[str],
|
||||
parameter: None | str | Iterable[str], invert: bool = False
|
||||
) -> Callable[[str | None], bool]:
|
||||
"""Convert parameter to function that matches input against parameter."""
|
||||
if parameter is None or parameter == MATCH_ALL:
|
||||
return lambda _: True
|
||||
return lambda _: not invert
|
||||
|
||||
if isinstance(parameter, str) or not hasattr(parameter, "__iter__"):
|
||||
return lambda state: state == parameter
|
||||
return lambda state: invert is not (state == parameter)
|
||||
|
||||
parameter_set = set(parameter)
|
||||
return lambda state: state in parameter_set
|
||||
return lambda state: invert is not (state in parameter_set)
|
||||
|
||||
|
||||
@callback
|
||||
|
|
|
@ -7,7 +7,7 @@ import pytest
|
|||
import homeassistant.components.automation as automation
|
||||
from homeassistant.components.homeassistant.triggers import state as state_trigger
|
||||
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF
|
||||
from homeassistant.core import Context
|
||||
from homeassistant.core import Context, HomeAssistant, ServiceCall
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
@ -21,7 +21,7 @@ from tests.common import (
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def calls(hass):
|
||||
def calls(hass: HomeAssistant) -> list[ServiceCall]:
|
||||
"""Track calls to a mock service."""
|
||||
return async_mock_service(hass, "test", "automation")
|
||||
|
||||
|
@ -164,6 +164,36 @@ async def test_if_fires_on_entity_change_with_from_filter(hass, calls):
|
|||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_not_from_filter(
|
||||
hass: HomeAssistant, calls: list[ServiceCall]
|
||||
) -> None:
|
||||
"""Test for firing on entity change inverse filter."""
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"platform": "state",
|
||||
"entity_id": "test.entity",
|
||||
"not_from": "hello",
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Do not fire from hello
|
||||
hass.states.async_set("test.entity", "world")
|
||||
await hass.async_block_till_done()
|
||||
assert not calls
|
||||
|
||||
hass.states.async_set("test.entity", "universum")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_to_filter(hass, calls):
|
||||
"""Test for firing on entity change with to filter."""
|
||||
assert await async_setup_component(
|
||||
|
@ -187,6 +217,36 @@ async def test_if_fires_on_entity_change_with_to_filter(hass, calls):
|
|||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_not_to_filter(
|
||||
hass: HomeAssistant, calls: list[ServiceCall]
|
||||
) -> None:
|
||||
"""Test for firing on entity change with to filter."""
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"platform": "state",
|
||||
"entity_id": "test.entity",
|
||||
"not_to": "world",
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Do not fire to world
|
||||
hass.states.async_set("test.entity", "world")
|
||||
await hass.async_block_till_done()
|
||||
assert not calls
|
||||
|
||||
hass.states.async_set("test.entity", "universum")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_from_filter_all(hass, calls):
|
||||
"""Test for firing on entity change with filter."""
|
||||
assert await async_setup_component(
|
||||
|
@ -283,6 +343,100 @@ async def test_if_fires_on_entity_change_with_both_filters(hass, calls):
|
|||
assert len(calls) == 1
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_not_from_to(
|
||||
hass: HomeAssistant, calls: list[ServiceCall]
|
||||
) -> None:
|
||||
"""Test for firing if not from doesn't match and to match."""
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"platform": "state",
|
||||
"entity_id": "test.entity",
|
||||
"not_from": ["hello", "galaxy"],
|
||||
"to": ["galaxy", "universe"],
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# We should not trigger from hello
|
||||
hass.states.async_set("test.entity", "world")
|
||||
await hass.async_block_till_done()
|
||||
assert not calls
|
||||
|
||||
# We should not trigger to != galaxy
|
||||
hass.states.async_set("test.entity", "world")
|
||||
await hass.async_block_till_done()
|
||||
assert not calls
|
||||
|
||||
# We should trigger to galaxy
|
||||
hass.states.async_set("test.entity", "galaxy")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
# We should not trigger from milky way
|
||||
hass.states.async_set("test.entity", "milky_way")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
# We should trigger to universe
|
||||
hass.states.async_set("test.entity", "universe")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
|
||||
|
||||
async def test_if_fires_on_entity_change_with_from_not_to(
|
||||
hass: HomeAssistant, calls: list[ServiceCall]
|
||||
) -> None:
|
||||
"""Test for firing if not from doesn't match and to match."""
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"platform": "state",
|
||||
"entity_id": "test.entity",
|
||||
"from": ["hello", "galaxy"],
|
||||
"not_to": ["galaxy", "universe"],
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# We should trigger to world from hello
|
||||
hass.states.async_set("test.entity", "world")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
# Reset back to hello, should not trigger
|
||||
hass.states.async_set("test.entity", "hello")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
# We should not trigger to galaxy
|
||||
hass.states.async_set("test.entity", "galaxy")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
# We should trigger form galaxy to milky way
|
||||
hass.states.async_set("test.entity", "milky_way")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
|
||||
# We should not trigger to universe
|
||||
hass.states.async_set("test.entity", "universe")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
|
||||
|
||||
async def test_if_not_fires_if_to_filter_not_match(hass, calls):
|
||||
"""Test for not firing if to filter is not a match."""
|
||||
assert await async_setup_component(
|
||||
|
|
Loading…
Reference in New Issue