core/tests/components/ozw/conftest.py

91 lines
2.8 KiB
Python
Raw Normal View History

2020-05-03 00:54:16 +00:00
"""Helpers for tests."""
import json
import pytest
from .common import MQTTMessage
from tests.async_mock import patch
from tests.common import load_fixture
@pytest.fixture(name="generic_data", scope="session")
def generic_data_fixture():
"""Load generic MQTT data and return it."""
2020-05-14 20:56:04 +00:00
return load_fixture("ozw/generic_network_dump.csv")
2020-05-03 00:54:16 +00:00
2020-05-07 21:52:54 +00:00
@pytest.fixture(name="light_data", scope="session")
def light_data_fixture():
"""Load light dimmer MQTT data and return it."""
2020-05-14 20:56:04 +00:00
return load_fixture("ozw/light_network_dump.csv")
2020-05-07 21:52:54 +00:00
2020-05-03 00:54:16 +00:00
@pytest.fixture(name="sent_messages")
def sent_messages_fixture():
"""Fixture to capture sent messages."""
sent_messages = []
with patch(
"homeassistant.components.mqtt.async_publish",
side_effect=lambda hass, topic, payload: sent_messages.append(
{"topic": topic, "payload": json.loads(payload)}
),
):
yield sent_messages
2020-05-07 21:52:54 +00:00
@pytest.fixture(name="light_msg")
async def light_msg_fixture(hass):
"""Return a mock MQTT msg with a light actuator message."""
light_json = json.loads(
2020-05-14 20:56:04 +00:00
await hass.async_add_executor_job(load_fixture, "ozw/light.json")
2020-05-07 21:52:54 +00:00
)
message = MQTTMessage(topic=light_json["topic"], payload=light_json["payload"])
message.encode()
return message
2020-05-03 00:54:16 +00:00
@pytest.fixture(name="switch_msg")
async def switch_msg_fixture(hass):
"""Return a mock MQTT msg with a switch actuator message."""
switch_json = json.loads(
2020-05-14 20:56:04 +00:00
await hass.async_add_executor_job(load_fixture, "ozw/switch.json")
2020-05-03 00:54:16 +00:00
)
message = MQTTMessage(topic=switch_json["topic"], payload=switch_json["payload"])
message.encode()
return message
@pytest.fixture(name="sensor_msg")
async def sensor_msg_fixture(hass):
"""Return a mock MQTT msg with a sensor change message."""
sensor_json = json.loads(
2020-05-14 20:56:04 +00:00
await hass.async_add_executor_job(load_fixture, "ozw/sensor.json")
)
message = MQTTMessage(topic=sensor_json["topic"], payload=sensor_json["payload"])
message.encode()
return message
@pytest.fixture(name="binary_sensor_msg")
async def binary_sensor_msg_fixture(hass):
"""Return a mock MQTT msg with a binary_sensor change message."""
sensor_json = json.loads(
2020-05-14 20:56:04 +00:00
await hass.async_add_executor_job(load_fixture, "ozw/binary_sensor.json")
)
message = MQTTMessage(topic=sensor_json["topic"], payload=sensor_json["payload"])
message.encode()
return message
@pytest.fixture(name="binary_sensor_alt_msg")
async def binary_sensor_alt_msg_fixture(hass):
"""Return a mock MQTT msg with a binary_sensor change message."""
sensor_json = json.loads(
2020-05-14 20:56:04 +00:00
await hass.async_add_executor_job(load_fixture, "ozw/binary_sensor_alt.json")
)
message = MQTTMessage(topic=sensor_json["topic"], payload=sensor_json["payload"])
message.encode()
return message