Move nested code to class level as static method in imap coordinator (#109665)
* Move _decode_payload to module level in imap coordinator * Make static methodpull/109689/head
parent
c9fd97c6a3
commit
7ab1cdc2b3
|
@ -101,6 +101,23 @@ class ImapMessage:
|
||||||
"""Initialize IMAP message."""
|
"""Initialize IMAP message."""
|
||||||
self.email_message = email.message_from_bytes(raw_message)
|
self.email_message = email.message_from_bytes(raw_message)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _decode_payload(part: Message) -> str:
|
||||||
|
"""Try to decode text payloads.
|
||||||
|
|
||||||
|
Common text encodings are quoted-printable or base64.
|
||||||
|
Falls back to the raw content part if decoding fails.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
decoded_payload: Any = part.get_payload(decode=True)
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
assert isinstance(decoded_payload, bytes)
|
||||||
|
content_charset = part.get_content_charset() or "utf-8"
|
||||||
|
return decoded_payload.decode(content_charset)
|
||||||
|
except ValueError:
|
||||||
|
# return undecoded payload
|
||||||
|
return str(part.get_payload())
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def headers(self) -> dict[str, tuple[str,]]:
|
def headers(self) -> dict[str, tuple[str,]]:
|
||||||
"""Get the email headers."""
|
"""Get the email headers."""
|
||||||
|
@ -158,30 +175,14 @@ class ImapMessage:
|
||||||
message_html: str | None = None
|
message_html: str | None = None
|
||||||
message_untyped_text: str | None = None
|
message_untyped_text: str | None = None
|
||||||
|
|
||||||
def _decode_payload(part: Message) -> str:
|
|
||||||
"""Try to decode text payloads.
|
|
||||||
|
|
||||||
Common text encodings are quoted-printable or base64.
|
|
||||||
Falls back to the raw content part if decoding fails.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
decoded_payload: Any = part.get_payload(decode=True)
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
assert isinstance(decoded_payload, bytes)
|
|
||||||
content_charset = part.get_content_charset() or "utf-8"
|
|
||||||
return decoded_payload.decode(content_charset)
|
|
||||||
except ValueError:
|
|
||||||
# return undecoded payload
|
|
||||||
return str(part.get_payload())
|
|
||||||
|
|
||||||
part: Message
|
part: Message
|
||||||
for part in self.email_message.walk():
|
for part in self.email_message.walk():
|
||||||
if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
|
if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
|
||||||
if message_text is None:
|
if message_text is None:
|
||||||
message_text = _decode_payload(part)
|
message_text = self._decode_payload(part)
|
||||||
elif part.get_content_type() == "text/html":
|
elif part.get_content_type() == "text/html":
|
||||||
if message_html is None:
|
if message_html is None:
|
||||||
message_html = _decode_payload(part)
|
message_html = self._decode_payload(part)
|
||||||
elif (
|
elif (
|
||||||
part.get_content_type().startswith("text")
|
part.get_content_type().startswith("text")
|
||||||
and message_untyped_text is None
|
and message_untyped_text is None
|
||||||
|
|
Loading…
Reference in New Issue