Fix imap crash on email without subject (#94230)

pull/94288/head
Jan Bouwhuis 2023-06-08 11:11:12 +02:00 committed by Paulus Schoutsen
parent 413e1c97d7
commit 0cf3825183
3 changed files with 61 additions and 2 deletions

View File

@ -120,7 +120,7 @@ class ImapMessage:
@property @property
def subject(self) -> str: def subject(self) -> str:
"""Decode the message subject.""" """Decode the message subject."""
decoded_header = decode_header(self.email_message["Subject"]) decoded_header = decode_header(self.email_message["Subject"] or "")
subject_header = make_header(decoded_header) subject_header = make_header(decoded_header)
return str(subject_header) return str(subject_header)

View File

@ -24,7 +24,12 @@ TEST_MESSAGE_HEADERS2 = (
b"Subject: Test subject\r\n" b"Subject: Test subject\r\n"
) )
TEST_MESSAGE_HEADERS3 = b""
TEST_MESSAGE = TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2 TEST_MESSAGE = TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2
TEST_MESSAGE_NO_SUBJECT_TO_FROM = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS3
)
TEST_MESSAGE_ALT = TEST_MESSAGE_HEADERS1 + DATE_HEADER2 + TEST_MESSAGE_HEADERS2 TEST_MESSAGE_ALT = TEST_MESSAGE_HEADERS1 + DATE_HEADER2 + TEST_MESSAGE_HEADERS2
TEST_INVALID_DATE1 = ( TEST_INVALID_DATE1 = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID1 + TEST_MESSAGE_HEADERS2 TEST_MESSAGE_HEADERS1 + DATE_HEADER_INVALID1 + TEST_MESSAGE_HEADERS2
@ -204,4 +209,19 @@ TEST_FETCH_RESPONSE_MULTIPART = (
], ],
) )
TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE_NO_SUBJECT_TO_FROM + TEST_CONTENT_TEXT_PLAIN)).encode(
"utf-8"
)
+ b"}",
bytearray(TEST_MESSAGE_NO_SUBJECT_TO_FROM + TEST_CONTENT_TEXT_PLAIN),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
RESPONSE_BAD = ("BAD", []) RESPONSE_BAD = ("BAD", [])

View File

@ -1,6 +1,6 @@
"""Test the imap entry initialization.""" """Test the imap entry initialization."""
import asyncio import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
@ -22,6 +22,7 @@ from .const import (
TEST_FETCH_RESPONSE_INVALID_DATE2, TEST_FETCH_RESPONSE_INVALID_DATE2,
TEST_FETCH_RESPONSE_INVALID_DATE3, TEST_FETCH_RESPONSE_INVALID_DATE3,
TEST_FETCH_RESPONSE_MULTIPART, TEST_FETCH_RESPONSE_MULTIPART,
TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM,
TEST_FETCH_RESPONSE_TEXT_BARE, TEST_FETCH_RESPONSE_TEXT_BARE,
TEST_FETCH_RESPONSE_TEXT_OTHER, TEST_FETCH_RESPONSE_TEXT_OTHER,
TEST_FETCH_RESPONSE_TEXT_PLAIN, TEST_FETCH_RESPONSE_TEXT_PLAIN,
@ -153,6 +154,44 @@ async def test_receiving_message_successfully(
) )
@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE])
@pytest.mark.parametrize("imap_fetch", [TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM])
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_receiving_message_no_subject_to_from(
hass: HomeAssistant, mock_imap_protocol: MagicMock
) -> None:
"""Test receiving a message successfully without subject, to and from in body."""
event_called = async_capture_events(hass, "imap_content")
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
# we should have received one message
assert state is not None
assert state.state == "1"
# we should have received one event
assert len(event_called) == 1
data: dict[str, Any] = event_called[0].data
assert data["server"] == "imap.server.com"
assert data["username"] == "email@email.com"
assert data["search"] == "UnSeen UnDeleted"
assert data["folder"] == "INBOX"
assert data["sender"] == ""
assert data["subject"] == ""
assert data["date"] == datetime(
2023, 3, 24, 13, 52, tzinfo=timezone(timedelta(seconds=3600))
)
assert data["text"] == "Test body\r\n\r\n"
assert data["headers"]["Return-Path"] == ("<john.doe@example.com>",)
assert data["headers"]["Delivered-To"] == ("notify@example.com",)
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"]) @pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
@pytest.mark.parametrize( @pytest.mark.parametrize(
("imap_login_state", "success"), [(AUTH, True), (NONAUTH, False)] ("imap_login_state", "success"), [(AUTH, True), (NONAUTH, False)]