Source code for eletter.decompose

from __future__ import annotations
from datetime import datetime
from email import headerregistry as hr
from email.message import EmailMessage
from functools import partial
import attr
from mailbits import ContentType, parse_addresses
from .classes import (
    Alternative,
    Attachment,
    BytesAttachment,
    EmailAttachment,
    HTMLBody,
    MailItem,
    Mixed,
    Multipart,
    Related,
    TextAttachment,
    TextBody,
)
from .core import compose
from .errors import DecompositionError, MixedContentError, SimplificationError


[docs] @attr.s class Eletter: """ .. versionadded:: 0.5.0 A decomposed e-mail message """ #: The message's body content: MailItem = attr.ib() #: The message's subject line, if any subject: str | None = attr.ib(default=None) #: The message's :mailheader:`From` addresses from_: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`To` addresses to: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`CC` addresses cc: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`BCC` addresses bcc: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`Reply-To` addresses reply_to: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`Sender` address, if any sender: hr.Address | None = attr.ib(default=None) #: The message's :mailheader:`Date` header, if set date: datetime | None = attr.ib(default=None) #: Any additional headers on the message. The header names are lowercase. headers: dict[str, list[str]] = attr.ib(factory=dict)
[docs] def compose(self) -> EmailMessage: """ Convert the `Eletter` back into an `~email.message.EmailMessage` """ return self.content.compose( subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
[docs] def simplify(self, unmix: bool = False) -> SimpleEletter: """ Simplify the `Eletter` into a `SimpleEletter`, breaking down `Eletter.content` into a text body, HTML body, and a list of attachments. By default, a :mimetype:`multipart/mixed` message can only be simplified if all of the attachments come after all of the message bodies; set ``unmix`` to `True` to separate the attachments from the bodies regardless of what order they come in. :raises SimplificationError: if ``msg`` cannot be simplified """ content = smooth(self.content) text: str | None html: str | None attachments: list[Attachment] if isinstance(content, Alternative): text = None html = None attachments = [] for t, h, attach in map(partial(simplify_alt_part, unmix=unmix), content): if t is not None and h is None: if text is None: text = t else: raise SimplificationError( "Multiple text/plain parts in multipart/alternative" ) elif h is not None and t is None: if html is None: html = h else: raise SimplificationError( "Multiple text/html parts in multipart/alternative" ) elif t is None and h is None: raise SimplificationError( "Alternative part contains neither text/plain nor text/html" ) else: raise SimplificationError( "Alternative part contains both text/plain and text/html" ) attachments.extend(a for a in attach if a not in attachments) else: text, html, attachments = simplify_alt_part(content, unmix=unmix) if text is None and html is None: raise SimplificationError("No text or HTML bodies in message") return SimpleEletter( text=text, html=html, attachments=attachments, subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
[docs] @attr.s class SimpleEletter: """ .. versionadded:: 0.5.0 A decomposed simple e-mail message, consisting of a text body and/or HTML body plus some number of attachments and headers """ #: The message's text body, if any text: str | None = attr.ib(default=None) #: The message's HTML body, if any html: str | None = attr.ib(default=None) #: Attachments on the message attachments: list[Attachment] = attr.ib(factory=list) #: The message's subject line, if any subject: str | None = attr.ib(default=None) #: The message's :mailheader:`From` addresses from_: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`To` addresses to: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`CC` addresses cc: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`BCC` addresses bcc: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`Reply-To` addresses reply_to: list[hr.Address | hr.Group] = attr.ib(factory=list) #: The message's :mailheader:`Sender` address, if any sender: hr.Address | None = attr.ib(default=None) #: The message's :mailheader:`Date` header, if set date: datetime | None = attr.ib(default=None) #: Any additional headers on the message. The header names are lowercase. headers: dict[str, list[str]] = attr.ib(factory=dict)
[docs] def compose(self) -> EmailMessage: """ Convert the `SimpleEletter` back into an `~email.message.EmailMessage` """ return compose( text=self.text, html=self.html, attachments=self.attachments, subject=self.subject, from_=self.from_, to=self.to, cc=self.cc, bcc=self.bcc, reply_to=self.reply_to, sender=self.sender, date=self.date, headers=self.headers, )
STANDARD_HEADERS = { "subject", "from", "to", "cc", "bcc", "reply-to", "sender", "date", "content-type", "content-id", "content-disposition", "content-transfer-encoding", "mime-version", }
[docs] def decompose(msg: EmailMessage) -> Eletter: """ .. versionadded:: 0.5.0 Decompose an `~email.message.EmailMessage` into an `Eletter` instance containing a `MailItem` and a collection of headers. Only structures that can be represented by ``eletter`` classes are supported. All message parts that are not :mimetype:`text/plain`, :mimetype:`text/html`, :mimetype:`multipart/*`, or :mimetype:`message/*` are treated as attachments. Attachments without filenames or an explicit "attachment" :mailheader:`Content-Disposition` are treated as inline. Any information specific to how the message is encoded is discarded (namely, "charset" parameters on :mimetype:`text/*` parts, :mailheader:`Content-Transfer-Encoding` headers, and :mailheader:`MIME-Version` headers). Headers on message sub-parts that do not have representations on `MailItem`\\s are discarded (namely, everything other than :mailheader:`Content-Type`, :mailheader:`Content-Disposition`, and :mailheader:`Content-ID`). :raises TypeError: if any sub-part of ``msg`` is not an `~email.message.EmailMessage` instance :raises DecompositionError: if ``msg`` contains a part with an unrepresentable :mailheader:`Content-Type` """ subject = get_str_header(msg, "Subject") from_ = get_address_list(msg, "From") to = get_address_list(msg, "To") cc = get_address_list(msg, "CC") bcc = get_address_list(msg, "BCC") reply_to = get_address_list(msg, "Reply-To") sender_head = msg.get("Sender") sender: hr.Address | None if sender_head is not None: assert isinstance(sender_head, hr.SingleAddressHeader) sender = sender_head.address else: sender = None date_head = msg.get("Date") date: datetime | None if date_head is not None: assert isinstance(date_head, hr.DateHeader) date = date_head.datetime else: date = None headers: dict[str, list[str]] = {} for h in msg.keys(): h = h.lower() if h not in STANDARD_HEADERS: headers[h] = list(map(str, msg.get_all(h, []))) content = get_content(msg) return Eletter( subject=subject, from_=from_, to=to, cc=cc, bcc=bcc, reply_to=reply_to, sender=sender, date=date, headers=headers, content=content, )
def get_content(msg: EmailMessage) -> MailItem: try: ct = ContentType.parse(str(msg.get("Content-Type", msg.get_default_type()))) except ValueError: ct = ContentType.parse("text/plain") disposition = msg.get_content_disposition() filename = msg.get_filename(None) content_id = get_str_header(msg, "Content-ID") if filename is not None and disposition is None: disposition = "attachment" if ct.maintype == "multipart": content: Multipart if ct.subtype == "mixed": content = Mixed(content_id=content_id) elif ct.subtype == "alternative": content = Alternative(content_id=content_id) elif ct.subtype == "related": content = Related(content_id=content_id, start=ct.params.get("start")) else: raise DecompositionError(f"Unsupported Content-Type: {ct.content_type}") for p in msg.iter_parts(): if not isinstance(p, EmailMessage): # pragma: no cover raise TypeError("EmailMessage parts must be EmailMessage instances") content.append(get_content(p)) return content elif ct.maintype == "message": if ct.subtype == "rfc822": body = msg.get_content() if not isinstance(body, EmailMessage): # pragma: no cover raise TypeError("EmailMessage parts must be EmailMessage instances") return EmailAttachment( content=body, filename=filename, inline=disposition != "attachment", content_id=content_id, ) else: raise DecompositionError(f"Unsupported Content-Type: {ct.content_type}") elif ct.maintype == "text": text = msg.get_content() assert isinstance(text, str) if ( filename is not None or disposition == "attachment" or ct.subtype not in ("plain", "html") ): ct.params.pop("charset", None) return TextAttachment( content=text, filename=filename, content_type=str(ct), inline=disposition != "attachment", content_id=content_id, ) elif ct.subtype == "plain": return TextBody(text, content_id=content_id) else: assert ct.subtype == "html" return HTMLBody(text, content_id=content_id) else: blob = msg.get_content() assert isinstance(blob, bytes) return BytesAttachment( content=blob, filename=filename, content_type=str(ct), content_id=content_id, inline=disposition != "attachment", ) def get_str_header(msg: EmailMessage, header: str) -> str | None: value = msg.get(header) if value is not None: return str(value) else: return None def get_address_list(msg: EmailMessage, header: str) -> list[hr.Address | hr.Group]: addresses = [] for h in msg.get_all(header, []): assert isinstance(h, hr.AddressHeader) addresses.extend(parse_addresses(h)) return addresses
[docs] def decompose_simple(msg: EmailMessage, unmix: bool = False) -> SimpleEletter: """ .. versionadded:: 0.5.0 Decompose an `~email.message.EmailMessage` into a `SimpleEletter` instance consisting of a text body and/or HTML body, some number of attachments, and a collection of headers. The `~email.message.EmailMessage` is first decomposed with `decompose()` and then simplified by calling `Eletter.simplify()`. By default, a :mimetype:`multipart/mixed` message can only be simplified if all of the attachments come after all of the message bodies; set ``unmix`` to `True` to separate the attachments from the bodies regardless of what order they come in. :raises TypeError: if any sub-part of ``msg`` is not an `~email.message.EmailMessage` instance :raises DecompositionError: if ``msg`` contains a part with an unrepresentable :mailheader:`Content-Type` :raises SimplificationError: if ``msg`` cannot be simplified """ return decompose(msg).simplify(unmix=unmix)
def smooth(mi: MailItem) -> MailItem: if isinstance(mi, Multipart): out: list[MailItem] = [] for n in mi: n = smooth(n) # Flatten nested Mixed and Alternative, but not Related: if type(n) is type(mi) and not isinstance(mi, Related): assert isinstance(n, Multipart) out.extend(n) elif not (isinstance(n, Multipart) and len(n) == 0): out.append(n) if len(out) == 1: return out[0] else: return type(mi)(out) else: return mi def alt2text_html(alt: Alternative) -> tuple[str, str]: if len(alt) == 2: if isinstance(alt[0], TextBody) and isinstance(alt[1], HTMLBody): return (alt[0].content, alt[1].content) elif isinstance(alt[0], HTMLBody) and isinstance(alt[1], TextBody): return (alt[1].content, alt[0].content) raise SimplificationError( "multipart/alternative inside multipart/mixed is not a text/plain part" " plus a text/html part" ) def simplify_alt_part( content: MailItem, unmix: bool = False ) -> tuple[str | None, str | None, list[Attachment]]: text: str | None = None html: str | None = None attachments: list[Attachment] = [] def add_text(t: str) -> None: nonlocal text if text is None: text = t else: if not text.endswith("\n"): text += "\n" text += t def add_html(h: str) -> None: nonlocal html if html is None: html = h else: if not html.endswith("\n"): html += "\n" html += h if isinstance(content, TextBody): text = content.content elif isinstance(content, HTMLBody): html = content.content elif isinstance(content, Mixed): for mi in content: if isinstance(mi, TextBody): if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if html is not None: raise SimplificationError( "No matching HTML alternative for text part" ) add_text(mi.content) elif isinstance(mi, HTMLBody): if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if text is not None: raise SimplificationError( "No matching text alternative for HTML part" ) add_html(mi.content) # elif isinstance(mi, Mixed): # smoothed out elif isinstance(mi, Alternative): # Require the Alt to be only text | html; error on further # nesting text_part, html_part = alt2text_html(mi) if attachments and not unmix: raise MixedContentError( "Message intersperses attachments with text" ) if (text is None) == (html is None): add_text(text_part) add_html(html_part) elif text is not None: raise SimplificationError( "Text + HTML alternative follows text-only body" ) else: assert html is not None raise SimplificationError( "Text + HTML alternative follows HTML-only body" ) elif isinstance(mi, Related): raise SimplificationError("Cannot simplify multipart/related") elif isinstance(mi, Attachment): attachments.append(mi) else: raise TypeError(str(type(mi))) # pragma: no cover elif isinstance(content, Related): raise SimplificationError("Cannot simplify multipart/related") elif isinstance(content, Attachment): raise SimplificationError("Body is an attachment") else: raise TypeError(str(type(content))) # pragma: no cover return (text, html, attachments)