Entity for Tags (#115048)

Co-authored-by: Robert Resch <robert@resch.dev>
Co-authored-by: Erik <erik@montnemery.com>
pull/118393/head
G Johansson 2024-05-29 17:45:19 +02:00 committed by GitHub
parent f37edc207e
commit 9e3e7f5b48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 570 additions and 43 deletions

View File

@ -3,41 +3,55 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, final
import uuid
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.const import CONF_NAME
from homeassistant.core import Context, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import collection
from homeassistant.helpers import collection, entity_registry as er
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.storage import Store
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import slugify
import homeassistant.util.dt as dt_util
from homeassistant.util.hass_dict import HassKey
from .const import DEVICE_ID, DOMAIN, EVENT_TAG_SCANNED, TAG_ID
from .const import DEFAULT_NAME, DEVICE_ID, DOMAIN, EVENT_TAG_SCANNED, LOGGER, TAG_ID
_LOGGER = logging.getLogger(__name__)
LAST_SCANNED = "last_scanned"
LAST_SCANNED_BY_DEVICE_ID = "last_scanned_by_device_id"
STORAGE_KEY = DOMAIN
STORAGE_VERSION = 1
STORAGE_VERSION_MINOR = 2
TAG_DATA: HassKey[TagStorageCollection] = HassKey(DOMAIN)
SIGNAL_TAG_CHANGED = "signal_tag_changed"
CREATE_FIELDS = {
vol.Optional(TAG_ID): cv.string,
vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
vol.Optional("description"): cv.string,
vol.Optional(LAST_SCANNED): cv.datetime,
vol.Optional(DEVICE_ID): cv.string,
}
UPDATE_FIELDS = {
vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
vol.Optional("description"): cv.string,
vol.Optional(LAST_SCANNED): cv.datetime,
vol.Optional(DEVICE_ID): cv.string,
}
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
@ -63,12 +77,60 @@ class TagIDManager(collection.IDManager):
return suggestion
def _create_entry(
entity_registry: er.EntityRegistry, tag_id: str, name: str | None
) -> er.RegistryEntry:
"""Create an entity registry entry for a tag."""
entry = entity_registry.async_get_or_create(
DOMAIN,
DOMAIN,
tag_id,
original_name=f"{DEFAULT_NAME} {tag_id}",
suggested_object_id=slugify(name) if name else tag_id,
)
return entity_registry.async_update_entity(entry.entity_id, name=name)
class TagStore(Store[collection.SerializedStorageCollection]):
"""Store tag data."""
async def _async_migrate_func(
self,
old_major_version: int,
old_minor_version: int,
old_data: dict[str, list[dict[str, Any]]],
) -> dict:
"""Migrate to the new version."""
data = old_data
if old_major_version == 1 and old_minor_version < 2:
entity_registry = er.async_get(self.hass)
# Version 1.2 moves name to entity registry
for tag in data["items"]:
# Copy name in tag store to the entity registry
_create_entry(entity_registry, tag[TAG_ID], tag.get(CONF_NAME))
tag["migrated"] = True
if old_major_version > 1:
raise NotImplementedError
return data
class TagStorageCollection(collection.DictStorageCollection):
"""Tag collection stored in storage."""
CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
def __init__(
self,
store: TagStore,
id_manager: collection.IDManager | None = None,
) -> None:
"""Initialize the storage collection."""
super().__init__(store, id_manager)
self.entity_registry = er.async_get(self.hass)
async def _process_create_data(self, data: dict) -> dict:
"""Validate the config is valid."""
data = self.CREATE_SCHEMA(data)
@ -77,6 +139,10 @@ class TagStorageCollection(collection.DictStorageCollection):
# make last_scanned JSON serializeable
if LAST_SCANNED in data:
data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
# Create entity in entity_registry when creating the tag
# This is done early to store name only once in entity registry
_create_entry(self.entity_registry, data[TAG_ID], data.get(CONF_NAME))
return data
@callback
@ -87,24 +153,163 @@ class TagStorageCollection(collection.DictStorageCollection):
async def _update_data(self, item: dict, update_data: dict) -> dict:
"""Return a new updated data object."""
data = {**item, **self.UPDATE_SCHEMA(update_data)}
tag_id = data[TAG_ID]
# make last_scanned JSON serializeable
if LAST_SCANNED in update_data:
data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
if name := data.get(CONF_NAME):
if entity_id := self.entity_registry.async_get_entity_id(
DOMAIN, DOMAIN, tag_id
):
self.entity_registry.async_update_entity(entity_id, name=name)
else:
raise collection.ItemNotFound(tag_id)
return data
def _serialize_item(self, item_id: str, item: dict) -> dict:
"""Return the serialized representation of an item for storing.
We don't store the name, it's stored in the entity registry.
"""
# Preserve the name of migrated entries to allow downgrading to 2024.5
# without losing tag names. This can be removed in HA Core 2025.1.
migrated = item_id in self.data and "migrated" in self.data[item_id]
return {k: v for k, v in item.items() if k != CONF_NAME or migrated}
class TagDictStorageCollectionWebsocket(
collection.StorageCollectionWebsocket[TagStorageCollection]
):
"""Class to expose tag storage collection management over websocket."""
def __init__(
self,
storage_collection: TagStorageCollection,
api_prefix: str,
model_name: str,
create_schema: ConfigType,
update_schema: ConfigType,
) -> None:
"""Initialize a websocket for tag."""
super().__init__(
storage_collection, api_prefix, model_name, create_schema, update_schema
)
self.entity_registry = er.async_get(storage_collection.hass)
@callback
def ws_list_item(
self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""List items specifically for tag.
Provides name from entity_registry instead of storage collection.
"""
tag_items = []
for item in self.storage_collection.async_items():
# Make a copy to avoid adding name to the stored entry
item = {k: v for k, v in item.items() if k != "migrated"}
if (
entity_id := self.entity_registry.async_get_entity_id(
DOMAIN, DOMAIN, item[TAG_ID]
)
) and (entity := self.entity_registry.async_get(entity_id)):
item[CONF_NAME] = entity.name or entity.original_name
tag_items.append(item)
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Listing tags %s", tag_items)
connection.send_result(msg["id"], tag_items)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Tag component."""
component = EntityComponent[TagEntity](LOGGER, DOMAIN, hass)
id_manager = TagIDManager()
hass.data[TAG_DATA] = storage_collection = TagStorageCollection(
Store(hass, STORAGE_VERSION, STORAGE_KEY),
TagStore(
hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
),
id_manager,
)
await storage_collection.async_load()
collection.DictStorageCollectionWebsocket(
TagDictStorageCollectionWebsocket(
storage_collection, DOMAIN, DOMAIN, CREATE_FIELDS, UPDATE_FIELDS
).async_setup(hass)
entity_registry = er.async_get(hass)
async def tag_change_listener(
change_type: str, item_id: str, updated_config: dict
) -> None:
"""Tag storage change listener."""
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"%s, item: %s, update: %s", change_type, item_id, updated_config
)
if change_type == collection.CHANGE_ADDED:
# When tags are added to storage
entity = _create_entry(entity_registry, updated_config[TAG_ID], None)
if TYPE_CHECKING:
assert entity.original_name
await component.async_add_entities(
[
TagEntity(
hass,
entity.name or entity.original_name,
updated_config[TAG_ID],
updated_config.get(LAST_SCANNED),
updated_config.get(DEVICE_ID),
)
]
)
elif change_type == collection.CHANGE_UPDATED:
# When tags are changed or updated in storage
async_dispatcher_send(
hass,
SIGNAL_TAG_CHANGED,
updated_config.get(DEVICE_ID),
updated_config.get(LAST_SCANNED),
)
# Deleted tags
elif change_type == collection.CHANGE_REMOVED:
# When tags are removed from storage
entity_id = entity_registry.async_get_entity_id(
DOMAIN, DOMAIN, updated_config[TAG_ID]
)
if entity_id:
entity_registry.async_remove(entity_id)
storage_collection.async_add_listener(tag_change_listener)
entities: list[TagEntity] = []
for tag in storage_collection.async_items():
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Adding tag: %s", tag)
entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag[TAG_ID])
if entity_id := entity_registry.async_get_entity_id(
DOMAIN, DOMAIN, tag[TAG_ID]
):
entity = entity_registry.async_get(entity_id)
else:
entity = _create_entry(entity_registry, tag[TAG_ID], None)
if TYPE_CHECKING:
assert entity
assert entity.original_name
name = entity.name or entity.original_name
entities.append(
TagEntity(
hass,
name,
tag[TAG_ID],
tag.get(LAST_SCANNED),
tag.get(DEVICE_ID),
)
)
await component.async_add_entities(entities)
return True
@ -119,11 +324,13 @@ async def async_scan_tag(
raise HomeAssistantError("tag component has not been set up.")
storage_collection = hass.data[TAG_DATA]
entity_registry = er.async_get(hass)
entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag_id)
# Get name from helper, default value None if not present in data
# Get name from entity registry, default value None if not present
tag_name = None
if tag_data := storage_collection.data.get(tag_id):
tag_name = tag_data.get(CONF_NAME)
if entity_id and (entity := entity_registry.async_get(entity_id)):
tag_name = entity.name or entity.original_name
hass.bus.async_fire(
EVENT_TAG_SCANNED,
@ -131,12 +338,87 @@ async def async_scan_tag(
context=context,
)
extra_kwargs = {}
if device_id:
extra_kwargs[DEVICE_ID] = device_id
if tag_id in storage_collection.data:
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Updating tag %s with extra %s", tag_id, extra_kwargs)
await storage_collection.async_update_item(
tag_id, {LAST_SCANNED: dt_util.utcnow()}
tag_id, {LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
)
else:
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Creating tag %s with extra %s", tag_id, extra_kwargs)
await storage_collection.async_create_item(
{TAG_ID: tag_id, LAST_SCANNED: dt_util.utcnow()}
{TAG_ID: tag_id, LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
)
_LOGGER.debug("Tag: %s scanned by device: %s", tag_id, device_id)
class TagEntity(Entity):
"""Representation of a Tag entity."""
_unrecorded_attributes = frozenset({TAG_ID})
_attr_translation_key = DOMAIN
_attr_should_poll = False
def __init__(
self,
hass: HomeAssistant,
name: str,
tag_id: str,
last_scanned: str | None,
device_id: str | None,
) -> None:
"""Initialize the Tag event."""
self.hass = hass
self._attr_name = name
self._tag_id = tag_id
self._attr_unique_id = tag_id
self._last_device_id: str | None = device_id
self._last_scanned = last_scanned
@callback
def async_handle_event(
self, device_id: str | None, last_scanned: str | None
) -> None:
"""Handle the Tag scan event."""
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"Tag %s scanned by device %s at %s, last scanned at %s",
self._tag_id,
device_id,
last_scanned,
self._last_scanned,
)
self._last_device_id = device_id
self._last_scanned = last_scanned
self.async_write_ha_state()
@property
@final
def state(self) -> str | None:
"""Return the entity state."""
if (
not self._last_scanned
or (last_scanned := dt_util.parse_datetime(self._last_scanned)) is None
):
return None
return last_scanned.isoformat(timespec="milliseconds")
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the sun."""
return {TAG_ID: self._tag_id, LAST_SCANNED_BY_DEVICE_ID: self._last_device_id}
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
await super().async_added_to_hass()
self.async_on_remove(
async_dispatcher_connect(
self.hass,
SIGNAL_TAG_CHANGED,
self.async_handle_event,
)
)

View File

@ -1,6 +1,10 @@
"""Constants for the Tag integration."""
import logging
DEVICE_ID = "device_id"
DOMAIN = "tag"
EVENT_TAG_SCANNED = "tag_scanned"
TAG_ID = "tag_id"
DEFAULT_NAME = "Tag"
LOGGER = logging.getLogger(__package__)

View File

@ -0,0 +1,9 @@
{
"entity": {
"tag": {
"tag": {
"default": "mdi:tag-outline"
}
}
}
}

View File

@ -1,3 +1,17 @@
{
"title": "Tag"
"title": "Tag",
"entity": {
"tag": {
"tag": {
"state_attributes": {
"tag_id": {
"name": "Tag ID"
},
"last_scanned_by_device_id": {
"name": "Last scanned by device ID"
}
}
}
}
}
}

View File

@ -1 +1,5 @@
"""Tests for the Tag integration."""
TEST_TAG_ID = "test tag id"
TEST_TAG_NAME = "test tag name"
TEST_DEVICE_ID = "device id"

View File

@ -0,0 +1,28 @@
# serializer version: 1
# name: test_migration
dict({
'data': dict({
'items': list([
dict({
'id': 'test tag id',
'migrated': True,
'name': 'test tag name',
'tag_id': 'test tag id',
}),
dict({
'device_id': 'some_scanner',
'id': 'new tag',
'last_scanned': '2024-02-29T13:00:00+00:00',
'tag_id': 'new tag',
}),
dict({
'id': '1234567890',
'tag_id': '1234567890',
}),
]),
}),
'key': 'tag',
'minor_version': 2,
'version': 1,
})
# ---

View File

@ -4,18 +4,16 @@ from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.tag import DOMAIN, EVENT_TAG_SCANNED, async_scan_tag
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import TEST_DEVICE_ID, TEST_TAG_ID, TEST_TAG_NAME
from tests.common import async_capture_events
from tests.typing import WebSocketGenerator
TEST_TAG_ID = "test tag id"
TEST_TAG_NAME = "test tag name"
TEST_DEVICE_ID = "device id"
@pytest.fixture
def storage_setup_named_tag(
@ -29,10 +27,21 @@ def storage_setup_named_tag(
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"data": {"items": [{"id": TEST_TAG_ID, CONF_NAME: TEST_TAG_NAME}]},
"minor_version": 2,
"data": {
"items": [
{
"id": TEST_TAG_ID,
"tag_id": TEST_TAG_ID,
}
]
},
}
else:
hass_storage[DOMAIN] = items
entity_registry = er.async_get(hass)
entry = entity_registry.async_get_or_create(DOMAIN, DOMAIN, TEST_TAG_ID)
entity_registry.async_update_entity(entry.entity_id, name=TEST_TAG_NAME)
config = {DOMAIN: {}}
return await async_setup_component(hass, DOMAIN, config)
@ -75,7 +84,8 @@ def storage_setup_unnamed_tag(hass: HomeAssistant, hass_storage):
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"data": {"items": [{"id": TEST_TAG_ID}]},
"minor_version": 2,
"data": {"items": [{"id": TEST_TAG_ID, "tag_id": TEST_TAG_ID}]},
}
else:
hass_storage[DOMAIN] = items
@ -107,6 +117,6 @@ async def test_unnamed_tag_scanned_event(
event = events[0]
event_data = event.data
assert event_data["name"] is None
assert event_data["name"] == "Tag test tag id"
assert event_data["device_id"] == TEST_DEVICE_ID
assert event_data["tag_id"] == TEST_TAG_ID

View File

@ -1,14 +1,21 @@
"""Tests for the tag component."""
import logging
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy import SnapshotAssertion
from homeassistant.components.tag import DOMAIN, async_scan_tag
from homeassistant.components.tag import DOMAIN, _create_entry, async_scan_tag
from homeassistant.const import CONF_NAME, STATE_UNKNOWN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import collection
from homeassistant.helpers import collection, entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import TEST_DEVICE_ID, TEST_TAG_ID, TEST_TAG_NAME
from tests.common import async_fire_time_changed
from tests.typing import WebSocketGenerator
@ -21,7 +28,45 @@ def storage_setup(hass: HomeAssistant, hass_storage):
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"data": {"items": [{"id": "test tag"}]},
"minor_version": 2,
"data": {
"items": [
{
"id": TEST_TAG_ID,
"tag_id": TEST_TAG_ID,
}
]
},
}
else:
hass_storage[DOMAIN] = items
entity_registry = er.async_get(hass)
_create_entry(entity_registry, TEST_TAG_ID, TEST_TAG_NAME)
config = {DOMAIN: {}}
return await async_setup_component(hass, DOMAIN, config)
return _storage
@pytest.fixture
def storage_setup_1_1(hass: HomeAssistant, hass_storage):
"""Storage version 1.1 setup."""
async def _storage(items=None):
if items is None:
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"minor_version": 1,
"data": {
"items": [
{
"id": TEST_TAG_ID,
"tag_id": TEST_TAG_ID,
CONF_NAME: TEST_TAG_NAME,
}
]
},
}
else:
hass_storage[DOMAIN] = items
@ -31,6 +76,49 @@ def storage_setup(hass: HomeAssistant, hass_storage):
return _storage
async def test_migration(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
storage_setup_1_1,
freezer: FrozenDateTimeFactory,
hass_storage,
snapshot: SnapshotAssertion,
) -> None:
"""Test migrating tag store."""
assert await storage_setup_1_1()
client = await hass_ws_client(hass)
freezer.move_to("2024-02-29 13:00")
await client.send_json_auto_id({"type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
assert resp["result"] == [
{"id": TEST_TAG_ID, "name": "test tag name", "tag_id": TEST_TAG_ID}
]
# Scan a new tag
await async_scan_tag(hass, "new tag", "some_scanner")
# Add a new tag through WS
await client.send_json_auto_id(
{
"type": f"{DOMAIN}/create",
"tag_id": "1234567890",
"name": "Kitchen tag",
}
)
resp = await client.receive_json()
assert resp["success"]
# Trigger store
freezer.tick(11)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass_storage[DOMAIN] == snapshot
async def test_ws_list(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, storage_setup
) -> None:
@ -39,14 +127,12 @@ async def test_ws_list(
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": f"{DOMAIN}/list"})
await client.send_json_auto_id({"type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 1
assert "test tag" in result
assert resp["result"] == [
{"id": TEST_TAG_ID, "name": "test tag name", "tag_id": TEST_TAG_ID}
]
async def test_ws_update(
@ -58,21 +144,17 @@ async def test_ws_update(
client = await hass_ws_client(hass)
await client.send_json(
await client.send_json_auto_id(
{
"id": 6,
"type": f"{DOMAIN}/update",
f"{DOMAIN}_id": "test tag",
f"{DOMAIN}_id": TEST_TAG_ID,
"name": "New name",
}
)
resp = await client.receive_json()
assert resp["success"]
item = resp["result"]
assert item["id"] == "test tag"
assert item["name"] == "New name"
assert item == {"id": TEST_TAG_ID, "name": "New name", "tag_id": TEST_TAG_ID}
async def test_tag_scanned(
@ -86,29 +168,37 @@ async def test_tag_scanned(
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": f"{DOMAIN}/list"})
await client.send_json_auto_id({"type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 1
assert "test tag" in result
assert resp["result"] == [
{"id": TEST_TAG_ID, "name": "test tag name", "tag_id": TEST_TAG_ID}
]
now = dt_util.utcnow()
freezer.move_to(now)
await async_scan_tag(hass, "new tag", "some_scanner")
await client.send_json({"id": 7, "type": f"{DOMAIN}/list"})
await client.send_json_auto_id({"type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 2
assert "test tag" in result
assert "new tag" in result
assert result["new tag"]["last_scanned"] == now.isoformat()
assert resp["result"] == [
{"id": TEST_TAG_ID, "name": "test tag name", "tag_id": TEST_TAG_ID},
{
"device_id": "some_scanner",
"id": "new tag",
"last_scanned": now.isoformat(),
"name": "Tag new tag",
"tag_id": "new tag",
},
]
def track_changes(coll: collection.ObservableCollection):
@ -131,8 +221,93 @@ async def test_tag_id_exists(
changes = track_changes(hass.data[DOMAIN])
client = await hass_ws_client(hass)
await client.send_json({"id": 2, "type": f"{DOMAIN}/create", "tag_id": "test tag"})
await client.send_json_auto_id({"type": f"{DOMAIN}/create", "tag_id": TEST_TAG_ID})
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "home_assistant_error"
assert len(changes) == 0
async def test_entity(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
freezer: FrozenDateTimeFactory,
storage_setup,
) -> None:
"""Test tag entity."""
assert await storage_setup()
await hass_ws_client(hass)
entity = hass.states.get("tag.test_tag_name")
assert entity
assert entity.state == STATE_UNKNOWN
now = dt_util.utcnow()
freezer.move_to(now)
await async_scan_tag(hass, TEST_TAG_ID, TEST_DEVICE_ID)
entity = hass.states.get("tag.test_tag_name")
assert entity
assert entity.state == now.isoformat(timespec="milliseconds")
assert entity.attributes == {
"tag_id": "test tag id",
"last_scanned_by_device_id": "device id",
"friendly_name": "test tag name",
}
async def test_entity_created_and_removed(
caplog: pytest.LogCaptureFixture,
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
freezer: FrozenDateTimeFactory,
storage_setup,
entity_registry: er.EntityRegistry,
) -> None:
"""Test tag entity created and removed."""
caplog.at_level(logging.DEBUG)
assert await storage_setup()
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": f"{DOMAIN}/create",
"tag_id": "1234567890",
"name": "Kitchen tag",
}
)
resp = await client.receive_json()
assert resp["success"]
item = resp["result"]
assert item["id"] == "1234567890"
assert item["name"] == "Kitchen tag"
entity = hass.states.get("tag.kitchen_tag")
assert entity
assert entity.state == STATE_UNKNOWN
entity_id = entity.entity_id
assert entity_registry.async_get(entity_id)
now = dt_util.utcnow()
freezer.move_to(now)
await async_scan_tag(hass, "1234567890", TEST_DEVICE_ID)
entity = hass.states.get("tag.kitchen_tag")
assert entity
assert entity.state == now.isoformat(timespec="milliseconds")
await client.send_json_auto_id(
{
"type": f"{DOMAIN}/delete",
"tag_id": "1234567890",
}
)
resp = await client.receive_json()
assert resp["success"]
entity = hass.states.get("tag.kitchen_tag")
assert not entity
assert not entity_registry.async_get(entity_id)

View File

@ -26,7 +26,8 @@ def tag_setup(hass: HomeAssistant, hass_storage):
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"data": {"items": [{"id": "test tag"}]},
"minor_version": 2,
"data": {"items": [{"id": "test tag", "tag_id": "test tag"}]},
}
else:
hass_storage[DOMAIN] = items