"""Coordinator for imag integration.""" from __future__ import annotations import asyncio from collections.abc import Mapping from datetime import timedelta import email import logging from typing import Any from aioimaplib import AUTH, IMAP4_SSL, SELECTED, AioImapException import async_timeout from homeassistant.config_entries import ConfigEntry, ConfigEntryState from homeassistant.const import ( CONF_PASSWORD, CONF_PORT, CONF_USERNAME, CONTENT_TYPE_TEXT_PLAIN, ) from homeassistant.core import HomeAssistant from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryError from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .const import CONF_CHARSET, CONF_FOLDER, CONF_SEARCH, CONF_SERVER, DOMAIN from .errors import InvalidAuth, InvalidFolder _LOGGER = logging.getLogger(__name__) BACKOFF_TIME = 10 EVENT_IMAP = "imap_content" async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL: """Connect to imap server and return client.""" client = IMAP4_SSL(data[CONF_SERVER], data[CONF_PORT]) await client.wait_hello_from_server() await client.login(data[CONF_USERNAME], data[CONF_PASSWORD]) if client.protocol.state != AUTH: raise InvalidAuth("Invalid username or password") await client.select(data[CONF_FOLDER]) if client.protocol.state != SELECTED: raise InvalidFolder(f"Folder {data[CONF_FOLDER]} is invalid") return client class ImapMessage: """Class to parse an RFC822 email message.""" def __init__(self, raw_message: bytes) -> None: """Initialize IMAP message.""" self.email_message = email.message_from_bytes(raw_message) @property def headers(self) -> dict[str, tuple[str,]]: """Get the email headers.""" header_base: dict[str, tuple[str,]] = {} for key, value in self.email_message.items(): header: tuple[str,] = (str(value),) if header_base.setdefault(key, header) != header: header_base[key] += header # type: ignore[assignment] return header_base @property def sender(self) -> str: """Get the parsed message sender from the email.""" return str(email.utils.parseaddr(self.email_message["From"])[1]) @property def subject(self) -> str: """Decode the message subject.""" decoded_header = email.header.decode_header(self.email_message["Subject"]) header = email.header.make_header(decoded_header) return str(header) @property def text(self) -> str: """Get the message text from the email. Will look for text/plain or use text/html if not found. """ message_text = None message_html = None message_untyped_text = None for part in self.email_message.walk(): if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN: if message_text is None: message_text = part.get_payload() elif part.get_content_type() == "text/html": if message_html is None: message_html = part.get_payload() elif ( part.get_content_type().startswith("text") and message_untyped_text is None ): message_untyped_text = part.get_payload() if message_text is not None: return message_text if message_html is not None: return message_html if message_untyped_text is not None: return message_untyped_text return self.email_message.get_payload() class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]): """Base class for imap client.""" config_entry: ConfigEntry def __init__( self, hass: HomeAssistant, imap_client: IMAP4_SSL, update_interval: timedelta | None, ) -> None: """Initiate imap client.""" self.imap_client = imap_client self._last_message_id: str | None = None super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=update_interval, ) async def async_start(self) -> None: """Start coordinator.""" async def _async_reconnect_if_needed(self) -> None: """Connect to imap server.""" if self.imap_client is None: self.imap_client = await connect_to_server(self.config_entry.data) async def _async_process_event(self, last_message_id: str) -> None: """Send a event for the last message if the last message was changed.""" response = await self.imap_client.fetch(last_message_id, "BODY.PEEK[]") if response.result == "OK": message = ImapMessage(response.lines[1]) data = { "server": self.config_entry.data[CONF_SERVER], "username": self.config_entry.data[CONF_USERNAME], "search": self.config_entry.data[CONF_SEARCH], "folder": self.config_entry.data[CONF_FOLDER], "text": message.text, "sender": message.sender, "subject": message.subject, "headers": message.headers, } self.hass.bus.fire(EVENT_IMAP, data) _LOGGER.debug( "Message processed, sender: %s, subject: %s", message.sender, message.subject, ) async def _async_fetch_number_of_messages(self) -> int | None: """Fetch last message and messages count.""" await self._async_reconnect_if_needed() await self.imap_client.noop() result, lines = await self.imap_client.search( self.config_entry.data[CONF_SEARCH], charset=self.config_entry.data[CONF_CHARSET], ) if result != "OK": raise UpdateFailed( f"Invalid response for search '{self.config_entry.data[CONF_SEARCH]}': {result} / {lines[0]}" ) count: int = len(message_ids := lines[0].split()) last_message_id = ( str(message_ids[-1:][0], encoding=self.config_entry.data[CONF_CHARSET]) if count else None ) if count and last_message_id is not None: self._last_message_id = last_message_id await self._async_process_event(last_message_id) return count async def _cleanup(self, log_error: bool = False) -> None: """Close resources.""" if self.imap_client: try: if self.imap_client.has_pending_idle(): self.imap_client.idle_done() await self.imap_client.stop_wait_server_push() await self.imap_client.close() await self.imap_client.logout() except (AioImapException, asyncio.TimeoutError) as ex: if log_error: self.async_set_update_error(ex) _LOGGER.warning("Error while cleaning up imap connection") self.imap_client = None async def shutdown(self, *_) -> None: """Close resources.""" await self._cleanup(log_error=True) class ImapPollingDataUpdateCoordinator(ImapDataUpdateCoordinator): """Class for imap client.""" def __init__(self, hass: HomeAssistant, imap_client: IMAP4_SSL) -> None: """Initiate imap client.""" super().__init__(hass, imap_client, timedelta(seconds=10)) async def _async_update_data(self) -> int | None: """Update the number of unread emails.""" try: return await self._async_fetch_number_of_messages() except ( AioImapException, UpdateFailed, asyncio.TimeoutError, ) as ex: self.async_set_update_error(ex) await self._cleanup() raise UpdateFailed() from ex except InvalidFolder as ex: _LOGGER.warning("Selected mailbox folder is invalid") self.async_set_update_error(ex) await self._cleanup() raise ConfigEntryError("Selected mailbox folder is invalid.") from ex except InvalidAuth as ex: _LOGGER.warning("Username or password incorrect, starting reauthentication") self.async_set_update_error(ex) await self._cleanup() raise ConfigEntryAuthFailed() from ex class ImapPushDataUpdateCoordinator(ImapDataUpdateCoordinator): """Class for imap client.""" def __init__(self, hass: HomeAssistant, imap_client: IMAP4_SSL) -> None: """Initiate imap client.""" super().__init__(hass, imap_client, None) self._push_wait_task: asyncio.Task[None] | None = None async def _async_update_data(self) -> int | None: """Update the number of unread emails.""" await self.async_start() return None async def async_start(self) -> None: """Start coordinator.""" self._push_wait_task = self.hass.async_create_background_task( self._async_wait_push_loop(), "Wait for IMAP data push" ) async def _async_wait_push_loop(self) -> None: """Wait for data push from server.""" while True: try: number_of_messages = await self._async_fetch_number_of_messages() except InvalidAuth as ex: _LOGGER.warning( "Username or password incorrect, starting reauthentication" ) self.config_entry.async_start_reauth(self.hass) self.async_set_update_error(ex) await self._cleanup() await asyncio.sleep(BACKOFF_TIME) except InvalidFolder as ex: _LOGGER.warning("Selected mailbox folder is invalid") self.config_entry.async_set_state( self.hass, ConfigEntryState.SETUP_ERROR, "Selected mailbox folder is invalid.", ) self.async_set_update_error(ex) await self._cleanup() await asyncio.sleep(BACKOFF_TIME) except ( UpdateFailed, AioImapException, asyncio.TimeoutError, ) as ex: self.async_set_update_error(ex) await self._cleanup() await asyncio.sleep(BACKOFF_TIME) continue else: self.async_set_updated_data(number_of_messages) try: idle: asyncio.Future = await self.imap_client.idle_start() await self.imap_client.wait_server_push() self.imap_client.idle_done() async with async_timeout.timeout(10): await idle except (AioImapException, asyncio.TimeoutError): _LOGGER.warning( "Lost %s (will attempt to reconnect after %s s)", self.config_entry.data[CONF_SERVER], BACKOFF_TIME, ) self.async_set_update_error(UpdateFailed("Lost connection")) await self._cleanup() await asyncio.sleep(BACKOFF_TIME) async def shutdown(self, *_) -> None: """Close resources.""" if self._push_wait_task: self._push_wait_task.cancel() await super().shutdown()