Implement imap_content event for imap integration (#90242)

pull/90419/head
Jan Bouwhuis 2023-03-28 21:02:43 +02:00 committed by GitHub
parent d21433b6af
commit 24d0d15f38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 671 additions and 9 deletions

View File

@ -518,9 +518,6 @@ omit =
homeassistant/components/ifttt/alarm_control_panel.py
homeassistant/components/iglo/light.py
homeassistant/components/ihc/*
homeassistant/components/imap/__init__.py
homeassistant/components/imap/coordinator.py
homeassistant/components/imap/sensor.py
homeassistant/components/imap_email_content/sensor.py
homeassistant/components/incomfort/*
homeassistant/components/insteon/binary_sensor.py

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
import email
import logging
from typing import Any
@ -11,7 +12,12 @@ from aioimaplib import AUTH, IMAP4_SSL, SELECTED, AioImapException
import async_timeout
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import CONF_PASSWORD, CONF_PORT, CONF_USERNAME
from homeassistant.const import (
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
CONTENT_TYPE_TEXT_PLAIN,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@ -23,6 +29,8 @@ _LOGGER = logging.getLogger(__name__)
BACKOFF_TIME = 10
EVENT_IMAP = "imap_content"
async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
"""Connect to imap server and return client."""
@ -37,6 +45,70 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
return client
class ImapMessage:
"""Class to parse an RFC822 email message."""
def __init__(self, raw_message: bytes) -> None:
"""Initialize IMAP message."""
self.email_message = email.message_from_bytes(raw_message)
@property
def headers(self) -> dict[str, tuple[str,]]:
"""Get the email headers."""
header_base: dict[str, tuple[str,]] = {}
for key, value in self.email_message.items():
header: tuple[str,] = (str(value),)
if header_base.setdefault(key, header) != header:
header_base[key] += header # type: ignore[assignment]
return header_base
@property
def sender(self) -> str:
"""Get the parsed message sender from the email."""
return str(email.utils.parseaddr(self.email_message["From"])[1])
@property
def subject(self) -> str:
"""Decode the message subject."""
decoded_header = email.header.decode_header(self.email_message["Subject"])
header = email.header.make_header(decoded_header)
return str(header)
@property
def text(self) -> str:
"""Get the message text from the email.
Will look for text/plain or use text/html if not found.
"""
message_text = None
message_html = None
message_untyped_text = None
for part in self.email_message.walk():
if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
if message_text is None:
message_text = part.get_payload()
elif part.get_content_type() == "text/html":
if message_html is None:
message_html = part.get_payload()
elif (
part.get_content_type().startswith("text")
and message_untyped_text is None
):
message_untyped_text = part.get_payload()
if message_text is not None:
return message_text
if message_html is not None:
return message_html
if message_untyped_text is not None:
return message_untyped_text
return self.email_message.get_payload()
class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
"""Base class for imap client."""
@ -50,6 +122,7 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
) -> None:
"""Initiate imap client."""
self.imap_client = imap_client
self._last_message_id: str | None = None
super().__init__(
hass,
_LOGGER,
@ -65,8 +138,30 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
if self.imap_client is None:
self.imap_client = await connect_to_server(self.config_entry.data)
async def _async_process_event(self, last_message_id: str) -> None:
"""Send a event for the last message if the last message was changed."""
response = await self.imap_client.fetch(last_message_id, "BODY.PEEK[]")
if response.result == "OK":
message = ImapMessage(response.lines[1])
data = {
"server": self.config_entry.data[CONF_SERVER],
"username": self.config_entry.data[CONF_USERNAME],
"search": self.config_entry.data[CONF_SEARCH],
"folder": self.config_entry.data[CONF_FOLDER],
"text": message.text,
"sender": message.sender,
"subject": message.subject,
"headers": message.headers,
}
self.hass.bus.fire(EVENT_IMAP, data)
_LOGGER.debug(
"Message processed, sender: %s, subject: %s",
message.sender,
message.subject,
)
async def _async_fetch_number_of_messages(self) -> int | None:
"""Fetch number of messages."""
"""Fetch last message and messages count."""
await self._async_reconnect_if_needed()
await self.imap_client.noop()
result, lines = await self.imap_client.search(
@ -77,7 +172,17 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
raise UpdateFailed(
f"Invalid response for search '{self.config_entry.data[CONF_SEARCH]}': {result} / {lines[0]}"
)
return len(lines[0].split())
count: int = len(message_ids := lines[0].split())
last_message_id = (
str(message_ids[-1:][0], encoding=self.config_entry.data[CONF_CHARSET])
if count
else None
)
if count and last_message_id is not None:
self._last_message_id = last_message_id
await self._async_process_event(last_message_id)
return count
async def _cleanup(self, log_error: bool = False) -> None:
"""Close resources."""

View File

@ -1,9 +1,13 @@
"""Test the iamp config flow."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
"""Fixtures for imap tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
from aioimaplib import AUTH, LOGOUT, NONAUTH, SELECTED, STARTED, Response
import pytest
from .const import EMPTY_SEARCH_RESPONSE, TEST_FETCH_RESPONSE_TEXT_PLAIN
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock, None, None]:
@ -12,3 +16,97 @@ def mock_setup_entry() -> Generator[AsyncMock, None, None]:
"homeassistant.components.imap.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry
@pytest.fixture
def imap_has_capability() -> bool:
"""Fixture to set the imap capabilities."""
return True
@pytest.fixture
def imap_login_state() -> str:
"""Fixture to set the imap state after login."""
return AUTH
@pytest.fixture
def imap_select_state() -> str:
"""Fixture to set the imap capabilities."""
return SELECTED
@pytest.fixture
def imap_search() -> tuple[str, list[bytes]]:
"""Fixture to set the imap search response."""
return EMPTY_SEARCH_RESPONSE
@pytest.fixture
def imap_fetch() -> tuple[str, list[bytes | bytearray]]:
"""Fixture to set the imap fetch response."""
return TEST_FETCH_RESPONSE_TEXT_PLAIN
@pytest.fixture
def imap_pending_idle() -> bool:
"""Fixture to set the imap pending idle feature."""
return True
@pytest.fixture
async def mock_imap_protocol(
imap_search: tuple[str, list[bytes]],
imap_fetch: tuple[str, list[bytes | bytearray]],
imap_has_capability: bool,
imap_pending_idle: bool,
imap_login_state: str,
imap_select_state: str,
) -> Generator[MagicMock, None]:
"""Mock the aioimaplib IMAP protocol handler."""
with patch(
"homeassistant.components.imap.coordinator.IMAP4_SSL", autospec=True
) as imap_mock:
imap_mock = imap_mock.return_value
async def login(user: str, password: str) -> Response:
"""Mock imap login."""
imap_mock.protocol.state = imap_login_state
if imap_login_state != AUTH:
return Response("BAD", [])
return Response("OK", [b"CAPABILITY IMAP4rev1 ...", b"Logged in"])
async def close() -> Response:
"""Mock imap close the selected folder."""
imap_mock.protocol.state = imap_login_state
return Response("OK", [])
async def logout() -> Response:
"""Mock imap logout."""
imap_mock.protocol.state = LOGOUT
return Response("OK", [])
async def select(mailbox: str = "INBOX") -> Response:
"""Mock imap folder select."""
imap_mock.protocol.state = imap_select_state
if imap_login_state != SELECTED:
return Response("BAD", [])
return Response("OK", [])
async def wait_hello_from_server() -> None:
"""Mock wait for hello."""
imap_mock.protocol.state = NONAUTH
imap_mock.has_pending_idle.return_value = imap_pending_idle
imap_mock.protocol = MagicMock()
imap_mock.protocol.state = STARTED
imap_mock.has_capability.return_value = imap_has_capability
imap_mock.login.side_effect = login
imap_mock.close.side_effect = close
imap_mock.logout.side_effect = logout
imap_mock.select.side_effect = select
imap_mock.search.return_value = Response(*imap_search)
imap_mock.fetch.return_value = Response(*imap_fetch)
imap_mock.wait_hello_from_server.side_effect = wait_hello_from_server
yield imap_mock

View File

@ -0,0 +1,139 @@
"""Constants for tests imap integration."""
TEST_MESSAGE = (
b"Return-Path: <john.doe@example.com>\r\nDelivered-To: notify@example.com\r\n"
b"Received: from beta.example.com\r\n\tby beta with LMTP\r\n\t"
b"id eLp2M/GcHWQTLxQAho4UZQ\r\n\t(envelope-from <john.doe@example.com>)\r\n\t"
b"for <notify@example.com>; Fri, 24 Mar 2023 13:52:01 +0100\r\n"
b"Received: from localhost (localhost [127.0.0.1])\r\n\t"
b"by beta.example.com (Postfix) with ESMTP id D0FFA61425\r\n\t"
b"for <notify@example.com>; Fri, 24 Mar 2023 13:52:01 +0100 (CET)\r\n"
b"Date: Fri, 24 Mar 2023 13:52:00 +0100\r\n"
b"MIME-Version: 1.0\r\n"
b"To: notify@example.com\r\n"
b"From: John Doe <john.doe@example.com>\r\n"
b"Subject: Test subject\r\n"
)
TEST_CONTENT_TEXT_BARE = b"\r\n" b"Test body\r\n" b"\r\n"
TEST_CONTENT_BINARY = (
b"Content-Type: application/binary\r\n"
b"Content-Transfer-Encoding: base64\r\n"
b"\r\n"
b"VGVzdCBib2R5\r\n"
)
TEST_CONTENT_TEXT_PLAIN = (
b"Content-Type: text/plain; charset=UTF-8; format=flowed\r\n"
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n"
)
TEST_CONTENT_TEXT_OTHER = (
b"Content-Type: text/other; charset=UTF-8\r\n"
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n"
)
TEST_CONTENT_HTML = (
b"Content-Type: text/html; charset=UTF-8\r\n"
b"Content-Transfer-Encoding: 7bit\r\n"
b"\r\n"
b"<html>\r\n"
b" <head>\r\n"
b' <meta http-equiv="content-type" content="text/html; charset=UTF-8">\r\n'
b" </head>\r\n"
b" <body>\r\n"
b" <p>Test body<br>\r\n"
b" </p>\r\n"
b" </body>\r\n"
b"</html>\r\n"
b"\r\n"
)
TEST_CONTENT_MULTIPART = (
b"\r\nThis is a multi-part message in MIME format.\r\n"
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n"
+ TEST_CONTENT_TEXT_PLAIN
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n"
+ TEST_CONTENT_HTML
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI--\r\n"
)
EMPTY_SEARCH_RESPONSE = ("OK", [b"", b"Search completed (0.0001 + 0.000 secs)."])
BAD_RESPONSE = ("BAD", [b"", b"Unexpected error"])
TEST_SEARCH_RESPONSE = ("OK", [b"1", b"Search completed (0.0001 + 0.000 secs)."])
TEST_FETCH_RESPONSE_TEXT_BARE = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_TEXT_BARE)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_TEXT_BARE),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_TEXT_PLAIN = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_TEXT_PLAIN)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_TEXT_PLAIN),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_TEXT_OTHER = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_TEXT_OTHER)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_TEXT_OTHER),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_BINARY = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_BINARY)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_BINARY),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_HTML = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_HTML)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_HTML),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_MULTIPART = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_MULTIPART)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_MULTIPART),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
RESPONSE_BAD = ("BAD", [])

View File

@ -0,0 +1,323 @@
"""Test the imap entry initialization."""
import asyncio
from datetime import timedelta
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from aioimaplib import AUTH, NONAUTH, SELECTED, AioImapException, Response
import pytest
from homeassistant.components.imap import DOMAIN
from homeassistant.components.imap.errors import InvalidAuth, InvalidFolder
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.util.dt import utcnow
from .const import (
BAD_RESPONSE,
TEST_FETCH_RESPONSE_BINARY,
TEST_FETCH_RESPONSE_HTML,
TEST_FETCH_RESPONSE_MULTIPART,
TEST_FETCH_RESPONSE_TEXT_BARE,
TEST_FETCH_RESPONSE_TEXT_OTHER,
TEST_FETCH_RESPONSE_TEXT_PLAIN,
TEST_SEARCH_RESPONSE,
)
from .test_config_flow import MOCK_CONFIG
from tests.common import MockConfigEntry, async_capture_events, async_fire_time_changed
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_entry_startup_and_unload(
hass: HomeAssistant, mock_imap_protocol: MagicMock
) -> None:
"""Test imap entry startup and unload with push and polling coordinator."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert await config_entry.async_unload(hass)
@pytest.mark.parametrize(
"effect",
[
InvalidAuth,
InvalidFolder,
asyncio.TimeoutError,
],
)
async def test_entry_startup_fails(
hass: HomeAssistant,
mock_imap_protocol: MagicMock,
effect: Exception,
) -> None:
"""Test imap entry startup fails on invalid auth or folder."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.imap.connect_to_server",
side_effect=effect,
):
assert await hass.config_entries.async_setup(config_entry.entry_id) is False
@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE])
@pytest.mark.parametrize(
"imap_fetch",
[
TEST_FETCH_RESPONSE_TEXT_BARE,
TEST_FETCH_RESPONSE_TEXT_PLAIN,
TEST_FETCH_RESPONSE_TEXT_OTHER,
TEST_FETCH_RESPONSE_HTML,
TEST_FETCH_RESPONSE_MULTIPART,
TEST_FETCH_RESPONSE_BINARY,
],
ids=["bare", "plain", "other", "html", "multipart", "binary"],
)
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_receiving_message_successfully(
hass: HomeAssistant, mock_imap_protocol: MagicMock
) -> None:
"""Test receiving a message successfully."""
event_called = async_capture_events(hass, "imap_content")
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
# we should have received one message
assert state is not None
assert state.state == "1"
# we should have received one event
assert len(event_called) == 1
data: dict[str, Any] = event_called[0].data
assert data["server"] == "imap.server.com"
assert data["username"] == "email@email.com"
assert data["search"] == "UnSeen UnDeleted"
assert data["folder"] == "INBOX"
assert data["sender"] == "john.doe@example.com"
assert data["subject"] == "Test subject"
assert data["text"]
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
@pytest.mark.parametrize(
("imap_login_state", "success"), [(AUTH, True), (NONAUTH, False)]
)
async def test_initial_authentication_error(
hass: HomeAssistant, mock_imap_protocol: MagicMock, success: bool
) -> None:
"""Test authentication error when starting the entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id) == success
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
assert (state is not None) == success
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
@pytest.mark.parametrize(
("imap_select_state", "success"), [(AUTH, False), (SELECTED, True)]
)
async def test_initial_invalid_folder_error(
hass: HomeAssistant, mock_imap_protocol: MagicMock, success: bool
) -> None:
"""Test invalid folder error when starting the entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id) == success
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
assert (state is not None) == success
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_late_authentication_error(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mock_imap_protocol: MagicMock,
) -> None:
"""Test authentication error handling after a search was failed."""
# Mock an error in waiting for a pushed update
mock_imap_protocol.wait_server_push.side_effect = AioImapException(
"Something went wrong"
)
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
# Mock that the search fails, this will trigger
# that the connection will be restarted
# Then fail selecting the folder
mock_imap_protocol.search.return_value = Response(*BAD_RESPONSE)
mock_imap_protocol.login.side_effect = Response(*BAD_RESPONSE)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
assert "Username or password incorrect, starting reauthentication" in caplog.text
# we still should have an entity with an unavailable state
state = hass.states.get("sensor.imap_email_email_com")
assert state is not None
assert state.state == STATE_UNAVAILABLE
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_late_folder_error(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mock_imap_protocol: MagicMock,
) -> None:
"""Test invalid folder error handling after a search was failed.
Asserting the IMAP push coordinator.
"""
# Mock an error in waiting for a pushed update
mock_imap_protocol.wait_server_push.side_effect = AioImapException(
"Something went wrong"
)
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had at least one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
# Mock that the search fails, this will trigger
# that the connection will be restarted
# Then fail selecting the folder
mock_imap_protocol.search.return_value = Response(*BAD_RESPONSE)
mock_imap_protocol.select.side_effect = Response(*BAD_RESPONSE)
# Make sure we have had at least one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=60))
await hass.async_block_till_done()
assert "Selected mailbox folder is invalid" in caplog.text
# we still should have an entity with an unavailable state
state = hass.states.get("sensor.imap_email_email_com")
assert state is not None
assert state.state == STATE_UNAVAILABLE
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
@pytest.mark.parametrize(
"imap_close",
[
AsyncMock(side_effect=AioImapException("Something went wrong")),
AsyncMock(side_effect=asyncio.TimeoutError),
],
ids=["AioImapException", "TimeoutError"],
)
async def test_handle_cleanup_exception(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mock_imap_protocol: MagicMock,
imap_close: Exception,
) -> None:
"""Test handling an excepton during cleaning up."""
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
# we should have an entity
assert state is not None
assert state.state == "0"
# Fail cleaning up
mock_imap_protocol.close.side_effect = imap_close
assert await config_entry.async_unload(hass)
await hass.async_block_till_done()
assert "Error while cleaning up imap connection" in caplog.text
state = hass.states.get("sensor.imap_email_email_com")
# we should have an entity with an unavailable state
assert state is not None
assert state.state == STATE_UNAVAILABLE
@pytest.mark.parametrize("imap_has_capability", [True], ids=["push"])
@pytest.mark.parametrize(
"imap_wait_server_push_exception",
[
AioImapException("Something went wrong"),
asyncio.TimeoutError,
],
ids=["AioImapException", "TimeoutError"],
)
async def test_lost_connection_with_imap_push(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mock_imap_protocol: MagicMock,
imap_wait_server_push_exception: AioImapException | asyncio.TimeoutError,
) -> None:
"""Test error handling when the connection is lost."""
# Mock an error in waiting for a pushed update
mock_imap_protocol.wait_server_push.side_effect = imap_wait_server_push_exception
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert "Lost imap.server.com (will attempt to reconnect after 10 s)" in caplog.text
state = hass.states.get("sensor.imap_email_email_com")
# we should have an entity with an unavailable state
assert state is not None
assert state.state == STATE_UNAVAILABLE
@pytest.mark.parametrize("imap_has_capability", [True], ids=["push"])
async def test_fetch_number_of_messages(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mock_imap_protocol: MagicMock,
) -> None:
"""Test _async_fetch_number_of_messages fails with push coordinator."""
# Mock an error in waiting for a pushed update
mock_imap_protocol.search.return_value = Response(*BAD_RESPONSE)
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we wait for the backoff time
async_fire_time_changed(hass, utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
assert "Invalid response for search" in caplog.text
state = hass.states.get("sensor.imap_email_email_com")
# we should have an entity with an unavailable state
assert state is not None
assert state.state == STATE_UNAVAILABLE