core/tests/components/nest/test_events.py

295 lines
8.8 KiB
Python

"""Test for Nest binary sensor platform for the Smart Device Management API.
These tests fake out the subscriber/devicemanager, and are not using a real
pubsub subscriber.
"""
from google_nest_sdm.device import Device
from google_nest_sdm.event import EventMessage
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.util.dt import utcnow
from .common import async_setup_sdm_platform
from tests.common import async_capture_events
DOMAIN = "nest"
DEVICE_ID = "some-device-id"
PLATFORM = "camera"
NEST_EVENT = "nest_event"
EVENT_SESSION_ID = "CjY5Y3VKaTZwR3o4Y19YbTVfMF..."
EVENT_ID = "FWWVQVUdGNUlTU2V4MGV2aTNXV..."
async def async_setup_devices(hass, device_type, traits={}):
"""Set up the platform and prerequisites."""
devices = {
DEVICE_ID: Device.MakeDevice(
{
"name": DEVICE_ID,
"type": device_type,
"traits": traits,
},
auth=None,
),
}
return await async_setup_sdm_platform(hass, PLATFORM, devices=devices)
def create_device_traits(event_trait):
"""Create fake traits for a device."""
return {
"sdm.devices.traits.Info": {
"customName": "Front",
},
event_trait: {},
"sdm.devices.traits.CameraLiveStream": {
"maxVideoResolution": {
"width": 640,
"height": 480,
},
"videoCodecs": ["H264"],
"audioCodecs": ["AAC"],
},
}
def create_event(event_type, device_id=DEVICE_ID, timestamp=None):
"""Create an EventMessage for a single event type."""
events = {
event_type: {
"eventSessionId": EVENT_SESSION_ID,
"eventId": EVENT_ID,
},
}
return create_events(events=events, device_id=device_id)
def create_events(events, device_id=DEVICE_ID, timestamp=None):
"""Create an EventMessage for events."""
if not timestamp:
timestamp = utcnow()
return EventMessage(
{
"eventId": "some-event-id",
"timestamp": timestamp.isoformat(timespec="seconds"),
"resourceUpdate": {
"name": device_id,
"events": events,
},
},
auth=None,
)
async def test_doorbell_chime_event(hass):
"""Test a pubsub message for a doorbell event."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.DoorbellChime"),
)
registry = er.async_get(hass)
entry = registry.async_get("camera.front")
assert entry is not None
assert entry.unique_id == "some-device-id-camera"
assert entry.original_name == "Front"
assert entry.domain == "camera"
device_registry = dr.async_get(hass)
device = device_registry.async_get(entry.device_id)
assert device.name == "Front"
assert device.model == "Doorbell"
assert device.identifiers == {("nest", DEVICE_ID)}
timestamp = utcnow()
await subscriber.async_receive_event(
create_event("sdm.devices.events.DoorbellChime.Chime", timestamp=timestamp)
)
await hass.async_block_till_done()
event_time = timestamp.replace(microsecond=0)
assert len(events) == 1
assert events[0].data == {
"device_id": entry.device_id,
"type": "doorbell_chime",
"timestamp": event_time,
}
async def test_camera_motion_event(hass):
"""Test a pubsub message for a camera motion event."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.CAMERA",
create_device_traits("sdm.devices.traits.CameraMotion"),
)
registry = er.async_get(hass)
entry = registry.async_get("camera.front")
assert entry is not None
timestamp = utcnow()
await subscriber.async_receive_event(
create_event("sdm.devices.events.CameraMotion.Motion", timestamp=timestamp)
)
await hass.async_block_till_done()
event_time = timestamp.replace(microsecond=0)
assert len(events) == 1
assert events[0].data == {
"device_id": entry.device_id,
"type": "camera_motion",
"timestamp": event_time,
}
async def test_camera_sound_event(hass):
"""Test a pubsub message for a camera sound event."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.CAMERA",
create_device_traits("sdm.devices.traits.CameraSound"),
)
registry = er.async_get(hass)
entry = registry.async_get("camera.front")
assert entry is not None
timestamp = utcnow()
await subscriber.async_receive_event(
create_event("sdm.devices.events.CameraSound.Sound", timestamp=timestamp)
)
await hass.async_block_till_done()
event_time = timestamp.replace(microsecond=0)
assert len(events) == 1
assert events[0].data == {
"device_id": entry.device_id,
"type": "camera_sound",
"timestamp": event_time,
}
async def test_camera_person_event(hass):
"""Test a pubsub message for a camera person event."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.CameraEventImage"),
)
registry = er.async_get(hass)
entry = registry.async_get("camera.front")
assert entry is not None
timestamp = utcnow()
await subscriber.async_receive_event(
create_event("sdm.devices.events.CameraPerson.Person", timestamp=timestamp)
)
await hass.async_block_till_done()
event_time = timestamp.replace(microsecond=0)
assert len(events) == 1
assert events[0].data == {
"device_id": entry.device_id,
"type": "camera_person",
"timestamp": event_time,
}
async def test_camera_multiple_event(hass):
"""Test a pubsub message for a camera person event."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.CameraEventImage"),
)
registry = er.async_get(hass)
entry = registry.async_get("camera.front")
assert entry is not None
event_map = {
"sdm.devices.events.CameraMotion.Motion": {
"eventSessionId": EVENT_SESSION_ID,
"eventId": EVENT_ID,
},
"sdm.devices.events.CameraPerson.Person": {
"eventSessionId": EVENT_SESSION_ID,
"eventId": EVENT_ID,
},
}
timestamp = utcnow()
await subscriber.async_receive_event(create_events(event_map, timestamp=timestamp))
await hass.async_block_till_done()
event_time = timestamp.replace(microsecond=0)
assert len(events) == 2
assert events[0].data == {
"device_id": entry.device_id,
"type": "camera_motion",
"timestamp": event_time,
}
assert events[1].data == {
"device_id": entry.device_id,
"type": "camera_person",
"timestamp": event_time,
}
async def test_unknown_event(hass):
"""Test a pubsub message for an unknown event type."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.DoorbellChime"),
)
await subscriber.async_receive_event(create_event("some-event-id"))
await hass.async_block_till_done()
assert len(events) == 0
async def test_unknown_device_id(hass):
"""Test a pubsub message for an unknown event type."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.DoorbellChime"),
)
await subscriber.async_receive_event(
create_event("sdm.devices.events.DoorbellChime.Chime", "invalid-device-id")
)
await hass.async_block_till_done()
assert len(events) == 0
async def test_event_message_without_device_event(hass):
"""Test a pubsub message for an unknown event type."""
events = async_capture_events(hass, NEST_EVENT)
subscriber = await async_setup_devices(
hass,
"sdm.devices.types.DOORBELL",
create_device_traits("sdm.devices.traits.DoorbellChime"),
)
timestamp = utcnow()
event = EventMessage(
{
"eventId": "some-event-id",
"timestamp": timestamp.isoformat(timespec="seconds"),
},
auth=None,
)
await subscriber.async_receive_event(event)
await hass.async_block_till_done()
assert len(events) == 0