Make more max lengths constants and add additional MaxLengthExceeded checks (#50337)

* Add more MaxLengthExceeded checks

* remove some validations to reduce performance impact

* check length of generated entity ID

* dont check entity ID twice and use single context id length constant

* fix test

* add missing test
pull/51091/head
Raman Gupta 2021-05-25 13:58:01 -04:00 committed by GitHub
parent abd6f739e8
commit 58e37435b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 16 deletions

View File

@ -20,7 +20,14 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm.session import Session
from homeassistant.const import MAX_LENGTH_EVENT_TYPE
from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_DOMAIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
@ -63,14 +70,14 @@ class Events(Base): # type: ignore
}
__tablename__ = TABLE_EVENTS
event_id = Column(Integer, Identity(), primary_key=True)
event_type = Column(String(MAX_LENGTH_EVENT_TYPE))
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(32))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN))
time_fired = Column(DATETIME_TYPE, index=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
context_id = Column(String(36), index=True)
context_user_id = Column(String(36), index=True)
context_parent_id = Column(String(36), index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
__table_args__ = (
# Used for fetching events at a specific time
@ -130,9 +137,9 @@ class States(Base): # type: ignore
}
__tablename__ = TABLE_STATES
state_id = Column(Integer, Identity(), primary_key=True)
domain = Column(String(64))
entity_id = Column(String(255))
state = Column(String(255))
domain = Column(String(MAX_LENGTH_STATE_DOMAIN))
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
state = Column(String(MAX_LENGTH_STATE_STATE))
attributes = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
event_id = Column(
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True

View File

@ -26,9 +26,14 @@ ENTITY_MATCH_ALL: Final = "all"
# If no name is specified
DEVICE_DEFAULT_NAME: Final = "Unnamed Device"
# Max characters for an event_type (changing this requires a recorder
# database migration)
MAX_LENGTH_EVENT_TYPE: Final = 64
# Max characters for data stored in the recorder (changes to these limits would require
# a database migration)
MAX_LENGTH_EVENT_EVENT_TYPE: Final = 64
MAX_LENGTH_EVENT_ORIGIN: Final = 32
MAX_LENGTH_EVENT_CONTEXT_ID: Final = 36
MAX_LENGTH_STATE_DOMAIN: Final = 64
MAX_LENGTH_STATE_ENTITY_ID: Final = 255
MAX_LENGTH_STATE_STATE: Final = 255
# Sun events
SUN_EVENT_SUNSET: Final = "sunset"

View File

@ -47,7 +47,8 @@ from homeassistant.const import (
EVENT_TIMER_OUT_OF_SYNC,
LENGTH_METERS,
MATCH_ALL,
MAX_LENGTH_EVENT_TYPE,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_STATE_STATE,
__version__,
)
from homeassistant.exceptions import (
@ -130,7 +131,7 @@ def valid_entity_id(entity_id: str) -> bool:
def valid_state(state: str) -> bool:
"""Test if a state is valid."""
return len(state) < 256
return len(state) <= MAX_LENGTH_STATE_STATE
def callback(func: CALLABLE_T) -> CALLABLE_T:
@ -700,8 +701,10 @@ class EventBus:
This method must be run in the event loop.
"""
if len(event_type) > MAX_LENGTH_EVENT_TYPE:
raise MaxLengthExceeded(event_type, "event_type", MAX_LENGTH_EVENT_TYPE)
if len(event_type) > MAX_LENGTH_EVENT_EVENT_TYPE:
raise MaxLengthExceeded(
event_type, "event_type", MAX_LENGTH_EVENT_EVENT_TYPE
)
listeners = self._listeners.get(event_type, [])

View File

@ -24,6 +24,8 @@ from homeassistant.const import (
ATTR_SUPPORTED_FEATURES,
ATTR_UNIT_OF_MEASUREMENT,
EVENT_HOMEASSISTANT_START,
MAX_LENGTH_STATE_DOMAIN,
MAX_LENGTH_STATE_ENTITY_ID,
STATE_UNAVAILABLE,
)
from homeassistant.core import (
@ -33,6 +35,7 @@ from homeassistant.core import (
split_entity_id,
valid_entity_id,
)
from homeassistant.exceptions import MaxLengthExceeded
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
from homeassistant.loader import bind_hass
@ -201,6 +204,10 @@ class EntityRegistry:
Conflicts checked against registered and currently existing entities.
"""
preferred_string = f"{domain}.{slugify(suggested_object_id)}"
if len(domain) > MAX_LENGTH_STATE_DOMAIN:
raise MaxLengthExceeded(domain, "domain", MAX_LENGTH_STATE_DOMAIN)
test_string = preferred_string
if not known_object_ids:
known_object_ids = {}
@ -214,6 +221,11 @@ class EntityRegistry:
tries += 1
test_string = f"{preferred_string}_{tries}"
if len(test_string) > MAX_LENGTH_STATE_ENTITY_ID:
raise MaxLengthExceeded(
test_string, "generated_entity_id", MAX_LENGTH_STATE_ENTITY_ID
)
return test_string
@callback

View File

@ -6,6 +6,7 @@ import pytest
from homeassistant import config_entries
from homeassistant.const import EVENT_HOMEASSISTANT_START, STATE_UNAVAILABLE
from homeassistant.core import CoreState, callback, valid_entity_id
from homeassistant.exceptions import MaxLengthExceeded
from homeassistant.helpers import device_registry as dr, entity_registry as er
from tests.common import (
@ -904,3 +905,47 @@ async def test_disabled_entities_excluded_from_entity_list(hass, registry):
registry, device_entry.id, include_disabled_entities=True
)
assert entries == [entry1, entry2]
async def test_entity_max_length_exceeded(hass, registry):
"""Test that an exception is raised when the max character length is exceeded."""
long_entity_id_name = (
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890123456789012345678901234567890"
)
with pytest.raises(MaxLengthExceeded) as exc_info:
registry.async_generate_entity_id("sensor", long_entity_id_name)
assert exc_info.value.property_name == "generated_entity_id"
assert exc_info.value.max_length == 255
assert exc_info.value.value == f"sensor.{long_entity_id_name}"
# Try again but against the domain
long_domain_name = long_entity_id_name
with pytest.raises(MaxLengthExceeded) as exc_info:
registry.async_generate_entity_id(long_domain_name, "sensor")
assert exc_info.value.property_name == "domain"
assert exc_info.value.max_length == 64
assert exc_info.value.value == long_domain_name
# Try again but force a number to get added to the entity ID
long_entity_id_name = (
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567890123456789012345678901234567890"
"1234567890123456789012345678901234567"
)
with pytest.raises(MaxLengthExceeded) as exc_info:
registry.async_generate_entity_id(
"sensor", long_entity_id_name, [f"sensor.{long_entity_id_name}"]
)
assert exc_info.value.property_name == "generated_entity_id"
assert exc_info.value.max_length == 255
assert exc_info.value.value == f"sensor.{long_entity_id_name}_2"